From e39bfb302275fc1fe2aa147e0b002ccb767c3af8 Mon Sep 17 00:00:00 2001 From: PJ Fanning Date: Tue, 21 May 2024 20:28:26 +0100 Subject: [PATCH] Update Running.scala --- .../pekko/persistence/typed/internal/Running.scala | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/Running.scala b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/Running.scala index 8f0fee6b94..c3f6285a38 100644 --- a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/Running.scala +++ b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/Running.scala @@ -120,13 +120,12 @@ private[pekko] object Running { /** * A marker to indicate to upstream code that an unstash is needed. */ - final class UnstashNeeded(val next: Behavior[InternalProtocol], context: ActorContext[InternalProtocol]) + final class UnstashNeeded(val next: Behavior[InternalProtocol], + unstashFunction: Behavior[InternalProtocol] => Behavior[InternalProtocol], + context: ActorContext[InternalProtocol]) extends AbstractBehavior[InternalProtocol](context) { - /** - * @throws NotImplementedError as this behavior should not receive any messages - */ - override def onMessage(msg: InternalProtocol): Behavior[InternalProtocol] = ??? + override def onMessage(msg: InternalProtocol): Behavior[InternalProtocol] = unstashFunction(next) } def startReplicationStream[C, E, S]( @@ -283,7 +282,7 @@ private[pekko] object Running { def onCommand(state: RunningState[S], cmd: C): Behavior[InternalProtocol] = { val effect = setup.commandHandler(state.state, cmd) val (next, doUnstash) = applyEffects(cmd, state, effect.asInstanceOf[EffectImpl[E, S]]) // TODO can we avoid the cast? - if (doUnstash) new Running.UnstashNeeded(next, setup.context) + if (doUnstash) new Running.UnstashNeeded(next, tryUnstashOne, setup.context) else next }