From 658d53fba31148e28b0d3c38230812d0b4990c4f Mon Sep 17 00:00:00 2001 From: He-Pin Date: Mon, 5 Aug 2024 01:02:00 +0800 Subject: [PATCH] feat: Add Pattern timeout support --- .../pekko/pattern/FutureTimeoutSupport.scala | 63 ++++++++++++++++++- .../org/apache/pekko/pattern/Patterns.scala | 11 ++++ 2 files changed, 72 insertions(+), 2 deletions(-) diff --git a/actor/src/main/scala/org/apache/pekko/pattern/FutureTimeoutSupport.scala b/actor/src/main/scala/org/apache/pekko/pattern/FutureTimeoutSupport.scala index 2be1ad59b69..3a7dfe04e0e 100644 --- a/actor/src/main/scala/org/apache/pekko/pattern/FutureTimeoutSupport.scala +++ b/actor/src/main/scala/org/apache/pekko/pattern/FutureTimeoutSupport.scala @@ -13,8 +13,7 @@ package org.apache.pekko.pattern -import java.util.concurrent.CompletableFuture -import java.util.concurrent.CompletionStage +import java.util.concurrent.{ CompletableFuture, CompletionStage, TimeoutException } import scala.concurrent.{ ExecutionContext, Future, Promise } import scala.concurrent.duration.FiniteDuration @@ -87,4 +86,64 @@ trait FutureTimeoutSupport { } p } + + /** + * Returns a [[scala.concurrent.Future]] that will be completed with a [[TimeoutException]] + * if the provided value is not completed within the specified duration. + */ + def timeout[T](duration: FiniteDuration, using: Scheduler)(value: => Future[T])( + implicit ec: ExecutionContext): Future[T] = { + val future = + try value + catch { + case NonFatal(t) => Future.failed(t) + } + future.value match { + case Some(_) => future + case None => // not completed yet + val p = Promise[T]() + val timeout = using.scheduleOnce(duration) { + p.tryFailure(new TimeoutException(s"Timeout of $duration expired")) + if (future.isInstanceOf[CompletableFuture[T]]) { + future.asInstanceOf[CompletableFuture[T]] + .toCompletableFuture + .cancel(true) + } + } + future.onComplete { result => + timeout.cancel() + p.tryComplete(result) + } + p.future + } + } + + /** + * Returns a [[java.util.concurrent.CompletionStage]] that will be completed with a [[TimeoutException]] + * if the provided value is not completed within the specified duration. + */ + def timeoutCompletionStage[T](duration: FiniteDuration, using: Scheduler)(value: => CompletionStage[T])( + implicit ec: ExecutionContext): CompletionStage[T] = { + val stage: CompletionStage[T] = + try value + catch { + case NonFatal(t) => Futures.failedCompletionStage(t) + } + if (stage.toCompletableFuture.isDone) { + stage + } else { + val p = new CompletableFuture[T] + val timeout = using.scheduleOnce(duration) { + p.completeExceptionally(new TimeoutException(s"Timeout of $duration expired")) + stage.toCompletableFuture.cancel(true) + } + stage.handle[Unit]((v: T, ex: Throwable) => { + timeout.cancel() + if (v != null) p.complete(v) + if (ex != null) p.completeExceptionally(ex) + }) + p + } + } + } diff --git a/actor/src/main/scala/org/apache/pekko/pattern/Patterns.scala b/actor/src/main/scala/org/apache/pekko/pattern/Patterns.scala index 2b8759e0263..9691a1b5ca2 100644 --- a/actor/src/main/scala/org/apache/pekko/pattern/Patterns.scala +++ b/actor/src/main/scala/org/apache/pekko/pattern/Patterns.scala @@ -465,6 +465,17 @@ object Patterns { value: Callable[CompletionStage[T]]): CompletionStage[T] = afterCompletionStage(duration.asScala, scheduler)(value.call())(context) + /** + * Returns a [[java.util.concurrent.CompletionStage]] that will be completed with a [[java.util.concurrent.TimeoutException]] + * if the provided value is not completed within the specified duration. + */ + def timeout[T]( + duration: java.time.Duration, + scheduler: Scheduler, + context: ExecutionContext, + value: Callable[CompletionStage[T]]): CompletionStage[T] = + timeoutCompletionStage(duration.asScala, scheduler)(value.call())(context) + /** * Returns a [[scala.concurrent.Future]] that will be completed with the success or failure of the provided Callable * after the specified duration.