From 225701d4d6885c61f3694c23fa959b10fcf60de5 Mon Sep 17 00:00:00 2001 From: AndyChen Date: Sun, 1 Oct 2023 01:11:48 +0800 Subject: [PATCH] fix: confusion deadletter while ask timeout #668 (#664) * fix: keep typed ask deadletter same as classic * fix: compile on scala3 * fix: a more convincing test * chore: remove unnecessary header * chore: use raw apache license * chore: grammar and sort imports * chore: grammar fix * chore: copyright * fix: immutable object domain * fix: make it simple --- .../apache/pekko/actor/typed/AskSpec.scala | 68 +++++++++++++++++++ .../org/apache/pekko/pattern/AskSupport.scala | 2 +- 2 files changed, 69 insertions(+), 1 deletion(-) diff --git a/actor-typed-tests/src/test/scala/org/apache/pekko/actor/typed/AskSpec.scala b/actor-typed-tests/src/test/scala/org/apache/pekko/actor/typed/AskSpec.scala index b2355e302e7..5dc0dd7697e 100644 --- a/actor-typed-tests/src/test/scala/org/apache/pekko/actor/typed/AskSpec.scala +++ b/actor-typed-tests/src/test/scala/org/apache/pekko/actor/typed/AskSpec.scala @@ -32,10 +32,16 @@ import pekko.pattern.StatusReply import pekko.testkit.TestException import pekko.util.Timeout +import scala.util.Failure + object AskSpec { sealed trait Msg final case class Foo(s: String, replyTo: ActorRef[String]) extends Msg + final case class Bar(s: String, duration: FiniteDuration, replyTo: ActorRef[String]) extends Msg final case class Stop(replyTo: ActorRef[Unit]) extends Msg + sealed trait Proxy + final case class ProxyMsg(s: String) extends Proxy + final case class ProxyReply(s: String) extends Proxy } class AskSpec extends ScalaTestWithActorTestKit(""" @@ -52,6 +58,9 @@ class AskSpec extends ScalaTestWithActorTestKit(""" case (_, foo: Foo) => foo.replyTo ! "foo" Behaviors.same + case (ctx, bar: Bar) => + ctx.scheduleOnce(bar.duration, bar.replyTo, "bar") + Behaviors.same case (_, Stop(r)) => r ! (()) Behaviors.stopped @@ -119,6 +128,65 @@ class AskSpec extends ScalaTestWithActorTestKit(""" } } + "publish dead-letter if the context.ask has completed on timeout" in { + import pekko.actor.typed.internal.adapter.ActorRefAdapter._ + implicit val timeout: Timeout = 1.millis + + val actor: ActorRef[Msg] = spawn(behavior) + val mockActor: ActorRef[Proxy] = spawn(Behaviors.receive[Proxy]((context, msg) => + msg match { + case ProxyMsg(s) => + context.ask[Msg, String](actor, Bar(s, 10.millis, _)) { + case Success(result) => ProxyReply(result) + case Failure(ex) => throw ex + } + Behaviors.same + case ProxyReply(s) => + throw new IllegalArgumentException(s"unexpected reply: $s") + })) + + mockActor ! ProxyMsg("foo") + + val deadLetterProbe = createDeadLetterProbe() + + val deadLetter = deadLetterProbe.receiveMessage() + deadLetter.message match { + case s: String => s should ===("bar") + case _ => fail(s"unexpected DeadLetter: $deadLetter") + } + + val deadLettersRef = system.classicSystem.deadLetters + deadLetter.recipient shouldNot equal(deadLettersRef) + deadLetter.recipient shouldNot equal(toClassic(actor)) + deadLetter.recipient shouldNot equal(toClassic(mockActor)) + } + "publish dead-letter if the AskPattern.ask has completed on timeout" in { + implicit val timeout: Timeout = 1.millis + + val deadLetterProbe = createDeadLetterProbe() + val mockProbe = createTestProbe[Msg]() + val mockBusyRef = mockProbe.ref + // this will not completed unit worker reply. + val askResult: Future[String] = mockBusyRef.ask(replyTo => Foo("foo", replyTo)) + val request = mockProbe.expectMessageType[Foo](1.seconds) + // waiting for temporary ask actor terminated with timeout + mockProbe.expectTerminated(request.replyTo) + // verify ask timeout + val result = askResult.failed.futureValue + result shouldBe a[TimeoutException] + result.getMessage should startWith("Ask timed out on") + // mock reply manually + request match { + case Foo(s, replyTo) => replyTo ! s + } + + val deadLetter = deadLetterProbe.receiveMessage() + deadLetter.message shouldBe a[String] + val deadLettersRef = system.classicSystem.deadLetters + // that should be not equals, otherwise, it may raise confusion, perform like a dead letter sent to the deadLetterActor. + deadLetter.recipient shouldNot equal(deadLettersRef) + } + "transform a replied org.apache.pekko.actor.Status.Failure to a failed future" in { // It's unlikely but possible that this happens, since the receiving actor would // have to accept a message with an actoref that accepts AnyRef or be doing crazy casting diff --git a/actor/src/main/scala/org/apache/pekko/pattern/AskSupport.scala b/actor/src/main/scala/org/apache/pekko/pattern/AskSupport.scala index 8154a4ee034..e2ac3da4fc6 100644 --- a/actor/src/main/scala/org/apache/pekko/pattern/AskSupport.scala +++ b/actor/src/main/scala/org/apache/pekko/pattern/AskSupport.scala @@ -617,7 +617,7 @@ private[pekko] final class PromiseActorRef( override def !(message: Any)(implicit sender: ActorRef = Actor.noSender): Unit = state match { case Stopped | _: StoppedWithPath => - provider.deadLetters ! message + provider.deadLetters ! DeadLetter(message, if (sender eq Actor.noSender) provider.deadLetters else sender, this) onComplete(message, alreadyCompleted = true) case _ => if (message == null) throw InvalidMessageException("Message is null")