Skip to content

Commit

Permalink
Support Supervision.restart for SubFlow's
Browse files Browse the repository at this point in the history
  • Loading branch information
mdedetrich committed Jan 19, 2024
1 parent 19da736 commit e555122
Showing 1 changed file with 22 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -511,13 +511,6 @@ import pekko.util.ccompat.JavaConverters._
private var substreamWaitingToBePushed = false
private var substreamCancelled = false

def propagateSubstreamCancel(ex: Throwable): Boolean =
decider(ex) match {
case Supervision.Stop => true
case Supervision.Resume => false
case Supervision.Restart => false
}

setHandler(
out,
new OutHandler {
Expand Down Expand Up @@ -614,14 +607,31 @@ import pekko.util.ccompat.JavaConverters._

override def onDownstreamFinish(cause: Throwable): Unit = {
substreamCancelled = true
if (isClosed(in) || propagateSubstreamCancel(cause)) {
cancelStage(cause)
} else {
// Start draining
if (!hasBeenPulled(in)) pull(in)
decider(cause) match {
case Supervision.Stop =>
cancelStage(cause)
case Supervision.Resume =>
if (isClosed(in)) cancelStage(cause)
else {
// Start draining
if (!hasBeenPulled(in)) pull(in)
}
case Supervision.Restart =>
if (isClosed(in)) completeStage()
else {
restartState()
// Start draining
if (!hasBeenPulled(in)) pull(in)
}
}
}

private def restartState(): Unit = {
substreamSource = null
substreamWaitingToBePushed = false
substreamCancelled = false
}

override def onPush(): Unit = {
val elem = grab(in)
try {
Expand Down

0 comments on commit e555122

Please sign in to comment.