Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

=str Switch the type parameter order of UnfoldResourceSourceAsync #616

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,139 +13,145 @@

package org.apache.pekko.stream.impl

import scala.concurrent.{ ExecutionContext, Future }
import scala.util.{ Failure, Success, Try }
import scala.util.control.NonFatal

import org.apache.pekko
import pekko.Done
import pekko.annotation.InternalApi
import pekko.dispatch.ExecutionContexts.parasitic
import pekko.stream.ActorAttributes.SupervisionStrategy
import pekko.stream._
import pekko.stream.ActorAttributes.SupervisionStrategy
import pekko.stream.Attributes.SourceLocation
import pekko.stream.impl.Stages.DefaultAttributes
import pekko.stream.stage._

import scala.concurrent.{ ExecutionContext, Future }
import scala.util.control.NonFatal
import scala.util.{ Failure, Success, Try }
import pekko.util.OptionVal

/**
* INTERNAL API
*/
@InternalApi private[pekko] final class UnfoldResourceSourceAsync[T, S](
create: () => Future[S],
readData: (S) => Future[Option[T]],
close: (S) => Future[Done])
@InternalApi private[pekko] final class UnfoldResourceSourceAsync[R, T](
create: () => Future[R],
readData: R => Future[Option[T]],
close: R => Future[Done])
extends GraphStage[SourceShape[T]] {
val out = Outlet[T]("UnfoldResourceSourceAsync.out")
override val shape = SourceShape(out)
override def initialAttributes: Attributes = DefaultAttributes.unfoldResourceSourceAsync

def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) with OutHandler {
lazy val decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider
private implicit def ec: ExecutionContext = materializer.executionContext
private var state: Option[S] = None

private val createdCallback = getAsyncCallback[Try[S]] {
case Success(resource) =>
state = Some(resource)
if (isAvailable(out)) onPull()
case Failure(t) => failStage(t)
}.invokeWithFeedback _

private val errorHandler: PartialFunction[Throwable, Unit] = {
case NonFatal(ex) =>
decider(ex) match {
case Supervision.Stop =>
failStage(ex)
case Supervision.Restart =>
try {
restartResource()
} catch {
case NonFatal(ex) => failStage(ex)
}
case Supervision.Resume => onPull()
}
}

private val readCallback = getAsyncCallback[Try[Option[T]]](handle).invoke _

private def handle(result: Try[Option[T]]): Unit = result match {
case Success(data) =>
data match {
case Some(d) => push(out, d)
case None =>
// end of resource reached, lets close it
state match {
case Some(resource) =>
close(resource).onComplete(getAsyncCallback[Try[Done]] {
case Success(Done) => completeStage()
case Failure(ex) => failStage(ex)
}.invoke)(parasitic)
state = None

case None =>
// cannot happen, but for good measure
throw new IllegalStateException("Reached end of data but there is no open resource")
}
}
case Failure(t) => errorHandler(t)
}
override def initialAttributes: Attributes =
DefaultAttributes.unfoldResourceSourceAsync and SourceLocation.forLambda(create)

def createLogic(inheritedAttributes: Attributes) =
new GraphStageLogic(shape) with OutHandler {
lazy val decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider
private implicit def ec: ExecutionContext = materializer.executionContext
private var maybeResource: OptionVal[R] = OptionVal.none

private val createdCallback = getAsyncCallback[Try[R]] {
case Success(resource) =>
require(resource != null, "`create` method must not return a null resource.")
maybeResource = OptionVal(resource)
if (isAvailable(out)) onPull()
case Failure(t) => failStage(t)
}.invokeWithFeedback _

private val errorHandler: PartialFunction[Throwable, Unit] = {
case NonFatal(ex) =>
decider(ex) match {
case Supervision.Stop =>
failStage(ex)
case Supervision.Restart =>
try {
restartResource()
} catch {
case NonFatal(ex) => failStage(ex)
}
case Supervision.Resume => onPull()
}
}

override def preStart(): Unit = createResource()

override def onPull(): Unit =
state match {
case Some(resource) =>
try {
val future = readData(resource)
future.value match {
case Some(value) => handle(value)
case None => future.onComplete(readCallback)(parasitic)
}
} catch errorHandler
case None =>
// we got a pull but there is no open resource, we are either
// currently creating/restarting then the read will be triggered when creating the
// resource completes, or shutting down and then the pull does not matter anyway
private val readCallback = getAsyncCallback[Try[Option[T]]](handle).invoke _

private def handle(result: Try[Option[T]]): Unit = result match {
case Success(data) =>
data match {
case Some(d) => push(out, d)
case None =>
// end of resource reached, lets close it
maybeResource match {
case OptionVal.Some(resource) =>
close(resource).onComplete(getAsyncCallback[Try[Done]] {
case Success(Done) => completeStage()
case Failure(ex) => failStage(ex)
}.invoke)(parasitic)
maybeResource = OptionVal.none

case _ =>
// cannot happen, but for good measure
throw new IllegalStateException("Reached end of data but there is no open resource")
}
}
case Failure(t) => errorHandler(t)
}

override def postStop(): Unit = {
state.foreach(r => close(r))
}
override def preStart(): Unit = createResource()

override def onPull(): Unit =
maybeResource match {
case OptionVal.Some(resource) =>
try {
val future = readData(resource)
future.value match {
case Some(value) => handle(value)
case None => future.onComplete(readCallback)(parasitic)
}
} catch errorHandler
case _ =>
// we got a pull but there is no open resource, we are either
// currently creating/restarting then the read will be triggered when creating the
// resource completes, or shutting down and then the pull does not matter anyway
}

private def restartResource(): Unit = {
state match {
case Some(resource) =>
// wait for the resource to close before restarting
close(resource).onComplete(getAsyncCallback[Try[Done]] {
case Success(Done) =>
createResource()
case Failure(ex) => failStage(ex)
}.invoke)(parasitic)
state = None
case None =>
createResource()
override def postStop(): Unit = maybeResource match {
case OptionVal.Some(resource) => close(resource)
case _ => // do nothing
}
}

private def createResource(): Unit = {
create().onComplete { resource =>
createdCallback(resource).failed.foreach {
case _: StreamDetachedException =>
// stream stopped before created callback could be invoked, we need
// to close the resource if it is was opened, to not leak it
resource match {
case Success(r) =>
close(r)
case Failure(ex) =>
throw ex // failed to open but stream is stopped already
}
case _ => // we don't care here
private def restartResource(): Unit = {
maybeResource match {
case OptionVal.Some(resource) =>
// wait for the resource to close before restarting
close(resource).onComplete(getAsyncCallback[Try[Done]] {
case Success(Done) =>
createResource()
case Failure(ex) => failStage(ex)
}.invoke)(parasitic)
maybeResource = OptionVal.none
case _ =>
createResource()
}
}(parasitic)
}
}

setHandler(out, this)
private def createResource(): Unit = {
create().onComplete { resource =>
createdCallback(resource).failed.foreach {
case _: StreamDetachedException =>
// stream stopped before created callback could be invoked, we need
// to close the resource if it is was opened, to not leak it
resource match {
case Success(r) =>
close(r)
case Failure(ex) =>
throw ex // failed to open but stream is stopped already
}
case _ => // we don't care here
}
}(parasitic)
}

setHandler(out, this)

}
}
override def toString = "UnfoldResourceSourceAsync"

}
20 changes: 11 additions & 9 deletions stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@ import scala.concurrent.{ Future, Promise }
import scala.concurrent.duration.FiniteDuration
import scala.reflect.ClassTag

import org.reactivestreams.{ Publisher, Subscriber }

import org.apache.pekko
import pekko.{ Done, NotUsed }
import pekko.actor.{ ActorRef, Cancellable, ClassicActorSystemProvider }
Expand All @@ -43,6 +41,8 @@ import pekko.util.JavaDurationConverters._
import pekko.util.OptionConverters._
import pekko.util.ccompat.JavaConverters._

import org.reactivestreams.{ Publisher, Subscriber }

/** Java API */
object Source {
private[this] val _empty = new Source[Any, NotUsed](scaladsl.Source.empty)
Expand Down Expand Up @@ -851,16 +851,18 @@ object Source {
* @param read - function that reads data from opened resource. It is called each time backpressure signal
* is received. Stream calls close and completes when `CompletionStage` from read function returns None.
* @param close - function that closes resource
* @tparam T the element type
* @tparam R the resource type
*/
def unfoldResourceAsync[T, S](
create: function.Creator[CompletionStage[S]],
read: function.Function[S, CompletionStage[Optional[T]]],
close: function.Function[S, CompletionStage[Done]]): javadsl.Source[T, NotUsed] =
def unfoldResourceAsync[T, R](
create: function.Creator[CompletionStage[R]],
read: function.Function[R, CompletionStage[Optional[T]]],
close: function.Function[R, CompletionStage[Done]]): javadsl.Source[T, NotUsed] =
new Source(
scaladsl.Source.unfoldResourceAsync[T, S](
scaladsl.Source.unfoldResourceAsync[T, R](
() => create.create().asScala,
(s: S) => read.apply(s).asScala.map(_.toScala)(pekko.dispatch.ExecutionContexts.parasitic),
(s: S) => close.apply(s).asScala))
(resource: R) => read.apply(resource).asScala.map(_.toScala)(pekko.dispatch.ExecutionContexts.parasitic),
(resource: R) => close.apply(resource).asScala))

/**
* Upcast a stream of elements to a stream of supertypes of that element. Useful in combination with
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -961,11 +961,13 @@ object Source {
* @param read - function that reads data from opened resource. It is called each time backpressure signal
* is received. Stream calls close and completes when `Future` from read function returns None.
* @param close - function that closes resource
* @tparam T the element type
* @tparam R the resource type
*/
def unfoldResourceAsync[T, S](
create: () => Future[S],
read: (S) => Future[Option[T]],
close: (S) => Future[Done]): Source[T, NotUsed] =
def unfoldResourceAsync[T, R](
create: () => Future[R],
read: (R) => Future[Option[T]],
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
read: (R) => Future[Option[T]],
read: R => Future[Option[T]],

close: (R) => Future[Done]): Source[T, NotUsed] =
Source.fromGraph(new UnfoldResourceSourceAsync(create, read, close))

/**
Expand Down
Loading