Skip to content

Commit

Permalink
Raise Racing WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
nomisRev committed May 13, 2024
1 parent a81952b commit 4a11caf
Showing 1 changed file with 38 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import arrow.core.identity
import arrow.core.nonFatalOrThrow
import arrow.core.prependTo
import arrow.core.raise.DelicateRaiseApi
import arrow.core.raise.Raise
import arrow.core.raise.RaiseCancellationException
import kotlinx.coroutines.CoroutineExceptionHandler
import kotlinx.coroutines.CoroutineScope
Expand All @@ -23,6 +24,36 @@ import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext
import kotlin.coroutines.cancellation.CancellationException

public interface RaiseRacingScope<Error, A> : Raise<Error> {
public suspend fun race(
context: CoroutineContext = EmptyCoroutineContext,
block: suspend Raise<Error>.() -> A
)

public suspend fun raceOrRaise(
context: CoroutineContext = EmptyCoroutineContext,
block: suspend Raise<Error>.() -> A
)

public suspend fun raceOrThrow(
context: CoroutineContext = EmptyCoroutineContext,
block: suspend Raise<Error>.() -> A
)

public suspend fun raceOrFail(
context: CoroutineContext = EmptyCoroutineContext,
block: suspend Raise<Error>.() -> A
)
}

public suspend fun <Error, A> Raise<Error>.racing(
dropped: suspend (Error) -> Unit,
handleException: (suspend (context: CoroutineContext, exception: Throwable) -> Unit)? = null,
block: RacingScope<A>.() -> Unit,
): A {
TODO()
}

/**
* A DSL that allows racing many `suspend` functions in parallel against each-other,
* it yields a final result of [A] based on the first function that yields a result.
Expand All @@ -49,21 +80,19 @@ import kotlin.coroutines.cancellation.CancellationException
* suspend fun never(): Nothing = racing { }
* ```
*
* @param handleException handle any exception that occurred in [RacingScope.race],
* by default it [Throwable.printStackTrace].
* @param handleException handle any exception that occurred in [RacingScope.race].
* @param block the body of the DSL that describes the racing logic
* @return the winning value of [A].
*/
public suspend fun <A> racing(
handleException: ((context: CoroutineContext, exception: Throwable) -> Unit)? = null,
handleException: (suspend (context: CoroutineContext, exception: Throwable) -> Unit)? = null,
block: RacingScope<A>.() -> Unit,
): A = coroutineScope {
val exceptionHandler = handleException ?: defaultExceptionHandler()::handleException
val exceptionHandler = handleException ?: { _, _ -> }
select {
val scope = SelectRacingScope(this@select, this@coroutineScope, exceptionHandler)
block(scope)
// TODO add this check??
require(scope.racers.value.isNotEmpty()) { "A racing program with no racers can never yield a result." }
require(scope.racers.get().isNotEmpty()) { "A racing program with no racers can never yield a result." }
}
}

Expand All @@ -82,7 +111,7 @@ public interface RacingScope<A> {
private class SelectRacingScope<A>(
private val select: SelectBuilder<A>,
private val scope: CoroutineScope,
private val handleException: (context: CoroutineContext, exception: Throwable) -> Unit
private val handleException: suspend (context: CoroutineContext, exception: Throwable) -> Unit
) : RacingScope<A> {
val racers: Atomic<List<Deferred<A>>> = Atomic(emptyList())

Expand All @@ -109,15 +138,12 @@ private class SelectRacingScope<A>(
}
}

// To not fail the entire race, we allow users to handle the exceptions but do not cancel the race.
@OptIn(DelicateRaiseApi::class)
override suspend fun race(context: CoroutineContext, block: suspend CoroutineScope.() -> A) =
raceOrFail {
try {
block()
} catch (e: RaiseCancellationException) {
// `Raise<E>` error is ignored... Can we do better here?
awaitCancellation()
} catch (e: CancellationException) {
throw e
} catch (e: Throwable) {
handleException(currentCoroutineContext(), e.nonFatalOrThrow())
awaitCancellation()
Expand Down

0 comments on commit 4a11caf

Please sign in to comment.