From 2d231f6c907f5856d7d82caac100a83ef1398a2b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?I=CC=87brahim=20Yilmaz?= Date: Mon, 6 Apr 2020 21:27:16 +0200 Subject: [PATCH 1/5] Flow based Ticker implemented. --- .../jvm/src/flow/TickerFlows.kt | 33 +++++++ .../jvm/test/flow/TickerFlowsTest.kt | 92 +++++++++++++++++++ 2 files changed, 125 insertions(+) create mode 100644 kotlinx-coroutines-core/jvm/src/flow/TickerFlows.kt create mode 100644 kotlinx-coroutines-core/jvm/test/flow/TickerFlowsTest.kt diff --git a/kotlinx-coroutines-core/jvm/src/flow/TickerFlows.kt b/kotlinx-coroutines-core/jvm/src/flow/TickerFlows.kt new file mode 100644 index 0000000000..d66c698804 --- /dev/null +++ b/kotlinx-coroutines-core/jvm/src/flow/TickerFlows.kt @@ -0,0 +1,33 @@ +package kotlinx.coroutines.flow + +import kotlinx.coroutines.Job +import kotlinx.coroutines.channels.awaitClose +import java.util.* +import kotlin.concurrent.schedule + +/** + * Creates a flow that produces the first item after the given initial delay and subsequent items with the + * given delay between them. + * + * The resulting flow is a callback flow, which basically listens @see [Timer.schedule] + * + * This Flow stops producing elements immediately after [Job.cancel] invocation. + * + * @param period period between each element in milliseconds. + * @param initialDelay delay after which the first element will be produced (it is equal to [period] by default) in milliseconds. + */ +public fun ticker( + period: Long, + initialDelay: Long = period +): Flow = callbackFlow { + require(period > 0) + require(initialDelay > -1) + + val timer = Timer() + timer.schedule(initialDelay, period) { + offer(Unit) + } + + awaitClose { timer.cancel() } +} + diff --git a/kotlinx-coroutines-core/jvm/test/flow/TickerFlowsTest.kt b/kotlinx-coroutines-core/jvm/test/flow/TickerFlowsTest.kt new file mode 100644 index 0000000000..0377bc6b3f --- /dev/null +++ b/kotlinx-coroutines-core/jvm/test/flow/TickerFlowsTest.kt @@ -0,0 +1,92 @@ +package flow + +import kotlinx.coroutines.TestBase +import kotlinx.coroutines.cancelAndJoin +import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.launchIn +import kotlinx.coroutines.flow.onEach +import kotlinx.coroutines.flow.ticker +import java.util.concurrent.CancellationException +import kotlin.test.Test +import kotlin.test.assertEquals + + +class TickerFlowsTest : TestBase() { + + @Test(expected = IllegalArgumentException::class) + fun testNegativePeriod() = runTest { + // WHEN + ticker(-1).launchIn(this) + } + + @Test(expected = IllegalArgumentException::class) + fun testZeroNegativePeriod() = runTest { + // WHEN + ticker(0).launchIn(this) + } + + @Test(expected = IllegalArgumentException::class) + fun testNegativeInitialDelay() = runTest { + // WHEN + ticker(100, -1).launchIn(this) + } + + @Test + fun testInitialDelay() = runTest { + // GIVEN + val inbox = mutableListOf() + + // WHEN + val periodicTicker = + ticker(100, 100).onEach { + inbox.add(Unit) + }.launchIn(this) + + delay(500) + + // THEN + assertEquals(4, inbox.size) + + periodicTicker.cancelAndJoin() + } + + + @Test + fun testReceive() = runTest { + // GIVEN + val inbox = mutableListOf() + + // WHEN + val periodicTicker = + ticker(100).onEach { + inbox.add(Unit) + }.launchIn(this) + + delay(500) + + // THEN + assertEquals(4, inbox.size) + + periodicTicker.cancelAndJoin() + } + + @Test + fun testDoNotReceiveAfterCancel() = runTest { + // GIVEN + val inbox = mutableListOf() + + // WHEN + val periodicTicker = + ticker(100).onEach { + inbox.add(Unit) + }.launchIn(this) + + delay(50) + + // THEN + assertEquals(0, inbox.size) + periodicTicker.cancel(CancellationException()) + } + + +} \ No newline at end of file From 70410db90c3ed5758d2ddc9cd77fc62f9dcc3b42 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?I=CC=87brahim=20Yilmaz?= Date: Mon, 6 Apr 2020 21:34:10 +0200 Subject: [PATCH 2/5] ticker renamed to tickerFlow to be more specific. --- .../src/flow/{TickerFlows.kt => TickerFlow.kt} | 2 +- .../{TickerFlowsTest.kt => TickerFlowTest.kt} | 16 ++++++++-------- 2 files changed, 9 insertions(+), 9 deletions(-) rename kotlinx-coroutines-core/jvm/src/flow/{TickerFlows.kt => TickerFlow.kt} (97%) rename kotlinx-coroutines-core/jvm/test/flow/{TickerFlowsTest.kt => TickerFlowTest.kt} (84%) diff --git a/kotlinx-coroutines-core/jvm/src/flow/TickerFlows.kt b/kotlinx-coroutines-core/jvm/src/flow/TickerFlow.kt similarity index 97% rename from kotlinx-coroutines-core/jvm/src/flow/TickerFlows.kt rename to kotlinx-coroutines-core/jvm/src/flow/TickerFlow.kt index d66c698804..f843ead2dc 100644 --- a/kotlinx-coroutines-core/jvm/src/flow/TickerFlows.kt +++ b/kotlinx-coroutines-core/jvm/src/flow/TickerFlow.kt @@ -16,7 +16,7 @@ import kotlin.concurrent.schedule * @param period period between each element in milliseconds. * @param initialDelay delay after which the first element will be produced (it is equal to [period] by default) in milliseconds. */ -public fun ticker( +public fun tickerFlow( period: Long, initialDelay: Long = period ): Flow = callbackFlow { diff --git a/kotlinx-coroutines-core/jvm/test/flow/TickerFlowsTest.kt b/kotlinx-coroutines-core/jvm/test/flow/TickerFlowTest.kt similarity index 84% rename from kotlinx-coroutines-core/jvm/test/flow/TickerFlowsTest.kt rename to kotlinx-coroutines-core/jvm/test/flow/TickerFlowTest.kt index 0377bc6b3f..56bd712374 100644 --- a/kotlinx-coroutines-core/jvm/test/flow/TickerFlowsTest.kt +++ b/kotlinx-coroutines-core/jvm/test/flow/TickerFlowTest.kt @@ -5,30 +5,30 @@ import kotlinx.coroutines.cancelAndJoin import kotlinx.coroutines.delay import kotlinx.coroutines.flow.launchIn import kotlinx.coroutines.flow.onEach -import kotlinx.coroutines.flow.ticker +import kotlinx.coroutines.flow.tickerFlow import java.util.concurrent.CancellationException import kotlin.test.Test import kotlin.test.assertEquals -class TickerFlowsTest : TestBase() { +class TickerFlowTest : TestBase() { @Test(expected = IllegalArgumentException::class) fun testNegativePeriod() = runTest { // WHEN - ticker(-1).launchIn(this) + tickerFlow(-1).launchIn(this) } @Test(expected = IllegalArgumentException::class) fun testZeroNegativePeriod() = runTest { // WHEN - ticker(0).launchIn(this) + tickerFlow(0).launchIn(this) } @Test(expected = IllegalArgumentException::class) fun testNegativeInitialDelay() = runTest { // WHEN - ticker(100, -1).launchIn(this) + tickerFlow(100, -1).launchIn(this) } @Test @@ -38,7 +38,7 @@ class TickerFlowsTest : TestBase() { // WHEN val periodicTicker = - ticker(100, 100).onEach { + tickerFlow(100, 100).onEach { inbox.add(Unit) }.launchIn(this) @@ -58,7 +58,7 @@ class TickerFlowsTest : TestBase() { // WHEN val periodicTicker = - ticker(100).onEach { + tickerFlow(100).onEach { inbox.add(Unit) }.launchIn(this) @@ -77,7 +77,7 @@ class TickerFlowsTest : TestBase() { // WHEN val periodicTicker = - ticker(100).onEach { + tickerFlow(100).onEach { inbox.add(Unit) }.launchIn(this) From 7e15913dd0b93880f280e8a4ab4a9927cee88bc5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?I=CC=87brahim=20Yilmaz?= Date: Tue, 7 Apr 2020 10:18:53 +0200 Subject: [PATCH 3/5] testZeroNegativePeriod renamed to testZeroPeriod testZeroInitialDelay added to test 0 initialDelay. testDoNotReceiveAfterCancel testcase fixed. --- .../jvm/test/flow/TickerFlowTest.kt | 23 +++++++++++++++++-- 1 file changed, 21 insertions(+), 2 deletions(-) diff --git a/kotlinx-coroutines-core/jvm/test/flow/TickerFlowTest.kt b/kotlinx-coroutines-core/jvm/test/flow/TickerFlowTest.kt index 56bd712374..ca0e8fa0f4 100644 --- a/kotlinx-coroutines-core/jvm/test/flow/TickerFlowTest.kt +++ b/kotlinx-coroutines-core/jvm/test/flow/TickerFlowTest.kt @@ -20,7 +20,7 @@ class TickerFlowTest : TestBase() { } @Test(expected = IllegalArgumentException::class) - fun testZeroNegativePeriod() = runTest { + fun testZeroPeriod() = runTest { // WHEN tickerFlow(0).launchIn(this) } @@ -50,6 +50,25 @@ class TickerFlowTest : TestBase() { periodicTicker.cancelAndJoin() } + @Test + fun testZeroInitialDelay() = runTest { + // GIVEN + val inbox = mutableListOf() + + // WHEN + val periodicTicker = + tickerFlow(100, 0).onEach { + inbox.add(Unit) + }.launchIn(this) + + delay(500) + + // THEN + assertEquals(5, inbox.size) + + periodicTicker.cancelAndJoin() + } + @Test fun testReceive() = runTest { @@ -82,10 +101,10 @@ class TickerFlowTest : TestBase() { }.launchIn(this) delay(50) + periodicTicker.cancel(CancellationException()) // THEN assertEquals(0, inbox.size) - periodicTicker.cancel(CancellationException()) } From e9e737d33c8f861b1bcd891c89c954b045011442 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?I=CC=87brahim=20Yilmaz?= Date: Wed, 13 May 2020 23:40:29 +0200 Subject: [PATCH 4/5] Instead of using Jvm bounded Timer, current TimerChannel is used with receiveAsFlow extension function. --- .../jvm/src/flow/TickerFlow.kt | 34 +++++++++---------- .../jvm/test/flow/TickerFlowTest.kt | 34 ++++++------------- 2 files changed, 27 insertions(+), 41 deletions(-) diff --git a/kotlinx-coroutines-core/jvm/src/flow/TickerFlow.kt b/kotlinx-coroutines-core/jvm/src/flow/TickerFlow.kt index f843ead2dc..d7a2d4847b 100644 --- a/kotlinx-coroutines-core/jvm/src/flow/TickerFlow.kt +++ b/kotlinx-coroutines-core/jvm/src/flow/TickerFlow.kt @@ -1,9 +1,11 @@ package kotlinx.coroutines.flow import kotlinx.coroutines.Job -import kotlinx.coroutines.channels.awaitClose +import kotlinx.coroutines.channels.TickerMode +import kotlinx.coroutines.channels.ticker import java.util.* -import kotlin.concurrent.schedule +import kotlin.coroutines.CoroutineContext +import kotlin.coroutines.EmptyCoroutineContext /** * Creates a flow that produces the first item after the given initial delay and subsequent items with the @@ -13,21 +15,17 @@ import kotlin.concurrent.schedule * * This Flow stops producing elements immediately after [Job.cancel] invocation. * - * @param period period between each element in milliseconds. - * @param initialDelay delay after which the first element will be produced (it is equal to [period] by default) in milliseconds. + * @param delayMillis delay between each element in milliseconds. + * @param initialDelayMillis delay after which the first element will be produced (it is equal to [delayMillis] by default) in milliseconds. + * @param context context of the producing coroutine. + * @param mode specifies behavior when elements are not received ([FIXED_PERIOD][TickerMode.FIXED_PERIOD] by default). */ public fun tickerFlow( - period: Long, - initialDelay: Long = period -): Flow = callbackFlow { - require(period > 0) - require(initialDelay > -1) - - val timer = Timer() - timer.schedule(initialDelay, period) { - offer(Unit) - } - - awaitClose { timer.cancel() } -} - + delayMillis: Long, + initialDelayMillis: Long = delayMillis, + context: CoroutineContext = EmptyCoroutineContext, + mode: TickerMode = TickerMode.FIXED_PERIOD +): Flow { + require(delayMillis > 0) + return ticker(delayMillis, initialDelayMillis, context, mode).receiveAsFlow() +} \ No newline at end of file diff --git a/kotlinx-coroutines-core/jvm/test/flow/TickerFlowTest.kt b/kotlinx-coroutines-core/jvm/test/flow/TickerFlowTest.kt index ca0e8fa0f4..02db27d6f6 100644 --- a/kotlinx-coroutines-core/jvm/test/flow/TickerFlowTest.kt +++ b/kotlinx-coroutines-core/jvm/test/flow/TickerFlowTest.kt @@ -1,7 +1,6 @@ package flow import kotlinx.coroutines.TestBase -import kotlinx.coroutines.cancelAndJoin import kotlinx.coroutines.delay import kotlinx.coroutines.flow.launchIn import kotlinx.coroutines.flow.onEach @@ -37,17 +36,14 @@ class TickerFlowTest : TestBase() { val inbox = mutableListOf() // WHEN - val periodicTicker = - tickerFlow(100, 100).onEach { - inbox.add(Unit) - }.launchIn(this) + tickerFlow(100, 200).onEach { + inbox.add(Unit) + }.launchIn(this) delay(500) // THEN assertEquals(4, inbox.size) - - periodicTicker.cancelAndJoin() } @Test @@ -56,17 +52,14 @@ class TickerFlowTest : TestBase() { val inbox = mutableListOf() // WHEN - val periodicTicker = - tickerFlow(100, 0).onEach { - inbox.add(Unit) - }.launchIn(this) + tickerFlow(100, 0).onEach { + inbox.add(Unit) + }.launchIn(this) delay(500) // THEN - assertEquals(5, inbox.size) - - periodicTicker.cancelAndJoin() + assertEquals(6, inbox.size) } @@ -76,17 +69,14 @@ class TickerFlowTest : TestBase() { val inbox = mutableListOf() // WHEN - val periodicTicker = - tickerFlow(100).onEach { - inbox.add(Unit) - }.launchIn(this) + tickerFlow(100).onEach { + inbox.add(Unit) + }.launchIn(this) delay(500) // THEN - assertEquals(4, inbox.size) - - periodicTicker.cancelAndJoin() + assertEquals(5, inbox.size) } @Test @@ -106,6 +96,4 @@ class TickerFlowTest : TestBase() { // THEN assertEquals(0, inbox.size) } - - } \ No newline at end of file From 1c7f015b724bd610351d12bd80c0eb4d822071d0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?I=CC=87brahim=20Yilmaz?= Date: Thu, 14 May 2020 07:52:01 +0200 Subject: [PATCH 5/5] Documentation is updated according to current implementation. --- kotlinx-coroutines-core/jvm/src/flow/TickerFlow.kt | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/kotlinx-coroutines-core/jvm/src/flow/TickerFlow.kt b/kotlinx-coroutines-core/jvm/src/flow/TickerFlow.kt index d7a2d4847b..d2a554d12c 100644 --- a/kotlinx-coroutines-core/jvm/src/flow/TickerFlow.kt +++ b/kotlinx-coroutines-core/jvm/src/flow/TickerFlow.kt @@ -3,7 +3,6 @@ package kotlinx.coroutines.flow import kotlinx.coroutines.Job import kotlinx.coroutines.channels.TickerMode import kotlinx.coroutines.channels.ticker -import java.util.* import kotlin.coroutines.CoroutineContext import kotlin.coroutines.EmptyCoroutineContext @@ -11,7 +10,7 @@ import kotlin.coroutines.EmptyCoroutineContext * Creates a flow that produces the first item after the given initial delay and subsequent items with the * given delay between them. * - * The resulting flow is a callback flow, which basically listens @see [Timer.schedule] + * The resulting flow is basically using [ticker] * * This Flow stops producing elements immediately after [Job.cancel] invocation. *