Skip to content

Commit

Permalink
=str Switch the type parameter order of UnfoldResourceSourceAsync
Browse files Browse the repository at this point in the history
  • Loading branch information
He-Pin committed Sep 2, 2023
1 parent 8d2141a commit f25ef8b
Show file tree
Hide file tree
Showing 3 changed files with 133 additions and 123 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,139 +13,145 @@

package org.apache.pekko.stream.impl

import scala.concurrent.{ ExecutionContext, Future }
import scala.util.{ Failure, Success, Try }
import scala.util.control.NonFatal

import org.apache.pekko
import pekko.Done
import pekko.annotation.InternalApi
import pekko.dispatch.ExecutionContexts.parasitic
import pekko.stream.ActorAttributes.SupervisionStrategy
import pekko.stream._
import pekko.stream.ActorAttributes.SupervisionStrategy
import pekko.stream.Attributes.SourceLocation
import pekko.stream.impl.Stages.DefaultAttributes
import pekko.stream.stage._

import scala.concurrent.{ ExecutionContext, Future }
import scala.util.control.NonFatal
import scala.util.{ Failure, Success, Try }
import pekko.util.OptionVal

/**
* INTERNAL API
*/
@InternalApi private[pekko] final class UnfoldResourceSourceAsync[T, S](
create: () => Future[S],
readData: (S) => Future[Option[T]],
close: (S) => Future[Done])
@InternalApi private[pekko] final class UnfoldResourceSourceAsync[R, T](
create: () => Future[R],
readData: R => Future[Option[T]],
close: R => Future[Done])
extends GraphStage[SourceShape[T]] {
val out = Outlet[T]("UnfoldResourceSourceAsync.out")
override val shape = SourceShape(out)
override def initialAttributes: Attributes = DefaultAttributes.unfoldResourceSourceAsync

def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) with OutHandler {
lazy val decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider
private implicit def ec: ExecutionContext = materializer.executionContext
private var state: Option[S] = None

private val createdCallback = getAsyncCallback[Try[S]] {
case Success(resource) =>
state = Some(resource)
if (isAvailable(out)) onPull()
case Failure(t) => failStage(t)
}.invokeWithFeedback _

private val errorHandler: PartialFunction[Throwable, Unit] = {
case NonFatal(ex) =>
decider(ex) match {
case Supervision.Stop =>
failStage(ex)
case Supervision.Restart =>
try {
restartResource()
} catch {
case NonFatal(ex) => failStage(ex)
}
case Supervision.Resume => onPull()
}
}

private val readCallback = getAsyncCallback[Try[Option[T]]](handle).invoke _

private def handle(result: Try[Option[T]]): Unit = result match {
case Success(data) =>
data match {
case Some(d) => push(out, d)
case None =>
// end of resource reached, lets close it
state match {
case Some(resource) =>
close(resource).onComplete(getAsyncCallback[Try[Done]] {
case Success(Done) => completeStage()
case Failure(ex) => failStage(ex)
}.invoke)(parasitic)
state = None

case None =>
// cannot happen, but for good measure
throw new IllegalStateException("Reached end of data but there is no open resource")
}
}
case Failure(t) => errorHandler(t)
}
override def initialAttributes: Attributes =
DefaultAttributes.unfoldResourceSourceAsync and SourceLocation.forLambda(create)

def createLogic(inheritedAttributes: Attributes) =
new GraphStageLogic(shape) with OutHandler {
lazy val decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider
private implicit def ec: ExecutionContext = materializer.executionContext
private var maybeResource: OptionVal[R] = OptionVal.none

private val createdCallback = getAsyncCallback[Try[R]] {
case Success(resource) =>
require(resource != null, "`create` method must not return a null resource.")
maybeResource = OptionVal(resource)
if (isAvailable(out)) onPull()
case Failure(t) => failStage(t)
}.invokeWithFeedback _

private val errorHandler: PartialFunction[Throwable, Unit] = {
case NonFatal(ex) =>
decider(ex) match {
case Supervision.Stop =>
failStage(ex)
case Supervision.Restart =>
try {
restartResource()
} catch {
case NonFatal(ex) => failStage(ex)
}
case Supervision.Resume => onPull()
}
}

override def preStart(): Unit = createResource()

override def onPull(): Unit =
state match {
case Some(resource) =>
try {
val future = readData(resource)
future.value match {
case Some(value) => handle(value)
case None => future.onComplete(readCallback)(parasitic)
}
} catch errorHandler
case None =>
// we got a pull but there is no open resource, we are either
// currently creating/restarting then the read will be triggered when creating the
// resource completes, or shutting down and then the pull does not matter anyway
private val readCallback = getAsyncCallback[Try[Option[T]]](handle).invoke _

private def handle(result: Try[Option[T]]): Unit = result match {
case Success(data) =>
data match {
case Some(d) => push(out, d)
case None =>
// end of resource reached, lets close it
maybeResource match {
case OptionVal.Some(resource) =>
close(resource).onComplete(getAsyncCallback[Try[Done]] {
case Success(Done) => completeStage()
case Failure(ex) => failStage(ex)
}.invoke)(parasitic)
maybeResource = OptionVal.none

case _ =>
// cannot happen, but for good measure
throw new IllegalStateException("Reached end of data but there is no open resource")
}
}
case Failure(t) => errorHandler(t)
}

override def postStop(): Unit = {
state.foreach(r => close(r))
}
override def preStart(): Unit = createResource()

override def onPull(): Unit =
maybeResource match {
case OptionVal.Some(resource) =>
try {
val future = readData(resource)
future.value match {
case Some(value) => handle(value)
case None => future.onComplete(readCallback)(parasitic)
}
} catch errorHandler
case _ =>
// we got a pull but there is no open resource, we are either
// currently creating/restarting then the read will be triggered when creating the
// resource completes, or shutting down and then the pull does not matter anyway
}

private def restartResource(): Unit = {
state match {
case Some(resource) =>
// wait for the resource to close before restarting
close(resource).onComplete(getAsyncCallback[Try[Done]] {
case Success(Done) =>
createResource()
case Failure(ex) => failStage(ex)
}.invoke)(parasitic)
state = None
case None =>
createResource()
override def postStop(): Unit = maybeResource match {
case OptionVal.Some(resource) => close(resource)
case _ => // do nothing
}
}

private def createResource(): Unit = {
create().onComplete { resource =>
createdCallback(resource).failed.foreach {
case _: StreamDetachedException =>
// stream stopped before created callback could be invoked, we need
// to close the resource if it is was opened, to not leak it
resource match {
case Success(r) =>
close(r)
case Failure(ex) =>
throw ex // failed to open but stream is stopped already
}
case _ => // we don't care here
private def restartResource(): Unit = {
maybeResource match {
case OptionVal.Some(resource) =>
// wait for the resource to close before restarting
close(resource).onComplete(getAsyncCallback[Try[Done]] {
case Success(Done) =>
createResource()
case Failure(ex) => failStage(ex)
}.invoke)(parasitic)
maybeResource = OptionVal.none
case _ =>
createResource()
}
}(parasitic)
}
}

setHandler(out, this)
private def createResource(): Unit = {
create().onComplete { resource =>
createdCallback(resource).failed.foreach {
case _: StreamDetachedException =>
// stream stopped before created callback could be invoked, we need
// to close the resource if it is was opened, to not leak it
resource match {
case Success(r) =>
close(r)
case Failure(ex) =>
throw ex // failed to open but stream is stopped already
}
case _ => // we don't care here
}
}(parasitic)
}

setHandler(out, this)

}
}
override def toString = "UnfoldResourceSourceAsync"

}
20 changes: 11 additions & 9 deletions stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@ import scala.concurrent.{ Future, Promise }
import scala.concurrent.duration.FiniteDuration
import scala.reflect.ClassTag

import org.reactivestreams.{ Publisher, Subscriber }

import org.apache.pekko
import pekko.{ Done, NotUsed }
import pekko.actor.{ ActorRef, Cancellable, ClassicActorSystemProvider }
Expand All @@ -43,6 +41,8 @@ import pekko.util.JavaDurationConverters._
import pekko.util.OptionConverters._
import pekko.util.ccompat.JavaConverters._

import org.reactivestreams.{ Publisher, Subscriber }

/** Java API */
object Source {
private[this] val _empty = new Source[Any, NotUsed](scaladsl.Source.empty)
Expand Down Expand Up @@ -851,16 +851,18 @@ object Source {
* @param read - function that reads data from opened resource. It is called each time backpressure signal
* is received. Stream calls close and completes when `CompletionStage` from read function returns None.
* @param close - function that closes resource
* @tparam T the element type
* @tparam R the resource type
*/
def unfoldResourceAsync[T, S](
create: function.Creator[CompletionStage[S]],
read: function.Function[S, CompletionStage[Optional[T]]],
close: function.Function[S, CompletionStage[Done]]): javadsl.Source[T, NotUsed] =
def unfoldResourceAsync[T, R](
create: function.Creator[CompletionStage[R]],
read: function.Function[R, CompletionStage[Optional[T]]],
close: function.Function[R, CompletionStage[Done]]): javadsl.Source[T, NotUsed] =
new Source(
scaladsl.Source.unfoldResourceAsync[T, S](
scaladsl.Source.unfoldResourceAsync[T, R](
() => create.create().asScala,
(s: S) => read.apply(s).asScala.map(_.toScala)(pekko.dispatch.ExecutionContexts.parasitic),
(s: S) => close.apply(s).asScala))
(resource: R) => read.apply(resource).asScala.map(_.toScala)(pekko.dispatch.ExecutionContexts.parasitic),
(resource: R) => close.apply(resource).asScala))

/**
* Upcast a stream of elements to a stream of supertypes of that element. Useful in combination with
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -961,11 +961,13 @@ object Source {
* @param read - function that reads data from opened resource. It is called each time backpressure signal
* is received. Stream calls close and completes when `Future` from read function returns None.
* @param close - function that closes resource
* @tparam T the element type
* @tparam R the resource type
*/
def unfoldResourceAsync[T, S](
create: () => Future[S],
read: (S) => Future[Option[T]],
close: (S) => Future[Done]): Source[T, NotUsed] =
def unfoldResourceAsync[T, R](
create: () => Future[R],
read: (R) => Future[Option[T]],
close: (R) => Future[Done]): Source[T, NotUsed] =
Source.fromGraph(new UnfoldResourceSourceAsync(create, read, close))

/**
Expand Down

0 comments on commit f25ef8b

Please sign in to comment.