diff --git a/arrow-libs/fx/arrow-fx-coroutines/src/commonMain/kotlin/arrow/fx/coroutines/Racing.kt b/arrow-libs/fx/arrow-fx-coroutines/src/commonMain/kotlin/arrow/fx/coroutines/Racing.kt index 0d0590e01a..ce1c2ccf63 100644 --- a/arrow-libs/fx/arrow-fx-coroutines/src/commonMain/kotlin/arrow/fx/coroutines/Racing.kt +++ b/arrow-libs/fx/arrow-fx-coroutines/src/commonMain/kotlin/arrow/fx/coroutines/Racing.kt @@ -7,6 +7,7 @@ import arrow.core.identity import arrow.core.nonFatalOrThrow import arrow.core.prependTo import arrow.core.raise.DelicateRaiseApi +import arrow.core.raise.Raise import arrow.core.raise.RaiseCancellationException import kotlinx.coroutines.CoroutineExceptionHandler import kotlinx.coroutines.CoroutineScope @@ -23,6 +24,36 @@ import kotlin.coroutines.CoroutineContext import kotlin.coroutines.EmptyCoroutineContext import kotlin.coroutines.cancellation.CancellationException +public interface RaiseRacingScope : Raise { + public suspend fun race( + context: CoroutineContext = EmptyCoroutineContext, + block: suspend Raise.() -> A + ) + + public suspend fun raceOrRaise( + context: CoroutineContext = EmptyCoroutineContext, + block: suspend Raise.() -> A + ) + + public suspend fun raceOrThrow( + context: CoroutineContext = EmptyCoroutineContext, + block: suspend Raise.() -> A + ) + + public suspend fun raceOrFail( + context: CoroutineContext = EmptyCoroutineContext, + block: suspend Raise.() -> A + ) +} + +public suspend fun Raise.racing( + dropped: suspend (Error) -> Unit, + handleException: (suspend (context: CoroutineContext, exception: Throwable) -> Unit)? = null, + block: RacingScope.() -> Unit, +): A { + TODO() +} + /** * A DSL that allows racing many `suspend` functions in parallel against each-other, * it yields a final result of [A] based on the first function that yields a result. @@ -49,21 +80,19 @@ import kotlin.coroutines.cancellation.CancellationException * suspend fun never(): Nothing = racing { } * ``` * - * @param handleException handle any exception that occurred in [RacingScope.race], - * by default it [Throwable.printStackTrace]. + * @param handleException handle any exception that occurred in [RacingScope.race]. * @param block the body of the DSL that describes the racing logic * @return the winning value of [A]. */ public suspend fun racing( - handleException: ((context: CoroutineContext, exception: Throwable) -> Unit)? = null, + handleException: (suspend (context: CoroutineContext, exception: Throwable) -> Unit)? = null, block: RacingScope.() -> Unit, ): A = coroutineScope { - val exceptionHandler = handleException ?: defaultExceptionHandler()::handleException + val exceptionHandler = handleException ?: { _, _ -> } select { val scope = SelectRacingScope(this@select, this@coroutineScope, exceptionHandler) block(scope) - // TODO add this check?? - require(scope.racers.value.isNotEmpty()) { "A racing program with no racers can never yield a result." } + require(scope.racers.get().isNotEmpty()) { "A racing program with no racers can never yield a result." } } } @@ -82,7 +111,7 @@ public interface RacingScope { private class SelectRacingScope( private val select: SelectBuilder, private val scope: CoroutineScope, - private val handleException: (context: CoroutineContext, exception: Throwable) -> Unit + private val handleException: suspend (context: CoroutineContext, exception: Throwable) -> Unit ) : RacingScope { val racers: Atomic>> = Atomic(emptyList()) @@ -109,15 +138,12 @@ private class SelectRacingScope( } } - // To not fail the entire race, we allow users to handle the exceptions but do not cancel the race. - @OptIn(DelicateRaiseApi::class) override suspend fun race(context: CoroutineContext, block: suspend CoroutineScope.() -> A) = raceOrFail { try { block() - } catch (e: RaiseCancellationException) { - // `Raise` error is ignored... Can we do better here? - awaitCancellation() + } catch (e: CancellationException) { + throw e } catch (e: Throwable) { handleException(currentCoroutineContext(), e.nonFatalOrThrow()) awaitCancellation()