Skip to content

Commit

Permalink
Fix uncaught decider exception in Split with Supervision.resumingDecider
Browse files Browse the repository at this point in the history
  • Loading branch information
mdedetrich committed Mar 18, 2024
1 parent 5376692 commit df3b9fc
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -211,8 +211,6 @@ class FlowSplitAfterSpec extends StreamSpec("""
}

"resume stream when splitAfter function throws" in {
info("Supervision is not supported fully by GraphStages yet")
pending
val publisherProbeProbe = TestPublisher.manualProbe[Int]()
val exc = TE("test")
val publisher = Source
Expand Down Expand Up @@ -250,10 +248,10 @@ class FlowSplitAfterSpec extends StreamSpec("""
upstreamSubscription.sendNext(6)
substreamPuppet1.expectNext(6)
substreamPuppet1.expectComplete()
upstreamSubscription.sendNext(7)
val substream2 = subscriber.expectNext()
val substreamPuppet2 = StreamPuppet(substream2.runWith(Sink.asPublisher(false)))
substreamPuppet2.request(10)
upstreamSubscription.sendNext(7)
substreamPuppet2.expectNext(7)

upstreamSubscription.sendComplete()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -327,9 +327,6 @@ class FlowSplitWhenSpec extends StreamSpec("""
}

"resume stream when splitWhen function throws" in {
info("Supervision is not supported fully by GraphStages yet")
pending

val publisherProbeProbe = TestPublisher.manualProbe[Int]()
val exc = TE("test")
val publisher = Source
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -640,7 +640,12 @@ import pekko.util.ccompat.JavaConverters._
else substreamSource.push(elem)
}
} catch {
case NonFatal(ex) => onUpstreamFailure(ex)
case NonFatal(ex) =>
decider(ex) match {
case Supervision.Resume => pull(in)
case Supervision.Stop => onUpstreamFailure(ex)
case Supervision.Restart => onUpstreamFailure(ex) // TODO implement restart?
}
}
}

Expand Down

0 comments on commit df3b9fc

Please sign in to comment.