Skip to content

Commit

Permalink
possible solution to stackoverflow
Browse files Browse the repository at this point in the history
  • Loading branch information
pjfanning committed May 21, 2024
1 parent e5ee3d2 commit 54c2302
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,18 @@ private[pekko] object Running {
}
}

/**
* A marker to indicate to upstream code that an unstash is needed.
*/
final class UnstashNeeded(val next: Behavior[InternalProtocol], context: ActorContext[InternalProtocol])
extends AbstractBehavior[InternalProtocol](context) {

/**
* @throws NotImplementedError as this behavior should not receive any messages
*/
override def onMessage(msg: InternalProtocol): Behavior[InternalProtocol] = ???
}

def startReplicationStream[C, E, S](
setup: BehaviorSetup[C, E, S],
state: RunningState[S],
Expand Down Expand Up @@ -271,7 +283,7 @@ private[pekko] object Running {
def onCommand(state: RunningState[S], cmd: C): Behavior[InternalProtocol] = {
val effect = setup.commandHandler(state.state, cmd)
val (next, doUnstash) = applyEffects(cmd, state, effect.asInstanceOf[EffectImpl[E, S]]) // TODO can we avoid the cast?
if (doUnstash) tryUnstashOne(next)
if (doUnstash) new Running.UnstashNeeded(next, setup.context)
else next
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import pekko.actor.typed.scaladsl.adapter._
import pekko.annotation.InternalApi
import pekko.util.ConstantFun

import scala.annotation.tailrec

/** INTERNAL API: Stash management for persistent behaviors */
@InternalApi
private[pekko] trait StashManagement[C, E, S] {
Expand Down Expand Up @@ -74,7 +76,11 @@ private[pekko] trait StashManagement[C, E, S] {
/**
* `tryUnstashOne` is called at the end of processing each command, published event, or when persist is completed
*/
protected def tryUnstashOne(behavior: Behavior[InternalProtocol]): Behavior[InternalProtocol] = {
protected def tryUnstashOne(behavior: Behavior[InternalProtocol]): Behavior[InternalProtocol] =
unstashOne(behavior)

@tailrec
private def unstashOne(behavior: Behavior[InternalProtocol]): Behavior[InternalProtocol] = {
val buffer =
if (stashState.isUnstashAllInProgress) stashState.userStashBuffer
else stashState.internalStashBuffer
Expand All @@ -84,7 +90,10 @@ private[pekko] trait StashManagement[C, E, S] {

stashState.decrementUnstashAllProgress()

buffer.unstash(behavior, 1, ConstantFun.scalaIdentityFunction)
buffer.unstash(behavior, 1, ConstantFun.scalaIdentityFunction) match {
case unstash: Running.UnstashNeeded => unstashOne(unstash.next)
case next => next
}
} else behavior

}
Expand Down

0 comments on commit 54c2302

Please sign in to comment.