From 0f472e918c9c3c48a2d8ffb8f2a400afd0ac94dc Mon Sep 17 00:00:00 2001 From: Matthew de Detrich Date: Fri, 19 Jan 2024 20:52:25 +1100 Subject: [PATCH] Support Supervision.restart for SubFlow's --- .../stream/impl/fusing/StreamOfStreams.scala | 28 +++++++++++-------- 1 file changed, 16 insertions(+), 12 deletions(-) diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/StreamOfStreams.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/StreamOfStreams.scala index aba8c7f680c..4235ad43353 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/StreamOfStreams.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/StreamOfStreams.scala @@ -511,13 +511,6 @@ import pekko.util.ccompat.JavaConverters._ private var substreamWaitingToBePushed = false private var substreamCancelled = false - def propagateSubstreamCancel(ex: Throwable): Boolean = - decider(ex) match { - case Supervision.Stop => true - case Supervision.Resume => false - case Supervision.Restart => false - } - setHandler( out, new OutHandler { @@ -614,11 +607,22 @@ import pekko.util.ccompat.JavaConverters._ override def onDownstreamFinish(cause: Throwable): Unit = { substreamCancelled = true - if (isClosed(in) || propagateSubstreamCancel(cause)) { - cancelStage(cause) - } else { - // Start draining - if (!hasBeenPulled(in)) pull(in) + decider(cause) match { + case Supervision.Stop => + cancelStage(cause) + case Supervision.Resume => + if (isClosed(in)) cancelStage(cause) + else { + // Start draining + if (!hasBeenPulled(in)) pull(in) + } + case Supervision.Restart => + if (isClosed(in)) completeStage() + else { + substreamCancelled = false + // Start draining + if (!hasBeenPulled(in)) pull(in) + } } }