diff --git a/bench-jmh/src/main/scala/org/apache/pekko/stream/ZipWithIndexBenchmark.scala b/bench-jmh/src/main/scala/org/apache/pekko/stream/ZipWithIndexBenchmark.scala index 3393e54d849..3736656773a 100644 --- a/bench-jmh/src/main/scala/org/apache/pekko/stream/ZipWithIndexBenchmark.scala +++ b/bench-jmh/src/main/scala/org/apache/pekko/stream/ZipWithIndexBenchmark.scala @@ -30,6 +30,8 @@ import org.apache.pekko import pekko.actor.ActorSystem import pekko.stream.scaladsl._ +import scala.annotation.nowarn + object ZipWithIndexBenchmark { final val OperationsPerInvocation = 100000 } @@ -37,6 +39,7 @@ object ZipWithIndexBenchmark { @State(Scope.Benchmark) @OutputTimeUnit(TimeUnit.SECONDS) @BenchmarkMode(Array(Mode.Throughput)) +@nowarn("msg=deprecated") class ZipWithIndexBenchmark { import ZipWithIndexBenchmark._ diff --git a/docs/src/main/paradox/stream/operators/Source-or-Flow/statefulMapConcat.md b/docs/src/main/paradox/stream/operators/Source-or-Flow/statefulMapConcat.md index 0eb9b8a1b39..3315fe9d5a5 100644 --- a/docs/src/main/paradox/stream/operators/Source-or-Flow/statefulMapConcat.md +++ b/docs/src/main/paradox/stream/operators/Source-or-Flow/statefulMapConcat.md @@ -4,6 +4,15 @@ Transform each element into zero or more elements that are individually passed d @ref[Simple operators](../index.md#simple-operators) +@@@ warning + +The `statefulMapConcat` operator has been deprecated. + +- for stateful mapping, use @ref:[statefulMap](./statefulMap.md) +- for stateful map concat, use @ref:[statefulMap](./statefulMap.md) with @ref:[mapConcat](./mapConcat.md). + +@@@ + ## Signature @apidoc[Flow.statefulMapConcat](Flow) { scala="#statefulMapConcat[T](f:()=>Out=>scala.collection.immutable.Iterable[T]):FlowOps.this.Repr[T]" java="#statefulMapConcat(org.apache.pekko.japi.function.Creator)" } diff --git a/docs/src/test/scala/docs/stream/operators/flow/StatefulMapConcat.scala b/docs/src/test/scala/docs/stream/operators/flow/StatefulMapConcat.scala index f22c38c6aa4..ebb7da274d7 100644 --- a/docs/src/test/scala/docs/stream/operators/flow/StatefulMapConcat.scala +++ b/docs/src/test/scala/docs/stream/operators/flow/StatefulMapConcat.scala @@ -17,6 +17,9 @@ import org.apache.pekko.actor.ActorSystem import org.apache.pekko.stream.scaladsl.Flow import org.apache.pekko.stream.scaladsl.Source +import scala.annotation.nowarn + +@nowarn("msg=deprecated") class StatefulMapConcat { implicit val system: ActorSystem = ??? diff --git a/docs/src/test/scala/docs/stream/operators/sourceorflow/Split.scala b/docs/src/test/scala/docs/stream/operators/sourceorflow/Split.scala index 054ac79224a..c4919925dac 100644 --- a/docs/src/test/scala/docs/stream/operators/sourceorflow/Split.scala +++ b/docs/src/test/scala/docs/stream/operators/sourceorflow/Split.scala @@ -22,6 +22,9 @@ import scala.concurrent.duration._ import org.apache.pekko.stream.scaladsl.Sink import org.apache.pekko.stream.scaladsl.Source +import scala.annotation.nowarn + +@nowarn("msg=deprecated") object Split { def splitWhenExample(args: Array[String]): Unit = { import org.apache.pekko.actor.ActorSystem diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowStatefulMapConcatSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowStatefulMapConcatSpec.scala index ce86a47502e..dc909475b39 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowStatefulMapConcatSpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowStatefulMapConcatSpec.scala @@ -21,6 +21,9 @@ import pekko.stream.Supervision import pekko.stream.testkit._ import pekko.stream.testkit.scaladsl.TestSink +import scala.annotation.nowarn + +@nowarn("msg=deprecated") class FlowStatefulMapConcatSpec extends StreamSpec(""" pekko.stream.materializer.initial-input-buffer-size = 2 """) with ScriptedTest { diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala index 54b75b94e21..3d10ab4953d 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala @@ -784,6 +784,9 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr * The returned `Iterable` MUST NOT contain `null` values, * as they are illegal as stream elements - according to the Reactive Streams specification. * + * This operator doesn't handle upstream's completion signal since the state kept in the closure can be lost. + * Use [[FlowOps.statefulMap]] instead. + * * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. * * '''Emits when''' the mapping function returns an element or there are still remaining elements @@ -796,6 +799,8 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr * * '''Cancels when''' downstream cancels */ + @deprecated("Use `statefulMap` with `mapConcat` instead.", "1.0.2") + @Deprecated def statefulMapConcat[T]( f: function.Creator[function.Function[Out, java.lang.Iterable[T]]]): javadsl.Flow[In, T, Mat] = new Flow(delegate.statefulMapConcat { () => diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala index 0156d0a60c0..a8fe6677671 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala @@ -2437,6 +2437,9 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ * The returned `Iterable` MUST NOT contain `null` values, * as they are illegal as stream elements - according to the Reactive Streams specification. * + * This operator doesn't handle upstream's completion signal since the state kept in the closure can be lost. + * Use [[FlowOps.statefulMap]] instead. + * * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. * * '''Emits when''' the mapping function returns an element or there are still remaining elements @@ -2449,6 +2452,8 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ * * '''Cancels when''' downstream cancels */ + @deprecated("Use `statefulMap` with `mapConcat` instead.", "1.0.2") + @Deprecated def statefulMapConcat[T](f: function.Creator[function.Function[Out, java.lang.Iterable[T]]]): javadsl.Source[T, Mat] = new Source(delegate.statefulMapConcat { () => val fun = f.create() diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala index e6e1997208d..b9c7d089ab2 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala @@ -258,6 +258,9 @@ class SubFlow[In, Out, Mat]( * The returned `Iterable` MUST NOT contain `null` values, * as they are illegal as stream elements - according to the Reactive Streams specification. * + * This operator doesn't handle upstream's completion signal since the state kept in the closure can be lost. + * Use [[FlowOps.statefulMap]] instead. + * * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. * * '''Emits when''' the mapping function returns an element or there are still remaining elements @@ -270,6 +273,8 @@ class SubFlow[In, Out, Mat]( * * '''Cancels when''' downstream cancels */ + @deprecated("Use `statefulMap` with `mapConcat` instead.", "1.0.2") + @Deprecated def statefulMapConcat[T](f: function.Creator[function.Function[Out, java.lang.Iterable[T]]]): SubFlow[In, T, Mat] = new SubFlow(delegate.statefulMapConcat { () => val fun = f.create() diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala index 43ba6d69400..0de91a896ad 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala @@ -249,6 +249,9 @@ class SubSource[Out, Mat]( * The returned `Iterable` MUST NOT contain `null` values, * as they are illegal as stream elements - according to the Reactive Streams specification. * + * This operator doesn't handle upstream's completion signal since the state kept in the closure can be lost. + * Use [[FlowOps.statefulMap]] instead. + * * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. * * '''Emits when''' the mapping function returns an element or there are still remaining elements @@ -261,6 +264,8 @@ class SubSource[Out, Mat]( * * '''Cancels when''' downstream cancels */ + @deprecated("Use `statefulMap` with `mapConcat` instead.", "1.0.2") + @Deprecated def statefulMapConcat[T](f: function.Creator[function.Function[Out, java.lang.Iterable[T]]]): SubSource[T, Mat] = new SubSource(delegate.statefulMapConcat { () => val fun = f.create() diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala index 7c6c47b17c2..16422486397 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala @@ -1008,6 +1008,7 @@ trait FlowOps[+Out, +Mat] { * * '''Cancels when''' downstream cancels */ + @nowarn("msg=deprecated") def mapConcat[T](f: Out => IterableOnce[T]): Repr[T] = statefulMapConcat(() => f) /** @@ -1053,6 +1054,9 @@ trait FlowOps[+Out, +Mat] { * The returned `Iterable` MUST NOT contain `null` values, * as they are illegal as stream elements - according to the Reactive Streams specification. * + * This operator doesn't handle upstream's completion signal since the state kept in the closure can be lost. + * Use [[FlowOps.statefulMap]] instead. + * * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. * * '''Emits when''' the mapping function returns an element or there are still remaining elements @@ -1067,6 +1071,7 @@ trait FlowOps[+Out, +Mat] { * * See also [[FlowOps.mapConcat]] */ + @deprecated("Use `statefulMap` with `mapConcat` instead.", "1.0.2") def statefulMapConcat[T](f: () => Out => IterableOnce[T]): Repr[T] = via(new StatefulMapConcat(f))