From 597074945c6d248f2a46a9eb574116462755b596 Mon Sep 17 00:00:00 2001 From: He-Pin Date: Tue, 5 Sep 2023 01:43:36 +0800 Subject: [PATCH] +str Add Emitter api for unsafe transformation. --- .../org/apache/pekko/stream/Emitter.scala | 57 +++++++++ .../fusing/UnsafeTransformUnordered.scala | 117 ++++++++++++++++++ .../apache/pekko/stream/javadsl/Flow.scala | 4 + .../apache/pekko/stream/javadsl/Source.scala | 4 + .../apache/pekko/stream/javadsl/SubFlow.scala | 6 +- .../pekko/stream/javadsl/SubSource.scala | 4 + .../apache/pekko/stream/scaladsl/Flow.scala | 14 ++- 7 files changed, 202 insertions(+), 4 deletions(-) create mode 100644 stream/src/main/scala/org/apache/pekko/stream/Emitter.scala create mode 100644 stream/src/main/scala/org/apache/pekko/stream/impl/fusing/UnsafeTransformUnordered.scala diff --git a/stream/src/main/scala/org/apache/pekko/stream/Emitter.scala b/stream/src/main/scala/org/apache/pekko/stream/Emitter.scala new file mode 100644 index 00000000000..62fc0eaf999 --- /dev/null +++ b/stream/src/main/scala/org/apache/pekko/stream/Emitter.scala @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pekko.stream + +import org.apache.pekko.annotation.InternalApi + +import scala.util.{ Failure, Success, Try } + +trait Emitter[T] { + + def emit(value: T): Unit + + def fail(throwable: Throwable): Unit + + def complete(): Unit + + def handle(result: Try[T]): Unit = result match { + case Success(value) => emit(value) + case Failure(ex) => fail(ex) + } + + def handle(result: Either[Throwable, T]): Unit = result match { + case Right(value) => emit(value) + case Left(ex) => fail(ex) + } +} + +@InternalApi +private[pekko] trait UnsafeEmitter[T] extends Emitter[T] { + def emitNow(value: T): Unit + def failNow(throwable: Throwable): Unit + + def completeNow(): Unit + def handleNow(result: Try[T]): Unit = result match { + case Success(value) => emitNow(value) + case Failure(ex) => failNow(ex) + } + def handleNow(result: Either[Throwable, T]): Unit = result match { + case Right(value) => emitNow(value) + case Left(ex) => failNow(ex) + } +} diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/UnsafeTransformUnordered.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/UnsafeTransformUnordered.scala new file mode 100644 index 00000000000..7200d35b6cb --- /dev/null +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/UnsafeTransformUnordered.scala @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pekko.stream.impl.fusing + +import org.apache.pekko.stream.ActorAttributes.SupervisionStrategy +import org.apache.pekko.stream.Attributes.SourceLocation +import org.apache.pekko.stream.impl.ActorSubscriberMessage +import org.apache.pekko.stream.impl.Stages.DefaultAttributes +import org.apache.pekko.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler } +import org.apache.pekko.stream._ + +import scala.util.control.NonFatal + +private[pekko] class UnsafeTransformUnordered[In, Out]( + parallelism: Int, + transform: (In, Emitter[Out]) => Unit) + extends GraphStage[FlowShape[In, Out]] { + private val in = Inlet[In]("UnsafeTransformOrdered.in") + private val out = Outlet[Out]("UnsafeTransformOrdered.out") + + override def initialAttributes = DefaultAttributes.mapAsyncUnordered and SourceLocation.forLambda(transform) + + override val shape = FlowShape(in, out) + + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = + new GraphStageLogic(shape) with InHandler with OutHandler with UnsafeEmitter[Out] { + override def toString = s"UnsafeTransformOrdered.Logic(inFlight=$inFlight, buffer=$buffer)" + import org.apache.pekko.stream.impl.{ Buffer => BufferImpl } + + private val decider = + inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider + private var inFlight = 0 + private var buffer: org.apache.pekko.stream.impl.Buffer[Out] = _ + + import ActorSubscriberMessage._ + + private val callback: ActorSubscriberMessage => Unit = getAsyncCallback[ActorSubscriberMessage](handle).invoke + + override def emitNow(value: Out): Unit = handle(OnNext(value)) + override def failNow(throwable: Throwable): Unit = handle(OnError(throwable)) + override def completeNow(): Unit = completeStage() + + // TODO check permit + override final def emit(value: Out): Unit = callback(OnNext(value)) + override final def fail(throwable: Throwable): Unit = callback(OnError(throwable)) + override final def complete(): Unit = callback(OnComplete) + + // + private[this] def todo: Int = inFlight + buffer.used + + override def preStart(): Unit = buffer = BufferImpl(parallelism, inheritedAttributes) + + private def isCompleted = isClosed(in) && todo == 0 + + def handle(msg: ActorSubscriberMessage): Unit = { + inFlight -= 1 + + msg match { + case OnNext(elem: Out @unchecked) if elem != null => + if (isAvailable(out)) { + if (!hasBeenPulled(in)) tryPull(in) + push(out, elem) + if (isCompleted) completeStage() + } else buffer.enqueue(elem) + case OnNext(_) => + if (isCompleted) completeStage() + else if (!hasBeenPulled(in)) tryPull(in) + case OnComplete => + completeStage() + case OnError(ex) => + if (decider(ex) == Supervision.Stop) failStage(ex) + else if (isCompleted) completeStage() + else if (!hasBeenPulled(in)) tryPull(in) + } + } + + override def onPush(): Unit = { + try { + val elem = grab(in) + transform(elem, this) + inFlight += 1 + } catch { + case NonFatal(ex) => if (decider(ex) == Supervision.Stop) failStage(ex) + } + if (todo < parallelism && !hasBeenPulled(in)) tryPull(in) + } + + override def onUpstreamFinish(): Unit = { + if (todo == 0) completeStage() + } + + override def onPull(): Unit = { + if (!buffer.isEmpty) push(out, buffer.dequeue()) + + val leftTodo = todo + if (isClosed(in) && leftTodo == 0) completeStage() + else if (leftTodo < parallelism && !hasBeenPulled(in)) tryPull(in) + } + + setHandlers(in, out, this) + } +} diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala index f979d1bffc3..75296f45610 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala @@ -873,6 +873,10 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr def mapAsyncUnordered[T](parallelism: Int, f: function.Function[Out, CompletionStage[T]]): javadsl.Flow[In, T, Mat] = new Flow(delegate.mapAsyncUnordered(parallelism)(x => f(x).asScala)) + def unsafeTransformUnordered[T]( + parallelism: Int, transform: function.Function2[Out, Emitter[T], Unit]): javadsl.Flow[In, T, Mat] = + new Flow(delegate.unsafeTransformUnordered[T](parallelism)((out, emitter) => transform(out, emitter))) + /** * Use the `ask` pattern to send a request-reply message to the target `ref` actor. * If any of the asks times out it will fail the stream with a [[pekko.pattern.AskTimeoutException]]. diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala index e2dee057f04..c47f8f28ee8 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala @@ -2527,6 +2527,10 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ def mapAsyncUnordered[T](parallelism: Int, f: function.Function[Out, CompletionStage[T]]): javadsl.Source[T, Mat] = new Source(delegate.mapAsyncUnordered(parallelism)(x => f(x).asScala)) + def unsafeTransformUnordered[T]( + parallelism: Int, transform: function.Function2[Out, Emitter[T], Unit]): javadsl.Source[T, Mat] = + new Source(delegate.unsafeTransformUnordered(parallelism)((out, emitter) => transform(out, emitter))) + /** * Use the `ask` pattern to send a request-reply message to the target `ref` actor. * If any of the asks times out it will fail the stream with a [[pekko.pattern.AskTimeoutException]]. diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala index 1b6309ff83d..3223a1db4ed 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala @@ -16,13 +16,11 @@ package org.apache.pekko.stream.javadsl import java.util.{ Comparator, Optional } import java.util.concurrent.CompletionStage import java.util.function.Supplier - import scala.annotation.{ nowarn, varargs } import scala.annotation.unchecked.uncheckedVariance import scala.collection.immutable import scala.concurrent.duration.FiniteDuration import scala.reflect.ClassTag - import org.apache.pekko import pekko.NotUsed import pekko.annotation.ApiMayChange @@ -348,6 +346,10 @@ class SubFlow[In, Out, Mat]( def mapAsyncUnordered[T](parallelism: Int, f: function.Function[Out, CompletionStage[T]]): SubFlow[In, T, Mat] = new SubFlow(delegate.mapAsyncUnordered(parallelism)(x => f(x).asScala)) + def unsafeTransformUnordered[T]( + parallelism: Int, transform: function.Function2[Out, Emitter[T], Unit]): javadsl.SubFlow[In, T, Mat] = + new SubFlow(delegate.unsafeTransformUnordered[T](parallelism)((out, emitter) => transform(out, emitter))) + /** * Only pass on those elements that satisfy the given predicate. * diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala index 6104422b66b..20c38d46810 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala @@ -339,6 +339,10 @@ class SubSource[Out, Mat]( def mapAsyncUnordered[T](parallelism: Int, f: function.Function[Out, CompletionStage[T]]): SubSource[T, Mat] = new SubSource(delegate.mapAsyncUnordered(parallelism)(x => f(x).asScala)) + def unsafeTransformUnordered[T]( + parallelism: Int, transform: function.Function2[Out, Emitter[T], Unit]): javadsl.SubSource[T, Mat] = + new SubSource(delegate.unsafeTransformUnordered(parallelism)((out, emitter) => transform(out, emitter))) + /** * Only pass on those elements that satisfy the given predicate. * diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala index 16422486397..2cbde8e42b8 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala @@ -829,7 +829,6 @@ final case class RunnableGraph[+Mat](override val traversalBuilder: TraversalBui @ccompatUsedUntil213 trait FlowOps[+Out, +Mat] { import GraphDSL.Implicits._ - import org.apache.pekko.stream.impl.Stages._ type Repr[+O] <: FlowOps[O, Mat] { @@ -1141,7 +1140,18 @@ trait FlowOps[+Out, +Mat] { * * @see [[#mapAsync]] */ - def mapAsyncUnordered[T](parallelism: Int)(f: Out => Future[T]): Repr[T] = via(MapAsyncUnordered(parallelism, f)) + def mapAsyncUnordered[T](parallelism: Int)(f: Out => Future[T]): Repr[T] = + unsafeTransformUnordered[T](parallelism) { (out, emitter) => + val future = f(out) + future.value match { + case Some(elem) => emitter.asInstanceOf[UnsafeEmitter[T]].handleNow(elem) + case None => future.onComplete(emitter.handle)(pekko.dispatch.ExecutionContexts.parasitic) + } + } + + @ApiMayChange + def unsafeTransformUnordered[T](parallelism: Int)(transform: (Out, Emitter[T]) => Unit): Repr[T] = + via(new UnsafeTransformUnordered(parallelism, transform)) /** * Use the `ask` pattern to send a request-reply message to the target `ref` actor.