Skip to content

Commit

Permalink
Refactor ParZip2Test from Kotest Plugin to Kotlin-test runtime #3192 (#…
Browse files Browse the repository at this point in the history
…3222)

Co-authored-by: Alejandro Serrano <[email protected]>
  • Loading branch information
asmadsen and serras authored Oct 26, 2023
1 parent 6daa91a commit 335bd90
Showing 1 changed file with 71 additions and 71 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package arrow.fx.coroutines.parZip

import arrow.atomic.Atomic
import arrow.atomic.AtomicInt
import arrow.atomic.update
import arrow.atomic.value
import arrow.core.Either
Expand All @@ -10,7 +10,6 @@ import arrow.fx.coroutines.guaranteeCase
import arrow.fx.coroutines.leftException
import arrow.fx.coroutines.parZip
import arrow.fx.coroutines.throwable
import io.kotest.core.spec.style.StringSpec
import io.kotest.matchers.should
import io.kotest.matchers.shouldBe
import io.kotest.matchers.types.shouldBeTypeOf
Expand All @@ -25,93 +24,94 @@ import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.async
import kotlinx.coroutines.awaitCancellation
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.test.runTest
import kotlin.test.Test

class ParZip2Test : StringSpec({
"parZip 2 runs in parallel" {
checkAll(Arb.int(), Arb.int()) { a, b ->
val r = Atomic("")
val modifyGate = CompletableDeferred<Int>()
class ParZip2Test {
@Test fun parZip2RunsInParallel() = runTest {
checkAll(Arb.int(), Arb.int()) { a, b ->
val r = AtomicInt(0)
val modifyGate = CompletableDeferred<Int>()

parZip(
{
modifyGate.await()
r.update { i -> "$i$a" }
},
{
r.value = "$b"
modifyGate.complete(0)
}
) { _a, _b ->
Pair(_a, _b)
parZip(
{
modifyGate.await()
r.update { i -> i + a }
},
{
r.value = b
modifyGate.complete(0)
}

r.value shouldBe "$b$a"
) { _a, _b ->
Pair(_a, _b)
}

r.value shouldBe b + a
}
}

"Cancelling parZip 2 cancels all participants" {
checkAll(Arb.int(), Arb.int()) { a, b ->
val s = Channel<Unit>()
val pa = CompletableDeferred<Pair<Int, ExitCase>>()
val pb = CompletableDeferred<Pair<Int, ExitCase>>()
@Test fun cancellingParZip2CancelsAllParticipants() = runTest {
checkAll(Arb.int(), Arb.int()) { a, b ->
val s = Channel<Unit>()
val pa = CompletableDeferred<Pair<Int, ExitCase>>()
val pb = CompletableDeferred<Pair<Int, ExitCase>>()

val loserA: suspend CoroutineScope.() -> Int =
{ guaranteeCase({ s.receive(); awaitCancellation() }) { ex -> pa.complete(Pair(a, ex)) } }
val loserB: suspend CoroutineScope.() -> Int =
{ guaranteeCase({ s.receive(); awaitCancellation() }) { ex -> pb.complete(Pair(b, ex)) } }
val loserA: suspend CoroutineScope.() -> Int =
{ guaranteeCase({ s.receive(); awaitCancellation() }) { ex -> pa.complete(Pair(a, ex)) } }
val loserB: suspend CoroutineScope.() -> Int =
{ guaranteeCase({ s.receive(); awaitCancellation() }) { ex -> pb.complete(Pair(b, ex)) } }

val f = async { parZip(loserA, loserB) { _a, _b -> Pair(_a, _b) } }
val f = async { parZip(loserA, loserB) { _a, _b -> Pair(_a, _b) } }

s.send(Unit) // Suspend until all racers started
s.send(Unit)
f.cancel()
s.send(Unit) // Suspend until all racers started
s.send(Unit)
f.cancel()

pa.await().let { (res, exit) ->
res shouldBe a
exit.shouldBeTypeOf<ExitCase.Cancelled>()
}
pb.await().let { (res, exit) ->
res shouldBe b
exit.shouldBeTypeOf<ExitCase.Cancelled>()
}
pa.await().let { (res, exit) ->
res shouldBe a
exit.shouldBeTypeOf<ExitCase.Cancelled>()
}
pb.await().let { (res, exit) ->
res shouldBe b
exit.shouldBeTypeOf<ExitCase.Cancelled>()
}
}
}

"parZip 2 cancels losers if a failure occurs in one of the tasks" {
checkAll(Arb.throwable(), Arb.boolean()) { e, leftWinner ->
val s = Channel<Unit>()
val pa = CompletableDeferred<ExitCase>()

val winner: suspend CoroutineScope.() -> Unit = { s.send(Unit); throw e }
val loserA: suspend CoroutineScope.() -> Int =
{ guaranteeCase({ s.receive(); awaitCancellation() }) { ex -> pa.complete(ex) } }
@Test fun parZip2CancelsLosersIfAFailtureOccursInOneOfTheTasts() = runTest {
checkAll(Arb.throwable(), Arb.boolean()) { e, leftWinner ->
val s = Channel<Unit>()
val pa = CompletableDeferred<ExitCase>()

val r = Either.catch {
if (leftWinner) parZip(winner, loserA) { _, _ -> Unit }
else parZip(loserA, winner) { _, _ -> Unit }
}
val winner: suspend CoroutineScope.() -> Unit = { s.send(Unit); throw e }
val loserA: suspend CoroutineScope.() -> Int =
{ guaranteeCase({ s.receive(); awaitCancellation() }) { ex -> pa.complete(ex) } }

pa.await().shouldBeTypeOf<ExitCase.Cancelled>()
r should leftException(e)
val r = Either.catch {
if (leftWinner) parZip(winner, loserA) { _, _ -> Unit }
else parZip(loserA, winner) { _, _ -> Unit }
}

pa.await().shouldBeTypeOf<ExitCase.Cancelled>()
r should leftException(e)
}
}

"parZip CancellationException on right can cancel rest" {
checkAll(Arb.string()) { msg ->
val exit = CompletableDeferred<ExitCase>()
val start = CompletableDeferred<Unit>()
try {
parZip<Unit, Unit, Unit>({
awaitExitCase(start, exit)
}, {
start.await()
throw CancellationException(msg)
}) { _, _ -> }
} catch (e: CancellationException) {
e.message shouldBe msg
}
exit.await().shouldBeTypeOf<ExitCase.Cancelled>()
@Test fun parZipCancellationExceptionOnRightCanCancelRest() = runTest {
checkAll(Arb.string()) { msg ->
val exit = CompletableDeferred<ExitCase>()
val start = CompletableDeferred<Unit>()
try {
parZip<Unit, Unit, Unit>({
awaitExitCase(start, exit)
}, {
start.await()
throw CancellationException(msg)
}) { _, _ -> }
} catch (e: CancellationException) {
e.message shouldBe msg
}
exit.await().shouldBeTypeOf<ExitCase.Cancelled>()
}
}
)
}

0 comments on commit 335bd90

Please sign in to comment.