diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/Unfold.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/Unfold.scala index ea6a525ec4d..bd12267f8e7 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/impl/Unfold.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/Unfold.scala @@ -21,6 +21,7 @@ import pekko.stream.impl.Stages.DefaultAttributes import pekko.stream.stage.{ GraphStage, GraphStageLogic, OutHandler } import java.util.Optional +import java.util.concurrent.CompletableFuture import java.util.concurrent.CompletionStage import scala.concurrent.Future import scala.util.{ Failure, Success, Try } @@ -123,20 +124,21 @@ import scala.util.{ Failure, Success, Try } } def onPull(): Unit = { - val future = f.apply(state).toCompletableFuture - if (future.isDone && !future.isCompletedExceptionally) { - handle(future.getNow(null)) - } else { - future.handle((r, ex) => { - if (ex != null) { - asyncHandler(Failure(ex)) - } else { - asyncHandler(Success(r)) - } - null - }) + f.apply(state) match { + case cf: CompletableFuture[Optional[Pair[S, E]] @unchecked] if cf.isDone && !cf.isCompletedExceptionally => + handle(cf.join()) + case future => + future.handle((r, ex) => { + if (ex != null) { + asyncHandler(Failure(ex)) + } else { + asyncHandler(Success(r)) + } + null + }) } } + setHandler(out, this) } }