Skip to content

Commit

Permalink
feat: Add collectWhile operator. (#964)
Browse files Browse the repository at this point in the history
  • Loading branch information
He-Pin authored Jan 16, 2024
1 parent 11ba3d7 commit 19da736
Show file tree
Hide file tree
Showing 13 changed files with 347 additions and 4 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# collectWhile

Transform this stream by applying the given partial function to each of the elements on which the function is defined as they pass through this processing step, and cancel the upstream publisher after the partial function is not applied.

@ref[Simple operators](../index.md#simple-operators)

## Signature

@apidoc[Source.collectWhile](Source) { scala="#collectWhile[T](pf:PartialFunction[Out,T]):FlowOps.this.Repr[T]" java="#collectWhile(scala.PartialFunction)" }
@apidoc[Flow.collectWhile](Flow) { scala="#collectWhile[T](pf:PartialFunction[Out,T]):FlowOps.this.Repr[T]" java="#collectWhile(scala.PartialFunction)" }


## Description

Transform this stream by applying the given partial function to each of the elements on which the function is defined
as they pass through this processing step, and cancel the upstream publisher after the partial function is not applied.

## Example

Scala
: @@snip [Collect.scala](/docs/src/test/scala/docs/stream/operators/sourceorflow/Collect.scala) { #collectWhile }

Java
: @@snip [SourceOrFlow.java](/docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java) { #collectWhile }

## Reactive Streams semantics

@@@div { .callout }

**emits** when the provided partial function is defined for the element

**backpressures** when the partial function is defined for the element and downstream backpressures

**completes** when upstream completes or the partial function is not applied

**cancels** when downstream cancels

@@@
2 changes: 2 additions & 0 deletions docs/src/main/paradox/stream/operators/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ depending on being backpressured by downstream or not.
|Flow|<a name="asflowwithcontext"></a>@ref[asFlowWithContext](Flow/asFlowWithContext.md)|Extracts context data from the elements of a `Flow` so that it can be turned into a `FlowWithContext` which can propagate that context per element along a stream.|
|Source/Flow|<a name="collect"></a>@ref[collect](Source-or-Flow/collect.md)|Apply a partial function to each incoming element, if the partial function is defined for a value the returned value is passed downstream.|
|Source/Flow|<a name="collecttype"></a>@ref[collectType](Source-or-Flow/collectType.md)|Transform this stream by testing the type of each of the elements on which the element is an instance of the provided type as they pass through this processing step.|
|Source/Flow|<a name="collectwhile"></a>@ref[collectWhile](Source-or-Flow/collectWhile.md)|Transform this stream by applying the given partial function to each of the elements on which the function is defined as they pass through this processing step, and cancel the upstream publisher after the partial function is not applied.|
|Flow|<a name="completionstageflow"></a>@ref[completionStageFlow](Flow/completionStageFlow.md)|Streams the elements through the given future flow once it successfully completes.|
|Flow|<a name="contramap"></a>@ref[contramap](Flow/contramap.md)|Transform this Flow by applying a function to each *incoming* upstream element before it is passed to the Flow.|
|Source/Flow|<a name="detach"></a>@ref[detach](Source-or-Flow/detach.md)|Detach upstream demand from downstream demand without detaching the stream rates.|
Expand Down Expand Up @@ -413,6 +414,7 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md)
* [collect](Sink/collect.md)
* [collection](Sink/collection.md)
* [collectType](Source-or-Flow/collectType.md)
* [collectWhile](Source-or-Flow/collectWhile.md)
* [combine](Source/combine.md)
* [combine](Sink/combine.md)
* [completionStage](Source/completionStage.md)
Expand Down
11 changes: 11 additions & 0 deletions docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,17 @@ void collectExample() {
// #collect
}

void collectWhileExample() {
// #collectWhile
Flow<Message, Pong, NotUsed> flow =
Flow.of(Message.class)
.collectWhile(
PFBuilder.<Message, Pong>create()
.match(Ping.class, p -> p.id <= 100, p -> new Pong(p.id))
.build());
// #collectWhile
}

void collectTypeExample() {
// #collectType
Flow<Message, Pong, NotUsed> flow =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,12 @@ object Collect {
Flow[Message].collectType[Ping].filter(_.id != 0).map(p => Pong(p.id))
// #collectType
}

def collectWhile(): Unit = {
// #collectWhile
Flow[Message].collectWhile {
case Ping(id) if id <= 100 => Pong(id)
}
// #collectWhile
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -983,6 +983,20 @@ public void mustBeAbleToUseCollect() {
.expectComplete();
}

@Test
public void mustBeAbleToUseCollectWhile() {
Source.from(Arrays.asList(1, 3, 5, 6, 7, 8, 9))
.collectWhile(
PFBuilder.<Integer, Integer>create()
.match(Integer.class, elem -> elem % 2 != 0, elem -> elem)
.build())
.runWith(TestSink.create(system), system)
.ensureSubscription()
.request(5)
.expectNextN(Arrays.asList(1, 3, 5))
.expectComplete();
}

@Test
public void mustBeAbleToUseCollectType() throws Exception {
final TestKit probe = new TestKit(system);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.pekko.stream.scaladsl

import org.apache.pekko
import pekko.stream.ActorAttributes._
import pekko.stream.OverflowStrategy
import pekko.stream.Supervision._
import pekko.stream.testkit.{ ScriptedTest, StreamSpec }
import pekko.stream.testkit.Utils.TE
import pekko.stream.testkit.scaladsl.TestSink

class FlowCollectWhileSpec extends StreamSpec with ScriptedTest {

"A CollectWhile" must {

"collect in happy path" in {
Source(List(1, 3, 5, 7, 8, 9, 10))
.collectWhile {
case elem if elem % 2 != 0 => elem
}
.runWith(TestSink())
.request(7)
.expectNextN(List(1, 3, 5, 7))
.expectComplete()
}

"complete with buffer even no explict request" in {
Source(List(2, 3, 5))
.collectWhile {
case elem if elem % 2 != 0 => elem
}
.buffer(1, overflowStrategy = OverflowStrategy.backpressure)
.runWith(TestSink())
.ensureSubscription()
.expectComplete()
}

"complete with empty Source" in {
Source.empty[Int].collectWhile {
case elem if elem % 2 != 0 => elem
}.runWith(TestSink[Int]())
.ensureSubscription()
.expectComplete()
}

"restart when pf throws" in {
Source(1 to 6)
.collect { case x: Int => if (x % 2 == 0) throw TE("") else x }
.withAttributes(supervisionStrategy(restartingDecider))
.runWith(TestSink[Int]())
.request(1)
.expectNext(1)
.request(1)
.expectNext(3)
.request(1)
.expectNext(5)
.request(1)
.expectComplete()
}

"resume when pf throws" in {
Source(1 to 6)
.collect { case x: Int => if (x % 2 == 0) throw TE("") else x }
.withAttributes(supervisionStrategy(resumingDecider))
.runWith(TestSink[Int]())
.request(1)
.expectNext(1)
.request(1)
.expectNext(3)
.request(1)
.expectNext(5)
.request(1)
.expectComplete()
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import pekko.stream.Attributes._
val filter = name("filter")
val filterNot = name("filterNot")
val collect = name("collect")
val collectWhile = name("collectWhile")
val recover = name("recover")
val mapError = name("mapError")
val mapAsync = name("mapAsync")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.pekko.stream.impl.fusing

import scala.util.control.NonFatal

import org.apache.pekko
import pekko.annotation.InternalApi
import pekko.stream.{ Attributes, FlowShape, Inlet, Outlet, Supervision }
import pekko.stream.ActorAttributes.SupervisionStrategy
import pekko.stream.Attributes.SourceLocation
import pekko.stream.impl.Stages.DefaultAttributes
import pekko.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler }

/**
* INTERNAL API
*/
@InternalApi
private[pekko] final class CollectWhile[In, Out](pf: PartialFunction[In, Out]) extends GraphStage[FlowShape[In, Out]] {
private val in = Inlet[In]("CollectWhile.in")
private val out = Outlet[Out]("CollectWhile.out")
override val shape = FlowShape(in, out)

override def initialAttributes: Attributes = DefaultAttributes.collectWhile and SourceLocation.forLambda(pf)

override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with InHandler with OutHandler {
private lazy val decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider
import Collect.NotApplied

override final def onPush(): Unit =
try {
pf.applyOrElse(grab(in), NotApplied) match {
case NotApplied => completeStage()
case result: Out @unchecked => push(out, result)
case _ => throw new RuntimeException() // won't happen, compiler exhaustiveness check pleaser
}
} catch {
case NonFatal(ex) =>
decider(ex) match {
case Supervision.Stop => failStage(ex)
case _ => pull(in)
}
}

override final def onPull(): Unit = pull(in)

setHandlers(in, out, this)
}

override def toString: String = "CollectWhile"
}
22 changes: 22 additions & 0 deletions stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1125,6 +1125,28 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
def collect[T](pf: PartialFunction[Out, T]): javadsl.Flow[In, T, Mat] =
new Flow(delegate.collect(pf))

/**
* Transform this stream by applying the given partial function to each of the elements
* on which the function is defined as they pass through this processing step, and cancel the
* upstream publisher after the partial function is not applied.
*
* The stream will be completed without producing any elements if the partial function is not applied for
* the first stream element, eg: there is a downstream buffer.
*
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
*
* '''Emits when''' the provided partial function is defined for the element
*
* '''Backpressures when''' the partial function is defined for the element and downstream backpressures
*
* '''Completes when''' upstream completes or the partial function is not applied.
*
* '''Cancels when''' downstream cancels
* @since 1.1.0
*/
def collectWhile[T](pf: PartialFunction[Out, T]): javadsl.Flow[In, T, Mat] =
new Flow(delegate.collectWhile(pf))

/**
* Transform this stream by testing the type of each of the elements
* on which the element is an instance of the provided type as they pass through this processing step.
Expand Down
22 changes: 22 additions & 0 deletions stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2839,6 +2839,28 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
def collect[T](pf: PartialFunction[Out, T]): javadsl.Source[T, Mat] =
new Source(delegate.collect(pf))

/**
* Transform this stream by applying the given partial function to each of the elements
* on which the function is defined as they pass through this processing step, and cancel the
* upstream publisher after the partial function is not applied.
*
* The stream will be completed without producing any elements if the partial function is not applied for
* the first stream element, eg: there is a downstream buffer.
*
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
*
* '''Emits when''' the provided partial function is defined for the element
*
* '''Backpressures when''' the partial function is defined for the element and downstream backpressures
*
* '''Completes when''' upstream completes or the partial function is not applied.
*
* '''Cancels when''' downstream cancels
* @since 1.1.0
*/
def collectWhile[T](pf: PartialFunction[Out, T]): javadsl.Source[T, Mat] =
new Source(delegate.collectWhile(pf))

/**
* Transform this stream by testing the type of each of the elements
* on which the element is an instance of the provided type as they pass through this processing step.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -507,6 +507,28 @@ class SubFlow[In, Out, Mat](
def collect[T](pf: PartialFunction[Out, T]): SubFlow[In, T, Mat] =
new SubFlow(delegate.collect(pf))

/**
* Transform this stream by applying the given partial function to each of the elements
* on which the function is defined as they pass through this processing step, and cancel the
* upstream publisher after the partial function is not applied.
*
* The stream will be completed without producing any elements if the partial function is not applied for
* the first stream element, eg: there is a downstream buffer.
*
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
*
* '''Emits when''' the provided partial function is defined for the element
*
* '''Backpressures when''' the partial function is defined for the element and downstream backpressures
*
* '''Completes when''' upstream completes or the partial function is not applied.
*
* '''Cancels when''' downstream cancels
* @since 1.1.0
*/
def collectWhile[T](pf: PartialFunction[Out, T]): SubFlow[In, T, Mat] =
new SubFlow(delegate.collectWhile(pf))

/**
* Transform this stream by testing the type of each of the elements
* on which the element is an instance of the provided type as they pass through this processing step.
Expand Down
Loading

0 comments on commit 19da736

Please sign in to comment.