Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix uncaught decider exception in Split with Supervision.resumingDecider #1207

Conversation

mdedetrich
Copy link
Contributor

@mdedetrich mdedetrich commented Mar 18, 2024

Resolves: #1205

Turns out the error was quite simple, we weren't using a decider to caught the exception in onPush

@@ -250,10 +248,10 @@ class FlowSplitAfterSpec extends StreamSpec("""
upstreamSubscription.sendNext(6)
substreamPuppet1.expectNext(6)
substreamPuppet1.expectComplete()
upstreamSubscription.sendNext(7)
Copy link
Contributor Author

@mdedetrich mdedetrich Mar 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like this is a genuine mistake when the test was originally written (remember that this test never passed, it was written for the future when supervision strategy propagation would be implemented which occurred later as a result of #252).

Logically when you read the test, its expected that when you do expectNext that previously an element should have been sent with sendNext and its also the same in the equivalent groupBy test and also splitWhen test

Copy link
Contributor Author

@mdedetrich mdedetrich Mar 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought about this again, I may be wrong and the test is indeed valid as its written right now because its splitAfter so its its meant to be skipping the next element due to exception being thrown, in which case we have to drop one element and then do pull(in)???

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the element which case the exception is dropped now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the element which case the exception is dropped now.

So to confirm, the current PRs implementation is correct and the test as originally written was wrong?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As the downstreamSubscription.request(100) , I think the orign test should be right too, but its a little strange now ,that we need send the next(7) to make the next sub source be ready.

I think we should call

      private def pushSubstreamSource(): Unit = {
        push(out, Source.fromGraph(substreamSource.source))
        scheduleOnce(SubscriptionTimer, timeout)
        substreamWaitingToBePushed = false
      }

now inside the SubstreamHandler#onPush when the decision is SplitAfter too to keep the behavior the same, as it just a dummy Source.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mdedetrich in the test, it requests 10, so the SubSource is expected to be there after 6.

Copy link
Contributor Author

@mdedetrich mdedetrich Mar 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mdedetrich in the test, it requests 10, so the SubSource is expected to be there after 6.

But its also sending a completion because of this condition if (elem == 3) throw exc else elem % 3 == 0 (i.e. 6 % 3 == 0 so the split happens right after that) which is why the test is failing with

Expected OnNext(_), yet no element signaled during 3 seconds
java.lang.AssertionError: Expected OnNext(_), yet no element signaled during 3 seconds
	at org.apache.pekko.stream.testkit.TestSubscriber$ManualProbe.expectNext(StreamTestKit.scala:398)
	at org.apache.pekko.stream.testkit.TestSubscriber$ManualProbe.expectNext(StreamTestKit.scala:389)
	at org.apache.pekko.stream.scaladsl.FlowSplitAfterSpec.$anonfun$new$11(FlowSplitAfterSpec.scala:251)

at val substream2 = subscriber.expectNext() because there isn't going to be a expectNext, it's expecting a completion instead.

Note that at this point, we are dealing with the normal logic of SplitWhen. The new functionality of skipping exceptions with SupervisionDecider.resume occurs earlier when elem == 3 so we are past that and just testing happy path logic of SplitAfter which is not changed at all (and it shouldn't be either!).

This is why I think that part of the test is written incorrectly. In fact you can argue it can even be removed since it has nothing to do with recovering from thrown exceptions with SupervisionDecider.resume, its testing something entirely different (but I would personally leave it there now so its consistent with the other tests).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

**1. val substream2 = subscriber.expectNext(), the subscriber is not expecting a complete, otherwise, the call sendNext(7) will cause issue because the origin source is already completed.

The problem here is when the stream2 be generated, I expected the behavior the same, just after the 6 % 3 == 0, wdyt @samueleresca @mdedetrich**

Copy link
Contributor Author

@mdedetrich mdedetrich Mar 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

**1. val substream2 = subscriber.expectNext(), the subscriber is not expecting a complete, otherwise, the call sendNext(7) will cause issue because the origin source is already completed.

The problem here is when the stream2 be generated, I expected the behavior the same, just after the 6 % 3 == 0, wdyt @samueleresca @mdedetrich**

I am not saying that this is necessarily wrong, just that its a completely separate issue from what this PR is changing/solving. Or to put it differently, this behaviour is the same as current Pekko since at this point i.e.

upstreamSubscription.sendNext(6)
substreamPuppet1.expectNext(6)
substreamPuppet1.expectComplete()
upstreamSubscription.sendNext(7)
val substream2 = subscriber.expectNext()
val substreamPuppet2 = StreamPuppet(substream2.runWith(Sink.asPublisher(false)))
substreamPuppet2.request(10)
substreamPuppet2.expectNext(7)

upstreamSubscription.sendComplete()
subscriber.expectComplete()
substreamPuppet2.expectComplete()

we are past recovering from an exception (furthermore if we are somehow changing some fundamental behaviour with SplitAfter then other tests would fail, but they are all passing without any changes). The critical part of the test specifically dealing with recovering from exception and resuming is

upstreamSubscription.sendNext(3)
upstreamSubscription.sendNext(4)
substreamPuppet1.expectNext(4) // note that 3 was dropped

And this part is completely unchanged from how the test was originally written.

Given that, I think it makes sense to file a separate issue if this behaviour about completing after a split needs to change and hence to tackle it separately. The only exception to this that I can think of is that some state is not being reset correctly

decider(ex) match {
  case Supervision.Resume  => pull(in)
  case Supervision.Stop    => onUpstreamFailure(ex)
  case Supervision.Restart => onUpstreamFailure(ex) // TODO implement restart?
}

in the case Supervision.Resume => pull(in) block, but if thats the case it would also error out earlier since that block is only executed when recovering from exceptions in onPush (and again thats only when elem is 3, not 6)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks folks, I appreciate the detailed explanation. And yes, I was diverting away (not on purpose) from the original goal of the test

Note that at this point, we are dealing with the normal logic of SplitWhen. The new functionality of skipping
exceptions with SupervisionDecider.resume occurs earlier when elem == 3 so we are past that and just testing happy > path logic of SplitAfter which is not changed at all (and it shouldn't be either!).

looks good to me

@mdedetrich
Copy link
Contributor Author

@He-Pin @Roiocam @jxnu-liguobin @pjfanning @raboof Please take a note of #1207 (review) , I might be missing something here

@mdedetrich mdedetrich force-pushed the fix-exception-not-being-caught-in-split-resuming-decider branch from d1e9a99 to 455d8c5 Compare March 18, 2024 12:25
@mdedetrich mdedetrich changed the title Fix uncaught exception in Split with Supervision.resumingDecider Fix uncaught decider exception in Split with Supervision.resumingDecider Mar 18, 2024
@mdedetrich mdedetrich force-pushed the fix-exception-not-being-caught-in-split-resuming-decider branch from 455d8c5 to df3b9fc Compare March 18, 2024 12:33
pjfanning
pjfanning previously approved these changes Mar 18, 2024
Copy link
Contributor

@pjfanning pjfanning left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lgtm

@pjfanning pjfanning added this to the 1.1.0-M1 milestone Mar 18, 2024
@mdedetrich mdedetrich force-pushed the fix-exception-not-being-caught-in-split-resuming-decider branch from df3b9fc to 31d26d1 Compare March 19, 2024 06:50
Copy link
Member

@He-Pin He-Pin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm, we will see what will happen with nightly build.

@mdedetrich
Copy link
Contributor Author

Thanks, ill go ahead and merge this. With it being included in -M1 it will get plenty of testing/usage before the final release incase there is any hypothetical issue

@mdedetrich mdedetrich merged commit 689e30b into apache:main Mar 19, 2024
18 checks passed
@mdedetrich mdedetrich deleted the fix-exception-not-being-caught-in-split-resuming-decider branch March 19, 2024 15:41
@pjfanning pjfanning added the late-release-note late breaking changes that will require release notes changes label Mar 24, 2024
@pjfanning pjfanning removed the late-release-note late breaking changes that will require release notes changes label May 7, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

SubFlow's don't correctly propagate Supervision.resumeStrategy
4 participants