Skip to content

Commit

Permalink
fix: confusion deadletter while ask timeout #668 (#664)
Browse files Browse the repository at this point in the history
* fix: keep typed ask deadletter same as classic

* fix: compile on scala3

* fix: a more convincing test

* chore: remove unnecessary header

* chore: use raw apache license

* chore: grammar and sort imports

* chore: grammar fix

* chore: copyright

* fix: immutable object domain

* fix: make it simple
  • Loading branch information
Roiocam authored Sep 30, 2023
1 parent 6f52c85 commit 225701d
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,16 @@ import pekko.pattern.StatusReply
import pekko.testkit.TestException
import pekko.util.Timeout

import scala.util.Failure

object AskSpec {
sealed trait Msg
final case class Foo(s: String, replyTo: ActorRef[String]) extends Msg
final case class Bar(s: String, duration: FiniteDuration, replyTo: ActorRef[String]) extends Msg
final case class Stop(replyTo: ActorRef[Unit]) extends Msg
sealed trait Proxy
final case class ProxyMsg(s: String) extends Proxy
final case class ProxyReply(s: String) extends Proxy
}

class AskSpec extends ScalaTestWithActorTestKit("""
Expand All @@ -52,6 +58,9 @@ class AskSpec extends ScalaTestWithActorTestKit("""
case (_, foo: Foo) =>
foo.replyTo ! "foo"
Behaviors.same
case (ctx, bar: Bar) =>
ctx.scheduleOnce(bar.duration, bar.replyTo, "bar")
Behaviors.same
case (_, Stop(r)) =>
r ! (())
Behaviors.stopped
Expand Down Expand Up @@ -119,6 +128,65 @@ class AskSpec extends ScalaTestWithActorTestKit("""
}
}

"publish dead-letter if the context.ask has completed on timeout" in {
import pekko.actor.typed.internal.adapter.ActorRefAdapter._
implicit val timeout: Timeout = 1.millis

val actor: ActorRef[Msg] = spawn(behavior)
val mockActor: ActorRef[Proxy] = spawn(Behaviors.receive[Proxy]((context, msg) =>
msg match {
case ProxyMsg(s) =>
context.ask[Msg, String](actor, Bar(s, 10.millis, _)) {
case Success(result) => ProxyReply(result)
case Failure(ex) => throw ex
}
Behaviors.same
case ProxyReply(s) =>
throw new IllegalArgumentException(s"unexpected reply: $s")
}))

mockActor ! ProxyMsg("foo")

val deadLetterProbe = createDeadLetterProbe()

val deadLetter = deadLetterProbe.receiveMessage()
deadLetter.message match {
case s: String => s should ===("bar")
case _ => fail(s"unexpected DeadLetter: $deadLetter")
}

val deadLettersRef = system.classicSystem.deadLetters
deadLetter.recipient shouldNot equal(deadLettersRef)
deadLetter.recipient shouldNot equal(toClassic(actor))
deadLetter.recipient shouldNot equal(toClassic(mockActor))
}
"publish dead-letter if the AskPattern.ask has completed on timeout" in {
implicit val timeout: Timeout = 1.millis

val deadLetterProbe = createDeadLetterProbe()
val mockProbe = createTestProbe[Msg]()
val mockBusyRef = mockProbe.ref
// this will not completed unit worker reply.
val askResult: Future[String] = mockBusyRef.ask(replyTo => Foo("foo", replyTo))
val request = mockProbe.expectMessageType[Foo](1.seconds)
// waiting for temporary ask actor terminated with timeout
mockProbe.expectTerminated(request.replyTo)
// verify ask timeout
val result = askResult.failed.futureValue
result shouldBe a[TimeoutException]
result.getMessage should startWith("Ask timed out on")
// mock reply manually
request match {
case Foo(s, replyTo) => replyTo ! s
}

val deadLetter = deadLetterProbe.receiveMessage()
deadLetter.message shouldBe a[String]
val deadLettersRef = system.classicSystem.deadLetters
// that should be not equals, otherwise, it may raise confusion, perform like a dead letter sent to the deadLetterActor.
deadLetter.recipient shouldNot equal(deadLettersRef)
}

"transform a replied org.apache.pekko.actor.Status.Failure to a failed future" in {
// It's unlikely but possible that this happens, since the receiving actor would
// have to accept a message with an actoref that accepts AnyRef or be doing crazy casting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -617,7 +617,7 @@ private[pekko] final class PromiseActorRef(

override def !(message: Any)(implicit sender: ActorRef = Actor.noSender): Unit = state match {
case Stopped | _: StoppedWithPath =>
provider.deadLetters ! message
provider.deadLetters ! DeadLetter(message, if (sender eq Actor.noSender) provider.deadLetters else sender, this)
onComplete(message, alreadyCompleted = true)
case _ =>
if (message == null) throw InvalidMessageException("Message is null")
Expand Down

0 comments on commit 225701d

Please sign in to comment.