Skip to content

Commit

Permalink
=str Fix CompletionStage#toCompletableFuture may throw UnsupportedO…
Browse files Browse the repository at this point in the history
…perationException.
  • Loading branch information
He-Pin committed Aug 29, 2023
1 parent 99cc4e7 commit e8a1c3f
Showing 1 changed file with 14 additions and 12 deletions.
26 changes: 14 additions & 12 deletions stream/src/main/scala/org/apache/pekko/stream/impl/Unfold.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import pekko.stream.impl.Stages.DefaultAttributes
import pekko.stream.stage.{ GraphStage, GraphStageLogic, OutHandler }

import java.util.Optional
import java.util.concurrent.CompletableFuture
import java.util.concurrent.CompletionStage
import scala.concurrent.Future
import scala.util.{ Failure, Success, Try }
Expand Down Expand Up @@ -123,20 +124,21 @@ import scala.util.{ Failure, Success, Try }
}

def onPull(): Unit = {
val future = f.apply(state).toCompletableFuture
if (future.isDone && !future.isCompletedExceptionally) {
handle(future.getNow(null))
} else {
future.handle((r, ex) => {
if (ex != null) {
asyncHandler(Failure(ex))
} else {
asyncHandler(Success(r))
}
null
})
f.apply(state) match {
case cf: CompletableFuture[Optional[Pair[S, E]] @unchecked] if cf.isDone && !cf.isCompletedExceptionally =>
handle(cf.join())
case future =>
future.handle((r, ex) => {
if (ex != null) {
asyncHandler(Failure(ex))
} else {
asyncHandler(Success(r))
}
null
})
}
}

setHandler(out, this)
}
}

0 comments on commit e8a1c3f

Please sign in to comment.