Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Make SingleConsumerMultiProducer the default mailbox for stream. #917

Merged
merged 1 commit into from
Jan 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions stream/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,14 @@ pekko {
# or full dispatcher configuration to be used by ActorMaterializer when creating Actors.
dispatcher = "pekko.actor.default-dispatcher"

# FQCN of the MailboxType. The Class of the FQCN must have a public
# constructor with
# (org.apache.pekko.actor.ActorSystem.Settings, com.typesafe.config.Config) parameters.
# defaults to the single consumer mailbox for better performance.
mailbox {
mailbox-type = "org.apache.pekko.dispatch.SingleConsumerOnlyUnboundedMailbox"
}

# Fully qualified config path which holds the dispatcher configuration
# or full dispatcher configuration to be used by stream operators that
# perform blocking operations
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ import pekko.util.OptionVal
case Dispatchers.DefaultDispatcherId =>
// the caller said to use the default dispatcher, but that can been trumped by the dispatcher attribute
props.withDispatcher(context.effectiveAttributes.mandatoryAttribute[ActorAttributes.Dispatcher].dispatcher)
.withMailbox(PhasedFusingActorMaterializer.MailboxConfigName)
case _ => props
}

Expand Down Expand Up @@ -195,10 +196,13 @@ private[pekko] class SubFusingActorMaterializerImpl(
* INTERNAL API
*/
@InternalApi private[pekko] object StreamSupervisor {
def props(attributes: Attributes, haveShutDown: AtomicBoolean): Props =
def props(attributes: Attributes, haveShutDown: AtomicBoolean): Props = {
Props(new StreamSupervisor(haveShutDown))
.withDeploy(Deploy.local)
.withDispatcher(attributes.mandatoryAttribute[ActorAttributes.Dispatcher].dispatcher)
.withMailbox(PhasedFusingActorMaterializer.MailboxConfigName)
He-Pin marked this conversation as resolved.
Show resolved Hide resolved
}

private[stream] val baseName = "StreamSupervisor"
private val actorName = SeqActorName(baseName)
def nextName(): String = actorName.next()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ import pekko.util.OptionVal

val Debug = false

val MailboxConfigName: String = "pekko.stream.materializer.mailbox"
He-Pin marked this conversation as resolved.
Show resolved Hide resolved

val DefaultPhase: Phase[Any] = new Phase[Any] {
override def apply(
settings: ActorMaterializerSettings,
Expand Down Expand Up @@ -116,7 +118,10 @@ import pekko.util.OptionVal

val dispatcher = attributes.mandatoryAttribute[ActorAttributes.Dispatcher].dispatcher
val supervisorProps =
StreamSupervisor.props(attributes, haveShutDown).withDispatcher(dispatcher).withDeploy(Deploy.local)
StreamSupervisor.props(attributes, haveShutDown)
.withDispatcher(dispatcher)
.withMailbox(MailboxConfigName)
.withDeploy(Deploy.local)

// FIXME why do we need a global unique name for the child?
val streamSupervisor = context.actorOf(supervisorProps, StreamSupervisor.nextName())
Expand Down Expand Up @@ -625,6 +630,7 @@ private final case class SavedIslandData(
val effectiveProps = props.dispatcher match {
case Dispatchers.DefaultDispatcherId =>
props.withDispatcher(context.effectiveAttributes.mandatoryAttribute[ActorAttributes.Dispatcher].dispatcher)
.withMailbox(MailboxConfigName)
case _ => props
}

Expand Down Expand Up @@ -819,6 +825,7 @@ private final case class SavedIslandData(
val props = ActorGraphInterpreter
.props(shell)
.withDispatcher(effectiveAttributes.mandatoryAttribute[ActorAttributes.Dispatcher].dispatcher)
.withMailbox(PhasedFusingActorMaterializer.MailboxConfigName)

val actorName = fullIslandName match {
case OptionVal.Some(n) => n
Expand Down Expand Up @@ -974,7 +981,10 @@ private final case class SavedIslandData(
val maxInputBuffer = attributes.mandatoryAttribute[Attributes.InputBuffer].max

val props =
TLSActor.props(maxInputBuffer, tls.createSSLEngine, tls.verifySession, tls.closing).withDispatcher(dispatcher)
TLSActor.props(maxInputBuffer, tls.createSSLEngine, tls.verifySession, tls.closing)
.withDispatcher(dispatcher)
.withMailbox(PhasedFusingActorMaterializer.MailboxConfigName)

tlsActor = materializer.actorOf(props, "TLS-for-" + islandName)
def factory(id: Int) = new ActorPublisher[Any](tlsActor) {
override val wakeUpMsg = FanOut.SubstreamSubscribePending(id)
Expand Down
Loading