Skip to content

Commit

Permalink
Update Running.scala
Browse files Browse the repository at this point in the history
  • Loading branch information
pjfanning committed May 21, 2024
1 parent 54c2302 commit e39bfb3
Showing 1 changed file with 5 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -120,13 +120,12 @@ private[pekko] object Running {
/**
* A marker to indicate to upstream code that an unstash is needed.
*/
final class UnstashNeeded(val next: Behavior[InternalProtocol], context: ActorContext[InternalProtocol])
final class UnstashNeeded(val next: Behavior[InternalProtocol],
unstashFunction: Behavior[InternalProtocol] => Behavior[InternalProtocol],
context: ActorContext[InternalProtocol])
extends AbstractBehavior[InternalProtocol](context) {

/**
* @throws NotImplementedError as this behavior should not receive any messages
*/
override def onMessage(msg: InternalProtocol): Behavior[InternalProtocol] = ???
override def onMessage(msg: InternalProtocol): Behavior[InternalProtocol] = unstashFunction(next)
}

def startReplicationStream[C, E, S](
Expand Down Expand Up @@ -283,7 +282,7 @@ private[pekko] object Running {
def onCommand(state: RunningState[S], cmd: C): Behavior[InternalProtocol] = {
val effect = setup.commandHandler(state.state, cmd)
val (next, doUnstash) = applyEffects(cmd, state, effect.asInstanceOf[EffectImpl[E, S]]) // TODO can we avoid the cast?
if (doUnstash) new Running.UnstashNeeded(next, setup.context)
if (doUnstash) new Running.UnstashNeeded(next, tryUnstashOne, setup.context)
else next
}

Expand Down

0 comments on commit e39bfb3

Please sign in to comment.