Skip to content

Commit 1044d9c

Browse files
committed
This adds a ticker mode to ScheduledDataLoaderRegistry
1 parent db61b14 commit 1044d9c

File tree

3 files changed

+143
-12
lines changed

3 files changed

+143
-12
lines changed

src/main/java/org/dataloader/registries/ScheduledDataLoaderRegistry.java

Lines changed: 52 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,34 @@
1515

1616
/**
1717
* This {@link DataLoaderRegistry} will use a {@link DispatchPredicate} when {@link #dispatchAll()} is called
18-
* to test (for each {@link DataLoader} in the registry) if a dispatch should proceed. If the predicate returns false, then a task is scheduled
19-
* to perform that predicate dispatch again via the {@link ScheduledExecutorService}.
18+
* to test (for each {@link DataLoader} in the registry) if a dispatch should proceed. If the predicate returns false,
19+
* then a task is scheduled to perform that predicate dispatch again via the {@link ScheduledExecutorService}.
2020
* <p>
21-
* This will continue to loop (test false and reschedule) until such time as the predicate returns true, in which case
22-
* no rescheduling will occur and you will need to call dispatch again to restart the process.
21+
* In the default mode, when {@link #tickerMode} is false, the registry will continue to loop (test false and reschedule) until such time as the predicate returns true, in which case
22+
* no rescheduling will occur, and you will need to call dispatch again to restart the process.
23+
* <p>
24+
* However, when {@link #tickerMode} is true, the registry will always reschedule continuously after the first ever call to {@link #dispatchAll()}.
25+
* <p>
26+
* This will allow you to chain together {@link DataLoader} load calls like this :
27+
* <pre>{@code
28+
* CompletableFuture<String> future = dataLoaderA.load("A")
29+
* .thenCompose(value -> dataLoaderB.load(value));
30+
* }</pre>
31+
* <p>
32+
* However, it may mean your batching will not be as efficient as it might be. In environments
33+
* like graphql this might mean you are too eager in fetching. The {@link DispatchPredicate} still runs to decide if
34+
* dispatch should happen however in ticker mode it will be continuously rescheduled.
35+
* <p>
36+
* When {@link #tickerMode} is true, you really SHOULD close the registry say at the end of a request otherwise you will leave a job
37+
* on the {@link ScheduledExecutorService} that is continuously dispatching.
2338
* <p>
2439
* If you wanted to create a ScheduledDataLoaderRegistry that started a rescheduling immediately, just create one and
2540
* call {@link #rescheduleNow()}.
2641
* <p>
42+
* By default, it uses a {@link Executors#newSingleThreadScheduledExecutor()}} to schedule the tasks. However, if you
43+
* are creating a {@link ScheduledDataLoaderRegistry} per request you will want to look at sharing this {@link ScheduledExecutorService}
44+
* to avoid creating a new thread per registry created.
45+
* <p>
2746
* This code is currently marked as {@link ExperimentalApi}
2847
*/
2948
@ExperimentalApi
@@ -32,13 +51,15 @@ public class ScheduledDataLoaderRegistry extends DataLoaderRegistry implements A
3251
private final ScheduledExecutorService scheduledExecutorService;
3352
private final DispatchPredicate dispatchPredicate;
3453
private final Duration schedule;
54+
private final boolean tickerMode;
3555
private volatile boolean closed;
3656

3757
private ScheduledDataLoaderRegistry(Builder builder) {
3858
this.dataLoaders.putAll(builder.dataLoaders);
3959
this.scheduledExecutorService = builder.scheduledExecutorService;
4060
this.dispatchPredicate = builder.dispatchPredicate;
4161
this.schedule = builder.schedule;
62+
this.tickerMode = builder.tickerMode;
4263
this.closed = false;
4364
}
4465

@@ -57,6 +78,13 @@ public Duration getScheduleDuration() {
5778
return schedule;
5879
}
5980

81+
/**
82+
* @return true of the registry is in ticker mode or false otherwise
83+
*/
84+
public boolean isTickerMode() {
85+
return tickerMode;
86+
}
87+
6088
@Override
6189
public void dispatchAll() {
6290
dispatchAllWithCount();
@@ -68,11 +96,7 @@ public int dispatchAllWithCount() {
6896
for (Map.Entry<String, DataLoader<?, ?>> entry : dataLoaders.entrySet()) {
6997
DataLoader<?, ?> dataLoader = entry.getValue();
7098
String key = entry.getKey();
71-
if (dispatchPredicate.test(key, dataLoader)) {
72-
sum += dataLoader.dispatchWithCounts().getKeysCount();
73-
} else {
74-
reschedule(key, dataLoader);
75-
}
99+
dispatchOrReschedule(key, dataLoader);
76100
}
77101
return sum;
78102
}
@@ -111,9 +135,11 @@ private void reschedule(String key, DataLoader<?, ?> dataLoader) {
111135
}
112136

113137
private void dispatchOrReschedule(String key, DataLoader<?, ?> dataLoader) {
114-
if (dispatchPredicate.test(key, dataLoader)) {
138+
boolean shouldDispatch = dispatchPredicate.test(key, dataLoader);
139+
if (shouldDispatch) {
115140
dataLoader.dispatch();
116-
} else {
141+
}
142+
if (tickerMode || !shouldDispatch) {
117143
reschedule(key, dataLoader);
118144
}
119145
}
@@ -134,6 +160,7 @@ public static class Builder {
134160
private DispatchPredicate dispatchPredicate = (key, dl) -> true;
135161
private Duration schedule = Duration.ofMillis(10);
136162
private final Map<String, DataLoader<?, ?>> dataLoaders = new HashMap<>();
163+
private boolean tickerMode = false;
137164

138165
public Builder scheduledExecutorService(ScheduledExecutorService executorService) {
139166
this.scheduledExecutorService = nonNull(executorService);
@@ -176,6 +203,20 @@ public Builder registerAll(DataLoaderRegistry otherRegistry) {
176203
return this;
177204
}
178205

206+
/**
207+
* This sets ticker mode on the registry. When ticker mode is true the registry will
208+
* continuously reschedule the data loaders for possible dispatching after the first call
209+
* to dispatchAll.
210+
*
211+
* @param tickerMode true or false
212+
*
213+
* @return this builder for a fluent pattern
214+
*/
215+
public Builder tickerMode(boolean tickerMode) {
216+
this.tickerMode = tickerMode;
217+
return this;
218+
}
219+
179220
/**
180221
* @return the newly built {@link ScheduledDataLoaderRegistry}
181222
*/

src/test/java/org/dataloader/fixtures/TestKit.java

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import org.dataloader.DataLoaderFactory;
66
import org.dataloader.DataLoaderOptions;
77

8+
import java.time.Duration;
89
import java.util.ArrayList;
910
import java.util.Collection;
1011
import java.util.List;
@@ -31,6 +32,23 @@ public static <K, V> BatchLoader<K, V> keysAsValues(List<List<K>> loadCalls) {
3132
};
3233
}
3334

35+
public static <K, V> BatchLoader<K, V> keysAsValuesAsync(Duration delay) {
36+
return keysAsValuesAsync(new ArrayList<>(), delay);
37+
}
38+
39+
public static <K, V> BatchLoader<K, V> keysAsValuesAsync(List<List<K>> loadCalls, Duration delay) {
40+
return keys -> CompletableFuture.supplyAsync(() -> {
41+
snooze(delay.toMillis());
42+
List<K> ks = new ArrayList<>(keys);
43+
loadCalls.add(ks);
44+
@SuppressWarnings("unchecked")
45+
List<V> values = keys.stream()
46+
.map(k -> (V) k)
47+
.collect(toList());
48+
return values;
49+
});
50+
}
51+
3452
public static <K, V> DataLoader<K, V> idLoader() {
3553
return idLoader(null, new ArrayList<>());
3654
}
@@ -43,6 +61,14 @@ public static <K, V> DataLoader<K, V> idLoader(DataLoaderOptions options, List<L
4361
return DataLoaderFactory.newDataLoader(keysAsValues(loadCalls), options);
4462
}
4563

64+
public static <K, V> DataLoader<K, V> idLoaderAsync(Duration delay) {
65+
return idLoaderAsync(null, new ArrayList<>(), delay);
66+
}
67+
68+
public static <K, V> DataLoader<K, V> idLoaderAsync(DataLoaderOptions options, List<List<K>> loadCalls, Duration delay) {
69+
return DataLoaderFactory.newDataLoader(keysAsValuesAsync(loadCalls, delay), options);
70+
}
71+
4672
public static Collection<Integer> listFrom(int i, int max) {
4773
List<Integer> ints = new ArrayList<>();
4874
for (int j = i; j < max; j++) {
@@ -55,7 +81,7 @@ public static <V> CompletableFuture<V> futureError() {
5581
return failedFuture(new IllegalStateException("Error"));
5682
}
5783

58-
public static void snooze(int millis) {
84+
public static void snooze(long millis) {
5985
try {
6086
Thread.sleep(millis);
6187
} catch (InterruptedException e) {

src/test/java/org/dataloader/registries/ScheduledDataLoaderRegistryTest.java

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package org.dataloader.registries;
22

33
import junit.framework.TestCase;
4+
import org.awaitility.core.ConditionTimeoutException;
45
import org.dataloader.DataLoader;
56
import org.dataloader.DataLoaderFactory;
67
import org.dataloader.DataLoaderRegistry;
@@ -11,13 +12,17 @@
1112
import java.util.List;
1213
import java.util.concurrent.CompletableFuture;
1314
import java.util.concurrent.Executors;
15+
import java.util.concurrent.atomic.AtomicBoolean;
1416
import java.util.concurrent.atomic.AtomicInteger;
1517

1618
import static java.util.Arrays.asList;
1719
import static java.util.Collections.singletonList;
20+
import static org.awaitility.Awaitility.await;
21+
import static org.awaitility.Duration.TWO_SECONDS;
1822
import static org.dataloader.fixtures.TestKit.keysAsValues;
1923
import static org.dataloader.fixtures.TestKit.snooze;
2024
import static org.hamcrest.Matchers.equalTo;
25+
import static org.hamcrest.Matchers.is;
2126
import static org.junit.Assert.assertThat;
2227

2328
public class ScheduledDataLoaderRegistryTest extends TestCase {
@@ -257,4 +262,63 @@ public void test_close_is_a_one_way_door() {
257262
snooze(200);
258263
assertEquals(counter.get(), countThen + 1);
259264
}
265+
266+
public void test_can_tick_after_first_dispatch_for_chain_data_loaders() {
267+
268+
// delays much bigger than the tick rate will mean multiple calls to dispatch
269+
DataLoader<String, String> dlA = TestKit.idLoaderAsync(Duration.ofMillis(100));
270+
DataLoader<String, String> dlB = TestKit.idLoaderAsync(Duration.ofMillis(200));
271+
272+
CompletableFuture<String> chainedCF = dlA.load("AK1").thenCompose(dlB::load);
273+
274+
AtomicBoolean done = new AtomicBoolean();
275+
chainedCF.whenComplete((v, t) -> done.set(true));
276+
277+
ScheduledDataLoaderRegistry registry = ScheduledDataLoaderRegistry.newScheduledRegistry()
278+
.register("a", dlA)
279+
.register("b", dlB)
280+
.dispatchPredicate(alwaysDispatch)
281+
.schedule(Duration.ofMillis(10))
282+
.tickerMode(true)
283+
.build();
284+
285+
assertThat(registry.isTickerMode(), equalTo(true));
286+
287+
registry.dispatchAll();
288+
289+
await().atMost(TWO_SECONDS).untilAtomic(done, is(true));
290+
291+
registry.close();
292+
}
293+
294+
public void test_chain_data_loaders_will_hang_if_not_in_ticker_mode() {
295+
296+
// delays much bigger than the tick rate will mean multiple calls to dispatch
297+
DataLoader<String, String> dlA = TestKit.idLoaderAsync(Duration.ofMillis(100));
298+
DataLoader<String, String> dlB = TestKit.idLoaderAsync(Duration.ofMillis(200));
299+
300+
CompletableFuture<String> chainedCF = dlA.load("AK1").thenCompose(dlB::load);
301+
302+
AtomicBoolean done = new AtomicBoolean();
303+
chainedCF.whenComplete((v, t) -> done.set(true));
304+
305+
ScheduledDataLoaderRegistry registry = ScheduledDataLoaderRegistry.newScheduledRegistry()
306+
.register("a", dlA)
307+
.register("b", dlB)
308+
.dispatchPredicate(alwaysDispatch)
309+
.schedule(Duration.ofMillis(10))
310+
.tickerMode(false)
311+
.build();
312+
313+
assertThat(registry.isTickerMode(), equalTo(false));
314+
315+
registry.dispatchAll();
316+
317+
try {
318+
await().atMost(TWO_SECONDS).untilAtomic(done, is(true));
319+
fail("This should not have completed but rather timed out");
320+
} catch (ConditionTimeoutException expected) {
321+
}
322+
registry.close();
323+
}
260324
}

0 commit comments

Comments
 (0)