diff --git a/actor-typed-tests/src/test/java/org/apache/pekko/actor/typed/javadsl/ActorCompile.java b/actor-typed-tests/src/test/java/org/apache/pekko/actor/typed/javadsl/ActorCompile.java index 847ed5aba58..81126cf52cb 100644 --- a/actor-typed-tests/src/test/java/org/apache/pekko/actor/typed/javadsl/ActorCompile.java +++ b/actor-typed-tests/src/test/java/org/apache/pekko/actor/typed/javadsl/ActorCompile.java @@ -151,6 +151,16 @@ public Behavior aroundSignal( }); } + { + Behavior b = + Behaviors.withTimersSetup( + (timers, ctx) -> { + timers.startSingleTimer("key", new MyMsgB("tick"), Duration.ofSeconds(1)); + ctx.scheduleOnce(Duration.ofSeconds(1), ctx.getSelf(), new MyMsgB("tick")); + return Behaviors.ignore(); + }); + } + static class MyBehavior extends ExtensibleBehavior { @Override @@ -233,4 +243,20 @@ public Behavior receive(TypedActorContext context, MyMsg message) return Behaviors.empty(); }); } + // stash buffer with setup + { + Behavior behavior = + Behaviors.withStashSetup( + 5, + (stash, ctx) -> { + stash.forEach( + msg -> { + // checked is ok + throw new Exception("checked"); + }); + + ctx.scheduleOnce(Duration.ofSeconds(1), ctx.getSelf(), "tick"); + return Behaviors.empty(); + }); + } } diff --git a/actor-typed/src/main/scala/org/apache/pekko/actor/typed/javadsl/Behaviors.scala b/actor-typed/src/main/scala/org/apache/pekko/actor/typed/javadsl/Behaviors.scala index c9d4facd350..f520d436e0e 100644 --- a/actor-typed/src/main/scala/org/apache/pekko/actor/typed/javadsl/Behaviors.scala +++ b/actor-typed/src/main/scala/org/apache/pekko/actor/typed/javadsl/Behaviors.scala @@ -58,6 +58,17 @@ object Behaviors { factory(StashBufferImpl[T](ctx.asScala, capacity)) }) + /** + * Support for stashing messages to unstash at a later time. + * + * @since 1.1.0 + */ + def withStashSetup[T](capacity: Int, + factory: java.util.function.BiFunction[StashBuffer[T], ActorContext[T], Behavior[T]]): Behavior[T] = + setup(ctx => { + factory(StashBufferImpl[T](ctx.asScala, capacity), ctx.asJava) + }) + /** * Return this behavior from message processing in order to advise the * system to reuse the previous behavior. This is provided in order to @@ -345,6 +356,18 @@ object Behaviors { def withTimers[T](factory: pekko.japi.function.Function[TimerScheduler[T], Behavior[T]]): Behavior[T] = TimerSchedulerImpl.withTimers(timers => factory.apply(timers)) + /** + * Support for scheduled `self` messages in an actor. + * It takes care of the lifecycle of the timers such as cancelling them when the actor + * is restarted or stopped. + * + * @see [[TimerScheduler]] + * @since 1.1.0 + */ + def withTimersSetup[T]( + factory: pekko.japi.function.Function2[TimerScheduler[T], ActorContext[T], Behavior[T]]): Behavior[T] = + setup(ctx => TimerSchedulerImpl.wrapWithTimers(timer => factory.apply(timer, ctx.asJava))(ctx.asScala)) + /** * Per message MDC (Mapped Diagnostic Context) logging. * diff --git a/actor-typed/src/main/scala/org/apache/pekko/actor/typed/scaladsl/Behaviors.scala b/actor-typed/src/main/scala/org/apache/pekko/actor/typed/scaladsl/Behaviors.scala index 93c71616c78..2439079d09c 100644 --- a/actor-typed/src/main/scala/org/apache/pekko/actor/typed/scaladsl/Behaviors.scala +++ b/actor-typed/src/main/scala/org/apache/pekko/actor/typed/scaladsl/Behaviors.scala @@ -48,6 +48,17 @@ object Behaviors { factory(stash) }) + /** + * Support for stashing messages to unstash at a later time. + * + * @since 1.1.0 + */ + def withStashSetup[T](capacity: Int)(factory: (StashBuffer[T], ActorContext[T]) => Behavior[T]): Behavior[T] = + setup(ctx => { + val stash = StashBuffer[T](ctx, capacity) + factory(stash, ctx) + }) + /** * Return this behavior from message processing in order to advise the * system to reuse the previous behavior. This is provided in order to @@ -269,6 +280,19 @@ object Behaviors { def withTimers[T](factory: TimerScheduler[T] => Behavior[T]): Behavior[T] = TimerSchedulerImpl.withTimers(factory) + /** + * Support for scheduled `self` messages in an actor. + * It takes care of the lifecycle of the timers such as cancelling them when the actor + * is restarted or stopped. + * + * @see [[TimerScheduler]] + * @since 1.1.0 + */ + def withTimersSetup[T](factory: (TimerScheduler[T], ActorContext[T]) => Behavior[T]): Behavior[T] = + setup(ctx => { + TimerSchedulerImpl.wrapWithTimers(factory(_, ctx))(ctx) + }) + /** * Per message MDC (Mapped Diagnostic Context) logging. *