From 7a8111267e596754db090b9f82f58fe82135dc7d Mon Sep 17 00:00:00 2001 From: Matthew de Detrich Date: Fri, 19 Jan 2024 20:52:25 +1100 Subject: [PATCH] Support Supervision.restart for SubFlow's --- .../stream/impl/fusing/StreamOfStreams.scala | 34 ++++++++++++------- 1 file changed, 22 insertions(+), 12 deletions(-) diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/StreamOfStreams.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/StreamOfStreams.scala index 7b0e37e9429..cea1f64bb31 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/StreamOfStreams.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/StreamOfStreams.scala @@ -512,13 +512,6 @@ import pekko.util.ccompat.JavaConverters._ private var substreamWaitingToBePushed = false private var substreamCancelled = false - def propagateSubstreamCancel(ex: Throwable): Boolean = - decider(ex) match { - case Supervision.Stop => true - case Supervision.Resume => false - case Supervision.Restart => false - } - override def onPull(): Unit = { if (substreamSource eq null) { // can be already pulled from substream in case split after @@ -610,14 +603,31 @@ import pekko.util.ccompat.JavaConverters._ override def onDownstreamFinish(cause: Throwable): Unit = { substreamCancelled = true - if (isClosed(in) || propagateSubstreamCancel(cause)) { - cancelStage(cause) - } else { - // Start draining - if (!hasBeenPulled(in)) pull(in) + decider(cause) match { + case Supervision.Stop => + cancelStage(cause) + case Supervision.Resume => + if (isClosed(in)) cancelStage(cause) + else { + // Start draining + if (!hasBeenPulled(in)) pull(in) + } + case Supervision.Restart => + if (isClosed(in)) completeStage() + else { + restartState() + // Start draining + if (!hasBeenPulled(in)) pull(in) + } } } + private def restartState(): Unit = { + substreamSource = null + substreamWaitingToBePushed = false + substreamCancelled = false + } + override def onPush(): Unit = { val elem = grab(in) try {