Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix uncaught decider exception in Split with Supervision.resumingDecider #1207

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -211,8 +211,6 @@ class FlowSplitAfterSpec extends StreamSpec("""
}

"resume stream when splitAfter function throws" in {
info("Supervision is not supported fully by GraphStages yet")
pending
val publisherProbeProbe = TestPublisher.manualProbe[Int]()
val exc = TE("test")
val publisher = Source
Expand Down Expand Up @@ -250,10 +248,10 @@ class FlowSplitAfterSpec extends StreamSpec("""
upstreamSubscription.sendNext(6)
substreamPuppet1.expectNext(6)
substreamPuppet1.expectComplete()
upstreamSubscription.sendNext(7)
Copy link
Contributor Author

@mdedetrich mdedetrich Mar 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like this is a genuine mistake when the test was originally written (remember that this test never passed, it was written for the future when supervision strategy propagation would be implemented which occurred later as a result of #252).

Logically when you read the test, its expected that when you do expectNext that previously an element should have been sent with sendNext and its also the same in the equivalent groupBy test and also splitWhen test

Copy link
Contributor Author

@mdedetrich mdedetrich Mar 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought about this again, I may be wrong and the test is indeed valid as its written right now because its splitAfter so its its meant to be skipping the next element due to exception being thrown, in which case we have to drop one element and then do pull(in)???

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the element which case the exception is dropped now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the element which case the exception is dropped now.

So to confirm, the current PRs implementation is correct and the test as originally written was wrong?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As the downstreamSubscription.request(100) , I think the orign test should be right too, but its a little strange now ,that we need send the next(7) to make the next sub source be ready.

I think we should call

      private def pushSubstreamSource(): Unit = {
        push(out, Source.fromGraph(substreamSource.source))
        scheduleOnce(SubscriptionTimer, timeout)
        substreamWaitingToBePushed = false
      }

now inside the SubstreamHandler#onPush when the decision is SplitAfter too to keep the behavior the same, as it just a dummy Source.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mdedetrich in the test, it requests 10, so the SubSource is expected to be there after 6.

Copy link
Contributor Author

@mdedetrich mdedetrich Mar 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mdedetrich in the test, it requests 10, so the SubSource is expected to be there after 6.

But its also sending a completion because of this condition if (elem == 3) throw exc else elem % 3 == 0 (i.e. 6 % 3 == 0 so the split happens right after that) which is why the test is failing with

Expected OnNext(_), yet no element signaled during 3 seconds
java.lang.AssertionError: Expected OnNext(_), yet no element signaled during 3 seconds
	at org.apache.pekko.stream.testkit.TestSubscriber$ManualProbe.expectNext(StreamTestKit.scala:398)
	at org.apache.pekko.stream.testkit.TestSubscriber$ManualProbe.expectNext(StreamTestKit.scala:389)
	at org.apache.pekko.stream.scaladsl.FlowSplitAfterSpec.$anonfun$new$11(FlowSplitAfterSpec.scala:251)

at val substream2 = subscriber.expectNext() because there isn't going to be a expectNext, it's expecting a completion instead.

Note that at this point, we are dealing with the normal logic of SplitWhen. The new functionality of skipping exceptions with SupervisionDecider.resume occurs earlier when elem == 3 so we are past that and just testing happy path logic of SplitAfter which is not changed at all (and it shouldn't be either!).

This is why I think that part of the test is written incorrectly. In fact you can argue it can even be removed since it has nothing to do with recovering from thrown exceptions with SupervisionDecider.resume, its testing something entirely different (but I would personally leave it there now so its consistent with the other tests).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

**1. val substream2 = subscriber.expectNext(), the subscriber is not expecting a complete, otherwise, the call sendNext(7) will cause issue because the origin source is already completed.

The problem here is when the stream2 be generated, I expected the behavior the same, just after the 6 % 3 == 0, wdyt @samueleresca @mdedetrich**

Copy link
Contributor Author

@mdedetrich mdedetrich Mar 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

**1. val substream2 = subscriber.expectNext(), the subscriber is not expecting a complete, otherwise, the call sendNext(7) will cause issue because the origin source is already completed.

The problem here is when the stream2 be generated, I expected the behavior the same, just after the 6 % 3 == 0, wdyt @samueleresca @mdedetrich**

I am not saying that this is necessarily wrong, just that its a completely separate issue from what this PR is changing/solving. Or to put it differently, this behaviour is the same as current Pekko since at this point i.e.

upstreamSubscription.sendNext(6)
substreamPuppet1.expectNext(6)
substreamPuppet1.expectComplete()
upstreamSubscription.sendNext(7)
val substream2 = subscriber.expectNext()
val substreamPuppet2 = StreamPuppet(substream2.runWith(Sink.asPublisher(false)))
substreamPuppet2.request(10)
substreamPuppet2.expectNext(7)

upstreamSubscription.sendComplete()
subscriber.expectComplete()
substreamPuppet2.expectComplete()

we are past recovering from an exception (furthermore if we are somehow changing some fundamental behaviour with SplitAfter then other tests would fail, but they are all passing without any changes). The critical part of the test specifically dealing with recovering from exception and resuming is

upstreamSubscription.sendNext(3)
upstreamSubscription.sendNext(4)
substreamPuppet1.expectNext(4) // note that 3 was dropped

And this part is completely unchanged from how the test was originally written.

Given that, I think it makes sense to file a separate issue if this behaviour about completing after a split needs to change and hence to tackle it separately. The only exception to this that I can think of is that some state is not being reset correctly

decider(ex) match {
  case Supervision.Resume  => pull(in)
  case Supervision.Stop    => onUpstreamFailure(ex)
  case Supervision.Restart => onUpstreamFailure(ex) // TODO implement restart?
}

in the case Supervision.Resume => pull(in) block, but if thats the case it would also error out earlier since that block is only executed when recovering from exceptions in onPush (and again thats only when elem is 3, not 6)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks folks, I appreciate the detailed explanation. And yes, I was diverting away (not on purpose) from the original goal of the test

Note that at this point, we are dealing with the normal logic of SplitWhen. The new functionality of skipping
exceptions with SupervisionDecider.resume occurs earlier when elem == 3 so we are past that and just testing happy > path logic of SplitAfter which is not changed at all (and it shouldn't be either!).

looks good to me

val substream2 = subscriber.expectNext()
val substreamPuppet2 = StreamPuppet(substream2.runWith(Sink.asPublisher(false)))
substreamPuppet2.request(10)
upstreamSubscription.sendNext(7)
substreamPuppet2.expectNext(7)

upstreamSubscription.sendComplete()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -327,9 +327,6 @@ class FlowSplitWhenSpec extends StreamSpec("""
}

"resume stream when splitWhen function throws" in {
info("Supervision is not supported fully by GraphStages yet")
pending

val publisherProbeProbe = TestPublisher.manualProbe[Int]()
val exc = TE("test")
val publisher = Source
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -636,7 +636,12 @@ import pekko.util.ccompat.JavaConverters._
else substreamSource.push(elem)
}
} catch {
case NonFatal(ex) => onUpstreamFailure(ex)
case NonFatal(ex) =>
decider(ex) match {
case Supervision.Resume => pull(in)
case Supervision.Stop => onUpstreamFailure(ex)
case Supervision.Restart => onUpstreamFailure(ex) // TODO implement restart?
}
}
}

Expand Down
Loading