Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't use Scala Futures in Java APIs #1419

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@

import org.apache.pekko.testkit.PekkoSpec;

@SuppressWarnings("deprecation")
public class JavaFutureTests extends JUnitSuite {

@ClassRule
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.apache.pekko.testkit.PekkoSpec;
import org.apache.pekko.testkit.TestProbe;
import org.apache.pekko.util.Timeout;
import org.apache.pekko.util.FutureConverters;
import org.junit.ClassRule;
import org.junit.Test;
import org.scalatestplus.junit.JUnitSuite;
Expand Down Expand Up @@ -219,15 +220,15 @@ public void testAskWithReplyToTimeout() throws Exception {
@Test
public void usePipe() throws Exception {
TestProbe probe = new TestProbe(system);
pipe(Futures.successful("ho!"), system.dispatcher()).to(probe.ref());
pipe(CompletableFuture.completedFuture("ho!"), system.dispatcher()).to(probe.ref());
probe.expectMsg("ho!");
}

@Test
public void usePipeWithActorSelection() throws Exception {
TestProbe probe = new TestProbe(system);
ActorSelection selection = system.actorSelection(probe.ref().path());
pipe(Futures.successful("hi!"), system.dispatcher()).to(selection);
pipe(CompletableFuture.completedFuture("hi!"), system.dispatcher()).to(selection);
probe.expectMsg("hi!");
}

Expand Down Expand Up @@ -291,15 +292,11 @@ public void testRetryCompletionStageRandomDelay() throws Exception {
public void testRetry() throws Exception {
final String expected = "hello";

Future<String> retriedFuture =
CompletionStage<String> retriedFuture =
Patterns.retry(
() -> Futures.successful(expected),
3,
scala.concurrent.duration.Duration.apply(200, "millis"),
system.scheduler(),
ec);
() -> CompletableFuture.completedFuture(expected), 3, Duration.ofMillis(200), system);

String actual = Await.result(retriedFuture, FiniteDuration.apply(3, SECONDS));
String actual = retriedFuture.toCompletableFuture().get(3, SECONDS);
assertEquals(expected, actual);
}

Expand All @@ -317,21 +314,21 @@ public void testCSRetry() throws Exception {
}

@Test(expected = IllegalStateException.class)
public void testAfterFailedCallable() throws Exception {
Callable<Future<String>> failedCallable =
() -> Futures.failed(new IllegalStateException("Illegal!"));
public void testAfterFailedCallable() throws Throwable {
Callable<CompletionStage<String>> failedCallable =
() -> Futures.failedCompletionStage(new IllegalStateException("Illegal!"));

Future<String> delayedFuture =
Patterns.after(
scala.concurrent.duration.Duration.create(200, "millis"),
system.scheduler(),
ec,
failedCallable);
CompletionStage<String> delayedFuture =
Patterns.after(Duration.ofMillis(200), system, failedCallable);

Future<String> resultFuture = Futures.firstCompletedOf(Arrays.asList(delayedFuture), ec);
Await.result(resultFuture, scala.concurrent.duration.FiniteDuration.apply(3, SECONDS));
try {
delayedFuture.toCompletableFuture().get(3, SECONDS);
} catch (ExecutionException e) {
throw e.getCause();
}
}

@SuppressWarnings("deprecation")
@Test(expected = IllegalStateException.class)
public void testAfterFailedFuture() throws Exception {

Expand All @@ -340,7 +337,9 @@ public void testAfterFailedFuture() throws Exception {
scala.concurrent.duration.Duration.create(200, "millis"),
system.scheduler(),
ec,
() -> Futures.failed(new IllegalStateException("Illegal!")));
() ->
FutureConverters.asScala(
Futures.failedCompletionStage(new IllegalStateException("Illegal!"))));

Future<String> resultFuture = Futures.firstCompletedOf(Arrays.asList(delayedFuture), ec);
Await.result(resultFuture, FiniteDuration.apply(3, SECONDS));
Expand All @@ -350,19 +349,16 @@ public void testAfterFailedFuture() throws Exception {
public void testAfterSuccessfulCallable() throws Exception {
final String expected = "Hello";

Future<String> delayedFuture =
CompletionStage<String> delayedFuture =
Patterns.after(
scala.concurrent.duration.Duration.create(200, "millis"),
system.scheduler(),
ec,
() -> Futures.successful(expected));
Duration.ofMillis(200), system, () -> CompletableFuture.completedFuture(expected));

Future<String> resultFuture = Futures.firstCompletedOf(Arrays.asList(delayedFuture), ec);
final String actual = Await.result(resultFuture, FiniteDuration.apply(3, SECONDS));
String actual = delayedFuture.toCompletableFuture().get(3, SECONDS);

assertEquals(expected, actual);
}

@SuppressWarnings("deprecation")
@Test
public void testAfterSuccessfulFuture() throws Exception {
final String expected = "Hello";
Expand All @@ -380,6 +376,7 @@ public void testAfterSuccessfulFuture() throws Exception {
assertEquals(expected, actual);
}

@SuppressWarnings("deprecation")
@Test
public void testAfterFiniteDuration() throws Exception {
final String expected = "Hello";
Expand All @@ -391,7 +388,8 @@ public void testAfterFiniteDuration() throws Exception {
ec,
() -> Futures.successful("world"));

Future<String> immediateFuture = Futures.future(() -> expected, ec);
Future<String> immediateFuture =
FutureConverters.asScala(CompletableFuture.completedFuture(expected));

Future<String> resultFuture =
Futures.firstCompletedOf(Arrays.asList(delayedFuture, immediateFuture), ec);
Expand Down
7 changes: 7 additions & 0 deletions actor/src/main/scala/org/apache/pekko/dispatch/Future.scala
Original file line number Diff line number Diff line change
Expand Up @@ -130,27 +130,33 @@ object Futures {
* @param executor the execution context on which the future is run
* @return the `Future` holding the result of the computation
*/
@deprecated("Use CompletionStage / CompletableFuture-based APIs instead", "1.1.0")
def future[T](body: Callable[T], executor: ExecutionContext): Future[T] = Future(body.call)(executor)

/**
* Creates a promise object which can be completed with a value.
*
* @return the newly created `Promise` object
*/
@deprecated("Use CompletableFuture instead", "1.1.0")
def promise[T](): Promise[T] = Promise[T]()

/**
* creates an already completed Promise with the specified exception
*/
@deprecated("Use CompletionStage / CompletableFuture-based APIs instead", "1.1.0")
def failed[T](exception: Throwable): Future[T] = Future.failed(exception)

/**
* Creates an already completed Promise with the specified result
*/
@deprecated("Use CompletionStage / CompletableFuture-based APIs instead", "1.1.0")
def successful[T](result: T): Future[T] = Future.successful(result)

/**
* Creates an already completed CompletionStage with the specified exception
*
* Note: prefer CompletableFuture.failedStage(ex) from Java 9 onwards
*/
def failedCompletionStage[T](ex: Throwable): CompletionStage[T] = {
val f = CompletableFuture.completedFuture[T](null.asInstanceOf[T])
Expand All @@ -172,6 +178,7 @@ object Futures {
/**
* Returns a Future to the result of the first future in the list that is completed
*/
@deprecated("Use CompletableFuture.anyOf instead", "1.1.0")
def firstCompletedOf[T <: AnyRef](futures: JIterable[Future[T]], executor: ExecutionContext): Future[T] =
Future.firstCompletedOf(futures.asScala)(executor)

Expand Down
5 changes: 3 additions & 2 deletions docs/src/test/java/jdocs/future/ActorWithFuture.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,12 @@

// #context-dispatcher
import org.apache.pekko.actor.AbstractActor;
import org.apache.pekko.dispatch.Futures;

import java.util.concurrent.CompletableFuture;

public class ActorWithFuture extends AbstractActor {
ActorWithFuture() {
Futures.future(() -> "hello", getContext().dispatcher());
CompletableFuture.supplyAsync(() -> "hello", getContext().dispatcher());
}

@Override
Expand Down
42 changes: 20 additions & 22 deletions docs/src/test/java/jdocs/future/FutureDocTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,27 +14,18 @@
package jdocs.future;

import org.apache.pekko.actor.typed.ActorSystem;
import org.apache.pekko.dispatch.Futures;
import org.apache.pekko.pattern.Patterns;
import org.apache.pekko.testkit.PekkoJUnitActorSystemResource;
import org.apache.pekko.testkit.PekkoSpec;
import org.apache.pekko.util.Timeout;
import org.apache.pekko.util.FutureConverters;
import jdocs.AbstractJavaTest;
import org.junit.ClassRule;
import org.junit.Test;
import scala.concurrent.Await;
import scala.concurrent.ExecutionContext;
import scala.concurrent.Future;

import java.time.Duration;
import java.util.Arrays;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.*;

import static org.apache.pekko.actor.typed.javadsl.Adapter.toTyped;
import static org.apache.pekko.dispatch.Futures.future;
// #imports

// #imports
Expand All @@ -48,9 +39,9 @@ public class FutureDocTest extends AbstractJavaTest {

private final ActorSystem<Void> system = toTyped(actorSystemResource.getSystem());

@Test(expected = java.util.concurrent.CompletionException.class)
public void useAfter() throws Exception {
final ExecutionContext ec = system.executionContext();
@Test(expected = IllegalStateException.class)
public void useAfter() throws Throwable {
final Executor ex = system.executionContext();
// #after
CompletionStage<String> failWithException =
CompletableFuture.supplyAsync(
Expand All @@ -60,18 +51,25 @@ public void useAfter() throws Exception {
CompletionStage<String> delayed =
Patterns.after(Duration.ofMillis(200), system, () -> failWithException);
// #after
Future<String> future =
future(
CompletionStage<String> completionStage =
CompletableFuture.supplyAsync(
() -> {
Thread.sleep(1000);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return "foo";
},
ec);
Future<String> result =
Futures.firstCompletedOf(
Arrays.<Future<String>>asList(future, FutureConverters.asScala(delayed)), ec);
Timeout timeout = Timeout.create(Duration.ofSeconds(2));
Await.result(result, timeout.duration());
ex);
CompletableFuture<Object> result =
CompletableFuture.anyOf(
completionStage.toCompletableFuture(), delayed.toCompletableFuture());
try {
result.toCompletableFuture().get(2, SECONDS);
} catch (ExecutionException e) {
throw e.getCause();
}
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@
import org.junit.runner.RunWith;
import org.scalatestplus.junit.JUnitRunner;
import scala.concurrent.Future;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.Consumer;
import org.iq80.leveldb.util.FileUtils;
import java.util.Optional;
Expand Down Expand Up @@ -81,45 +84,46 @@ public Receive createReceive() {

class MySnapshotStore extends SnapshotStore {
@Override
public Future<Optional<SelectedSnapshot>> doLoadAsync(
public CompletionStage<Optional<SelectedSnapshot>> doLoadAsync(
String persistenceId, SnapshotSelectionCriteria criteria) {
return null;
}

@Override
public Future<Void> doSaveAsync(SnapshotMetadata metadata, Object snapshot) {
public CompletionStage<Void> doSaveAsync(SnapshotMetadata metadata, Object snapshot) {
return null;
}

@Override
public Future<Void> doDeleteAsync(SnapshotMetadata metadata) {
return Futures.successful(null);
public CompletionStage<Void> doDeleteAsync(SnapshotMetadata metadata) {
return CompletableFuture.completedFuture(null);
}

@Override
public Future<Void> doDeleteAsync(String persistenceId, SnapshotSelectionCriteria criteria) {
return Futures.successful(null);
public CompletionStage<Void> doDeleteAsync(
String persistenceId, SnapshotSelectionCriteria criteria) {
return CompletableFuture.completedFuture(null);
}
}

class MyAsyncJournal extends AsyncWriteJournal {
// #sync-journal-plugin-api
@Override
public Future<Iterable<Optional<Exception>>> doAsyncWriteMessages(
public CompletionStage<Iterable<Optional<Exception>>> doAsyncWriteMessages(
Iterable<AtomicWrite> messages) {
try {
Iterable<Optional<Exception>> result = new ArrayList<Optional<Exception>>();
// blocking call here...
// result.add(..)
return Futures.successful(result);
return CompletableFuture.completedFuture(result);
} catch (Exception e) {
return Futures.failed(e);
return Futures.failedCompletionStage(e);
}
}
// #sync-journal-plugin-api

@Override
public Future<Void> doAsyncDeleteMessagesTo(String persistenceId, long toSequenceNr) {
public CompletionStage<Void> doAsyncDeleteMessagesTo(String persistenceId, long toSequenceNr) {
return null;
}

Expand Down
5 changes: 2 additions & 3 deletions docs/src/test/java/jdocs/stream/FlowDocTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import org.apache.pekko.actor.ActorRef;
import org.apache.pekko.actor.ActorSystem;
import org.apache.pekko.actor.Cancellable;
import org.apache.pekko.dispatch.Futures;
import org.apache.pekko.japi.Pair;
import org.apache.pekko.stream.*;
import org.apache.pekko.stream.javadsl.*;
Expand Down Expand Up @@ -158,8 +157,8 @@ public void creatingSourcesSinks() throws Exception {
list.add(3);
Source.from(list);

// Create a source form a Future
Source.future(Futures.successful("Hello Streams!"));
// Create a source form a CompletionStage
Source.completionStage(CompletableFuture.completedFuture("Hello Streams!"));

// Create a source from a single element
Source.single("only one element");
Expand Down
Loading