Skip to content

Commit

Permalink
Support Supervision.restart for SubFlow's
Browse files Browse the repository at this point in the history
  • Loading branch information
mdedetrich committed Mar 19, 2024
1 parent 689e30b commit 186f1dc
Showing 1 changed file with 31 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -512,13 +512,6 @@ import pekko.util.ccompat.JavaConverters._
private var substreamWaitingToBePushed = false
private var substreamCancelled = false

def propagateSubstreamCancel(ex: Throwable): Boolean =
decider(ex) match {
case Supervision.Stop => true
case Supervision.Resume => false
case Supervision.Restart => false
}

override def onPull(): Unit = {
if (substreamSource eq null) {
// can be already pulled from substream in case split after
Expand Down Expand Up @@ -610,14 +603,31 @@ import pekko.util.ccompat.JavaConverters._

override def onDownstreamFinish(cause: Throwable): Unit = {
substreamCancelled = true
if (isClosed(in) || propagateSubstreamCancel(cause)) {
cancelStage(cause)
} else {
// Start draining
if (!hasBeenPulled(in)) pull(in)
decider(cause) match {
case Supervision.Stop =>
cancelStage(cause)
case Supervision.Resume =>
if (isClosed(in)) cancelStage(cause)
else {
// Start draining
if (!hasBeenPulled(in)) pull(in)
}
case Supervision.Restart =>
if (isClosed(in)) completeStage()
else {
restartState()
// Start draining
if (!hasBeenPulled(in)) pull(in)
}
}
}

private def restartState(): Unit = {
substreamSource = null
substreamWaitingToBePushed = false
substreamCancelled = false
}

override def onPush(): Unit = {
val elem = grab(in)
try {
Expand All @@ -638,9 +648,15 @@ import pekko.util.ccompat.JavaConverters._
} catch {
case NonFatal(ex) =>
decider(ex) match {
case Supervision.Resume => pull(in)
case Supervision.Stop => onUpstreamFailure(ex)
case Supervision.Restart => onUpstreamFailure(ex) // TODO implement restart?
case Supervision.Resume => pull(in)
case Supervision.Stop => onUpstreamFailure(ex)
case Supervision.Restart =>
if (isClosed(in)) completeStage()
else {
restartState()
// Start draining
if (!hasBeenPulled(in)) pull(in)
}
}
}
}
Expand Down

0 comments on commit 186f1dc

Please sign in to comment.