Skip to content

Commit

Permalink
+str Add Emitter api for unsafe transformation.
Browse files Browse the repository at this point in the history
  • Loading branch information
He-Pin committed Sep 5, 2023
1 parent 131e774 commit d245d4d
Show file tree
Hide file tree
Showing 7 changed files with 223 additions and 4 deletions.
76 changes: 76 additions & 0 deletions stream/src/main/scala/org/apache/pekko/stream/Emitter.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.pekko.stream

import org.apache.pekko.annotation.InternalApi

import scala.util.{ Failure, Success, Try }

object EmitterOps {
def emit[T](value: T)(implicit emitter: Emitter[T]): Unit = emitter.emit(value)
def fail(throwable: Throwable)(implicit emitter: Emitter[_]): Unit = emitter.fail(throwable)
def complete()(implicit emitter: Emitter[_]): Unit = emitter.complete()
def handle[T](result: Try[T])(implicit emitter: Emitter[T]): Unit = emitter.handle(result)
def handle[T](result: Either[Throwable, T])(implicit emitter: Emitter[T]): Unit = emitter.handle(result)
}

object EmitterUnsafeOps {
def emitNow[T](value: T)(implicit emitter: Emitter[T]): Unit = emitter.asInstanceOf[UnsafeEmitter[T]].emitNow(value)
def failNow(throwable: Throwable)(implicit emitter: Emitter[_]): Unit =
emitter.asInstanceOf[UnsafeEmitter[_]].failNow(throwable)
def completeNow()(implicit emitter: Emitter[_]): Unit = emitter.asInstanceOf[UnsafeEmitter[_]].completeNow()
def handleNow[T](result: Try[T])(implicit emitter: Emitter[T]): Unit =
emitter.asInstanceOf[UnsafeEmitter[T]].handleNow(result)
def handleNow[T](result: Either[Throwable, T])(implicit emitter: Emitter[T]): Unit =
emitter.asInstanceOf[UnsafeEmitter[T]].handleNow(result)
}

trait Emitter[T] {

def emit(value: T): Unit

def fail(throwable: Throwable): Unit

def complete(): Unit

def handle(result: Try[T]): Unit = result match {
case Success(value) => emit(value)
case Failure(ex) => fail(ex)
}

def handle(result: Either[Throwable, T]): Unit = result match {
case Right(value) => emit(value)
case Left(ex) => fail(ex)
}
}

@InternalApi
private[pekko] trait UnsafeEmitter[T] extends Emitter[T] {
def emitNow(value: T): Unit
def failNow(throwable: Throwable): Unit

def completeNow(): Unit
def handleNow(result: Try[T]): Unit = result match {
case Success(value) => emitNow(value)
case Failure(ex) => failNow(ex)
}
def handleNow(result: Either[Throwable, T]): Unit = result match {
case Right(value) => emitNow(value)
case Left(ex) => failNow(ex)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.pekko.stream.impl.fusing

import org.apache.pekko.stream.ActorAttributes.SupervisionStrategy
import org.apache.pekko.stream.Attributes.SourceLocation
import org.apache.pekko.stream.impl.ActorSubscriberMessage
import org.apache.pekko.stream.impl.Stages.DefaultAttributes
import org.apache.pekko.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler }
import org.apache.pekko.stream._

import scala.util.control.NonFatal

private[pekko] class UnsafeTransformUnordered[In, Out](
parallelism: Int,
transform: (In, Emitter[Out]) => Unit)
extends GraphStage[FlowShape[In, Out]] {
private val in = Inlet[In]("UnsafeTransformOrdered.in")
private val out = Outlet[Out]("UnsafeTransformOrdered.out")

override def initialAttributes = DefaultAttributes.mapAsyncUnordered and SourceLocation.forLambda(transform)

override val shape = FlowShape(in, out)

override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with InHandler with OutHandler with UnsafeEmitter[Out] {
override def toString = s"UnsafeTransformOrdered.Logic(inFlight=$inFlight, buffer=$buffer)"
import org.apache.pekko.stream.impl.{ Buffer => BufferImpl }

private val decider =
inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider
private var inFlight = 0
private var buffer: org.apache.pekko.stream.impl.Buffer[Out] = _

import ActorSubscriberMessage._

private val callback: ActorSubscriberMessage => Unit = getAsyncCallback[ActorSubscriberMessage](handle).invoke

override def emitNow(value: Out): Unit = handle(OnNext(value))
override def failNow(throwable: Throwable): Unit = handle(OnError(throwable))
override def completeNow(): Unit = completeStage()

// TODO check permit
override final def emit(value: Out): Unit = callback(OnNext(value))
override final def fail(throwable: Throwable): Unit = callback(OnError(throwable))
override final def complete(): Unit = callback(OnComplete)

//
private[this] def todo: Int = inFlight + buffer.used

override def preStart(): Unit = buffer = BufferImpl(parallelism, inheritedAttributes)

private def isCompleted = isClosed(in) && todo == 0

def handle(msg: ActorSubscriberMessage): Unit = {
inFlight -= 1

msg match {
case OnNext(elem: Out @unchecked) if elem != null =>
if (isAvailable(out)) {
if (!hasBeenPulled(in)) tryPull(in)
push(out, elem)
if (isCompleted) completeStage()
} else buffer.enqueue(elem)
case OnNext(_) =>
if (isCompleted) completeStage()
else if (!hasBeenPulled(in)) tryPull(in)
case OnComplete =>
completeStage()
case OnError(ex) =>
if (decider(ex) == Supervision.Stop) failStage(ex)
else if (isCompleted) completeStage()
else if (!hasBeenPulled(in)) tryPull(in)
}
}

override def onPush(): Unit = {
try {
val elem = grab(in)
transform(elem, this)
inFlight += 1
} catch {
case NonFatal(ex) => if (decider(ex) == Supervision.Stop) failStage(ex)
}
if (todo < parallelism && !hasBeenPulled(in)) tryPull(in)
}

override def onUpstreamFinish(): Unit = {
if (todo == 0) completeStage()
}

override def onPull(): Unit = {
if (!buffer.isEmpty) push(out, buffer.dequeue())

val leftTodo = todo
if (isClosed(in) && leftTodo == 0) completeStage()
else if (leftTodo < parallelism && !hasBeenPulled(in)) tryPull(in)
}

setHandlers(in, out, this)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -873,6 +873,10 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
def mapAsyncUnordered[T](parallelism: Int, f: function.Function[Out, CompletionStage[T]]): javadsl.Flow[In, T, Mat] =
new Flow(delegate.mapAsyncUnordered(parallelism)(x => f(x).asScala))

def unsafeTransformUnordered[T](
parallelism: Int, transform: function.Function2[Out, Emitter[T], Unit]): javadsl.Flow[In, T, Mat] =
new Flow(delegate.unsafeTransformUnordered[T](parallelism)(emitter => out => transform(out, emitter)))

/**
* Use the `ask` pattern to send a request-reply message to the target `ref` actor.
* If any of the asks times out it will fail the stream with a [[pekko.pattern.AskTimeoutException]].
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2527,6 +2527,10 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
def mapAsyncUnordered[T](parallelism: Int, f: function.Function[Out, CompletionStage[T]]): javadsl.Source[T, Mat] =
new Source(delegate.mapAsyncUnordered(parallelism)(x => f(x).asScala))

def unsafeTransformUnordered[T](
parallelism: Int, transform: function.Function2[Out, Emitter[T], Unit]): javadsl.Source[T, Mat] =
new Source(delegate.unsafeTransformUnordered[T](parallelism)(emitter => out => transform(out, emitter)))

/**
* Use the `ask` pattern to send a request-reply message to the target `ref` actor.
* If any of the asks times out it will fail the stream with a [[pekko.pattern.AskTimeoutException]].
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,11 @@ package org.apache.pekko.stream.javadsl
import java.util.{ Comparator, Optional }
import java.util.concurrent.CompletionStage
import java.util.function.Supplier

import scala.annotation.{ nowarn, varargs }
import scala.annotation.unchecked.uncheckedVariance
import scala.collection.immutable
import scala.concurrent.duration.FiniteDuration
import scala.reflect.ClassTag

import org.apache.pekko
import pekko.NotUsed
import pekko.annotation.ApiMayChange
Expand Down Expand Up @@ -348,6 +346,10 @@ class SubFlow[In, Out, Mat](
def mapAsyncUnordered[T](parallelism: Int, f: function.Function[Out, CompletionStage[T]]): SubFlow[In, T, Mat] =
new SubFlow(delegate.mapAsyncUnordered(parallelism)(x => f(x).asScala))

def unsafeTransformUnordered[T](
parallelism: Int, transform: function.Function2[Out, Emitter[T], Unit]): javadsl.SubFlow[In, T, Mat] =
new SubFlow(delegate.unsafeTransformUnordered[T](parallelism)(emitter => out => transform(out, emitter)))

/**
* Only pass on those elements that satisfy the given predicate.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,10 @@ class SubSource[Out, Mat](
def mapAsyncUnordered[T](parallelism: Int, f: function.Function[Out, CompletionStage[T]]): SubSource[T, Mat] =
new SubSource(delegate.mapAsyncUnordered(parallelism)(x => f(x).asScala))

def unsafeTransformUnordered[T](
parallelism: Int, transform: function.Function2[Out, Emitter[T], Unit]): javadsl.SubSource[T, Mat] =
new SubSource(delegate.unsafeTransformUnordered[T](parallelism)(emitter => out => transform(out, emitter)))

/**
* Only pass on those elements that satisfy the given predicate.
*
Expand Down
16 changes: 14 additions & 2 deletions stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala
Original file line number Diff line number Diff line change
Expand Up @@ -829,7 +829,6 @@ final case class RunnableGraph[+Mat](override val traversalBuilder: TraversalBui
@ccompatUsedUntil213
trait FlowOps[+Out, +Mat] {
import GraphDSL.Implicits._

import org.apache.pekko.stream.impl.Stages._

type Repr[+O] <: FlowOps[O, Mat] {
Expand Down Expand Up @@ -1141,7 +1140,20 @@ trait FlowOps[+Out, +Mat] {
*
* @see [[#mapAsync]]
*/
def mapAsyncUnordered[T](parallelism: Int)(f: Out => Future[T]): Repr[T] = via(MapAsyncUnordered(parallelism, f))
def mapAsyncUnordered[T](parallelism: Int)(f: Out => Future[T]): Repr[T] =
unsafeTransformUnordered[T](parallelism) { implicit emitter => out =>
import EmitterOps._
import EmitterUnsafeOps._
val future = f(out)
future.value match {
case Some(elem) => handleNow(elem)
case None => future.onComplete(handle(_))(pekko.dispatch.ExecutionContexts.parasitic)
}
}

@ApiMayChange
def unsafeTransformUnordered[T](parallelism: Int)(transform: Emitter[T] => Out => Unit): Repr[T] =
via(new UnsafeTransformUnordered[Out, T](parallelism, (out, emitter) => transform(emitter)(out)))

/**
* Use the `ask` pattern to send a request-reply message to the target `ref` actor.
Expand Down

0 comments on commit d245d4d

Please sign in to comment.