Skip to content

Commit

Permalink
feat: Make SingleConsumerMultiProducer the default mailbox for stream.
Browse files Browse the repository at this point in the history
  • Loading branch information
He-Pin committed Jan 8, 2024
1 parent 959c98d commit b1d8c93
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 7 deletions.
8 changes: 8 additions & 0 deletions stream/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,14 @@ pekko {
# or full dispatcher configuration to be used by ActorMaterializer when creating Actors.
dispatcher = "pekko.actor.default-dispatcher"

# FQCN of the MailboxType. The Class of the FQCN must have a public
# constructor with
# (org.apache.pekko.actor.ActorSystem.Settings, com.typesafe.config.Config) parameters.
# defaults to the single consumer mailbox for better performance.
mailbox {
mailbox-type = "org.apache.pekko.dispatch.SingleConsumerOnlyUnboundedMailbox"
}

# Fully qualified config path which holds the dispatcher configuration
# or full dispatcher configuration to be used by stream operators that
# perform blocking operations
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,9 @@ import pekko.util.OptionVal
val effectiveProps = props.dispatcher match {
case Dispatchers.DefaultDispatcherId =>
// the caller said to use the default dispatcher, but that can been trumped by the dispatcher attribute
props.withDispatcher(context.effectiveAttributes.mandatoryAttribute[ActorAttributes.Dispatcher].dispatcher)
val dispatcher = context.effectiveAttributes.mandatoryAttribute[ActorAttributes.Dispatcher].dispatcher
props.withDispatcher(dispatcher)
.withMailbox(PhasedFusingActorMaterializer.MailboxConfigName)
case _ => props
}

Expand Down Expand Up @@ -195,10 +197,14 @@ private[pekko] class SubFusingActorMaterializerImpl(
* INTERNAL API
*/
@InternalApi private[pekko] object StreamSupervisor {
def props(attributes: Attributes, haveShutDown: AtomicBoolean): Props =
def props(attributes: Attributes, haveShutDown: AtomicBoolean): Props = {
val dispatcher = attributes.mandatoryAttribute[ActorAttributes.Dispatcher].dispatcher
Props(new StreamSupervisor(haveShutDown))
.withDeploy(Deploy.local)
.withDispatcher(attributes.mandatoryAttribute[ActorAttributes.Dispatcher].dispatcher)
.withDispatcher(dispatcher)
.withMailbox(PhasedFusingActorMaterializer.MailboxConfigName)
}

private[stream] val baseName = "StreamSupervisor"
private val actorName = SeqActorName(baseName)
def nextName(): String = actorName.next()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ import pekko.util.OptionVal

val Debug = false

val MailboxConfigName: String = "pekko.stream.materializer.mailbox"

val DefaultPhase: Phase[Any] = new Phase[Any] {
override def apply(
settings: ActorMaterializerSettings,
Expand Down Expand Up @@ -116,7 +118,10 @@ import pekko.util.OptionVal

val dispatcher = attributes.mandatoryAttribute[ActorAttributes.Dispatcher].dispatcher
val supervisorProps =
StreamSupervisor.props(attributes, haveShutDown).withDispatcher(dispatcher).withDeploy(Deploy.local)
StreamSupervisor.props(attributes, haveShutDown)
.withDispatcher(dispatcher)
.withMailbox(MailboxConfigName)
.withDeploy(Deploy.local)

// FIXME why do we need a global unique name for the child?
val streamSupervisor = context.actorOf(supervisorProps, StreamSupervisor.nextName())
Expand Down Expand Up @@ -624,7 +629,9 @@ private final case class SavedIslandData(
@InternalApi private[pekko] override def actorOf(context: MaterializationContext, props: Props): ActorRef = {
val effectiveProps = props.dispatcher match {
case Dispatchers.DefaultDispatcherId =>
props.withDispatcher(context.effectiveAttributes.mandatoryAttribute[ActorAttributes.Dispatcher].dispatcher)
val dispatcher = context.effectiveAttributes.mandatoryAttribute[ActorAttributes.Dispatcher].dispatcher
props.withDispatcher(dispatcher)
.withMailbox(MailboxConfigName)
case _ => props
}

Expand Down Expand Up @@ -816,9 +823,11 @@ private final case class SavedIslandData(
fuseIntoExistingInterpreter(shell)

case _ =>
val dispatcher = effectiveAttributes.mandatoryAttribute[ActorAttributes.Dispatcher].dispatcher
val props = ActorGraphInterpreter
.props(shell)
.withDispatcher(effectiveAttributes.mandatoryAttribute[ActorAttributes.Dispatcher].dispatcher)
.withDispatcher(dispatcher)
.withMailbox(PhasedFusingActorMaterializer.MailboxConfigName)

val actorName = fullIslandName match {
case OptionVal.Some(n) => n
Expand Down Expand Up @@ -974,7 +983,10 @@ private final case class SavedIslandData(
val maxInputBuffer = attributes.mandatoryAttribute[Attributes.InputBuffer].max

val props =
TLSActor.props(maxInputBuffer, tls.createSSLEngine, tls.verifySession, tls.closing).withDispatcher(dispatcher)
TLSActor.props(maxInputBuffer, tls.createSSLEngine, tls.verifySession, tls.closing)
.withDispatcher(dispatcher)
.withMailbox(PhasedFusingActorMaterializer.MailboxConfigName)

tlsActor = materializer.actorOf(props, "TLS-for-" + islandName)
def factory(id: Int) = new ActorPublisher[Any](tlsActor) {
override val wakeUpMsg = FanOut.SubstreamSubscribePending(id)
Expand Down

0 comments on commit b1d8c93

Please sign in to comment.