From f25ef8beff922a58edadcf2ba315450164c584a4 Mon Sep 17 00:00:00 2001 From: He-Pin Date: Sat, 2 Sep 2023 21:00:09 +0800 Subject: [PATCH] =str Switch the type parameter order of UnfoldResourceSourceAsync --- .../impl/UnfoldResourceSourceAsync.scala | 226 +++++++++--------- .../apache/pekko/stream/javadsl/Source.scala | 20 +- .../apache/pekko/stream/scaladsl/Source.scala | 10 +- 3 files changed, 133 insertions(+), 123 deletions(-) diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/UnfoldResourceSourceAsync.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/UnfoldResourceSourceAsync.scala index 8431e4afd30..37120b0578a 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/impl/UnfoldResourceSourceAsync.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/UnfoldResourceSourceAsync.scala @@ -13,139 +13,145 @@ package org.apache.pekko.stream.impl +import scala.concurrent.{ ExecutionContext, Future } +import scala.util.{ Failure, Success, Try } +import scala.util.control.NonFatal + import org.apache.pekko import pekko.Done import pekko.annotation.InternalApi import pekko.dispatch.ExecutionContexts.parasitic -import pekko.stream.ActorAttributes.SupervisionStrategy import pekko.stream._ +import pekko.stream.ActorAttributes.SupervisionStrategy +import pekko.stream.Attributes.SourceLocation import pekko.stream.impl.Stages.DefaultAttributes import pekko.stream.stage._ - -import scala.concurrent.{ ExecutionContext, Future } -import scala.util.control.NonFatal -import scala.util.{ Failure, Success, Try } +import pekko.util.OptionVal /** * INTERNAL API */ -@InternalApi private[pekko] final class UnfoldResourceSourceAsync[T, S]( - create: () => Future[S], - readData: (S) => Future[Option[T]], - close: (S) => Future[Done]) +@InternalApi private[pekko] final class UnfoldResourceSourceAsync[R, T]( + create: () => Future[R], + readData: R => Future[Option[T]], + close: R => Future[Done]) extends GraphStage[SourceShape[T]] { val out = Outlet[T]("UnfoldResourceSourceAsync.out") override val shape = SourceShape(out) - override def initialAttributes: Attributes = DefaultAttributes.unfoldResourceSourceAsync - - def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) with OutHandler { - lazy val decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider - private implicit def ec: ExecutionContext = materializer.executionContext - private var state: Option[S] = None - - private val createdCallback = getAsyncCallback[Try[S]] { - case Success(resource) => - state = Some(resource) - if (isAvailable(out)) onPull() - case Failure(t) => failStage(t) - }.invokeWithFeedback _ - - private val errorHandler: PartialFunction[Throwable, Unit] = { - case NonFatal(ex) => - decider(ex) match { - case Supervision.Stop => - failStage(ex) - case Supervision.Restart => - try { - restartResource() - } catch { - case NonFatal(ex) => failStage(ex) - } - case Supervision.Resume => onPull() - } - } - - private val readCallback = getAsyncCallback[Try[Option[T]]](handle).invoke _ - - private def handle(result: Try[Option[T]]): Unit = result match { - case Success(data) => - data match { - case Some(d) => push(out, d) - case None => - // end of resource reached, lets close it - state match { - case Some(resource) => - close(resource).onComplete(getAsyncCallback[Try[Done]] { - case Success(Done) => completeStage() - case Failure(ex) => failStage(ex) - }.invoke)(parasitic) - state = None - - case None => - // cannot happen, but for good measure - throw new IllegalStateException("Reached end of data but there is no open resource") - } - } - case Failure(t) => errorHandler(t) - } + override def initialAttributes: Attributes = + DefaultAttributes.unfoldResourceSourceAsync and SourceLocation.forLambda(create) + + def createLogic(inheritedAttributes: Attributes) = + new GraphStageLogic(shape) with OutHandler { + lazy val decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider + private implicit def ec: ExecutionContext = materializer.executionContext + private var maybeResource: OptionVal[R] = OptionVal.none + + private val createdCallback = getAsyncCallback[Try[R]] { + case Success(resource) => + require(resource != null, "`create` method must not return a null resource.") + maybeResource = OptionVal(resource) + if (isAvailable(out)) onPull() + case Failure(t) => failStage(t) + }.invokeWithFeedback _ + + private val errorHandler: PartialFunction[Throwable, Unit] = { + case NonFatal(ex) => + decider(ex) match { + case Supervision.Stop => + failStage(ex) + case Supervision.Restart => + try { + restartResource() + } catch { + case NonFatal(ex) => failStage(ex) + } + case Supervision.Resume => onPull() + } + } - override def preStart(): Unit = createResource() - - override def onPull(): Unit = - state match { - case Some(resource) => - try { - val future = readData(resource) - future.value match { - case Some(value) => handle(value) - case None => future.onComplete(readCallback)(parasitic) - } - } catch errorHandler - case None => - // we got a pull but there is no open resource, we are either - // currently creating/restarting then the read will be triggered when creating the - // resource completes, or shutting down and then the pull does not matter anyway + private val readCallback = getAsyncCallback[Try[Option[T]]](handle).invoke _ + + private def handle(result: Try[Option[T]]): Unit = result match { + case Success(data) => + data match { + case Some(d) => push(out, d) + case None => + // end of resource reached, lets close it + maybeResource match { + case OptionVal.Some(resource) => + close(resource).onComplete(getAsyncCallback[Try[Done]] { + case Success(Done) => completeStage() + case Failure(ex) => failStage(ex) + }.invoke)(parasitic) + maybeResource = OptionVal.none + + case _ => + // cannot happen, but for good measure + throw new IllegalStateException("Reached end of data but there is no open resource") + } + } + case Failure(t) => errorHandler(t) } - override def postStop(): Unit = { - state.foreach(r => close(r)) - } + override def preStart(): Unit = createResource() + + override def onPull(): Unit = + maybeResource match { + case OptionVal.Some(resource) => + try { + val future = readData(resource) + future.value match { + case Some(value) => handle(value) + case None => future.onComplete(readCallback)(parasitic) + } + } catch errorHandler + case _ => + // we got a pull but there is no open resource, we are either + // currently creating/restarting then the read will be triggered when creating the + // resource completes, or shutting down and then the pull does not matter anyway + } - private def restartResource(): Unit = { - state match { - case Some(resource) => - // wait for the resource to close before restarting - close(resource).onComplete(getAsyncCallback[Try[Done]] { - case Success(Done) => - createResource() - case Failure(ex) => failStage(ex) - }.invoke)(parasitic) - state = None - case None => - createResource() + override def postStop(): Unit = maybeResource match { + case OptionVal.Some(resource) => close(resource) + case _ => // do nothing } - } - private def createResource(): Unit = { - create().onComplete { resource => - createdCallback(resource).failed.foreach { - case _: StreamDetachedException => - // stream stopped before created callback could be invoked, we need - // to close the resource if it is was opened, to not leak it - resource match { - case Success(r) => - close(r) - case Failure(ex) => - throw ex // failed to open but stream is stopped already - } - case _ => // we don't care here + private def restartResource(): Unit = { + maybeResource match { + case OptionVal.Some(resource) => + // wait for the resource to close before restarting + close(resource).onComplete(getAsyncCallback[Try[Done]] { + case Success(Done) => + createResource() + case Failure(ex) => failStage(ex) + }.invoke)(parasitic) + maybeResource = OptionVal.none + case _ => + createResource() } - }(parasitic) - } + } - setHandler(out, this) + private def createResource(): Unit = { + create().onComplete { resource => + createdCallback(resource).failed.foreach { + case _: StreamDetachedException => + // stream stopped before created callback could be invoked, we need + // to close the resource if it is was opened, to not leak it + resource match { + case Success(r) => + close(r) + case Failure(ex) => + throw ex // failed to open but stream is stopped already + } + case _ => // we don't care here + } + }(parasitic) + } + + setHandler(out, this) - } + } override def toString = "UnfoldResourceSourceAsync" } diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala index a8fe6677671..729036255ea 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala @@ -25,8 +25,6 @@ import scala.concurrent.{ Future, Promise } import scala.concurrent.duration.FiniteDuration import scala.reflect.ClassTag -import org.reactivestreams.{ Publisher, Subscriber } - import org.apache.pekko import pekko.{ Done, NotUsed } import pekko.actor.{ ActorRef, Cancellable, ClassicActorSystemProvider } @@ -43,6 +41,8 @@ import pekko.util.JavaDurationConverters._ import pekko.util.OptionConverters._ import pekko.util.ccompat.JavaConverters._ +import org.reactivestreams.{ Publisher, Subscriber } + /** Java API */ object Source { private[this] val _empty = new Source[Any, NotUsed](scaladsl.Source.empty) @@ -851,16 +851,18 @@ object Source { * @param read - function that reads data from opened resource. It is called each time backpressure signal * is received. Stream calls close and completes when `CompletionStage` from read function returns None. * @param close - function that closes resource + * @tparam T the element type + * @tparam R the resource type */ - def unfoldResourceAsync[T, S]( - create: function.Creator[CompletionStage[S]], - read: function.Function[S, CompletionStage[Optional[T]]], - close: function.Function[S, CompletionStage[Done]]): javadsl.Source[T, NotUsed] = + def unfoldResourceAsync[T, R]( + create: function.Creator[CompletionStage[R]], + read: function.Function[R, CompletionStage[Optional[T]]], + close: function.Function[R, CompletionStage[Done]]): javadsl.Source[T, NotUsed] = new Source( - scaladsl.Source.unfoldResourceAsync[T, S]( + scaladsl.Source.unfoldResourceAsync[T, R]( () => create.create().asScala, - (s: S) => read.apply(s).asScala.map(_.toScala)(pekko.dispatch.ExecutionContexts.parasitic), - (s: S) => close.apply(s).asScala)) + (resource: R) => read.apply(resource).asScala.map(_.toScala)(pekko.dispatch.ExecutionContexts.parasitic), + (resource: R) => close.apply(resource).asScala)) /** * Upcast a stream of elements to a stream of supertypes of that element. Useful in combination with diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala index cd33cfb799e..0665fc51da8 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala @@ -961,11 +961,13 @@ object Source { * @param read - function that reads data from opened resource. It is called each time backpressure signal * is received. Stream calls close and completes when `Future` from read function returns None. * @param close - function that closes resource + * @tparam T the element type + * @tparam R the resource type */ - def unfoldResourceAsync[T, S]( - create: () => Future[S], - read: (S) => Future[Option[T]], - close: (S) => Future[Done]): Source[T, NotUsed] = + def unfoldResourceAsync[T, R]( + create: () => Future[R], + read: (R) => Future[Option[T]], + close: (R) => Future[Done]): Source[T, NotUsed] = Source.fromGraph(new UnfoldResourceSourceAsync(create, read, close)) /**