From db2125586f4cc1648f4324a4aa2be3426f0cca0a Mon Sep 17 00:00:00 2001 From: He-Pin Date: Sat, 30 Mar 2024 01:37:49 +0800 Subject: [PATCH] chore: Refactor UnfoldResourceSourceAsync. --- .../impl/UnfoldResourceSourceAsync.scala | 178 +++++++++--------- 1 file changed, 90 insertions(+), 88 deletions(-) diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/UnfoldResourceSourceAsync.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/UnfoldResourceSourceAsync.scala index 7f783be6e85..6f6c1861866 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/impl/UnfoldResourceSourceAsync.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/UnfoldResourceSourceAsync.scala @@ -23,8 +23,10 @@ import pekko.annotation.InternalApi import pekko.dispatch.ExecutionContexts.parasitic import pekko.stream._ import pekko.stream.ActorAttributes.SupervisionStrategy +import pekko.stream.Attributes.SourceLocation import pekko.stream.impl.Stages.DefaultAttributes import pekko.stream.stage._ +import pekko.util.OptionVal /** * INTERNAL API @@ -36,64 +38,66 @@ import pekko.stream.stage._ extends GraphStage[SourceShape[T]] { val out = Outlet[T]("UnfoldResourceSourceAsync.out") override val shape = SourceShape(out) - override def initialAttributes: Attributes = DefaultAttributes.unfoldResourceSourceAsync - - def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) with OutHandler { - lazy val decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider - private implicit def ec: ExecutionContext = materializer.executionContext - private var state: Option[S] = None - - private val createdCallback = getAsyncCallback[Try[S]] { - case Success(resource) => - state = Some(resource) - if (isAvailable(out)) onPull() - case Failure(t) => failStage(t) - }.invokeWithFeedback _ - - private val errorHandler: PartialFunction[Throwable, Unit] = { - case NonFatal(ex) => - decider(ex) match { - case Supervision.Stop => - failStage(ex) - case Supervision.Restart => - try { - restartResource() - } catch { - case NonFatal(ex) => failStage(ex) - } - case Supervision.Resume => onPull() - } - } + override def initialAttributes: Attributes = + DefaultAttributes.unfoldResourceSourceAsync and SourceLocation.forLambda(create) + + def createLogic(inheritedAttributes: Attributes) = + new GraphStageLogic(shape) with OutHandler { + private lazy val decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider + private implicit def ec: ExecutionContext = materializer.executionContext + private var maybeResource: OptionVal[S] = OptionVal.none + + private val createdCallback = getAsyncCallback[Try[S]] { + case Success(resource) => + require(resource != null, "`create` method should not return a null resource.") + maybeResource = OptionVal(resource) + if (isAvailable(out)) onPull() + case Failure(t) => failStage(t) + }.invokeWithFeedback _ + + private val errorHandler: PartialFunction[Throwable, Unit] = { + case NonFatal(ex) => + decider(ex) match { + case Supervision.Stop => + failStage(ex) + case Supervision.Restart => + try { + restartResource() + } catch { + case NonFatal(ex) => failStage(ex) + } + case Supervision.Resume => onPull() + } + } - private val readCallback = getAsyncCallback[Try[Option[T]]](handle).invoke _ - - private def handle(result: Try[Option[T]]): Unit = result match { - case Success(data) => - data match { - case Some(d) => push(out, d) - case None => - // end of resource reached, lets close it - state match { - case Some(resource) => - close(resource).onComplete(getAsyncCallback[Try[Done]] { - case Success(Done) => completeStage() - case Failure(ex) => failStage(ex) - }.invoke)(parasitic) - state = None - - case None => - // cannot happen, but for good measure - throw new IllegalStateException("Reached end of data but there is no open resource") - } - } - case Failure(t) => errorHandler(t) - } + private val readCallback = getAsyncCallback[Try[Option[T]]](handle).invoke _ + + private def handle(result: Try[Option[T]]): Unit = result match { + case Success(data) => + data match { + case Some(d) => push(out, d) + case None => + // end of resource reached, lets close it + maybeResource match { + case OptionVal.Some(resource) => + close(resource).onComplete(getAsyncCallback[Try[Done]] { + case Success(Done) => completeStage() + case Failure(ex) => failStage(ex) + }.invoke)(parasitic) + maybeResource = OptionVal.none + + case _ => + // cannot happen, but for good measure + throw new IllegalStateException("Reached end of data but there is no open resource") + } + } + case Failure(t) => errorHandler(t) + } - override def preStart(): Unit = createResource() + override def preStart(): Unit = createResource() - override def onPull(): Unit = - state match { - case Some(resource) => + override def onPull(): Unit = maybeResource match { + case OptionVal.Some(resource) => try { val future = readData(resource) future.value match { @@ -101,51 +105,49 @@ import pekko.stream.stage._ case None => future.onComplete(readCallback)(parasitic) } } catch errorHandler - case None => + + case OptionVal.None => // we got a pull but there is no open resource, we are either // currently creating/restarting then the read will be triggered when creating the // resource completes, or shutting down and then the pull does not matter anyway } - override def postStop(): Unit = { - state.foreach(r => close(r)) - } + override def postStop(): Unit = maybeResource match { + case OptionVal.Some(resource) => close(resource) + case _ => // do nothing + } - private def restartResource(): Unit = { - state match { - case Some(resource) => + private def restartResource(): Unit = maybeResource match { + case OptionVal.Some(resource) => // wait for the resource to close before restarting close(resource).onComplete(getAsyncCallback[Try[Done]] { - case Success(Done) => - createResource() - case Failure(ex) => failStage(ex) + case Success(Done) => createResource() + case Failure(ex) => failStage(ex) }.invoke)(parasitic) - state = None - case None => - createResource() + maybeResource = OptionVal.none + + case _ => createResource() } - } - private def createResource(): Unit = { - create().onComplete { resource => - createdCallback(resource).failed.foreach { - case _: StreamDetachedException => - // stream stopped before created callback could be invoked, we need - // to close the resource if it is was opened, to not leak it - resource match { - case Success(r) => - close(r) - case Failure(ex) => - throw ex // failed to open but stream is stopped already - } - case _ => // we don't care here - } - }(parasitic) - } + private def createResource(): Unit = { + create().onComplete { resource => + createdCallback(resource).failed.foreach { + case _: StreamDetachedException => + // stream stopped before created callback could be invoked, we need + // to close the resource if it is was opened, to not leak it + resource match { + case Success(r) => + close(r) + case Failure(ex) => + throw ex // failed to open but stream is stopped already + } + case _ => // we don't care here + } + }(parasitic) + } - setHandler(out, this) + setHandler(out, this) + } - } override def toString = "UnfoldResourceSourceAsync" - }