Skip to content

Commit

Permalink
=str Deprecate statefulMapConcat operator.
Browse files Browse the repository at this point in the history
  • Loading branch information
He-Pin committed Sep 1, 2023
1 parent 7864485 commit 8d2141a
Show file tree
Hide file tree
Showing 10 changed files with 46 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,16 @@ import org.apache.pekko
import pekko.actor.ActorSystem
import pekko.stream.scaladsl._

import scala.annotation.nowarn

object ZipWithIndexBenchmark {
final val OperationsPerInvocation = 100000
}

@State(Scope.Benchmark)
@OutputTimeUnit(TimeUnit.SECONDS)
@BenchmarkMode(Array(Mode.Throughput))
@nowarn("msg=deprecated")
class ZipWithIndexBenchmark {
import ZipWithIndexBenchmark._

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,15 @@ Transform each element into zero or more elements that are individually passed d

@ref[Simple operators](../index.md#simple-operators)

@@@ warning

The `statefulMapConcat` operator has been deprecated.

- for stateful mapping, use @ref:[statefulMap](./statefulMap.md)
- for stateful map concat, use @ref:[statefulMap](./statefulMap.md) with @ref:[mapConcat](./mapConcat.md).

@@@

## Signature

@apidoc[Flow.statefulMapConcat](Flow) { scala="#statefulMapConcat[T](f:()=>Out=>scala.collection.immutable.Iterable[T]):FlowOps.this.Repr[T]" java="#statefulMapConcat(org.apache.pekko.japi.function.Creator)" }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ import org.apache.pekko.actor.ActorSystem
import org.apache.pekko.stream.scaladsl.Flow
import org.apache.pekko.stream.scaladsl.Source

import scala.annotation.nowarn

@nowarn("msg=deprecated")
class StatefulMapConcat {

implicit val system: ActorSystem = ???
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ import scala.concurrent.duration._
import org.apache.pekko.stream.scaladsl.Sink
import org.apache.pekko.stream.scaladsl.Source

import scala.annotation.nowarn

@nowarn("msg=deprecated")
object Split {
def splitWhenExample(args: Array[String]): Unit = {
import org.apache.pekko.actor.ActorSystem
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ import pekko.stream.Supervision
import pekko.stream.testkit._
import pekko.stream.testkit.scaladsl.TestSink

import scala.annotation.nowarn

@nowarn("msg=deprecated")
class FlowStatefulMapConcatSpec extends StreamSpec("""
pekko.stream.materializer.initial-input-buffer-size = 2
""") with ScriptedTest {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -784,6 +784,9 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
* The returned `Iterable` MUST NOT contain `null` values,
* as they are illegal as stream elements - according to the Reactive Streams specification.
*
* This operator doesn't handle upstream's completion signal since the state kept in the closure can be lost.
* Use [[FlowOps.statefulMap]] instead.
*
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
*
* '''Emits when''' the mapping function returns an element or there are still remaining elements
Expand All @@ -796,6 +799,8 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
*
* '''Cancels when''' downstream cancels
*/
@deprecated("Use `statefulMap` with `mapConcat` instead.", "1.0.2")
@Deprecated
def statefulMapConcat[T](
f: function.Creator[function.Function[Out, java.lang.Iterable[T]]]): javadsl.Flow[In, T, Mat] =
new Flow(delegate.statefulMapConcat { () =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2437,6 +2437,9 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
* The returned `Iterable` MUST NOT contain `null` values,
* as they are illegal as stream elements - according to the Reactive Streams specification.
*
* This operator doesn't handle upstream's completion signal since the state kept in the closure can be lost.
* Use [[FlowOps.statefulMap]] instead.
*
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
*
* '''Emits when''' the mapping function returns an element or there are still remaining elements
Expand All @@ -2449,6 +2452,8 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
*
* '''Cancels when''' downstream cancels
*/
@deprecated("Use `statefulMap` with `mapConcat` instead.", "1.0.2")
@Deprecated
def statefulMapConcat[T](f: function.Creator[function.Function[Out, java.lang.Iterable[T]]]): javadsl.Source[T, Mat] =
new Source(delegate.statefulMapConcat { () =>
val fun = f.create()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,9 @@ class SubFlow[In, Out, Mat](
* The returned `Iterable` MUST NOT contain `null` values,
* as they are illegal as stream elements - according to the Reactive Streams specification.
*
* This operator doesn't handle upstream's completion signal since the state kept in the closure can be lost.
* Use [[FlowOps.statefulMap]] instead.
*
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
*
* '''Emits when''' the mapping function returns an element or there are still remaining elements
Expand All @@ -270,6 +273,8 @@ class SubFlow[In, Out, Mat](
*
* '''Cancels when''' downstream cancels
*/
@deprecated("Use `statefulMap` with `mapConcat` instead.", "1.0.2")
@Deprecated
def statefulMapConcat[T](f: function.Creator[function.Function[Out, java.lang.Iterable[T]]]): SubFlow[In, T, Mat] =
new SubFlow(delegate.statefulMapConcat { () =>
val fun = f.create()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,9 @@ class SubSource[Out, Mat](
* The returned `Iterable` MUST NOT contain `null` values,
* as they are illegal as stream elements - according to the Reactive Streams specification.
*
* This operator doesn't handle upstream's completion signal since the state kept in the closure can be lost.
* Use [[FlowOps.statefulMap]] instead.
*
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
*
* '''Emits when''' the mapping function returns an element or there are still remaining elements
Expand All @@ -261,6 +264,8 @@ class SubSource[Out, Mat](
*
* '''Cancels when''' downstream cancels
*/
@deprecated("Use `statefulMap` with `mapConcat` instead.", "1.0.2")
@Deprecated
def statefulMapConcat[T](f: function.Creator[function.Function[Out, java.lang.Iterable[T]]]): SubSource[T, Mat] =
new SubSource(delegate.statefulMapConcat { () =>
val fun = f.create()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1008,6 +1008,7 @@ trait FlowOps[+Out, +Mat] {
*
* '''Cancels when''' downstream cancels
*/
@nowarn("msg=deprecated")
def mapConcat[T](f: Out => IterableOnce[T]): Repr[T] = statefulMapConcat(() => f)

/**
Expand Down Expand Up @@ -1053,6 +1054,9 @@ trait FlowOps[+Out, +Mat] {
* The returned `Iterable` MUST NOT contain `null` values,
* as they are illegal as stream elements - according to the Reactive Streams specification.
*
* This operator doesn't handle upstream's completion signal since the state kept in the closure can be lost.
* Use [[FlowOps.statefulMap]] instead.
*
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
*
* '''Emits when''' the mapping function returns an element or there are still remaining elements
Expand All @@ -1067,6 +1071,7 @@ trait FlowOps[+Out, +Mat] {
*
* See also [[FlowOps.mapConcat]]
*/
@deprecated("Use `statefulMap` with `mapConcat` instead.", "1.0.2")
def statefulMapConcat[T](f: () => Out => IterableOnce[T]): Repr[T] =
via(new StatefulMapConcat(f))

Expand Down

0 comments on commit 8d2141a

Please sign in to comment.