Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature request: RestartWithBackOffAndFallback for Flow & Sink #980

Open
He-Pin opened this issue Jan 19, 2024 · 16 comments
Open

Feature request: RestartWithBackOffAndFallback for Flow & Sink #980

He-Pin opened this issue Jan 19, 2024 · 16 comments
Assignees
Labels
t:stream Pekko Streams
Milestone

Comments

@He-Pin
Copy link
Member

He-Pin commented Jan 19, 2024

Motivation:
I saw in akka/akka#30267 @nvollmar added this feature for actor, but I think it would be nice to have this in pekko-stream, where user can logging the error with a restart/resume, and currently ,we just simply ignore it.

I found the related case from @tg44 in https://discuss.lightbend.com/t/recover-from-sink-exceptions/10554

I think this should be very useful for real production usecase.

Result:
More handy control over the exceptions

@He-Pin He-Pin added the t:stream Pekko Streams label Jan 19, 2024
@He-Pin He-Pin added this to the 1.1.0 milestone Jan 19, 2024
@mdedetrich
Copy link
Contributor

You can do this already due to this change #252 which has already landed.

You can just make your own supervision decider, i.e.

val decider: Supervision.Decider = { e =>
  logger.error("Unhandled exception in stream", e)
  Supervision.Stop
}

or am I missing something?

@He-Pin
Copy link
Member Author

He-Pin commented Jan 19, 2024

Yes, I was expecting some factory method, eg Decider.resume(level:LoggingLevel), let's wait for some feedbacks form @tg44 and @nvollmar

@tg44
Copy link
Contributor

tg44 commented Jan 19, 2024

So, my initial problem was that I have a substream which has a fileIO in it, the fileIO was only a safety measure, so if we loose the db connection we don't loose (too much) data. Then one day the fileIO failed (we couldn't recreate it bcs it opened a file which failed due to hosting provider issues), and it brought down the whole stream, and we lost hours of data when we finally noticed that the stream is down...

My idea was to lazily create a sink, if it fails I can recreate it, if it fails multiple times I can simply change it to a Sink.ignore if I want to.

@mdedetrich
Copy link
Contributor

mdedetrich commented Jan 19, 2024

@tg44 So if by Sub Stream you mean SubFlow the issue you describe is partially solved in Pekko 1.1.x with this PR (which isn't released yet but there are snapshots if you want to try). You can read the migration notes here

What I mean by partially is that as is noted in the migration notes we don't support Supervision.restart. Initially when implementing that PR I had difficulties supporting Supervision.restart but since there seems to be some demand for it I can look into it if you confirm that Supervision.restart support for SubFlow's is indeed what you are actually asking for.

@He-Pin Aside from what I just said I don't think we need to add anything else for Pekko, it appears that @tg44 just wants support for automatic restarting of SubFlow's

@He-Pin
Copy link
Member Author

He-Pin commented Jan 19, 2024

@mdedetrich I think what's @tg44 needs is :

  1. a RestartSink, with is currently missing, maybe that can be done with the RestartFlow.to(IgnoreSink)
  2. a fallback, but for the sink, the main sink was a fileIO sink ,but after some max retry, switch to a Sink.ignore.
Data Source X
   |
  \|/
Datas --> Process A ----> Process FileIO Sink (main, can retry with backoff restart)
                                      |
                                      |  (only switch after the main sink die and restart up to the max limit)
                                      |
                                      |----> fallback Ignore Sink(logging) maybe.
           

And I think this is what he really want.

@mdedetrich
Copy link
Contributor

@He-Pin But he is talking about SubFlow, so from what he seems to be saying is that he just wants the SubFlow to have the ability to restart (and this means by implication if the SubFlow is just a sink then the Sink will restart).

Then you can just do

val restartingDecider: Supervision.Decider = { e =>
  logger.error("Unhandled exception in stream, restarting", e)
  Supervision.Restart
}

source
  .splitAfter(_ == somePredicate)
  .withAttributes(ActorAttributes.supervisionStrategy(restartingDecider))

@He-Pin
Copy link
Member Author

He-Pin commented Jan 19, 2024

@mdedetrich But your proposal can not swith to an ignore sink.

I see you just updated a pr, what about extends RestartFlow to support a fallback too.

@He-Pin He-Pin changed the title Feature request: Resume with Error logging for Stream Feature request: RestartWithBackOffAndFallback for Flow & Sink Jan 19, 2024
@mdedetrich
Copy link
Contributor

mdedetrich commented Jan 19, 2024

@He-Pin #981 is what I was thinking of (still needs to be tested, there may be some mistake)

But your proposal can not swith to an ignore sink.

Its not necessary though, you can do this yourself by making a custom sink with Partition?

@He-Pin
Copy link
Member Author

He-Pin commented Jan 19, 2024

An out of box seems better, I think some kind of ZIO ZSink's orElse but only switch after some kind of maxRetriesWithBackOff.

@mdedetrich
Copy link
Contributor

An out of box seems better, I think some kind of ZIO ZSink's orElse but only switch after some kind of maxRetriesWithBackOff.

Yeah using Partition right now is a bit clunky because of how manual it is but thats another topic

@tg44 Let me know if I am on the right track and if so I will work on #981

@tg44
Copy link
Contributor

tg44 commented Jan 19, 2024

My usecase looks like this;

Data Source X
   |
  \|/
Datas
   |
  \|/
Process A 
   |
  \|/
wireTap()----> Process FileIO Sink (main, can retry with backoff restart)
   |                   |  (if it is restarted with supervision it will fail at every retry effectively killing the database save)
  \|/                  |  (only switch after the main sink die and restart up to the max limit)
database save          |
                       |----> fallback Ignore Sink(logging) maybe.

I think we don't have a Sink which can handle the right side of the graph. It could be something like;

def sinkWithFallback[T, Mat](factory: () => (exception: Option[Throwable]) => Sink[T, Mat]): Sink[T, Mat]

And it should be easily implemented from lazySink, but lazySink is internal (or at least the akka version I'm using rn has it in internal), so it is messy...

(We need the factory to be like in mapConcat so we can add a factory like

() => {
  var retries = 0
  (exception: Option[Throwable]) => {
      retries += 1
      if(retries > 100) { 
        Sink.ignore
      } else {
        someProbablyFailingSink
      }
  }
}

also, sorry if I'm not using proper scala syntax, I code in ts a lot recently :( )

@mdedetrich
Copy link
Contributor

@tg44 So just to confirm, buy sub stream you didn't mean split/when (which creates a SubFlow) but instead wireTap().

Also is there a reason why you aren't using alsoTo instead of wireTap? Usually for your scenario you would use alsoTo since wireTap will not backpressure the flow it is tapped into (hence wireTap is typically only used for logging/metrics and the such)

@tg44
Copy link
Contributor

tg44 commented Jan 19, 2024

@mdedetrich Nah, when I started to work with akka streams we tend to call every "part of the stream after a fanout" as a substream, and groupBy and the "real" substream/subflow as a term was introduced/standardized later on. (I was a really really early adopter, I think I have merged PRs almost every akka repos (akka, akka-http, I wrote/rewrote components to alpakka).) So sorry for the misunderstanding.

For me the fileIO is kinda a log to a file, so I choose wireTap with an intention, I have buffers and logrotator and everything in between the wireTap and the actual fileIO, I can allow to not write everything to the fileLog, but I need to write everything to the database as fast as possible.

@He-Pin
Copy link
Member Author

He-Pin commented Jan 19, 2024

Yes, I love your contritions @tg44 and I used it too.

And I think what's you did is some kind of Isolate strong and weak dependencies to prevent strong dependencies from being affected by weak dependencies

@mdedetrich
Copy link
Contributor

mdedetrich commented Jan 19, 2024

Okay so in the end this seems to be a request for an easier dsl for constructing a specific Sink and while #981 is valid its also not related.

Now that I think of it, this honestly seems to be asking for a Sink equivalent of RetryFlow.withBackoff, @He-Pin wdyt

@He-Pin
Copy link
Member Author

He-Pin commented Jan 19, 2024

@mdedetrich That's true, but we can extends to Retry/Restart*.withBackOff/fallback, that's what I have in mind.

@pjfanning pjfanning modified the milestones: 1.1.0-M1, 1.1.0-M2 Mar 18, 2024
@pjfanning pjfanning modified the milestones: 1.1.0, 1.1.1 Aug 23, 2024
@pjfanning pjfanning modified the milestones: 1.1.1, 1.1.2 Sep 9, 2024
@pjfanning pjfanning modified the milestones: 1.1.2, 1.1.x Sep 27, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
t:stream Pekko Streams
Projects
None yet
Development

No branches or pull requests

5 participants