Skip to content

Commit

Permalink
Allow specifying a more concrete Throwable type for retrying, alterna…
Browse files Browse the repository at this point in the history
…tive (#3418)
  • Loading branch information
serras authored May 16, 2024
1 parent dc7baf1 commit 51ace56
Show file tree
Hide file tree
Showing 5 changed files with 166 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -251,11 +251,11 @@ public final class arrow/resilience/Schedule$Decision$Done : arrow/resilience/Sc
}

public final class arrow/resilience/ScheduleKt {
public static final fun retry-4AuOtiA (Lkotlin/jvm/functions/Function2;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun retry-YL6hcnA (Larrow/core/raise/Raise;Lkotlin/jvm/functions/Function2;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun retry-aZo8_V4 (Lkotlin/jvm/functions/Function2;Lkotlin/reflect/KClass;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun retryEither-4AuOtiA (Lkotlin/jvm/functions/Function2;Lkotlin/jvm/functions/Function0;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun retryOrElse-aZo8_V4 (Lkotlin/jvm/functions/Function2;Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function3;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun retryOrElseEither-aZo8_V4 (Lkotlin/jvm/functions/Function2;Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function3;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun retryOrElse-quv6EbI (Lkotlin/jvm/functions/Function2;Lkotlin/reflect/KClass;Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function3;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun retryOrElseEither-quv6EbI (Lkotlin/jvm/functions/Function2;Lkotlin/reflect/KClass;Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function3;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun retryRaise-4AuOtiA (Lkotlin/jvm/functions/Function2;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,13 +125,16 @@ final fun <#A: kotlin/Any?, #B: kotlin/Any?> (kotlinx.coroutines.flow/Flow<#A>).
final fun <#A: kotlin/Any?> arrow.resilience/saga(kotlin.coroutines/SuspendFunction1<arrow.resilience/SagaActionStep, #A>, kotlin.coroutines/SuspendFunction1<#A, kotlin/Unit>): kotlin.coroutines/SuspendFunction1<arrow.resilience/SagaScope, #A> // arrow.resilience/saga|saga(kotlin.coroutines.SuspendFunction1<arrow.resilience.SagaActionStep,0:0>;kotlin.coroutines.SuspendFunction1<0:0,kotlin.Unit>){0§<kotlin.Any?>}[0]
final inline fun <#A: kotlin/Any?> arrow.resilience/saga(noinline kotlin.coroutines/SuspendFunction1<arrow.resilience/SagaScope, #A>): kotlin.coroutines/SuspendFunction1<arrow.resilience/SagaScope, #A> // arrow.resilience/saga|saga(kotlin.coroutines.SuspendFunction1<arrow.resilience.SagaScope,0:0>){0§<kotlin.Any?>}[0]
final object arrow.resilience/SagaActionStep // arrow.resilience/SagaActionStep|null[0]
final suspend fun <#A: kotlin/Any?, #B: kotlin/Any?, #C: kotlin/Any?> (arrow.resilience/Schedule<kotlin/Throwable, #B>).arrow.resilience/retryOrElseEither(kotlin.coroutines/SuspendFunction0<#A>, kotlin.coroutines/SuspendFunction2<kotlin/Throwable, #B, #C>): arrow.core/Either<#C, #A> // arrow.resilience/retryOrElseEither|[email protected]<kotlin.Throwable,0:1>(kotlin.coroutines.SuspendFunction0<0:0>;kotlin.coroutines.SuspendFunction2<kotlin.Throwable,0:1,0:2>){0§<kotlin.Any?>;1§<kotlin.Any?>;2§<kotlin.Any?>}[0]
final suspend fun <#A: kotlin/Any?, #B: kotlin/Any?> (arrow.resilience/Schedule<kotlin/Throwable, #B>).arrow.resilience/retryOrElse(kotlin.coroutines/SuspendFunction0<#A>, kotlin.coroutines/SuspendFunction2<kotlin/Throwable, #B, #A>): #A // arrow.resilience/retryOrElse|[email protected]<kotlin.Throwable,0:1>(kotlin.coroutines.SuspendFunction0<0:0>;kotlin.coroutines.SuspendFunction2<kotlin.Throwable,0:1,0:0>){0§<kotlin.Any?>;1§<kotlin.Any?>}[0]
final suspend fun <#A: kotlin/Any?> (arrow.resilience/Schedule<kotlin/Throwable, *>).arrow.resilience/retry(kotlin.coroutines/SuspendFunction0<#A>): #A // arrow.resilience/retry|[email protected]<kotlin.Throwable,*>(kotlin.coroutines.SuspendFunction0<0:0>){0§<kotlin.Any?>}[0]
final suspend fun <#A: kotlin/Any?> (kotlin.coroutines/SuspendFunction1<arrow.resilience/SagaScope, #A>).arrow.resilience/transact(): #A // arrow.resilience/transact|[email protected]<arrow.resilience.SagaScope,0:0>(){0§<kotlin.Any?>}[0]
final suspend fun <#A: kotlin/Throwable, #B: kotlin/Any?, #C: kotlin/Any?, #D: kotlin/Any?> (arrow.resilience/Schedule<#A, #C>).arrow.resilience/retryOrElseEither(kotlin.reflect/KClass<#A>, kotlin.coroutines/SuspendFunction0<#B>, kotlin.coroutines/SuspendFunction2<#A, #C, #D>): arrow.core/Either<#D, #B> // arrow.resilience/retryOrElseEither|[email protected]<0:0,0:2>(kotlin.reflect.KClass<0:0>;kotlin.coroutines.SuspendFunction0<0:1>;kotlin.coroutines.SuspendFunction2<0:0,0:2,0:3>){0§<kotlin.Throwable>;1§<kotlin.Any?>;2§<kotlin.Any?>;3§<kotlin.Any?>}[0]
final suspend fun <#A: kotlin/Throwable, #B: kotlin/Any?, #C: kotlin/Any?> (arrow.resilience/Schedule<#A, #C>).arrow.resilience/retryOrElse(kotlin.reflect/KClass<#A>, kotlin.coroutines/SuspendFunction0<#B>, kotlin.coroutines/SuspendFunction2<#A, #C, #B>): #B // arrow.resilience/retryOrElse|[email protected]<0:0,0:2>(kotlin.reflect.KClass<0:0>;kotlin.coroutines.SuspendFunction0<0:1>;kotlin.coroutines.SuspendFunction2<0:0,0:2,0:1>){0§<kotlin.Throwable>;1§<kotlin.Any?>;2§<kotlin.Any?>}[0]
final suspend fun <#A: kotlin/Throwable, #B: kotlin/Any?> (arrow.resilience/Schedule<#A, *>).arrow.resilience/retry(kotlin.reflect/KClass<#A>, kotlin.coroutines/SuspendFunction0<#B>): #B // arrow.resilience/retry|[email protected]<0:0,*>(kotlin.reflect.KClass<0:0>;kotlin.coroutines.SuspendFunction0<0:1>){0§<kotlin.Throwable>;1§<kotlin.Any?>}[0]
final suspend inline fun <#A: kotlin/Any?, #B: kotlin/Any?, #C: kotlin/Any?> (arrow.core.raise/Raise<#A>).arrow.resilience/retry(arrow.resilience/Schedule<#A, #C>, kotlin/Function1<arrow.core.raise/Raise<#A>, #B>): #B // arrow.resilience/retry|[email protected]<0:0>(arrow.resilience.Schedule<0:0,0:2>;kotlin.Function1<arrow.core.raise.Raise<0:0>,0:1>){0§<kotlin.Any?>;1§<kotlin.Any?>;2§<kotlin.Any?>}[0]
final suspend inline fun <#A: kotlin/Any?, #B: kotlin/Any?, #C: kotlin/Any?> (arrow.resilience/Schedule<#A, #C>).arrow.resilience/retryEither(kotlin/Function0<arrow.core/Either<#A, #B>>): arrow.core/Either<#A, #B> // arrow.resilience/retryEither|[email protected]<0:0,0:2>(kotlin.Function0<arrow.core.Either<0:0,0:1>>){0§<kotlin.Any?>;1§<kotlin.Any?>;2§<kotlin.Any?>}[0]
final suspend inline fun <#A: kotlin/Any?, #B: kotlin/Any?, #C: kotlin/Any?> (arrow.resilience/Schedule<#A, #C>).arrow.resilience/retryRaise(kotlin/Function1<arrow.core.raise/Raise<#A>, #B>): arrow.core/Either<#A, #B> // arrow.resilience/retryRaise|[email protected]<0:0,0:2>(kotlin.Function1<arrow.core.raise.Raise<0:0>,0:1>){0§<kotlin.Any?>;1§<kotlin.Any?>;2§<kotlin.Any?>}[0]
final suspend inline fun <#A: reified kotlin/Throwable, #B: kotlin/Any?, #C: kotlin/Any?, #D: kotlin/Any?> (arrow.resilience/Schedule<#A, #C>).arrow.resilience/retryOrElseEither(noinline kotlin.coroutines/SuspendFunction0<#B>, noinline kotlin.coroutines/SuspendFunction2<#A, #C, #D>): arrow.core/Either<#D, #B> // arrow.resilience/retryOrElseEither|[email protected]<0:0,0:2>(kotlin.coroutines.SuspendFunction0<0:1>;kotlin.coroutines.SuspendFunction2<0:0,0:2,0:3>){0§<kotlin.Throwable>;1§<kotlin.Any?>;2§<kotlin.Any?>;3§<kotlin.Any?>}[0]
final suspend inline fun <#A: reified kotlin/Throwable, #B: kotlin/Any?, #C: kotlin/Any?> (arrow.resilience/Schedule<#A, #C>).arrow.resilience/retryOrElse(noinline kotlin.coroutines/SuspendFunction0<#B>, noinline kotlin.coroutines/SuspendFunction2<kotlin/Throwable, #C, #B>): #B // arrow.resilience/retryOrElse|[email protected]<0:0,0:2>(kotlin.coroutines.SuspendFunction0<0:1>;kotlin.coroutines.SuspendFunction2<kotlin.Throwable,0:2,0:1>){0§<kotlin.Throwable>;1§<kotlin.Any?>;2§<kotlin.Any?>}[0]
final suspend inline fun <#A: reified kotlin/Throwable, #B: kotlin/Any?> (arrow.resilience/Schedule<#A, *>).arrow.resilience/retry(noinline kotlin.coroutines/SuspendFunction0<#B>): #B // arrow.resilience/retry|[email protected]<0:0,*>(kotlin.coroutines.SuspendFunction0<0:1>){0§<kotlin.Throwable>;1§<kotlin.Any?>}[0]
final val arrow.resilience.common/platform // arrow.resilience.common/platform|{}platform[0]
final fun <get-platform>(): arrow.resilience.common/Platform // arrow.resilience.common/platform.<get-platform>|<get-platform>(){}[0]
final value class <#A: in kotlin/Any?, #B: out kotlin/Any?> arrow.resilience/Schedule { // arrow.resilience/Schedule|null[0]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
package arrow.resilience

import arrow.core.Either
import arrow.core.NonFatal
import arrow.core.None
import arrow.core.Option
import arrow.core.identity
Expand All @@ -26,12 +27,14 @@ import kotlin.experimental.ExperimentalTypeInference
import kotlin.jvm.JvmInline
import kotlin.math.pow
import kotlin.random.Random
import kotlin.reflect.KClass
import kotlin.reflect.cast
import kotlin.time.Duration
import kotlin.time.Duration.Companion.ZERO
import kotlin.time.Duration.Companion.nanoseconds

public typealias ScheduleStep<Input, Output> =
suspend (Input) -> Schedule.Decision<Input, Output>
suspend (Input) -> Decision<Input, Output>

/**
* A [Schedule] describes how a `suspend fun` should [retry] or [repeat].
Expand Down Expand Up @@ -406,46 +409,86 @@ public value class Schedule<in Input, out Output>(public val step: ScheduleStep<
}

/**
* Retries [action] using any [Throwable] that occurred as the input to the [Schedule].
* Retries [action] using any [E] that occurred as the input to the [Schedule].
* It will throw the last exception if the [Schedule] is exhausted, and ignores the output of the [Schedule].
*/
public suspend fun <A> Schedule<Throwable, *>.retry(action: suspend () -> A): A =
retryOrElse(action) { e, _ -> throw e }
public suspend inline fun <reified E: Throwable, A> Schedule<E, *>.retry(
noinline action: suspend () -> A
): A = retry(E::class, action)

/**
* Retries [action] using any [Throwable] that occurred as the input to the [Schedule].
* Retries [action] using any [E] that occurred as the input to the [Schedule].
* It will throw the last exception if the [Schedule] is exhausted, and ignores the output of the [Schedule].
*/
public suspend fun <E: Throwable, A> Schedule<E, *>.retry(
exceptionClass: KClass<E>,
action: suspend () -> A
): A = retryOrElse(exceptionClass, action) { e, _ -> throw e }

/**
* Retries [action] using any [E] that occurred as the input to the [Schedule].
* If the [Schedule] is exhausted,
* it will invoke [orElse] with the last exception and the output of the [Schedule] to produce a fallback [Input] value.
*/
public suspend inline fun <reified E: Throwable, Input, Output> Schedule<E, Output>.retryOrElse(
noinline action: suspend () -> Input,
noinline orElse: suspend (Throwable, Output) -> Input
): Input = retryOrElse(E::class, action, orElse)

/**
* Retries [action] using any [E] that occurred as the input to the [Schedule].
* If the [Schedule] is exhausted,
* it will invoke [orElse] with the last exception and the output of the [Schedule] to produce a fallback [Input] value.
*/
public suspend fun <Input, Output> Schedule<Throwable, Output>.retryOrElse(
public suspend fun <E: Throwable, Input, Output> Schedule<E, Output>.retryOrElse(
exceptionClass: KClass<E>,
action: suspend () -> Input,
orElse: suspend (Throwable, Output) -> Input
): Input = retryOrElseEither(action, orElse).merge()
orElse: suspend (E, Output) -> Input
): Input = retryOrElseEither(exceptionClass, action, orElse).merge()

/**
* Retries [action] using any [Throwable] that occurred as the input to the [Schedule].
* Retries [action] using any [E] that occurred as the input to the [Schedule].
* If the [Schedule] is exhausted,
* it will invoke [orElse] with the last exception and the output of the [Schedule] to produce a fallback value of [A].
* Returns [Either] with the fallback value if the [Schedule] is exhausted, or the successful result of [action].
*/
public suspend fun <Input, Output, A> Schedule<Throwable, Output>.retryOrElseEither(
public suspend inline fun <reified E: Throwable, Input, Output, A> Schedule<E, Output>.retryOrElseEither(
noinline action: suspend () -> Input,
noinline orElse: suspend (E, Output) -> A
): Either<A, Input> = retryOrElseEither(E::class, action, orElse)

/**
* Retries [action] using any [E] that occurred as the input to the [Schedule].
* If the [Schedule] is exhausted,
* it will invoke [orElse] with the last exception and the output of the [Schedule] to produce a fallback value of [A].
* Returns [Either] with the fallback value if the [Schedule] is exhausted, or the successful result of [action].
*/
public suspend fun <E: Throwable, Input, Output, A> Schedule<E, Output>.retryOrElseEither(
exceptionClass: KClass<E>,
action: suspend () -> Input,
orElse: suspend (Throwable, Output) -> A
orElse: suspend (E, Output) -> A
): Either<A, Input> {
var step: ScheduleStep<Throwable, Output> = step
var step: ScheduleStep<E, Output> = step

while (true) {
currentCoroutineContext().ensureActive()
try {
return Either.Right(action.invoke())
} catch (e: Throwable) {
@Suppress("NAME_SHADOWING") val e = when {
exceptionClass.isInstance(e) -> exceptionClass.cast(e)
else -> throw e
}

when (val decision = step(e)) {
is Continue -> {
if (decision.delay != ZERO) delay(decision.delay)
step = decision.step
}

is Done -> return Either.Left(orElse(e.nonFatalOrThrow(), decision.output))
is Done ->
if (NonFatal(e)) return Either.Left(orElse(e, decision.output))
else throw e
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import kotlin.test.Test
import kotlin.test.assertEquals
import kotlin.test.assertFailsWith
import kotlin.test.assertTrue
import kotlin.test.fail
import kotlin.time.Duration.Companion.milliseconds

@OptIn(ExperimentalCoroutinesApi::class)
Expand Down Expand Up @@ -79,12 +78,5 @@ class FlowTest {
}
}

inline fun <A> assertThrowable(executable: () -> A): Throwable {
val a = try {
executable.invoke()
} catch (e: Throwable) {
e
}

return if (a is Throwable) a else fail("Expected an exception but found: $a")
}
inline fun assertThrowable(executable: () -> Unit): Throwable =
assertThrows<Throwable>(executable)
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import arrow.resilience.Schedule.Decision.Continue
import arrow.resilience.Schedule.Decision.Done
import kotlinx.coroutines.test.TestResult
import kotlinx.coroutines.test.runTest
import kotlinx.coroutines.withTimeout
import kotlin.math.pow
import kotlin.test.Test
import kotlin.test.assertContentEquals
Expand Down Expand Up @@ -269,15 +268,99 @@ class ScheduleTest {
.doUntil { _, output -> output > 50.0.milliseconds }
.doUntil { input, _ -> input is IllegalStateException }

val result: Either<Throwable, Unit> = withTimeout(10.seconds) {
schedule.retryOrElseEither({
throw ex
}, { t, _ -> t })
}
val result: Either<Throwable, Unit> = schedule.retryOrElseEither({
throw ex
}, { t, _ -> t })

result.fold({ assertEquals(ex, it) }, { fail("The impossible happened") })
}

@Test
fun rethrowsUnmatchedException(): TestResult = runTest {
val ex = Throwable("Hello")

val e = assertThrowable {
Schedule.forever<IllegalStateException>()
.retry { throw ex }
}
assertEquals(ex, e)
}

@Test
fun captureException(): TestResult = runTest {
val ex = IllegalStateException("Hello")
val buffer = mutableListOf<IllegalStateException>()

assertThrows<IllegalStateException> {
Schedule.recurs<IllegalStateException>(2)
.log { e, _ -> buffer.add(e) }
.retry { throw ex }
}

assertEquals(listOf(ex, ex), buffer)
}

@Test
fun captureExceptionDifferentInput(): TestResult = runTest {
val ex = IllegalStateException("Hello")
val buffer = mutableListOf<Throwable>()

assertThrows<IllegalStateException> {
Schedule.recurs<Throwable>(2)
.log { e, _ -> buffer.add(e) }
.retry<IllegalStateException, _> { throw ex }
}

assertEquals<List<Throwable>>(listOf(ex, ex), buffer)
}

@Test
fun notCaptureExceptionDifferentInput(): TestResult = runTest {
val ex = IllegalStateException("Hello")
val buffer = mutableListOf<Throwable>()

assertThrows<IllegalStateException> {
Schedule.recurs<Throwable>(2)
.log { e, _ -> buffer.add(e) }
.retry<IllegalArgumentException, _> { throw ex }
}

assertEquals(emptyList(), buffer)
}

@Test
fun retriesMatchedException(): TestResult = runTest {
val ex = IllegalStateException("Hello")
var count = 0

val i = Schedule.forever<IllegalStateException>()
.retry { if (count++ == 0) throw ex else 1 }

assertEquals(1, i)
}

@Test
fun retriesMatchedExceptionDifferentInput(): TestResult = runTest {
val ex = IllegalStateException("Hello")
var count = 0

val i = Schedule.forever<Throwable>()
.retry<IllegalStateException, _> { if (count++ == 0) throw ex else 1 }

assertEquals(1, i)
}

@Test
fun notRetriesMatchedExceptionDifferentInput(): TestResult = runTest {
val ex = IllegalStateException("Hello")
var count = 0

assertThrows<IllegalStateException> {
Schedule.forever<Throwable>()
.retry<IllegalArgumentException, _> { if (count++ == 0) throw ex else 1 }
}
}

@Test
fun retryRaiseIsStackSafe(): TestResult = runTest {
val count = AtomicLong(0)
Expand Down Expand Up @@ -401,3 +484,13 @@ private suspend fun <B> checkRepeat(schedule: Schedule<Long, List<B>>, expected:
}

private object CustomError

inline fun <reified A : Throwable> assertThrows(executable: () -> Unit): A {
val a = try {
executable.invoke()
} catch (e: Throwable) {
e
}

return if (a is A) a else fail("Expected an exception but found: $a")
}

0 comments on commit 51ace56

Please sign in to comment.