diff --git a/README.md b/README.md index 4263dff..77d3fb6 100644 --- a/README.md +++ b/README.md @@ -478,6 +478,33 @@ and there are also gains to this different mode of operation: However, with batch execution control comes responsibility! If you forget to make the call to `dispatch()` then the futures in the load request queue will never be batched, and thus _will never complete_! So be careful when crafting your loader designs. +## Scheduled Dispatching + +`ScheduledDataLoaderRegistry` is a registry that allows for dispatching to be done on a schedule. It contains a +predicate that is evaluated (per data loader contained within) when `dispatchAll` is invoked. + +If that predicate is true, it will make a `dispatch` call on the data loader, otherwise is will schedule a task to +perform that check again. Once a predicate evaluated to true, it will not reschedule and another call to +`dispatchAll` is required to be made. + +This allows you to do things like "dispatch ONLY if the queue depth is > 10 deep or more than 200 millis have passed +since it was last dispatched". + +```java + + DispatchPredicate depthOrTimePredicate = DispatchPredicate + .dispatchIfDepthGreaterThan(10) + .or(DispatchPredicate.dispatchIfLongerThan(Duration.ofMillis(200))); + + ScheduledDataLoaderRegistry registry = ScheduledDataLoaderRegistry.newScheduledRegistry() + .dispatchPredicate(depthOrTimePredicate) + .schedule(Duration.ofMillis(10)) + .register("users",userDataLoader) + .build(); +``` + +The above acts as a kind of minimum batch depth, with a time overload. It won't dispatch if the loader depth is less +than or equal to 10 but if 200ms pass it will dispatch. ## Let's get started! diff --git a/src/main/java/org/dataloader/DataLoaderRegistry.java b/src/main/java/org/dataloader/DataLoaderRegistry.java index 6f8a695..9b19c29 100644 --- a/src/main/java/org/dataloader/DataLoaderRegistry.java +++ b/src/main/java/org/dataloader/DataLoaderRegistry.java @@ -6,6 +6,7 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Set; @@ -19,7 +20,7 @@ */ @PublicApi public class DataLoaderRegistry { - private final Map> dataLoaders = new ConcurrentHashMap<>(); + protected final Map> dataLoaders = new ConcurrentHashMap<>(); public DataLoaderRegistry() { } @@ -28,6 +29,7 @@ private DataLoaderRegistry(Builder builder) { this.dataLoaders.putAll(builder.dataLoaders); } + /** * This will register a new dataloader * @@ -84,6 +86,13 @@ public DataLoaderRegistry combine(DataLoaderRegistry registry) { return new ArrayList<>(dataLoaders.values()); } + /** + * @return the currently registered data loaders as a map + */ + public Map> getDataLoadersMap() { + return new LinkedHashMap<>(dataLoaders); + } + /** * This will unregister a new dataloader * diff --git a/src/main/java/org/dataloader/annotations/ExperimentalApi.java b/src/main/java/org/dataloader/annotations/ExperimentalApi.java new file mode 100644 index 0000000..6be889e --- /dev/null +++ b/src/main/java/org/dataloader/annotations/ExperimentalApi.java @@ -0,0 +1,23 @@ +package org.dataloader.annotations; + +import java.lang.annotation.Documented; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +import static java.lang.annotation.ElementType.CONSTRUCTOR; +import static java.lang.annotation.ElementType.FIELD; +import static java.lang.annotation.ElementType.METHOD; +import static java.lang.annotation.ElementType.TYPE; + +/** + * This represents code that the graphql-java project considers experimental API and while our intention is that it will + * progress to be {@link PublicApi}, its existence, signature of behavior may change between releases. + * + * In general unnecessary changes will be avoided but you should not depend on experimental classes being stable + */ +@Retention(RetentionPolicy.RUNTIME) +@Target(value = {CONSTRUCTOR, METHOD, TYPE, FIELD}) +@Documented +public @interface ExperimentalApi { +} diff --git a/src/main/java/org/dataloader/registries/DispatchPredicate.java b/src/main/java/org/dataloader/registries/DispatchPredicate.java new file mode 100644 index 0000000..d5bd31b --- /dev/null +++ b/src/main/java/org/dataloader/registries/DispatchPredicate.java @@ -0,0 +1,92 @@ +package org.dataloader.registries; + +import org.dataloader.DataLoader; + +import java.time.Duration; +import java.util.Objects; + +/** + * A predicate class used by {@link ScheduledDataLoaderRegistry} to decide whether to dispatch or not + */ +@FunctionalInterface +public interface DispatchPredicate { + /** + * This predicate tests whether the data loader should be dispatched or not. + * + * @param dataLoaderKey the key of the data loader when registered + * @param dataLoader the dataloader to dispatch + * + * @return true if the data loader SHOULD be dispatched + */ + boolean test(String dataLoaderKey, DataLoader dataLoader); + + + /** + * Returns a composed predicate that represents a short-circuiting logical + * AND of this predicate and another. + * + * @param other a predicate that will be logically-ANDed with this + * predicate + * + * @return a composed predicate that represents the short-circuiting logical + * AND of this predicate and the {@code other} predicate + */ + default DispatchPredicate and(DispatchPredicate other) { + Objects.requireNonNull(other); + return (k, dl) -> test(k, dl) && other.test(k, dl); + } + + /** + * Returns a predicate that represents the logical negation of this + * predicate. + * + * @return a predicate that represents the logical negation of this + * predicate + */ + default DispatchPredicate negate() { + return (k, dl) -> !test(k, dl); + } + + /** + * Returns a composed predicate that represents a short-circuiting logical + * OR of this predicate and another. + * + * @param other a predicate that will be logically-ORed with this + * predicate + * + * @return a composed predicate that represents the short-circuiting logical + * OR of this predicate and the {@code other} predicate + */ + default DispatchPredicate or(DispatchPredicate other) { + Objects.requireNonNull(other); + return (k, dl) -> test(k, dl) || other.test(k, dl); + } + + /** + * This predicate will return true if the {@link DataLoader} has not be dispatched + * for at least the duration length of time. + * + * @param duration the length of time to check + * + * @return true if the data loader has not been dispatched in duration time + */ + static DispatchPredicate dispatchIfLongerThan(Duration duration) { + return (dataLoaderKey, dataLoader) -> { + int i = dataLoader.getTimeSinceDispatch().compareTo(duration); + return i > 0; + }; + } + + /** + * This predicate will return true if the {@link DataLoader#dispatchDepth()} is greater than the specified depth. + * + * This will act as minimum batch size. There must be more than `depth` items queued for the predicate to return true. + * + * @param depth the value to be greater than + * + * @return true if the {@link DataLoader#dispatchDepth()} is greater than the specified depth. + */ + static DispatchPredicate dispatchIfDepthGreaterThan(int depth) { + return (dataLoaderKey, dataLoader) -> dataLoader.dispatchDepth() > depth; + } +} diff --git a/src/main/java/org/dataloader/registries/ScheduledDataLoaderRegistry.java b/src/main/java/org/dataloader/registries/ScheduledDataLoaderRegistry.java new file mode 100644 index 0000000..4be317e --- /dev/null +++ b/src/main/java/org/dataloader/registries/ScheduledDataLoaderRegistry.java @@ -0,0 +1,186 @@ +package org.dataloader.registries; + +import org.dataloader.DataLoader; +import org.dataloader.DataLoaderRegistry; +import org.dataloader.annotations.ExperimentalApi; + +import java.time.Duration; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import static org.dataloader.impl.Assertions.nonNull; + +/** + * This {@link DataLoaderRegistry} will use a {@link DispatchPredicate} when {@link #dispatchAll()} is called + * to test (for each {@link DataLoader} in the registry) if a dispatch should proceed. If the predicate returns false, then a task is scheduled + * to perform that predicate dispatch again via the {@link ScheduledExecutorService}. + *

+ * This will continue to loop (test false and reschedule) until such time as the predicate returns true, in which case + * no rescheduling will occur and you will need to call dispatch again to restart the process. + *

+ * If you wanted to create a ScheduledDataLoaderRegistry that started a rescheduling immediately, just create one and + * call {@link #rescheduleNow()}. + *

+ * This code is currently marked as {@link ExperimentalApi} + */ +@ExperimentalApi +public class ScheduledDataLoaderRegistry extends DataLoaderRegistry implements AutoCloseable { + + private final ScheduledExecutorService scheduledExecutorService; + private final DispatchPredicate dispatchPredicate; + private final Duration schedule; + private volatile boolean closed; + + private ScheduledDataLoaderRegistry(Builder builder) { + this.dataLoaders.putAll(builder.dataLoaders); + this.scheduledExecutorService = builder.scheduledExecutorService; + this.dispatchPredicate = builder.dispatchPredicate; + this.schedule = builder.schedule; + this.closed = false; + } + + /** + * Once closed this registry will never again reschedule checks + */ + @Override + public void close() { + closed = true; + } + + /** + * @return how long the {@link ScheduledExecutorService} task will wait before checking the predicate again + */ + public Duration getScheduleDuration() { + return schedule; + } + + @Override + public void dispatchAll() { + dispatchAllWithCount(); + } + + @Override + public int dispatchAllWithCount() { + int sum = 0; + for (Map.Entry> entry : dataLoaders.entrySet()) { + DataLoader dataLoader = entry.getValue(); + String key = entry.getKey(); + if (dispatchPredicate.test(key, dataLoader)) { + sum += dataLoader.dispatchWithCounts().getKeysCount(); + } else { + reschedule(key, dataLoader); + } + } + return sum; + } + + /** + * This will immediately dispatch the {@link DataLoader}s in the registry + * without testing the predicate + */ + public void dispatchAllImmediately() { + super.dispatchAll(); + } + + /** + * This will immediately dispatch the {@link DataLoader}s in the registry + * without testing the predicate + * + * @return total number of entries that were dispatched from registered {@link org.dataloader.DataLoader}s. + */ + public int dispatchAllWithCountImmediately() { + return super.dispatchAllWithCount(); + } + + /** + * This will schedule a task to check the predicate and dispatch if true right now. It will not do + * a pre check of the preodicate like {@link #dispatchAll()} would + */ + public void rescheduleNow() { + dataLoaders.forEach(this::reschedule); + } + + private void reschedule(String key, DataLoader dataLoader) { + if (!closed) { + Runnable runThis = () -> dispatchOrReschedule(key, dataLoader); + scheduledExecutorService.schedule(runThis, schedule.toMillis(), TimeUnit.MILLISECONDS); + } + } + + private void dispatchOrReschedule(String key, DataLoader dataLoader) { + if (dispatchPredicate.test(key, dataLoader)) { + dataLoader.dispatch(); + } else { + reschedule(key, dataLoader); + } + } + + /** + * By default this will create use a {@link Executors#newSingleThreadScheduledExecutor()} + * and a schedule duration of 10 milli seconds. + * + * @return A builder of {@link ScheduledDataLoaderRegistry}s + */ + public static Builder newScheduledRegistry() { + return new Builder(); + } + + public static class Builder { + + private ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); + private DispatchPredicate dispatchPredicate = (key, dl) -> true; + private Duration schedule = Duration.ofMillis(10); + private final Map> dataLoaders = new HashMap<>(); + + public Builder scheduledExecutorService(ScheduledExecutorService executorService) { + this.scheduledExecutorService = nonNull(executorService); + return this; + } + + public Builder schedule(Duration schedule) { + this.schedule = schedule; + return this; + } + + public Builder dispatchPredicate(DispatchPredicate dispatchPredicate) { + this.dispatchPredicate = nonNull(dispatchPredicate); + return this; + } + + /** + * This will register a new dataloader + * + * @param key the key to put the data loader under + * @param dataLoader the data loader to register + * + * @return this builder for a fluent pattern + */ + public Builder register(String key, DataLoader dataLoader) { + dataLoaders.put(key, dataLoader); + return this; + } + + /** + * This will combine together the data loaders in this builder with the ones + * from a previous {@link DataLoaderRegistry} + * + * @param otherRegistry the previous {@link DataLoaderRegistry} + * + * @return this builder for a fluent pattern + */ + public Builder registerAll(DataLoaderRegistry otherRegistry) { + dataLoaders.putAll(otherRegistry.getDataLoadersMap()); + return this; + } + + /** + * @return the newly built {@link ScheduledDataLoaderRegistry} + */ + public ScheduledDataLoaderRegistry build() { + return new ScheduledDataLoaderRegistry(this); + } + } +} diff --git a/src/test/java/ReadmeExamples.java b/src/test/java/ReadmeExamples.java index 6cef375..33b1607 100644 --- a/src/test/java/ReadmeExamples.java +++ b/src/test/java/ReadmeExamples.java @@ -10,9 +10,12 @@ import org.dataloader.fixtures.SecurityCtx; import org.dataloader.fixtures.User; import org.dataloader.fixtures.UserManager; +import org.dataloader.registries.DispatchPredicate; +import org.dataloader.registries.ScheduledDataLoaderRegistry; import org.dataloader.stats.Statistics; import org.dataloader.stats.ThreadLocalStatisticsCollector; +import java.time.Duration; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -269,4 +272,14 @@ private void statsConfigExample() { DataLoader userDataLoader = DataLoaderFactory.newDataLoader(userBatchLoader, options); } + private void ScheduledDispatche() { + DispatchPredicate depthOrTimePredicate = DispatchPredicate.dispatchIfDepthGreaterThan(10) + .or(DispatchPredicate.dispatchIfLongerThan(Duration.ofMillis(200))); + + ScheduledDataLoaderRegistry registry = ScheduledDataLoaderRegistry.newScheduledRegistry() + .dispatchPredicate(depthOrTimePredicate) + .schedule(Duration.ofMillis(10)) + .register("users", userDataLoader) + .build(); + } } diff --git a/src/test/java/org/dataloader/ClockDataLoader.java b/src/test/java/org/dataloader/ClockDataLoader.java index 4b16e78..21faeea 100644 --- a/src/test/java/org/dataloader/ClockDataLoader.java +++ b/src/test/java/org/dataloader/ClockDataLoader.java @@ -4,11 +4,11 @@ public class ClockDataLoader extends DataLoader { - ClockDataLoader(Object batchLoadFunction, Clock clock) { + public ClockDataLoader(Object batchLoadFunction, Clock clock) { this(batchLoadFunction, null, clock); } - ClockDataLoader(Object batchLoadFunction, DataLoaderOptions options, Clock clock) { + public ClockDataLoader(Object batchLoadFunction, DataLoaderOptions options, Clock clock) { super(batchLoadFunction, options, clock); } diff --git a/src/test/java/org/dataloader/fixtures/TestKit.java b/src/test/java/org/dataloader/fixtures/TestKit.java index 1242114..2ea23a8 100644 --- a/src/test/java/org/dataloader/fixtures/TestKit.java +++ b/src/test/java/org/dataloader/fixtures/TestKit.java @@ -31,6 +31,10 @@ public static BatchLoader keysAsValues(List> loadCalls) { }; } + public static DataLoader idLoader() { + return idLoader(null, new ArrayList<>()); + } + public static DataLoader idLoader(List> loadCalls) { return idLoader(null, loadCalls); } diff --git a/src/test/java/org/dataloader/registries/DispatchPredicateTest.java b/src/test/java/org/dataloader/registries/DispatchPredicateTest.java new file mode 100644 index 0000000..f241c2f --- /dev/null +++ b/src/test/java/org/dataloader/registries/DispatchPredicateTest.java @@ -0,0 +1,105 @@ +package org.dataloader.registries; + +import org.dataloader.ClockDataLoader; +import org.dataloader.DataLoader; +import org.dataloader.fixtures.TestKit; +import org.dataloader.fixtures.TestingClock; +import org.junit.Test; + +import java.time.Duration; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class DispatchPredicateTest { + + @Test + public void default_logical_method() { + + String key = "k"; + DataLoader testDL = TestKit.idLoader(); + + DispatchPredicate alwaysTrue = (k, dl) -> true; + DispatchPredicate alwaysFalse = (k, dl) -> false; + + assertFalse(alwaysFalse.and(alwaysFalse).test(key, testDL)); + assertFalse(alwaysFalse.and(alwaysTrue).test(key, testDL)); + assertFalse(alwaysTrue.and(alwaysFalse).test(key, testDL)); + assertTrue(alwaysTrue.and(alwaysTrue).test(key, testDL)); + + assertTrue(alwaysFalse.negate().test(key, testDL)); + assertFalse(alwaysTrue.negate().test(key, testDL)); + + assertTrue(alwaysTrue.or(alwaysFalse).test(key, testDL)); + assertTrue(alwaysFalse.or(alwaysTrue).test(key, testDL)); + assertFalse(alwaysFalse.or(alwaysFalse).test(key, testDL)); + } + + @Test + public void dispatchIfLongerThan_test() { + TestingClock clock = new TestingClock(); + ClockDataLoader dlA = new ClockDataLoader<>(TestKit.keysAsValues(), clock); + + Duration ms200 = Duration.ofMillis(200); + DispatchPredicate dispatchPredicate = DispatchPredicate.dispatchIfLongerThan(ms200); + + assertFalse(dispatchPredicate.test("k", dlA)); + + clock.jump(199); + assertFalse(dispatchPredicate.test("k", dlA)); + + clock.jump(100); + assertTrue(dispatchPredicate.test("k", dlA)); + } + + @Test + public void dispatchIfDepthGreaterThan_test() { + DataLoader dlA = TestKit.idLoader(); + + DispatchPredicate dispatchPredicate = DispatchPredicate.dispatchIfDepthGreaterThan(4); + assertFalse(dispatchPredicate.test("k", dlA)); + + dlA.load("1"); + dlA.load("2"); + dlA.load("3"); + dlA.load("4"); + + assertFalse(dispatchPredicate.test("k", dlA)); + + + dlA.load("5"); + assertTrue(dispatchPredicate.test("k", dlA)); + + } + + @Test + public void combined_some_things() { + + TestingClock clock = new TestingClock(); + ClockDataLoader dlA = new ClockDataLoader<>(TestKit.keysAsValues(), clock); + + Duration ms200 = Duration.ofMillis(200); + + DispatchPredicate dispatchIfLongerThan = DispatchPredicate.dispatchIfLongerThan(ms200); + DispatchPredicate dispatchIfDepthGreaterThan = DispatchPredicate.dispatchIfDepthGreaterThan(4); + DispatchPredicate combinedPredicate = dispatchIfLongerThan.and(dispatchIfDepthGreaterThan); + + assertFalse(combinedPredicate.test("k", dlA)); + + clock.jump(500); // that's enough time for one condition + + assertFalse(combinedPredicate.test("k", dlA)); + + dlA.load("1"); + dlA.load("2"); + dlA.load("3"); + dlA.load("4"); + + assertFalse(combinedPredicate.test("k", dlA)); + + + dlA.load("5"); + assertTrue(combinedPredicate.test("k", dlA)); + + } +} \ No newline at end of file diff --git a/src/test/java/org/dataloader/registries/ScheduledDataLoaderRegistryTest.java b/src/test/java/org/dataloader/registries/ScheduledDataLoaderRegistryTest.java new file mode 100644 index 0000000..527f419 --- /dev/null +++ b/src/test/java/org/dataloader/registries/ScheduledDataLoaderRegistryTest.java @@ -0,0 +1,260 @@ +package org.dataloader.registries; + +import junit.framework.TestCase; +import org.dataloader.DataLoader; +import org.dataloader.DataLoaderFactory; +import org.dataloader.DataLoaderRegistry; +import org.dataloader.fixtures.TestKit; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicInteger; + +import static java.util.Arrays.asList; +import static java.util.Collections.singletonList; +import static org.dataloader.fixtures.TestKit.keysAsValues; +import static org.dataloader.fixtures.TestKit.snooze; +import static org.hamcrest.Matchers.equalTo; +import static org.junit.Assert.assertThat; + +public class ScheduledDataLoaderRegistryTest extends TestCase { + + DispatchPredicate alwaysDispatch = (key, dl) -> true; + DispatchPredicate neverDispatch = (key, dl) -> false; + + + public void test_basic_setup_works_like_a_normal_dlr() { + + List> aCalls = new ArrayList<>(); + List> bCalls = new ArrayList<>(); + + DataLoader dlA = TestKit.idLoader(aCalls); + dlA.load("AK1"); + dlA.load("AK2"); + + DataLoader dlB = TestKit.idLoader(bCalls); + dlB.load("BK1"); + dlB.load("BK2"); + + DataLoaderRegistry otherDLR = DataLoaderRegistry.newRegistry().register("b", dlB).build(); + + ScheduledDataLoaderRegistry registry = ScheduledDataLoaderRegistry.newScheduledRegistry() + .register("a", dlA) + .registerAll(otherDLR) + .dispatchPredicate(alwaysDispatch) + .scheduledExecutorService(Executors.newSingleThreadScheduledExecutor()) + .schedule(Duration.ofMillis(100)) + .build(); + + assertThat(registry.getScheduleDuration(), equalTo(Duration.ofMillis(100))); + + int count = registry.dispatchAllWithCount(); + assertThat(count, equalTo(4)); + assertThat(aCalls, equalTo(singletonList(asList("AK1", "AK2")))); + assertThat(bCalls, equalTo(singletonList(asList("BK1", "BK2")))); + } + + public void test_predicate_always_false() { + + List> calls = new ArrayList<>(); + DataLoader dlA = DataLoaderFactory.newDataLoader(keysAsValues(calls)); + dlA.load("K1"); + dlA.load("K2"); + + ScheduledDataLoaderRegistry registry = ScheduledDataLoaderRegistry.newScheduledRegistry() + .register("a", dlA) + .dispatchPredicate(neverDispatch) + .schedule(Duration.ofMillis(10)) + .build(); + + int count = registry.dispatchAllWithCount(); + assertThat(count, equalTo(0)); + assertThat(calls.size(), equalTo(0)); + + snooze(200); + + count = registry.dispatchAllWithCount(); + assertThat(count, equalTo(0)); + assertThat(calls.size(), equalTo(0)); + + snooze(200); + count = registry.dispatchAllWithCount(); + assertThat(count, equalTo(0)); + assertThat(calls.size(), equalTo(0)); + } + + public void test_predicate_that_eventually_returns_true() { + + + AtomicInteger counter = new AtomicInteger(); + DispatchPredicate neverDispatch = (key, dl) -> counter.incrementAndGet() > 5; + + List> calls = new ArrayList<>(); + DataLoader dlA = DataLoaderFactory.newDataLoader(keysAsValues(calls)); + CompletableFuture p1 = dlA.load("K1"); + CompletableFuture p2 = dlA.load("K2"); + + ScheduledDataLoaderRegistry registry = ScheduledDataLoaderRegistry.newScheduledRegistry() + .register("a", dlA) + .dispatchPredicate(neverDispatch) + .schedule(Duration.ofMillis(10)) + .build(); + + + int count = registry.dispatchAllWithCount(); + assertThat(count, equalTo(0)); + assertThat(calls.size(), equalTo(0)); + assertFalse(p1.isDone()); + assertFalse(p2.isDone()); + + snooze(200); + + registry.dispatchAll(); + assertTrue(p1.isDone()); + assertTrue(p2.isDone()); + } + + public void test_dispatchAllWithCountImmediately() { + List> calls = new ArrayList<>(); + DataLoader dlA = DataLoaderFactory.newDataLoader(keysAsValues(calls)); + dlA.load("K1"); + dlA.load("K2"); + + ScheduledDataLoaderRegistry registry = ScheduledDataLoaderRegistry.newScheduledRegistry() + .register("a", dlA) + .dispatchPredicate(neverDispatch) + .schedule(Duration.ofMillis(10)) + .build(); + + int count = registry.dispatchAllWithCountImmediately(); + assertThat(count, equalTo(2)); + assertThat(calls, equalTo(singletonList(asList("K1", "K2")))); + } + + public void test_dispatchAllImmediately() { + List> calls = new ArrayList<>(); + DataLoader dlA = DataLoaderFactory.newDataLoader(keysAsValues(calls)); + dlA.load("K1"); + dlA.load("K2"); + + ScheduledDataLoaderRegistry registry = ScheduledDataLoaderRegistry.newScheduledRegistry() + .register("a", dlA) + .dispatchPredicate(neverDispatch) + .schedule(Duration.ofMillis(10)) + .build(); + + registry.dispatchAllImmediately(); + assertThat(calls, equalTo(singletonList(asList("K1", "K2")))); + } + + public void test_rescheduleNow() { + AtomicInteger i = new AtomicInteger(); + DispatchPredicate countingPredicate = (dataLoaderKey, dataLoader) -> i.incrementAndGet() > 5; + + List> calls = new ArrayList<>(); + DataLoader dlA = DataLoaderFactory.newDataLoader(keysAsValues(calls)); + dlA.load("K1"); + dlA.load("K2"); + + ScheduledDataLoaderRegistry registry = ScheduledDataLoaderRegistry.newScheduledRegistry() + .register("a", dlA) + .dispatchPredicate(countingPredicate) + .schedule(Duration.ofMillis(100)) + .build(); + + // we never called dispatch per say - we started the scheduling direct + registry.rescheduleNow(); + assertTrue(calls.isEmpty()); + + snooze(2000); + assertThat(calls, equalTo(singletonList(asList("K1", "K2")))); + } + + public void test_it_will_take_out_the_schedule_once_it_dispatches() { + AtomicInteger counter = new AtomicInteger(); + DispatchPredicate countingPredicate = (dataLoaderKey, dataLoader) -> counter.incrementAndGet() > 5; + + List> calls = new ArrayList<>(); + DataLoader dlA = DataLoaderFactory.newDataLoader(keysAsValues(calls)); + dlA.load("K1"); + dlA.load("K2"); + + ScheduledDataLoaderRegistry registry = ScheduledDataLoaderRegistry.newScheduledRegistry() + .register("a", dlA) + .dispatchPredicate(countingPredicate) + .schedule(Duration.ofMillis(100)) + .build(); + + registry.dispatchAll(); + // we have 5 * 100 mills to reach this line + assertTrue(calls.isEmpty()); + + snooze(2000); + assertThat(calls, equalTo(singletonList(asList("K1", "K2")))); + + // reset our counter state + counter.set(0); + + dlA.load("K3"); + dlA.load("K4"); + + // no one has called dispatch - there is no rescheduling + snooze(2000); + assertThat(calls, equalTo(singletonList(asList("K1", "K2")))); + + registry.dispatchAll(); + // we have 5 * 100 mills to reach this line + assertThat(calls, equalTo(singletonList(asList("K1", "K2")))); + + snooze(2000); + + assertThat(calls, equalTo(asList(asList("K1", "K2"), asList("K3", "K4")))); + } + + public void test_close_is_a_one_way_door() { + AtomicInteger counter = new AtomicInteger(); + DispatchPredicate countingPredicate = (dataLoaderKey, dataLoader) -> { + counter.incrementAndGet(); + return false; + }; + + DataLoader dlA = TestKit.idLoader(); + dlA.load("K1"); + dlA.load("K2"); + + ScheduledDataLoaderRegistry registry = ScheduledDataLoaderRegistry.newScheduledRegistry() + .register("a", dlA) + .dispatchPredicate(countingPredicate) + .schedule(Duration.ofMillis(10)) + .build(); + + registry.rescheduleNow(); + + snooze(200); + + assertTrue(counter.get() > 0); + + registry.close(); + + snooze(100); + int countThen = counter.get(); + + registry.rescheduleNow(); + snooze(200); + assertEquals(counter.get(), countThen); + + registry.rescheduleNow(); + snooze(200); + assertEquals(counter.get(), countThen); + + registry.dispatchAll(); + snooze(200); + assertEquals(counter.get(), countThen + 1); // will have re-entered + + snooze(200); + assertEquals(counter.get(), countThen + 1); + } +} \ No newline at end of file