diff --git a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/Running.scala b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/Running.scala index f3846b930e1..8f0fee6b945 100644 --- a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/Running.scala +++ b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/Running.scala @@ -117,6 +117,18 @@ private[pekko] object Running { } } + /** + * A marker to indicate to upstream code that an unstash is needed. + */ + final class UnstashNeeded(val next: Behavior[InternalProtocol], context: ActorContext[InternalProtocol]) + extends AbstractBehavior[InternalProtocol](context) { + + /** + * @throws NotImplementedError as this behavior should not receive any messages + */ + override def onMessage(msg: InternalProtocol): Behavior[InternalProtocol] = ??? + } + def startReplicationStream[C, E, S]( setup: BehaviorSetup[C, E, S], state: RunningState[S], @@ -271,7 +283,7 @@ private[pekko] object Running { def onCommand(state: RunningState[S], cmd: C): Behavior[InternalProtocol] = { val effect = setup.commandHandler(state.state, cmd) val (next, doUnstash) = applyEffects(cmd, state, effect.asInstanceOf[EffectImpl[E, S]]) // TODO can we avoid the cast? - if (doUnstash) tryUnstashOne(next) + if (doUnstash) new Running.UnstashNeeded(next, setup.context) else next } diff --git a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/StashManagement.scala b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/StashManagement.scala index 8a1d917f979..a8d6b38aeca 100644 --- a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/StashManagement.scala +++ b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/StashManagement.scala @@ -25,6 +25,8 @@ import pekko.actor.typed.scaladsl.adapter._ import pekko.annotation.InternalApi import pekko.util.ConstantFun +import scala.annotation.tailrec + /** INTERNAL API: Stash management for persistent behaviors */ @InternalApi private[pekko] trait StashManagement[C, E, S] { @@ -74,7 +76,11 @@ private[pekko] trait StashManagement[C, E, S] { /** * `tryUnstashOne` is called at the end of processing each command, published event, or when persist is completed */ - protected def tryUnstashOne(behavior: Behavior[InternalProtocol]): Behavior[InternalProtocol] = { + protected def tryUnstashOne(behavior: Behavior[InternalProtocol]): Behavior[InternalProtocol] = + unstashOne(behavior) + + @tailrec + private def unstashOne(behavior: Behavior[InternalProtocol]): Behavior[InternalProtocol] = { val buffer = if (stashState.isUnstashAllInProgress) stashState.userStashBuffer else stashState.internalStashBuffer @@ -84,7 +90,10 @@ private[pekko] trait StashManagement[C, E, S] { stashState.decrementUnstashAllProgress() - buffer.unstash(behavior, 1, ConstantFun.scalaIdentityFunction) + buffer.unstash(behavior, 1, ConstantFun.scalaIdentityFunction) match { + case unstash: Running.UnstashNeeded => unstashOne(unstash.next) + case next => next + } } else behavior }