Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Refactor UnfoldResourceSourceAsync. #1240

Merged
merged 1 commit into from
Apr 15, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@ import pekko.annotation.InternalApi
import pekko.dispatch.ExecutionContexts.parasitic
import pekko.stream._
import pekko.stream.ActorAttributes.SupervisionStrategy
import pekko.stream.Attributes.SourceLocation
import pekko.stream.impl.Stages.DefaultAttributes
import pekko.stream.stage._
import pekko.util.OptionVal

/**
* INTERNAL API
Expand All @@ -36,116 +38,116 @@ import pekko.stream.stage._
extends GraphStage[SourceShape[T]] {
val out = Outlet[T]("UnfoldResourceSourceAsync.out")
override val shape = SourceShape(out)
override def initialAttributes: Attributes = DefaultAttributes.unfoldResourceSourceAsync

def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) with OutHandler {
lazy val decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider
private implicit def ec: ExecutionContext = materializer.executionContext
private var state: Option[S] = None

private val createdCallback = getAsyncCallback[Try[S]] {
case Success(resource) =>
state = Some(resource)
if (isAvailable(out)) onPull()
case Failure(t) => failStage(t)
}.invokeWithFeedback _

private val errorHandler: PartialFunction[Throwable, Unit] = {
case NonFatal(ex) =>
decider(ex) match {
case Supervision.Stop =>
failStage(ex)
case Supervision.Restart =>
try {
restartResource()
} catch {
case NonFatal(ex) => failStage(ex)
}
case Supervision.Resume => onPull()
}
}
override def initialAttributes: Attributes =
DefaultAttributes.unfoldResourceSourceAsync and SourceLocation.forLambda(create)

def createLogic(inheritedAttributes: Attributes) =
new GraphStageLogic(shape) with OutHandler {
private lazy val decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider
private implicit def ec: ExecutionContext = materializer.executionContext
private var maybeResource: OptionVal[S] = OptionVal.none

private val createdCallback = getAsyncCallback[Try[S]] {
case Success(resource) =>
require(resource != null, "`create` method should not return a null resource.")
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the resource can be null, but can not now.
I think no one will returning a null resource

Maybe we should do the same in unfoldResource

maybeResource = OptionVal(resource)
if (isAvailable(out)) onPull()
case Failure(t) => failStage(t)
}.invokeWithFeedback _

private val errorHandler: PartialFunction[Throwable, Unit] = {
case NonFatal(ex) =>
decider(ex) match {
case Supervision.Stop =>
failStage(ex)
case Supervision.Restart =>
try {
restartResource()
} catch {
case NonFatal(ex) => failStage(ex)
}
case Supervision.Resume => onPull()
}
}

private val readCallback = getAsyncCallback[Try[Option[T]]](handle).invoke _

private def handle(result: Try[Option[T]]): Unit = result match {
case Success(data) =>
data match {
case Some(d) => push(out, d)
case None =>
// end of resource reached, lets close it
state match {
case Some(resource) =>
close(resource).onComplete(getAsyncCallback[Try[Done]] {
case Success(Done) => completeStage()
case Failure(ex) => failStage(ex)
}.invoke)(parasitic)
state = None

case None =>
// cannot happen, but for good measure
throw new IllegalStateException("Reached end of data but there is no open resource")
}
}
case Failure(t) => errorHandler(t)
}
private val readCallback = getAsyncCallback[Try[Option[T]]](handle).invoke _

private def handle(result: Try[Option[T]]): Unit = result match {
case Success(data) =>
data match {
case Some(d) => push(out, d)
case None =>
// end of resource reached, lets close it
maybeResource match {
case OptionVal.Some(resource) =>
close(resource).onComplete(getAsyncCallback[Try[Done]] {
case Success(Done) => completeStage()
case Failure(ex) => failStage(ex)
}.invoke)(parasitic)
maybeResource = OptionVal.none

case _ =>
// cannot happen, but for good measure
throw new IllegalStateException("Reached end of data but there is no open resource")
}
}
case Failure(t) => errorHandler(t)
}

override def preStart(): Unit = createResource()
override def preStart(): Unit = createResource()

override def onPull(): Unit =
state match {
case Some(resource) =>
override def onPull(): Unit = maybeResource match {
case OptionVal.Some(resource) =>
try {
val future = readData(resource)
future.value match {
case Some(value) => handle(value)
case None => future.onComplete(readCallback)(parasitic)
}
} catch errorHandler
case None =>

case OptionVal.None =>
// we got a pull but there is no open resource, we are either
// currently creating/restarting then the read will be triggered when creating the
// resource completes, or shutting down and then the pull does not matter anyway
}

override def postStop(): Unit = {
state.foreach(r => close(r))
}
override def postStop(): Unit = maybeResource match {
case OptionVal.Some(resource) => close(resource)
case _ => // do nothing
}

private def restartResource(): Unit = {
state match {
case Some(resource) =>
private def restartResource(): Unit = maybeResource match {
case OptionVal.Some(resource) =>
// wait for the resource to close before restarting
close(resource).onComplete(getAsyncCallback[Try[Done]] {
case Success(Done) =>
createResource()
case Failure(ex) => failStage(ex)
case Success(Done) => createResource()
case Failure(ex) => failStage(ex)
}.invoke)(parasitic)
state = None
case None =>
createResource()
maybeResource = OptionVal.none

case _ => createResource()
}
}

private def createResource(): Unit = {
create().onComplete { resource =>
createdCallback(resource).failed.foreach {
case _: StreamDetachedException =>
// stream stopped before created callback could be invoked, we need
// to close the resource if it is was opened, to not leak it
resource match {
case Success(r) =>
close(r)
case Failure(ex) =>
throw ex // failed to open but stream is stopped already
}
case _ => // we don't care here
}
}(parasitic)
}
private def createResource(): Unit = {
create().onComplete { resource =>
createdCallback(resource).failed.foreach {
case _: StreamDetachedException =>
// stream stopped before created callback could be invoked, we need
// to close the resource if it is was opened, to not leak it
resource match {
case Success(r) =>
close(r)
case Failure(ex) =>
throw ex // failed to open but stream is stopped already
}
case _ => // we don't care here
}
}(parasitic)
}

setHandler(out, this)
setHandler(out, this)
}

}
override def toString = "UnfoldResourceSourceAsync"

}
Loading