From 5aa235b9ae371cfc99d6df0668cc43113d67dc0c Mon Sep 17 00:00:00 2001 From: Adib Faramarzi Date: Sun, 28 Apr 2019 01:21:36 +0430 Subject: [PATCH 01/12] Add sample for channels and flows --- .../kotlinx-coroutines-core.txt | 2 + .../common/src/flow/operators/Delay.kt | 109 +++++++++++++++--- 2 files changed, 96 insertions(+), 15 deletions(-) diff --git a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt index b1375b257e..a1a5a6be81 100644 --- a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt +++ b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt @@ -835,6 +835,8 @@ public final class kotlinx/coroutines/flow/FlowKt { public static final fun retry (Lkotlinx/coroutines/flow/Flow;ILkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/flow/Flow; public static synthetic fun retry$default (Lkotlinx/coroutines/flow/Flow;ILkotlin/jvm/functions/Function1;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow; public static final fun sample (Lkotlinx/coroutines/flow/Flow;J)Lkotlinx/coroutines/flow/Flow; + public static final fun sampleBy (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/channels/ReceiveChannel;)Lkotlinx/coroutines/flow/Flow; + public static final fun sampleBy (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow; public static final fun single (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public static final fun singleOrNull (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public static final fun switchMap (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow; diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Delay.kt b/kotlinx-coroutines-core/common/src/flow/operators/Delay.kt index 111ef7cf9a..17f30f44bd 100644 --- a/kotlinx-coroutines-core/common/src/flow/operators/Delay.kt +++ b/kotlinx-coroutines-core/common/src/flow/operators/Delay.kt @@ -116,21 +116,59 @@ public fun Flow.debounce(timeoutMillis: Long): Flow { * Note that the latest element is not emitted if it does not fit into the sampling window. */ public fun Flow.sample(periodMillis: Long): Flow { - require(periodMillis > 0) { "Sample period should be positive" } + return sampleBy(flow { + delay(periodMillis) + while (true) { + emit(Unit) + delay(periodMillis) + } + }) +} + +/** + * Returns a flow that emits only the latest value emitted by the original flow only when the [sampler] emits. + * + * Example: + * ``` + * flow { + * repeat(10) { + * emit(it) + * delay(50) + * } + * }.sampleBy(flow { + * repeat(10) { + * delay(100) + * emit(it) + * } + * }) + * + * ``` + * produces `0, 2, 4, 6, 8`. + * + * Note that the latest element is not emitted if it does not fit into the sampling window. + */ +public fun Flow.sampleBy(sampler: Flow): Flow { return flow { coroutineScope { - val values = produce(capacity = Channel.CONFLATED) { // Actually Any, KT-30796 + val values = produce(capacity = Channel.CONFLATED) { + // Actually Any, KT-30796 collect { value -> send(value ?: NullSurrogate) } } + val otherChannel = Channel(Channel.CONFLATED) + val job = launch { + sampler.collect { + otherChannel.send(it) + } + } var isDone = false var lastValue: Any? = null - val ticker = fixedPeriodTicker(periodMillis) while (!isDone) { select { values.onReceiveOrNull { if (it == null) { - ticker.cancel() + otherChannel.cancel() + job.cancel() isDone = true } else { lastValue = it @@ -138,7 +176,7 @@ public fun Flow.sample(periodMillis: Long): Flow { } // todo: shall be start sampling only when an element arrives or sample aways as here? - ticker.onReceive { + otherChannel.onReceive { val value = lastValue ?: return@onReceive lastValue = null // Consume the value emit(NullSurrogate.unbox(value)) @@ -149,17 +187,58 @@ public fun Flow.sample(periodMillis: Long): Flow { } } -/* - * TODO this design (and design of the corresponding operator) depends on #540 +/** + * Returns a flow that emits only the latest value emitted by the original flow only when the [sampler] channel emits. + * + * Example: + * ``` + * val receiveChannel = produce { + * repeat(10) { + * delay(100) + * send(it) + * } + * } + * launch { + * flow { + * repeat(10) { + * emit(it) + * delay(50) + * } + * }.sampleBy(receiveChannel) + * ``` + * produces `0, 2, 4, 6, 8`. + * + * Note that the latest element is not emitted if it does not fit into the sampling window. */ -internal fun CoroutineScope.fixedPeriodTicker(delayMillis: Long, initialDelayMillis: Long = delayMillis): ReceiveChannel { - require(delayMillis >= 0) { "Expected non-negative delay, but has $delayMillis ms" } - require(initialDelayMillis >= 0) { "Expected non-negative initial delay, but has $initialDelayMillis ms" } - return produce(capacity = 0) { - delay(initialDelayMillis) - while (true) { - channel.send(Unit) - delay(delayMillis) +public fun Flow.sampleBy(sampler: ReceiveChannel): Flow { + return flow { + coroutineScope { + val values = produce(capacity = Channel.CONFLATED) { + // Actually Any, KT-30796 + collect { value -> send(value ?: NullSurrogate) } + } + + var isDone = false + var lastValue: Any? = null + while (!isDone) { + select { + values.onReceiveOrNull { + if (it == null) { + sampler.cancel() + isDone = true + } else { + lastValue = it + } + } + + // todo: shall be start sampling only when an element arrives or sample aways as here? + sampler.onReceive { + val value = lastValue ?: return@onReceive + lastValue = null // Consume the value + emit(NullSurrogate.unbox(value)) + } + } + } } } } From 4a3977957a479a27a76ed0b6d25c0e1fd08646ec Mon Sep 17 00:00:00 2001 From: Adib Faramarzi Date: Sun, 28 Apr 2019 10:24:27 +0430 Subject: [PATCH 02/12] Add tests for flow --- .../common/src/flow/operators/Delay.kt | 16 +- .../test/flow/operators/SampleByTest.kt | 716 ++++++++++++++++++ 2 files changed, 726 insertions(+), 6 deletions(-) create mode 100644 kotlinx-coroutines-core/common/test/flow/operators/SampleByTest.kt diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Delay.kt b/kotlinx-coroutines-core/common/src/flow/operators/Delay.kt index 17f30f44bd..87b829e1bb 100644 --- a/kotlinx-coroutines-core/common/src/flow/operators/Delay.kt +++ b/kotlinx-coroutines-core/common/src/flow/operators/Delay.kt @@ -156,7 +156,7 @@ public fun Flow.sampleBy(sampler: Flow): Flow { } val otherChannel = Channel(Channel.CONFLATED) - val job = launch { + val samplerJob = launch { sampler.collect { otherChannel.send(it) } @@ -168,7 +168,7 @@ public fun Flow.sampleBy(sampler: Flow): Flow { values.onReceiveOrNull { if (it == null) { otherChannel.cancel() - job.cancel() + samplerJob.cancel() isDone = true } else { lastValue = it @@ -213,6 +213,8 @@ public fun Flow.sampleBy(sampler: Flow): Flow { public fun Flow.sampleBy(sampler: ReceiveChannel): Flow { return flow { coroutineScope { + + val values = produce(capacity = Channel.CONFLATED) { // Actually Any, KT-30796 collect { value -> send(value ?: NullSurrogate) } @@ -232,10 +234,12 @@ public fun Flow.sampleBy(sampler: ReceiveChannel): Flow { } // todo: shall be start sampling only when an element arrives or sample aways as here? - sampler.onReceive { - val value = lastValue ?: return@onReceive - lastValue = null // Consume the value - emit(NullSurrogate.unbox(value)) + sampler.onReceiveOrNull { + if(it != null){ + val value = lastValue ?: return@onReceiveOrNull + lastValue = null // Consume the value + emit(NullSurrogate.unbox(value)) + } } } } diff --git a/kotlinx-coroutines-core/common/test/flow/operators/SampleByTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/SampleByTest.kt new file mode 100644 index 0000000000..3e4f101e9a --- /dev/null +++ b/kotlinx-coroutines-core/common/test/flow/operators/SampleByTest.kt @@ -0,0 +1,716 @@ +/* + * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.flow.operators + +import kotlinx.coroutines.* +import kotlinx.coroutines.channels.* +import kotlinx.coroutines.flow.* +import kotlin.test.* + +class SampleByTest : TestBase() { + @Test + public fun testBasicWithFlow() = withVirtualTime { + expect(1) + val flow = flow { + expect(3) + delay(200) + emit("A") + expect(4) + delay(600) + emit("B") + delay(200) + emit("C") + delay(200) + expect(6) + emit("D") + delay(800) + expect(7) + emit("E") + } + + + val samplerFlow = flow { + delay(1000) + expect(5) + emit("AA") + delay(1000) + emit("BB") + } + expect(2) + val result = flow.sampleBy(samplerFlow).toList() + assertEquals(listOf("B", "D"), result) + finish(8) + } + + @Test + public fun testBasicWithChannel() = withVirtualTime { + expect(1) + val flow = flow { + expect(3) + delay(200) + emit("A") + expect(4) + delay(600) + emit("B") + delay(200) + emit("C") + delay(200) + expect(6) + emit("D") + delay(800) + expect(7) + emit("E") + } + + + val samplerChannel = produce { + delay(1000) + expect(5) + send("AA") + delay(1000) + send("BB") + } + expect(2) + val result = flow.sampleBy(samplerChannel).toList() + assertEquals(listOf("B", "D"), result) + finish(8) + } + + @Test + fun testDelayedFirstWithFlow() = withVirtualTime { + val flow = flow { + delay(60) + emit(1) + expect(1) + delay(60) + expect(3) + }.sampleBy(flow { + delay(100) + emit(4) + expect(2) + }) + assertEquals(1, flow.singleOrNull()) + finish(4) + } + + @Test + fun testDelayedFirstWithChannel() = withVirtualTime { + val channel = Channel(Channel.CONFLATED) + launch { + delay(100) + channel.send(4) + expect(2) + } + val flow = flow { + delay(60) + emit(1) + expect(1) + delay(60) + expect(3) + }.sampleBy(channel) + assertEquals(1, flow.singleOrNull()) + finish(4) + } + + @Test + fun testBasicFlow2() = withVirtualTime { + expect(1) + val flow = flow { + expect(3) + emit(1) + emit(2) + delay(501) + emit(3) + delay(100) + emit(4) + delay(100) + emit(5) + emit(6) + delay(301) + emit(7) + delay(501) + expect(4) + } + val samplerFlow = flow { + delay(500) + repeat(10) { + emit(1) + delay(500) + } + } + expect(2) + val result = flow.sampleBy(samplerFlow).toList() + assertEquals(listOf(2, 6, 7), result) + finish(5) + } + + @Test + fun testBasicChannel2() = withVirtualTime { + expect(1) + val flow = flow { + expect(3) + emit(1) + emit(2) + delay(501) + emit(3) + delay(100) + emit(4) + delay(100) + emit(5) + emit(6) + delay(301) + emit(7) + delay(501) + expect(4) + } + val channel = produce { + delay(500) + repeat(10) { + send(1) + delay(500) + } + } + expect(2) + val result = flow.sampleBy(channel).toList() + assertEquals(listOf(2, 6, 7), result) + finish(5) + } + + @Test + fun testFixedDelayWithFlow() = withVirtualTime { + val flow = flow { + emit("A") + expect(1) + delay(150) + emit("B") + expect(3) + }.sampleBy(flow { + delay(100) + emit("A") + expect(2) + }) + assertEquals("A", flow.single()) + finish(4) + } + + @Test + fun testFixedDelayWithChannel() = withVirtualTime { + throw Exception() + val channel = produce { + delay(100) + send(1) + expect(2) + } + val flow = flow { + emit("A") + expect(1) + delay(150) + emit("B") + expect(3) + }.sampleBy(channel) + assertEquals("A", flow.single()) + finish(4) + } + + @Test + fun testSingleNullWithFlow() = withVirtualTime { + val flow = flow { + emit(null) + delay(2) + expect(1) + }.sampleBy(flow { + delay(1) + emit(1) + }) + assertNull(flow.single()) + finish(2) + } + + + @Test + fun testSingleNullWithChannel() = withVirtualTime { + val channel = produce { + delay(1) + send(1) + } + val flow = flow { + emit(null) + delay(2) + expect(1) + }.sampleBy(channel) + assertNull(flow.single()) + finish(2) + } + + @Test + fun testBasicWithNullsWithFlow() = withVirtualTime { + expect(1) + val flow = flow { + expect(3) + emit("A") + delay(1500) + emit(null) + delay(500) + emit("C") + delay(250) + emit(null) + delay(2000) + emit("E") + expect(4) + } + + expect(2) + val sampler = flow { + repeat(20) { + delay(1000) + repeat(10) { + emit(1) + delay(1000) + } + } + } + val result = flow.sampleBy(sampler).toList() + assertEquals(listOf("A", null, null), result) + finish(5) + } + + @Test + fun testBasicWithNullsWithChannel() = withVirtualTime { + expect(1) + val flow = flow { + expect(3) + emit("A") + delay(1500) + emit(null) + delay(500) + emit("C") + delay(250) + emit(null) + delay(2000) + emit("E") + expect(4) + } + + expect(2) + val channel = produce { + repeat(20) { + delay(1000) + repeat(10) { + send(1) + delay(1000) + } + } + } + val result = flow.sampleBy(channel).toList() + assertEquals(listOf("A", null, null), result) + finish(5) + } + + @Test + fun testEmptyWithFlow() = runTest { + val flow = emptyFlow().sampleBy(flow { + delay(Long.MAX_VALUE) + emit(1) + }) + assertNull(flow.singleOrNull()) + } + + @Test + fun testEmptyWithChannel() = runTest { + val channel = produce { + delay(Long.MAX_VALUE) + send(1) + } + + val flow = emptyFlow().sampleBy(channel) + assertNull(flow.singleOrNull()) + } + + @Test + fun testScalarWithFlow() = runTest { + val flow = flowOf(1, 2, 3).sampleBy(flow { + delay(Long.MAX_VALUE) + emit(1) + }) + assertNull(flow.singleOrNull()) + } + + @Test + fun testScalarWithChannel() = runTest { + val flow = flowOf(1, 2, 3).sampleBy(produce { + delay(Long.MAX_VALUE) + send(1) + }) + assertNull(flow.singleOrNull()) + } + + @Test + // note that this test depends on the sampling strategy -- when sampling time starts on a quiescent flow that suddenly emits + fun testLongWaitWithFlow() = withVirtualTime { + expect(1) + val flow = flow { + expect(2) + emit("A") + delay(3500) // long delay -- multiple sampling intervals + emit("B") + delay(900) // crosses time = 4000 barrier + emit("C") + delay(3000) // long wait again + + } + val result = flow.sampleBy(flow { + repeat(10){ + delay(1000) + emit(1) + } + }).toList() + assertEquals(listOf("A", "B", "C"), result) + finish(3) + } + + + @Test + // note that this test depends on the sampling strategy -- when sampling time starts on a quiescent flow that suddenly emits + fun testLongWaitWithChannel() = withVirtualTime { + expect(1) + val flow = flow { + expect(2) + emit("A") + delay(3500) // long delay -- multiple sampling intervals + emit("B") + delay(900) // crosses time = 4000 barrier + emit("C") + delay(3000) // long wait again + + } + val result = flow.sampleBy(produce { + repeat(10){ + delay(1000) + send(1) + } + }).toList() + assertEquals(listOf("A", "B", "C"), result) + finish(3) + } + + @Test + fun testPaceWithFlow() = withVirtualTime { + val flow = flow { + expect(1) + repeat(4) { + emit(-it) + delay(50) + } + + repeat(4) { + emit(it) + delay(100) + } + expect(2) + }.sampleBy( flow { + repeat(10) { + delay(100) + emit(1) + } + }) + + assertEquals(listOf(-1, -3, 0, 1, 2, 3), flow.toList()) + finish(3) + + } + + @Test + fun testPaceWithChannel() = withVirtualTime { + val flow = flow { + expect(1) + repeat(4) { + emit(-it) + delay(50) + } + + repeat(4) { + emit(it) + delay(100) + } + expect(2) + }.sampleBy( produce { + repeat(10) { + delay(100) + send(1) + } + }) + assertEquals(listOf(-1, -3, 0, 1, 2, 3), flow.toList()) + finish(3) + + } + + @Test + fun testUpstreamErrorWithFlow() = runTest { + val latch = Channel() + val flow = flow { + expect(1) + emit(1) + expect(2) + latch.receive() + throw TestException() + }.sampleBy(flow { + repeat(10) { + delay(1) + emit(1) + } + }).map { + latch.send(Unit) + hang { expect(3) } + } + + assertFailsWith(flow) + finish(4) + } + + @Test + fun testUpstreamErrorWithChannel() = runTest { + val channel = Channel(Channel.CONFLATED) + launch { + repeat(10) { + delay(1) + channel.send(1) + } + } + val latch = Channel() + val flow = flow { + expect(1) + emit(1) + expect(2) + latch.receive() + throw TestException() + }.sampleBy(channel).map { + latch.send(Unit) + hang { expect(3) } + } + + assertFailsWith(flow) + finish(4) + } + + @Test + fun testUpstreamErrorIsolatedContextWithFlow() = runTest { + val latch = Channel() + val flow = flow { + assertEquals("upstream", NamedDispatchers.name()) + expect(1) + emit(1) + expect(2) + latch.receive() + throw TestException() + }.flowOn(NamedDispatchers("upstream")).sampleBy(flow { + repeat(10) { + delay(1) + emit(1) + } + }).map { + latch.send(Unit) + hang { expect(3) } + } + + assertFailsWith(flow) + finish(4) + } + + @Test + fun testUpstreamErrorIsolatedContextWithChannel() = runTest { + val latch = Channel() + val flow = flow { + assertEquals("upstream", NamedDispatchers.name()) + expect(1) + emit(1) + expect(2) + latch.receive() + throw TestException() + }.flowOn(NamedDispatchers("upstream")).sampleBy(produce { + repeat(10) { + delay(1) + send(1) + } + }).map { + latch.send(Unit) + hang { expect(3) } + } + + assertFailsWith(flow) + finish(4) + } + + @Test + fun testUpstreamErrorSampleNotTriggeredWithFlow() = runTest { + val flow = flow { + expect(1) + emit(1) + expect(2) + throw TestException() + }.sampleBy(flow{ + delay(Long.MAX_VALUE) + emit(1) + }).map { + expectUnreached() + } + assertFailsWith(flow) + finish(3) + } + + @Test + fun testUpstreamErrorSampleNotTriggeredWithChannel() = withVirtualTime { + + val channel = Channel(Channel.CONFLATED) + launch { + delay(Long.MAX_VALUE) + channel.send(1) + } + val flow = flow { + expect(1) + expect(2) + throw TestException() + }.sampleBy(channel).map { + expectUnreached() + } + assertFailsWith(flow) + finish(3) + } + + + @Test + fun testUpstreamErrorSampleNotTriggeredInIsolatedContextWithFlow() = runTest { + val flow = flow { + expect(1) + emit(1) + expect(2) + throw TestException() + }.flowWith(NamedDispatchers("unused")) { + sampleBy(flow{ + delay(Long.MAX_VALUE) + emit(1) + }).map { + expectUnreached() + } + } + + assertFailsWith(flow) + finish(3) + } + + @Test + fun testUpstreamErrorSampleNotTriggeredInIsolatedContextWithChannel() = runTest { + throw Exception() + val channel = Channel(Channel.CONFLATED) + launch { + delay(Long.MAX_VALUE) + channel.send(1) + } + val flow = flow { + expect(1) + emit(1) + expect(2) + throw TestException() + }.flowWith(NamedDispatchers("unused")) { + sampleBy(channel).map { + expectUnreached() + } + } + + assertFailsWith(flow) + finish(3) + } + + @Test + fun testDownstreamErrorWithFlow() = runTest { + val flow = flow { + expect(1) + emit(1) + hang { expect(3) } + }.sampleBy(flow { + repeat(100) { + delay(1) + emit(1) + } + }).map { + expect(2) + yield() + throw TestException() + it + } + + assertFailsWith(flow) + finish(4) + } + + @Test + fun testDownstreamErrorWithChannel() = runTest { + val channel = Channel(Channel.CONFLATED) + launch { + repeat(100) { + delay(1) + channel.send(1) + } + } + val flow = flow { + expect(1) + emit(1) + hang { expect(3) } + }.sampleBy(channel).map { + expect(2) + yield() + throw TestException() + it + } + + assertFailsWith(flow) + finish(4) + } + + @Test + fun testDownstreamErrorIsolatedContextWithFlow() = runTest { + val flow = flow { + assertEquals("upstream", NamedDispatchers.name()) + expect(1) + emit(1) + hang { expect(3) } + }.flowOn(NamedDispatchers("upstream")).sampleBy(flow { + repeat(100) { + delay(1) + emit(1) + } + }).map { + expect(2) + yield() + throw TestException() + it + } + + assertFailsWith(flow) + finish(4) + } + + @Test + fun testDownstreamErrorIsolatedContextWithChannel() = runTest { + val flow = flow { + assertEquals("upstream", NamedDispatchers.name()) + expect(1) + emit(1) + hang { expect(3) } + }.flowOn(NamedDispatchers("upstream")).sampleBy(produce { + repeat(100) { + delay(1) + send(1) + } + }).map { + expect(2) + yield() + throw TestException() + it + } + + assertFailsWith(flow) + finish(4) + } +} From cb4ae3cd5c6bcca3c34b184a6d140c998995966c Mon Sep 17 00:00:00 2001 From: Adib Faramarzi Date: Sun, 28 Apr 2019 21:27:48 +0430 Subject: [PATCH 03/12] Add tests for flow with another flow/channel --- .../common/src/flow/operators/Delay.kt | 9 ++-- .../test/flow/operators/SampleByTest.kt | 44 ++++++++++--------- 2 files changed, 28 insertions(+), 25 deletions(-) diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Delay.kt b/kotlinx-coroutines-core/common/src/flow/operators/Delay.kt index 87b829e1bb..03735a1401 100644 --- a/kotlinx-coroutines-core/common/src/flow/operators/Delay.kt +++ b/kotlinx-coroutines-core/common/src/flow/operators/Delay.kt @@ -213,13 +213,12 @@ public fun Flow.sampleBy(sampler: Flow): Flow { public fun Flow.sampleBy(sampler: ReceiveChannel): Flow { return flow { coroutineScope { - - val values = produce(capacity = Channel.CONFLATED) { // Actually Any, KT-30796 - collect { value -> send(value ?: NullSurrogate) } + collect { value -> + send(value ?: NullSurrogate) + } } - var isDone = false var lastValue: Any? = null while (!isDone) { @@ -239,6 +238,8 @@ public fun Flow.sampleBy(sampler: ReceiveChannel): Flow { val value = lastValue ?: return@onReceiveOrNull lastValue = null // Consume the value emit(NullSurrogate.unbox(value)) + }else{ + isDone = true } } } diff --git a/kotlinx-coroutines-core/common/test/flow/operators/SampleByTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/SampleByTest.kt index 3e4f101e9a..64488fd11a 100644 --- a/kotlinx-coroutines-core/common/test/flow/operators/SampleByTest.kt +++ b/kotlinx-coroutines-core/common/test/flow/operators/SampleByTest.kt @@ -25,9 +25,8 @@ class SampleByTest : TestBase() { delay(200) expect(6) emit("D") - delay(800) + delay(10000) expect(7) - emit("E") } @@ -59,9 +58,9 @@ class SampleByTest : TestBase() { delay(200) expect(6) emit("D") - delay(800) - expect(7) + delay(1000) emit("E") + expect(7) } @@ -197,7 +196,6 @@ class SampleByTest : TestBase() { @Test fun testFixedDelayWithChannel() = withVirtualTime { - throw Exception() val channel = produce { delay(100) send(1) @@ -519,7 +517,14 @@ class SampleByTest : TestBase() { } @Test - fun testUpstreamErrorIsolatedContextWithChannel() = runTest { + fun testUpstreamErrorIsolatedContextWithChannel() = withVirtualTime { + val sampler = Channel(Channel.CONFLATED) + launch { + repeat(10) { + delay(1) + sampler.send(1) + } + } val latch = Channel() val flow = flow { assertEquals("upstream", NamedDispatchers.name()) @@ -528,12 +533,7 @@ class SampleByTest : TestBase() { expect(2) latch.receive() throw TestException() - }.flowOn(NamedDispatchers("upstream")).sampleBy(produce { - repeat(10) { - delay(1) - send(1) - } - }).map { + }.flowOn(NamedDispatchers("upstream")).sampleBy(sampler).map { latch.send(Unit) hang { expect(3) } } @@ -600,8 +600,7 @@ class SampleByTest : TestBase() { } @Test - fun testUpstreamErrorSampleNotTriggeredInIsolatedContextWithChannel() = runTest { - throw Exception() + fun testUpstreamErrorSampleNotTriggeredInIsolatedContextWithChannel() = withVirtualTime { val channel = Channel(Channel.CONFLATED) launch { delay(Long.MAX_VALUE) @@ -618,6 +617,7 @@ class SampleByTest : TestBase() { } } + assertFailsWith(flow) finish(3) } @@ -692,18 +692,20 @@ class SampleByTest : TestBase() { } @Test - fun testDownstreamErrorIsolatedContextWithChannel() = runTest { + fun testDownstreamErrorIsolatedContextWithChannel() = withVirtualTime { + val channel = Channel(Channel.CONFLATED) + launch { + repeat(100) { + delay(1) + channel.send(1) + } + } val flow = flow { assertEquals("upstream", NamedDispatchers.name()) expect(1) emit(1) hang { expect(3) } - }.flowOn(NamedDispatchers("upstream")).sampleBy(produce { - repeat(100) { - delay(1) - send(1) - } - }).map { + }.flowOn(NamedDispatchers("upstream")).sampleBy(channel).map { expect(2) yield() throw TestException() From 9c941611a49660554a791020172c6378a7274eee Mon Sep 17 00:00:00 2001 From: Adib Faramarzi Date: Sun, 28 Apr 2019 21:52:11 +0430 Subject: [PATCH 04/12] Remove sampleBy with a ReceiveChannel since they can easily be converted to flows beforehand --- .../common/src/flow/operators/Delay.kt | 63 +-- .../test/flow/operators/SampleByTest.kt | 364 ------------------ 2 files changed, 1 insertion(+), 426 deletions(-) diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Delay.kt b/kotlinx-coroutines-core/common/src/flow/operators/Delay.kt index 03735a1401..28155b63df 100644 --- a/kotlinx-coroutines-core/common/src/flow/operators/Delay.kt +++ b/kotlinx-coroutines-core/common/src/flow/operators/Delay.kt @@ -185,65 +185,4 @@ public fun Flow.sampleBy(sampler: Flow): Flow { } } } -} - -/** - * Returns a flow that emits only the latest value emitted by the original flow only when the [sampler] channel emits. - * - * Example: - * ``` - * val receiveChannel = produce { - * repeat(10) { - * delay(100) - * send(it) - * } - * } - * launch { - * flow { - * repeat(10) { - * emit(it) - * delay(50) - * } - * }.sampleBy(receiveChannel) - * ``` - * produces `0, 2, 4, 6, 8`. - * - * Note that the latest element is not emitted if it does not fit into the sampling window. - */ -public fun Flow.sampleBy(sampler: ReceiveChannel): Flow { - return flow { - coroutineScope { - val values = produce(capacity = Channel.CONFLATED) { - // Actually Any, KT-30796 - collect { value -> - send(value ?: NullSurrogate) - } - } - var isDone = false - var lastValue: Any? = null - while (!isDone) { - select { - values.onReceiveOrNull { - if (it == null) { - sampler.cancel() - isDone = true - } else { - lastValue = it - } - } - - // todo: shall be start sampling only when an element arrives or sample aways as here? - sampler.onReceiveOrNull { - if(it != null){ - val value = lastValue ?: return@onReceiveOrNull - lastValue = null // Consume the value - emit(NullSurrogate.unbox(value)) - }else{ - isDone = true - } - } - } - } - } - } -} +} \ No newline at end of file diff --git a/kotlinx-coroutines-core/common/test/flow/operators/SampleByTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/SampleByTest.kt index 64488fd11a..20b3086fc8 100644 --- a/kotlinx-coroutines-core/common/test/flow/operators/SampleByTest.kt +++ b/kotlinx-coroutines-core/common/test/flow/operators/SampleByTest.kt @@ -42,41 +42,6 @@ class SampleByTest : TestBase() { assertEquals(listOf("B", "D"), result) finish(8) } - - @Test - public fun testBasicWithChannel() = withVirtualTime { - expect(1) - val flow = flow { - expect(3) - delay(200) - emit("A") - expect(4) - delay(600) - emit("B") - delay(200) - emit("C") - delay(200) - expect(6) - emit("D") - delay(1000) - emit("E") - expect(7) - } - - - val samplerChannel = produce { - delay(1000) - expect(5) - send("AA") - delay(1000) - send("BB") - } - expect(2) - val result = flow.sampleBy(samplerChannel).toList() - assertEquals(listOf("B", "D"), result) - finish(8) - } - @Test fun testDelayedFirstWithFlow() = withVirtualTime { val flow = flow { @@ -94,24 +59,6 @@ class SampleByTest : TestBase() { finish(4) } - @Test - fun testDelayedFirstWithChannel() = withVirtualTime { - val channel = Channel(Channel.CONFLATED) - launch { - delay(100) - channel.send(4) - expect(2) - } - val flow = flow { - delay(60) - emit(1) - expect(1) - delay(60) - expect(3) - }.sampleBy(channel) - assertEquals(1, flow.singleOrNull()) - finish(4) - } @Test fun testBasicFlow2() = withVirtualTime { @@ -145,38 +92,6 @@ class SampleByTest : TestBase() { finish(5) } - @Test - fun testBasicChannel2() = withVirtualTime { - expect(1) - val flow = flow { - expect(3) - emit(1) - emit(2) - delay(501) - emit(3) - delay(100) - emit(4) - delay(100) - emit(5) - emit(6) - delay(301) - emit(7) - delay(501) - expect(4) - } - val channel = produce { - delay(500) - repeat(10) { - send(1) - delay(500) - } - } - expect(2) - val result = flow.sampleBy(channel).toList() - assertEquals(listOf(2, 6, 7), result) - finish(5) - } - @Test fun testFixedDelayWithFlow() = withVirtualTime { val flow = flow { @@ -194,24 +109,6 @@ class SampleByTest : TestBase() { finish(4) } - @Test - fun testFixedDelayWithChannel() = withVirtualTime { - val channel = produce { - delay(100) - send(1) - expect(2) - } - val flow = flow { - emit("A") - expect(1) - delay(150) - emit("B") - expect(3) - }.sampleBy(channel) - assertEquals("A", flow.single()) - finish(4) - } - @Test fun testSingleNullWithFlow() = withVirtualTime { val flow = flow { @@ -226,22 +123,6 @@ class SampleByTest : TestBase() { finish(2) } - - @Test - fun testSingleNullWithChannel() = withVirtualTime { - val channel = produce { - delay(1) - send(1) - } - val flow = flow { - emit(null) - delay(2) - expect(1) - }.sampleBy(channel) - assertNull(flow.single()) - finish(2) - } - @Test fun testBasicWithNullsWithFlow() = withVirtualTime { expect(1) @@ -274,38 +155,6 @@ class SampleByTest : TestBase() { finish(5) } - @Test - fun testBasicWithNullsWithChannel() = withVirtualTime { - expect(1) - val flow = flow { - expect(3) - emit("A") - delay(1500) - emit(null) - delay(500) - emit("C") - delay(250) - emit(null) - delay(2000) - emit("E") - expect(4) - } - - expect(2) - val channel = produce { - repeat(20) { - delay(1000) - repeat(10) { - send(1) - delay(1000) - } - } - } - val result = flow.sampleBy(channel).toList() - assertEquals(listOf("A", null, null), result) - finish(5) - } - @Test fun testEmptyWithFlow() = runTest { val flow = emptyFlow().sampleBy(flow { @@ -315,17 +164,6 @@ class SampleByTest : TestBase() { assertNull(flow.singleOrNull()) } - @Test - fun testEmptyWithChannel() = runTest { - val channel = produce { - delay(Long.MAX_VALUE) - send(1) - } - - val flow = emptyFlow().sampleBy(channel) - assertNull(flow.singleOrNull()) - } - @Test fun testScalarWithFlow() = runTest { val flow = flowOf(1, 2, 3).sampleBy(flow { @@ -335,15 +173,6 @@ class SampleByTest : TestBase() { assertNull(flow.singleOrNull()) } - @Test - fun testScalarWithChannel() = runTest { - val flow = flowOf(1, 2, 3).sampleBy(produce { - delay(Long.MAX_VALUE) - send(1) - }) - assertNull(flow.singleOrNull()) - } - @Test // note that this test depends on the sampling strategy -- when sampling time starts on a quiescent flow that suddenly emits fun testLongWaitWithFlow() = withVirtualTime { @@ -368,31 +197,6 @@ class SampleByTest : TestBase() { finish(3) } - - @Test - // note that this test depends on the sampling strategy -- when sampling time starts on a quiescent flow that suddenly emits - fun testLongWaitWithChannel() = withVirtualTime { - expect(1) - val flow = flow { - expect(2) - emit("A") - delay(3500) // long delay -- multiple sampling intervals - emit("B") - delay(900) // crosses time = 4000 barrier - emit("C") - delay(3000) // long wait again - - } - val result = flow.sampleBy(produce { - repeat(10){ - delay(1000) - send(1) - } - }).toList() - assertEquals(listOf("A", "B", "C"), result) - finish(3) - } - @Test fun testPaceWithFlow() = withVirtualTime { val flow = flow { @@ -419,31 +223,6 @@ class SampleByTest : TestBase() { } - @Test - fun testPaceWithChannel() = withVirtualTime { - val flow = flow { - expect(1) - repeat(4) { - emit(-it) - delay(50) - } - - repeat(4) { - emit(it) - delay(100) - } - expect(2) - }.sampleBy( produce { - repeat(10) { - delay(100) - send(1) - } - }) - assertEquals(listOf(-1, -3, 0, 1, 2, 3), flow.toList()) - finish(3) - - } - @Test fun testUpstreamErrorWithFlow() = runTest { val latch = Channel() @@ -467,31 +246,6 @@ class SampleByTest : TestBase() { finish(4) } - @Test - fun testUpstreamErrorWithChannel() = runTest { - val channel = Channel(Channel.CONFLATED) - launch { - repeat(10) { - delay(1) - channel.send(1) - } - } - val latch = Channel() - val flow = flow { - expect(1) - emit(1) - expect(2) - latch.receive() - throw TestException() - }.sampleBy(channel).map { - latch.send(Unit) - hang { expect(3) } - } - - assertFailsWith(flow) - finish(4) - } - @Test fun testUpstreamErrorIsolatedContextWithFlow() = runTest { val latch = Channel() @@ -516,32 +270,6 @@ class SampleByTest : TestBase() { finish(4) } - @Test - fun testUpstreamErrorIsolatedContextWithChannel() = withVirtualTime { - val sampler = Channel(Channel.CONFLATED) - launch { - repeat(10) { - delay(1) - sampler.send(1) - } - } - val latch = Channel() - val flow = flow { - assertEquals("upstream", NamedDispatchers.name()) - expect(1) - emit(1) - expect(2) - latch.receive() - throw TestException() - }.flowOn(NamedDispatchers("upstream")).sampleBy(sampler).map { - latch.send(Unit) - hang { expect(3) } - } - - assertFailsWith(flow) - finish(4) - } - @Test fun testUpstreamErrorSampleNotTriggeredWithFlow() = runTest { val flow = flow { @@ -559,26 +287,6 @@ class SampleByTest : TestBase() { finish(3) } - @Test - fun testUpstreamErrorSampleNotTriggeredWithChannel() = withVirtualTime { - - val channel = Channel(Channel.CONFLATED) - launch { - delay(Long.MAX_VALUE) - channel.send(1) - } - val flow = flow { - expect(1) - expect(2) - throw TestException() - }.sampleBy(channel).map { - expectUnreached() - } - assertFailsWith(flow) - finish(3) - } - - @Test fun testUpstreamErrorSampleNotTriggeredInIsolatedContextWithFlow() = runTest { val flow = flow { @@ -599,29 +307,6 @@ class SampleByTest : TestBase() { finish(3) } - @Test - fun testUpstreamErrorSampleNotTriggeredInIsolatedContextWithChannel() = withVirtualTime { - val channel = Channel(Channel.CONFLATED) - launch { - delay(Long.MAX_VALUE) - channel.send(1) - } - val flow = flow { - expect(1) - emit(1) - expect(2) - throw TestException() - }.flowWith(NamedDispatchers("unused")) { - sampleBy(channel).map { - expectUnreached() - } - } - - - assertFailsWith(flow) - finish(3) - } - @Test fun testDownstreamErrorWithFlow() = runTest { val flow = flow { @@ -644,30 +329,6 @@ class SampleByTest : TestBase() { finish(4) } - @Test - fun testDownstreamErrorWithChannel() = runTest { - val channel = Channel(Channel.CONFLATED) - launch { - repeat(100) { - delay(1) - channel.send(1) - } - } - val flow = flow { - expect(1) - emit(1) - hang { expect(3) } - }.sampleBy(channel).map { - expect(2) - yield() - throw TestException() - it - } - - assertFailsWith(flow) - finish(4) - } - @Test fun testDownstreamErrorIsolatedContextWithFlow() = runTest { val flow = flow { @@ -690,29 +351,4 @@ class SampleByTest : TestBase() { assertFailsWith(flow) finish(4) } - - @Test - fun testDownstreamErrorIsolatedContextWithChannel() = withVirtualTime { - val channel = Channel(Channel.CONFLATED) - launch { - repeat(100) { - delay(1) - channel.send(1) - } - } - val flow = flow { - assertEquals("upstream", NamedDispatchers.name()) - expect(1) - emit(1) - hang { expect(3) } - }.flowOn(NamedDispatchers("upstream")).sampleBy(channel).map { - expect(2) - yield() - throw TestException() - it - } - - assertFailsWith(flow) - finish(4) - } } From 329985779cda121fb924b210fcf75ae880b24762 Mon Sep 17 00:00:00 2001 From: Adib Faramarzi Date: Mon, 29 Apr 2019 00:57:49 +0430 Subject: [PATCH 05/12] Remove sampleBy with a ReceiveChannel since they can easily be converted to flows beforehand --- .../reference-public-api/kotlinx-coroutines-core.txt | 1 - 1 file changed, 1 deletion(-) diff --git a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt index a1a5a6be81..533a187805 100644 --- a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt +++ b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt @@ -835,7 +835,6 @@ public final class kotlinx/coroutines/flow/FlowKt { public static final fun retry (Lkotlinx/coroutines/flow/Flow;ILkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/flow/Flow; public static synthetic fun retry$default (Lkotlinx/coroutines/flow/Flow;ILkotlin/jvm/functions/Function1;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow; public static final fun sample (Lkotlinx/coroutines/flow/Flow;J)Lkotlinx/coroutines/flow/Flow; - public static final fun sampleBy (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/channels/ReceiveChannel;)Lkotlinx/coroutines/flow/Flow; public static final fun sampleBy (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow; public static final fun single (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public static final fun singleOrNull (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; From 2aac5fc6cd52e209095d0d595e903eacd5d191fe Mon Sep 17 00:00:00 2001 From: Adib Faramarzi Date: Sat, 8 Jun 2019 19:07:03 +0430 Subject: [PATCH 06/12] Update sample usage to use `flowScope` - remove old sample implementation - remove old sample unit tests and add them as the new ones --- .../common/src/flow/operators/Delay.kt | 92 ++---- .../test/flow/operators/SampleByTest.kt | 81 ++--- .../common/test/flow/operators/SampleTest.kt | 276 ------------------ 3 files changed, 71 insertions(+), 378 deletions(-) delete mode 100644 kotlinx-coroutines-core/common/test/flow/operators/SampleTest.kt diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Delay.kt b/kotlinx-coroutines-core/common/src/flow/operators/Delay.kt index 99ad9695c9..6be5fe6983 100644 --- a/kotlinx-coroutines-core/common/src/flow/operators/Delay.kt +++ b/kotlinx-coroutines-core/common/src/flow/operators/Delay.kt @@ -107,51 +107,19 @@ public fun Flow.debounce(timeoutMillis: Long): Flow { * }.sample(100) * ``` * produces `1, 3, 5, 7, 9`. - * + * * Note that the latest element is not emitted if it does not fit into the sampling window. */ @FlowPreview public fun Flow.sample(periodMillis: Long): Flow { - require(periodMillis > 0) { "Sample period should be positive" } - return scopedFlow { downstream -> - val values = produce(capacity = Channel.CONFLATED) { - // Actually Any, KT-30796 - collect { value -> send(value ?: NULL) } - } - - var isDone = false - var lastValue: Any? = null - val ticker = fixedPeriodTicker(periodMillis) - while (!isDone) { - select { - values.onReceiveOrNull { - if (it == null) { - ticker.cancel(ChildCancelledException()) - isDone = true - } else { - lastValue = it - } - } - - // todo: shall be start sampling only when an element arrives or sample aways as here? - ticker.onReceive { - val value = lastValue ?: return@onReceive - lastValue = null // Consume the value - downstream.emit(NULL.unbox(value)) - } - } - } - } -} -/* - return sampleBy(flow { + return sample(flow { delay(periodMillis) while (true) { emit(Unit) delay(periodMillis) } }) - */ +} /** * Returns a flow that emits only the latest value emitted by the original flow only when the [sampler] emits. @@ -176,38 +144,38 @@ public fun Flow.sample(periodMillis: Long): Flow { * Note that the latest element is not emitted if it does not fit into the sampling window. */ public fun Flow.sample(sampler: Flow): Flow { - return flow { - coroutineScope { - val values = produce(capacity = Channel.CONFLATED) { - // Actually Any, KT-30796 - collect { value -> send(value ?: NullSurrogate) } - } + return scopedFlow { downstream -> + val values = produce(capacity = Channel.CONFLATED) { + // Actually Any, KT-30796 + collect { value -> send(value ?: NULL) } + } - val otherChannel = Channel(Channel.CONFLATED) - val samplerJob = launch { - sampler.collect { - otherChannel.send(it) - } + val otherChannel = produce(capacity = 0) { + sampler.collect { + value -> send(value) } - var isDone = false - var lastValue: Any? = null - while (!isDone) { - select { - values.onReceiveOrNull { - if (it == null) { - otherChannel.cancel() - samplerJob.cancel() - isDone = true - } else { - lastValue = it - } + } + var isDone = false + var lastValue: Any? = null + while (!isDone) { + select { + values.onReceiveOrNull { + if (it == null) { + isDone = true + otherChannel.cancel(ChildCancelledException()) + } else { + lastValue = it } + } - // todo: shall be start sampling only when an element arrives or sample aways as here? - otherChannel.onReceive { - val value = lastValue ?: return@onReceive + // todo: shall be start sampling only when an element arrives or sample aways as here? + otherChannel.onReceiveOrNull { + if(it != null) { + val value = lastValue ?: return@onReceiveOrNull lastValue = null // Consume the value - emit(NullSurrogate.unbox(value)) + downstream.emit(NULL.unbox(value)) + }else{ + isDone = true } } } diff --git a/kotlinx-coroutines-core/common/test/flow/operators/SampleByTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/SampleByTest.kt index 20b3086fc8..78c8acf9d9 100644 --- a/kotlinx-coroutines-core/common/test/flow/operators/SampleByTest.kt +++ b/kotlinx-coroutines-core/common/test/flow/operators/SampleByTest.kt @@ -5,13 +5,15 @@ package kotlinx.coroutines.flow.operators import kotlinx.coroutines.* -import kotlinx.coroutines.channels.* +import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.flow.* -import kotlin.test.* +import kotlin.test.Test +import kotlin.test.assertEquals +import kotlin.test.assertNull -class SampleByTest : TestBase() { +class SampleTest : TestBase() { @Test - public fun testBasicWithFlow() = withVirtualTime { + public fun testBasic() = withVirtualTime { expect(1) val flow = flow { expect(3) @@ -38,19 +40,20 @@ class SampleByTest : TestBase() { emit("BB") } expect(2) - val result = flow.sampleBy(samplerFlow).toList() + val result = flow.sample(samplerFlow).toList() assertEquals(listOf("B", "D"), result) finish(8) } + @Test - fun testDelayedFirstWithFlow() = withVirtualTime { + fun testDelayedFirst() = withVirtualTime { val flow = flow { delay(60) emit(1) expect(1) delay(60) expect(3) - }.sampleBy(flow { + }.sample(flow { delay(100) emit(4) expect(2) @@ -87,20 +90,20 @@ class SampleByTest : TestBase() { } } expect(2) - val result = flow.sampleBy(samplerFlow).toList() + val result = flow.sample(samplerFlow).toList() assertEquals(listOf(2, 6, 7), result) finish(5) } @Test - fun testFixedDelayWithFlow() = withVirtualTime { + fun testFixedDelay() = withVirtualTime { val flow = flow { emit("A") expect(1) delay(150) emit("B") expect(3) - }.sampleBy(flow { + }.sample(flow { delay(100) emit("A") expect(2) @@ -110,12 +113,12 @@ class SampleByTest : TestBase() { } @Test - fun testSingleNullWithFlow() = withVirtualTime { + fun testSingleNull() = withVirtualTime { val flow = flow { emit(null) delay(2) expect(1) - }.sampleBy(flow { + }.sample(flow { delay(1) emit(1) }) @@ -124,7 +127,7 @@ class SampleByTest : TestBase() { } @Test - fun testBasicWithNullsWithFlow() = withVirtualTime { + fun testBasicWithNulls() = withVirtualTime { expect(1) val flow = flow { expect(3) @@ -142,22 +145,20 @@ class SampleByTest : TestBase() { expect(2) val sampler = flow { - repeat(20) { + delay(1000) + repeat(10) { + emit(1) delay(1000) - repeat(10) { - emit(1) - delay(1000) - } } } - val result = flow.sampleBy(sampler).toList() + val result = flow.sample(sampler).toList() assertEquals(listOf("A", null, null), result) finish(5) } @Test - fun testEmptyWithFlow() = runTest { - val flow = emptyFlow().sampleBy(flow { + fun testEmpty() = runTest { + val flow = emptyFlow().sample(flow { delay(Long.MAX_VALUE) emit(1) }) @@ -165,8 +166,8 @@ class SampleByTest : TestBase() { } @Test - fun testScalarWithFlow() = runTest { - val flow = flowOf(1, 2, 3).sampleBy(flow { + fun testScalar() = runTest { + val flow = flowOf(1, 2, 3).sample(flow { delay(Long.MAX_VALUE) emit(1) }) @@ -175,7 +176,7 @@ class SampleByTest : TestBase() { @Test // note that this test depends on the sampling strategy -- when sampling time starts on a quiescent flow that suddenly emits - fun testLongWaitWithFlow() = withVirtualTime { + fun testLongWait() = withVirtualTime { expect(1) val flow = flow { expect(2) @@ -187,8 +188,8 @@ class SampleByTest : TestBase() { delay(3000) // long wait again } - val result = flow.sampleBy(flow { - repeat(10){ + val result = flow.sample(flow { + repeat(10) { delay(1000) emit(1) } @@ -198,7 +199,7 @@ class SampleByTest : TestBase() { } @Test - fun testPaceWithFlow() = withVirtualTime { + fun testPace() = withVirtualTime { val flow = flow { expect(1) repeat(4) { @@ -211,7 +212,7 @@ class SampleByTest : TestBase() { delay(100) } expect(2) - }.sampleBy( flow { + }.sample(flow { repeat(10) { delay(100) emit(1) @@ -224,7 +225,7 @@ class SampleByTest : TestBase() { } @Test - fun testUpstreamErrorWithFlow() = runTest { + fun testUpstreamError() = runTest { val latch = Channel() val flow = flow { expect(1) @@ -232,7 +233,7 @@ class SampleByTest : TestBase() { expect(2) latch.receive() throw TestException() - }.sampleBy(flow { + }.sample(flow { repeat(10) { delay(1) emit(1) @@ -247,7 +248,7 @@ class SampleByTest : TestBase() { } @Test - fun testUpstreamErrorIsolatedContextWithFlow() = runTest { + fun testUpstreamErrorIsolatedContext() = runTest { val latch = Channel() val flow = flow { assertEquals("upstream", NamedDispatchers.name()) @@ -256,7 +257,7 @@ class SampleByTest : TestBase() { expect(2) latch.receive() throw TestException() - }.flowOn(NamedDispatchers("upstream")).sampleBy(flow { + }.flowOn(NamedDispatchers("upstream")).sample(flow { repeat(10) { delay(1) emit(1) @@ -271,13 +272,13 @@ class SampleByTest : TestBase() { } @Test - fun testUpstreamErrorSampleNotTriggeredWithFlow() = runTest { + fun testUpstreamErrorSampleNotTriggered() = runTest { val flow = flow { expect(1) emit(1) expect(2) throw TestException() - }.sampleBy(flow{ + }.sample(flow { delay(Long.MAX_VALUE) emit(1) }).map { @@ -288,14 +289,14 @@ class SampleByTest : TestBase() { } @Test - fun testUpstreamErrorSampleNotTriggeredInIsolatedContextWithFlow() = runTest { + fun testUpstreamErrorSampleNotTriggeredInIsolatedContext() = runTest { val flow = flow { expect(1) emit(1) expect(2) throw TestException() }.flowWith(NamedDispatchers("unused")) { - sampleBy(flow{ + sample(flow { delay(Long.MAX_VALUE) emit(1) }).map { @@ -308,12 +309,12 @@ class SampleByTest : TestBase() { } @Test - fun testDownstreamErrorWithFlow() = runTest { + fun testDownstreamError() = runTest { val flow = flow { expect(1) emit(1) hang { expect(3) } - }.sampleBy(flow { + }.sample(flow { repeat(100) { delay(1) emit(1) @@ -330,13 +331,13 @@ class SampleByTest : TestBase() { } @Test - fun testDownstreamErrorIsolatedContextWithFlow() = runTest { + fun testDownstreamErrorIsolatedContext() = runTest { val flow = flow { assertEquals("upstream", NamedDispatchers.name()) expect(1) emit(1) hang { expect(3) } - }.flowOn(NamedDispatchers("upstream")).sampleBy(flow { + }.flowOn(NamedDispatchers("upstream")).sample(flow { repeat(100) { delay(1) emit(1) diff --git a/kotlinx-coroutines-core/common/test/flow/operators/SampleTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/SampleTest.kt deleted file mode 100644 index 9c96352df2..0000000000 --- a/kotlinx-coroutines-core/common/test/flow/operators/SampleTest.kt +++ /dev/null @@ -1,276 +0,0 @@ -/* - * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. - */ - -package kotlinx.coroutines.flow.operators - -import kotlinx.coroutines.* -import kotlinx.coroutines.channels.* -import kotlinx.coroutines.flow.* -import kotlin.test.* - -class SampleTest : TestBase() { - @Test - public fun testBasic() = withVirtualTime { - expect(1) - val flow = flow { - expect(3) - emit("A") - delay(1500) - emit("B") - delay(500) - emit("C") - delay(250) - emit("D") - delay(2000) - emit("E") - expect(4) - } - - expect(2) - val result = flow.sample(1000).toList() - assertEquals(listOf("A", "B", "D"), result) - finish(5) - } - - @Test - fun testDelayedFirst() = withVirtualTime { - val flow = flow { - delay(60) - emit(1) - delay(60) - expect(1) - }.sample(100) - assertEquals(1, flow.singleOrNull()) - finish(2) - } - - @Test - fun testBasic2() = withVirtualTime { - expect(1) - val flow = flow { - expect(3) - emit(1) - emit(2) - delay(501) - emit(3) - delay(100) - emit(4) - delay(100) - emit(5) - emit(6) - delay(301) - emit(7) - delay(501) - expect(4) - } - - expect(2) - val result = flow.sample(500).toList() - assertEquals(listOf(2, 6, 7), result) - finish(5) - } - - @Test - fun testFixedDelay() = withVirtualTime { - val flow = flow { - emit("A") - delay(150) - emit("B") - expect(1) - }.sample(100) - assertEquals("A", flow.single()) - finish(2) - } - - @Test - fun testSingleNull() = withVirtualTime { - val flow = flow { - emit(null) - delay(2) - expect(1) - }.sample(1) - assertNull(flow.single()) - finish(2) - } - - @Test - fun testBasicWithNulls() = withVirtualTime { - expect(1) - val flow = flow { - expect(3) - emit("A") - delay(1500) - emit(null) - delay(500) - emit("C") - delay(250) - emit(null) - delay(2000) - emit("E") - expect(4) - } - - expect(2) - val result = flow.sample(1000).toList() - assertEquals(listOf("A", null, null), result) - finish(5) - } - - @Test - fun testEmpty() = runTest { - val flow = emptyFlow().sample(Long.MAX_VALUE) - assertNull(flow.singleOrNull()) - } - - @Test - fun testScalar() = runTest { - val flow = flowOf(1, 2, 3).sample(Long.MAX_VALUE) - assertNull(flow.singleOrNull()) - } - - @Test - // note that this test depends on the sampling strategy -- when sampling time starts on a quiescent flow that suddenly emits - fun testLongWait() = withVirtualTime { - expect(1) - val flow = flow { - expect(2) - emit("A") - delay(3500) // long delay -- multiple sampling intervals - emit("B") - delay(900) // crosses time = 4000 barrier - emit("C") - delay(3000) // long wait again - - } - val result = flow.sample(1000).toList() - assertEquals(listOf("A", "B", "C"), result) - finish(3) - } - - @Test - fun testPace() = withVirtualTime { - val flow = flow { - expect(1) - repeat(4) { - emit(-it) - delay(50) - } - - repeat(4) { - emit(it) - delay(100) - } - expect(2) - }.sample(100) - - assertEquals(listOf(-1, -3, 0, 1, 2, 3), flow.toList()) - finish(3) - } - - @Test - fun testUpstreamError() = testUpstreamError(TestException()) - - @Test - fun testUpstreamErrorCancellationException() = testUpstreamError(CancellationException("")) - - private inline fun testUpstreamError(cause: T) = runTest { - val latch = Channel() - val flow = flow { - expect(1) - emit(1) - expect(2) - latch.receive() - throw cause - }.sample(1).map { - latch.send(Unit) - hang { expect(3) } - } - - assertFailsWith(flow) - finish(4) - } - - @Test - fun testUpstreamErrorIsolatedContext() = runTest { - val latch = Channel() - val flow = flow { - assertEquals("upstream", NamedDispatchers.name()) - expect(1) - emit(1) - expect(2) - latch.receive() - throw TestException() - }.flowOn(NamedDispatchers("upstream")).sample(1).map { - latch.send(Unit) - hang { expect(3) } - } - - assertFailsWith(flow) - finish(4) - } - - @Test - fun testUpstreamErrorSampleNotTriggered() = runTest { - val flow = flow { - expect(1) - emit(1) - expect(2) - throw TestException() - }.sample(Long.MAX_VALUE).map { - expectUnreached() - } - assertFailsWith(flow) - finish(3) - } - - @Test - fun testUpstreamErrorSampleNotTriggeredInIsolatedContext() = runTest { - val flow = flow { - expect(1) - emit(1) - expect(2) - throw TestException() - }.flowOn(NamedDispatchers("unused")).sample(Long.MAX_VALUE).map { - expectUnreached() - } - - assertFailsWith(flow) - finish(3) - } - - @Test - fun testDownstreamError() = runTest { - val flow = flow { - expect(1) - emit(1) - hang { expect(3) } - }.sample(100).map { - expect(2) - yield() - throw TestException() - it - } - - assertFailsWith(flow) - finish(4) - } - - @Test - fun testDownstreamErrorIsolatedContext() = runTest { - val flow = flow { - assertEquals("upstream", NamedDispatchers.name()) - expect(1) - emit(1) - hang { expect(3) } - }.flowOn(NamedDispatchers("upstream")).sample(100).map { - expect(2) - yield() - throw TestException() - it - } - - assertFailsWith(flow) - finish(4) - } -} From d2ecd7db4c506e973046fae7a7c8147d280906a4 Mon Sep 17 00:00:00 2001 From: Adib Faramarzi Date: Sat, 8 Jun 2019 21:29:53 +0430 Subject: [PATCH 07/12] Update sample usage to use `flowScope` - remove old sample implementation - remove old sample unit tests and add them as the new ones --- .../common/test/flow/operators/{SampleByTest.kt => SampleTest.kt} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename kotlinx-coroutines-core/common/test/flow/operators/{SampleByTest.kt => SampleTest.kt} (100%) diff --git a/kotlinx-coroutines-core/common/test/flow/operators/SampleByTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/SampleTest.kt similarity index 100% rename from kotlinx-coroutines-core/common/test/flow/operators/SampleByTest.kt rename to kotlinx-coroutines-core/common/test/flow/operators/SampleTest.kt From c21baec23c851c5e26fe46292ec1cdaeecfffde8 Mon Sep 17 00:00:00 2001 From: Adib Faramarzi Date: Sun, 12 Jul 2020 09:49:07 +0430 Subject: [PATCH 08/12] Update unit tests --- .../common/src/flow/operators/Delay.kt | 7 ++----- .../common/test/flow/operators/SampleTest.kt | 10 +++++----- 2 files changed, 7 insertions(+), 10 deletions(-) diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Delay.kt b/kotlinx-coroutines-core/common/src/flow/operators/Delay.kt index e82a05c481..cc4d711d46 100644 --- a/kotlinx-coroutines-core/common/src/flow/operators/Delay.kt +++ b/kotlinx-coroutines-core/common/src/flow/operators/Delay.kt @@ -160,27 +160,24 @@ public fun Flow.sample(sampler: Flow): Flow { value -> send(value) } } - var isDone = false var lastValue: Any? = null while (lastValue !== DONE) { select { values.onReceiveOrNull { if (it == null) { - isDone = true + lastValue = DONE otherChannel.cancel(ChildCancelledException()) } else { lastValue = it } } - - // todo: shall be start sampling only when an element arrives or sample aways as here? otherChannel.onReceiveOrNull { if(it != null) { val value = lastValue ?: return@onReceiveOrNull lastValue = null // Consume the value downstream.emit(NULL.unbox(value)) }else{ - isDone = true + lastValue = DONE } } } diff --git a/kotlinx-coroutines-core/common/test/flow/operators/SampleTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/SampleTest.kt index 4203f5e5f1..74b291a9bd 100644 --- a/kotlinx-coroutines-core/common/test/flow/operators/SampleTest.kt +++ b/kotlinx-coroutines-core/common/test/flow/operators/SampleTest.kt @@ -10,10 +10,11 @@ import kotlinx.coroutines.flow.* import kotlin.test.Test import kotlin.test.assertEquals import kotlin.test.assertNull +import kotlin.time.* class SampleTest : TestBase() { @Test - public fun testBasic() = withVirtualTime { + fun testBasic() = withVirtualTime { expect(1) val flow = flow { expect(3) @@ -295,14 +296,13 @@ class SampleTest : TestBase() { emit(1) expect(2) throw TestException() - }.flowWith(NamedDispatchers("unused")) { - sample(flow { + }.flowOn(NamedDispatchers("unused")).sample(flow { delay(Long.MAX_VALUE) emit(1) }).map { expectUnreached() } - } + assertFailsWith(flow) finish(3) @@ -355,7 +355,7 @@ class SampleTest : TestBase() { @ExperimentalTime @Test - public fun testDurationBasic() = withVirtualTime { + fun testDurationBasic() = withVirtualTime { expect(1) val flow = flow { expect(3) From a45f5fdd4c1f711f052a4ae1e30acddfaea9ea93 Mon Sep 17 00:00:00 2001 From: Adib Faramarzi Date: Tue, 11 Jul 2023 18:35:32 +0330 Subject: [PATCH 09/12] Update logic with master branch --- .../common/src/flow/operators/Delay.kt | 38 ++++++++++++------- 1 file changed, 24 insertions(+), 14 deletions(-) diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Delay.kt b/kotlinx-coroutines-core/common/src/flow/operators/Delay.kt index 7ce23bb874..4a241c019e 100644 --- a/kotlinx-coroutines-core/common/src/flow/operators/Delay.kt +++ b/kotlinx-coroutines-core/common/src/flow/operators/Delay.kt @@ -203,7 +203,7 @@ public fun Flow.debounce(timeout: (T) -> Duration): Flow = timeout(emittedItem).toDelayMillis() } -private fun Flow.debounceInternal(timeoutMillisSelector: (T) -> Long) : Flow = +private fun Flow.debounceInternal(timeoutMillisSelector: (T) -> Long): Flow = scopedFlow { downstream -> // Produce the values using the default (rendezvous) channel val values = produce { @@ -310,9 +310,9 @@ public fun Flow.sample(sampler: Flow): Flow { collect { value -> send(value ?: NULL) } } - val otherChannel = produce(capacity = 0) { - sampler.collect { - value -> send(value) + val samplerProducer = produce(capacity = 0) { + sampler.collect { value -> + send(value) } } var lastValue: Any? = null @@ -323,19 +323,26 @@ public fun Flow.sample(sampler: Flow): Flow { .onSuccess { lastValue = it } .onFailure { it?.let { throw it } - ticker.cancel(ChildCancelledException()) + samplerProducer.cancel(ChildCancelledException()) lastValue = DONE } } - otherChannel.onReceiveOrNull { - if(it != null) { - val value = lastValue ?: return@onReceiveOrNull - lastValue = null // Consume the value - downstream.emit(NULL.unbox(value)) - }else{ - lastValue = DONE - } + samplerProducer.onReceiveCatching { samplerResult -> + samplerResult + .onSuccess { sampledValue -> + if (sampledValue != null) { + val value = lastValue ?: return@onSuccess + lastValue = null // Consume the value + downstream.emit(NULL.unbox(value)) + } else { + lastValue = DONE + } + } + .onFailure { + lastValue = DONE + } + } } } @@ -345,7 +352,10 @@ public fun Flow.sample(sampler: Flow): Flow { /* * TODO this design (and design of the corresponding operator) depends on #540 */ -internal fun CoroutineScope.fixedPeriodTicker(delayMillis: Long, initialDelayMillis: Long = delayMillis): ReceiveChannel { +internal fun CoroutineScope.fixedPeriodTicker( + delayMillis: Long, + initialDelayMillis: Long = delayMillis +): ReceiveChannel { require(delayMillis >= 0) { "Expected non-negative delay, but has $delayMillis ms" } require(initialDelayMillis >= 0) { "Expected non-negative initial delay, but has $initialDelayMillis ms" } return produce(capacity = 0) { From 7b2bfee553267f752e9c01de4114abc60940bfa3 Mon Sep 17 00:00:00 2001 From: Adib Faramarzi Date: Tue, 11 Jul 2023 18:35:32 +0330 Subject: [PATCH 10/12] Update logic with master branch --- .../common/src/flow/operators/Delay.kt | 38 ++++++++++++------- .../common/test/flow/operators/SampleTest.kt | 2 +- 2 files changed, 25 insertions(+), 15 deletions(-) diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Delay.kt b/kotlinx-coroutines-core/common/src/flow/operators/Delay.kt index 7ce23bb874..4a241c019e 100644 --- a/kotlinx-coroutines-core/common/src/flow/operators/Delay.kt +++ b/kotlinx-coroutines-core/common/src/flow/operators/Delay.kt @@ -203,7 +203,7 @@ public fun Flow.debounce(timeout: (T) -> Duration): Flow = timeout(emittedItem).toDelayMillis() } -private fun Flow.debounceInternal(timeoutMillisSelector: (T) -> Long) : Flow = +private fun Flow.debounceInternal(timeoutMillisSelector: (T) -> Long): Flow = scopedFlow { downstream -> // Produce the values using the default (rendezvous) channel val values = produce { @@ -310,9 +310,9 @@ public fun Flow.sample(sampler: Flow): Flow { collect { value -> send(value ?: NULL) } } - val otherChannel = produce(capacity = 0) { - sampler.collect { - value -> send(value) + val samplerProducer = produce(capacity = 0) { + sampler.collect { value -> + send(value) } } var lastValue: Any? = null @@ -323,19 +323,26 @@ public fun Flow.sample(sampler: Flow): Flow { .onSuccess { lastValue = it } .onFailure { it?.let { throw it } - ticker.cancel(ChildCancelledException()) + samplerProducer.cancel(ChildCancelledException()) lastValue = DONE } } - otherChannel.onReceiveOrNull { - if(it != null) { - val value = lastValue ?: return@onReceiveOrNull - lastValue = null // Consume the value - downstream.emit(NULL.unbox(value)) - }else{ - lastValue = DONE - } + samplerProducer.onReceiveCatching { samplerResult -> + samplerResult + .onSuccess { sampledValue -> + if (sampledValue != null) { + val value = lastValue ?: return@onSuccess + lastValue = null // Consume the value + downstream.emit(NULL.unbox(value)) + } else { + lastValue = DONE + } + } + .onFailure { + lastValue = DONE + } + } } } @@ -345,7 +352,10 @@ public fun Flow.sample(sampler: Flow): Flow { /* * TODO this design (and design of the corresponding operator) depends on #540 */ -internal fun CoroutineScope.fixedPeriodTicker(delayMillis: Long, initialDelayMillis: Long = delayMillis): ReceiveChannel { +internal fun CoroutineScope.fixedPeriodTicker( + delayMillis: Long, + initialDelayMillis: Long = delayMillis +): ReceiveChannel { require(delayMillis >= 0) { "Expected non-negative delay, but has $delayMillis ms" } require(initialDelayMillis >= 0) { "Expected non-negative initial delay, but has $initialDelayMillis ms" } return produce(capacity = 0) { diff --git a/kotlinx-coroutines-core/common/test/flow/operators/SampleTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/SampleTest.kt index 3c9787cecc..1381ad37e0 100644 --- a/kotlinx-coroutines-core/common/test/flow/operators/SampleTest.kt +++ b/kotlinx-coroutines-core/common/test/flow/operators/SampleTest.kt @@ -10,7 +10,6 @@ import kotlinx.coroutines.flow.* import kotlin.test.Test import kotlin.test.assertEquals import kotlin.test.assertNull -import kotlin.time.* import kotlin.time.Duration.Companion.milliseconds class SampleTest : TestBase() { @@ -196,6 +195,7 @@ class SampleTest : TestBase() { emit(1) } }).toList() + assertEquals(listOf("A", "B", "C"), result) finish(3) } From 5d23c0d63a48ddc59665bca81d0f24d4da04d21b Mon Sep 17 00:00:00 2001 From: Adib Faramarzi Date: Tue, 11 Jul 2023 18:53:55 +0330 Subject: [PATCH 11/12] Update logic with master branch --- .../common/test/flow/operators/SampleTest.kt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/kotlinx-coroutines-core/common/test/flow/operators/SampleTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/SampleTest.kt index 1381ad37e0..6ef62416de 100644 --- a/kotlinx-coroutines-core/common/test/flow/operators/SampleTest.kt +++ b/kotlinx-coroutines-core/common/test/flow/operators/SampleTest.kt @@ -36,9 +36,9 @@ class SampleTest : TestBase() { val samplerFlow = flow { delay(1000) expect(5) - emit("AA") + emit("A1") delay(1000) - emit("BB") + emit("B1") } expect(2) val result = flow.sample(samplerFlow).toList() From 4a0107ac67b3f846cbea6aca8bf5993da4208e10 Mon Sep 17 00:00:00 2001 From: Adib Date: Tue, 11 Jul 2023 19:15:31 +0330 Subject: [PATCH 12/12] Update logic with master --- kotlinx-coroutines-core/common/test/flow/operators/SampleTest.kt | 1 + 1 file changed, 1 insertion(+) diff --git a/kotlinx-coroutines-core/common/test/flow/operators/SampleTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/SampleTest.kt index 6ef62416de..bcdba46ee8 100644 --- a/kotlinx-coroutines-core/common/test/flow/operators/SampleTest.kt +++ b/kotlinx-coroutines-core/common/test/flow/operators/SampleTest.kt @@ -41,6 +41,7 @@ class SampleTest : TestBase() { emit("B1") } expect(2) + val result = flow.sample(samplerFlow).toList() assertEquals(listOf("B", "D"), result) finish(8)