Skip to content

Commit 1151ccb

Browse files
committed
Adds a predicate to DataLoaderRegistry and a per dataloader map of predicates is also possible
1 parent db61b14 commit 1151ccb

File tree

3 files changed

+142
-71
lines changed

3 files changed

+142
-71
lines changed

src/main/java/org/dataloader/DataLoaderRegistry.java

Lines changed: 129 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package org.dataloader;
22

33
import org.dataloader.annotations.PublicApi;
4+
import org.dataloader.registries.DispatchPredicate;
45
import org.dataloader.stats.Statistics;
56

67
import java.util.ArrayList;
@@ -21,12 +22,17 @@
2122
@PublicApi
2223
public class DataLoaderRegistry {
2324
protected final Map<String, DataLoader<?, ?>> dataLoaders = new ConcurrentHashMap<>();
25+
protected final Map<DataLoader<?, ?>, DispatchPredicate> dataLoaderPredicates = new ConcurrentHashMap<>();
26+
protected final DispatchPredicate dispatchPredicate;
27+
2428

2529
public DataLoaderRegistry() {
30+
this.dispatchPredicate = DispatchPredicate.DISPATCH_ALWAYS;
2631
}
2732

28-
private DataLoaderRegistry(Builder builder) {
33+
protected DataLoaderRegistry(Builder<?> builder) {
2934
this.dataLoaders.putAll(builder.dataLoaders);
35+
this.dispatchPredicate = builder.dispatchPredicate;
3036
}
3137

3238

@@ -43,6 +49,21 @@ public DataLoaderRegistry register(String key, DataLoader<?, ?> dataLoader) {
4349
return this;
4450
}
4551

52+
/**
53+
* This will register a new dataloader and dispatch predicate associated with that data loader
54+
*
55+
* @param key the key to put the data loader under
56+
* @param dataLoader the data loader to register
57+
* @param dispatchPredicate the dispatch predicate to associate with this data loader
58+
*
59+
* @return this registry
60+
*/
61+
public DataLoaderRegistry register(String key, DataLoader<?, ?> dataLoader, DispatchPredicate dispatchPredicate) {
62+
dataLoaders.put(key, dataLoader);
63+
dataLoaderPredicates.put(dataLoader, dispatchPredicate);
64+
return this;
65+
}
66+
4667
/**
4768
* Computes a data loader if absent or return it if it was
4869
* already registered at that key.
@@ -76,6 +97,8 @@ public DataLoaderRegistry combine(DataLoaderRegistry registry) {
7697

7798
this.dataLoaders.forEach(combined::register);
7899
registry.dataLoaders.forEach(combined::register);
100+
combined.dataLoaderPredicates.putAll(this.dataLoaderPredicates);
101+
combined.dataLoaderPredicates.putAll(registry.dataLoaderPredicates);
79102
return combined;
80103
}
81104

@@ -101,7 +124,10 @@ public DataLoaderRegistry combine(DataLoaderRegistry registry) {
101124
* @return this registry
102125
*/
103126
public DataLoaderRegistry unregister(String key) {
104-
dataLoaders.remove(key);
127+
DataLoader<?, ?> dataLoader = dataLoaders.remove(key);
128+
if (dataLoader != null) {
129+
dataLoaderPredicates.remove(dataLoader);
130+
}
105131
return this;
106132
}
107133

@@ -131,7 +157,7 @@ public Set<String> getKeys() {
131157
* {@link org.dataloader.DataLoader}s
132158
*/
133159
public void dispatchAll() {
134-
getDataLoaders().forEach(DataLoader::dispatch);
160+
dispatchAllWithCount();
135161
}
136162

137163
/**
@@ -142,8 +168,12 @@ public void dispatchAll() {
142168
*/
143169
public int dispatchAllWithCount() {
144170
int sum = 0;
145-
for (DataLoader<?, ?> dataLoader : getDataLoaders()) {
146-
sum += dataLoader.dispatchWithCounts().getKeysCount();
171+
for (Map.Entry<String, DataLoader<?, ?>> entry : dataLoaders.entrySet()) {
172+
DataLoader<?, ?> dataLoader = entry.getValue();
173+
String key = entry.getKey();
174+
if (shouldDispatch(key, dataLoader)) {
175+
sum += dataLoader.dispatchWithCounts().getKeysCount();
176+
}
147177
}
148178
return sum;
149179
}
@@ -154,12 +184,59 @@ public int dispatchAllWithCount() {
154184
*/
155185
public int dispatchDepth() {
156186
int totalDispatchDepth = 0;
157-
for (DataLoader<?, ?> dataLoader : getDataLoaders()) {
158-
totalDispatchDepth += dataLoader.dispatchDepth();
187+
for (Map.Entry<String, DataLoader<?, ?>> entry : dataLoaders.entrySet()) {
188+
DataLoader<?, ?> dataLoader = entry.getValue();
189+
String key = entry.getKey();
190+
if (shouldDispatch(key, dataLoader)) {
191+
totalDispatchDepth += dataLoader.dispatchDepth();
192+
}
159193
}
160194
return totalDispatchDepth;
161195
}
162196

197+
/**
198+
* This will immediately dispatch the {@link DataLoader}s in the registry
199+
* without testing the predicate
200+
*/
201+
public void dispatchAllImmediately() {
202+
dispatchAllWithCountImmediately();
203+
}
204+
205+
/**
206+
* This will immediately dispatch the {@link DataLoader}s in the registry
207+
* without testing the predicate
208+
*
209+
* @return total number of entries that were dispatched from registered {@link org.dataloader.DataLoader}s.
210+
*/
211+
public int dispatchAllWithCountImmediately() {
212+
int sum = 0;
213+
for (Map.Entry<String, DataLoader<?, ?>> entry : dataLoaders.entrySet()) {
214+
DataLoader<?, ?> dataLoader = entry.getValue();
215+
sum += dataLoader.dispatchWithCounts().getKeysCount();
216+
}
217+
return sum;
218+
}
219+
220+
221+
/**
222+
* Returns true if the dataloader has a predicate which returned true, OR the overall
223+
* registry predicate returned true.
224+
*
225+
* @param dataLoaderKey the key in the dataloader map
226+
* @param dataLoader the dataloader
227+
*
228+
* @return true if it should dispatch
229+
*/
230+
protected boolean shouldDispatch(String dataLoaderKey, DataLoader<?, ?> dataLoader) {
231+
DispatchPredicate dispatchPredicate = dataLoaderPredicates.get(dataLoader);
232+
if (dispatchPredicate != null) {
233+
if (dispatchPredicate.test(dataLoaderKey, dataLoader)) {
234+
return true;
235+
}
236+
}
237+
return this.dispatchPredicate.test(dataLoaderKey, dataLoader);
238+
}
239+
163240
/**
164241
* @return a combined set of statistics for all data loaders in this registry presented
165242
* as the sum of all their statistics
@@ -175,13 +252,22 @@ public Statistics getStatistics() {
175252
/**
176253
* @return A builder of {@link DataLoaderRegistry}s
177254
*/
178-
public static Builder newRegistry() {
255+
public static Builder<?> newRegistry() {
256+
//noinspection rawtypes
179257
return new Builder();
180258
}
181259

182-
public static class Builder {
260+
public static class Builder<B extends Builder<B>> {
183261

184262
private final Map<String, DataLoader<?, ?>> dataLoaders = new HashMap<>();
263+
private final Map<DataLoader<?, ?>, DispatchPredicate> dataLoaderPredicates = new ConcurrentHashMap<>();
264+
265+
private DispatchPredicate dispatchPredicate = DispatchPredicate.DISPATCH_ALWAYS;
266+
267+
private B self() {
268+
//noinspection unchecked
269+
return (B) this;
270+
}
185271

186272
/**
187273
* This will register a new dataloader
@@ -191,22 +277,51 @@ public static class Builder {
191277
*
192278
* @return this builder for a fluent pattern
193279
*/
194-
public Builder register(String key, DataLoader<?, ?> dataLoader) {
280+
public B register(String key, DataLoader<?, ?> dataLoader) {
195281
dataLoaders.put(key, dataLoader);
196-
return this;
282+
return self();
197283
}
198284

199285
/**
200-
* This will combine together the data loaders in this builder with the ones
286+
* This will register a new dataloader with a specific {@link DispatchPredicate}
287+
*
288+
* @param key the key to put the data loader under
289+
* @param dataLoader the data loader to register
290+
* @param dispatchPredicate the dispatch predicate
291+
*
292+
* @return this builder for a fluent pattern
293+
*/
294+
public B register(String key, DataLoader<?, ?> dataLoader, DispatchPredicate dispatchPredicate) {
295+
register(key, dataLoader);
296+
dataLoaderPredicates.put(dataLoader, dispatchPredicate);
297+
return self();
298+
}
299+
300+
/**
301+
* This will combine the data loaders in this builder with the ones
201302
* from a previous {@link DataLoaderRegistry}
202303
*
203304
* @param otherRegistry the previous {@link DataLoaderRegistry}
204305
*
205306
* @return this builder for a fluent pattern
206307
*/
207-
public Builder registerAll(DataLoaderRegistry otherRegistry) {
308+
public B registerAll(DataLoaderRegistry otherRegistry) {
208309
dataLoaders.putAll(otherRegistry.dataLoaders);
209-
return this;
310+
dataLoaderPredicates.putAll(otherRegistry.dataLoaderPredicates);
311+
return self();
312+
}
313+
314+
/**
315+
* This sets a predicate on the {@link DataLoaderRegistry} that will control
316+
* whether all {@link DataLoader}s in the {@link DataLoaderRegistry }should be dispatched.
317+
*
318+
* @param dispatchPredicate the predicate
319+
*
320+
* @return this builder for a fluent pattern
321+
*/
322+
public B dispatchPredicate(DispatchPredicate dispatchPredicate) {
323+
this.dispatchPredicate = dispatchPredicate;
324+
return self();
210325
}
211326

212327
/**

src/main/java/org/dataloader/registries/DispatchPredicate.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,16 @@
1010
*/
1111
@FunctionalInterface
1212
public interface DispatchPredicate {
13+
14+
/**
15+
* A predicate that always returns true
16+
*/
17+
DispatchPredicate DISPATCH_ALWAYS = (dataLoaderKey, dataLoader) -> true;
18+
/**
19+
* A predicate that always returns false
20+
*/
21+
DispatchPredicate DISPATCH_NEVER = (dataLoaderKey, dataLoader) -> true;
22+
1323
/**
1424
* This predicate tests whether the data loader should be dispatched or not.
1525
*

src/main/java/org/dataloader/registries/ScheduledDataLoaderRegistry.java

Lines changed: 3 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
import org.dataloader.annotations.ExperimentalApi;
66

77
import java.time.Duration;
8-
import java.util.HashMap;
98
import java.util.Map;
109
import java.util.concurrent.Executors;
1110
import java.util.concurrent.ScheduledExecutorService;
@@ -30,14 +29,12 @@
3029
public class ScheduledDataLoaderRegistry extends DataLoaderRegistry implements AutoCloseable {
3130

3231
private final ScheduledExecutorService scheduledExecutorService;
33-
private final DispatchPredicate dispatchPredicate;
3432
private final Duration schedule;
3533
private volatile boolean closed;
3634

3735
private ScheduledDataLoaderRegistry(Builder builder) {
38-
this.dataLoaders.putAll(builder.dataLoaders);
36+
super(builder);
3937
this.scheduledExecutorService = builder.scheduledExecutorService;
40-
this.dispatchPredicate = builder.dispatchPredicate;
4138
this.schedule = builder.schedule;
4239
this.closed = false;
4340
}
@@ -77,24 +74,6 @@ public int dispatchAllWithCount() {
7774
return sum;
7875
}
7976

80-
/**
81-
* This will immediately dispatch the {@link DataLoader}s in the registry
82-
* without testing the predicate
83-
*/
84-
public void dispatchAllImmediately() {
85-
super.dispatchAll();
86-
}
87-
88-
/**
89-
* This will immediately dispatch the {@link DataLoader}s in the registry
90-
* without testing the predicate
91-
*
92-
* @return total number of entries that were dispatched from registered {@link org.dataloader.DataLoader}s.
93-
*/
94-
public int dispatchAllWithCountImmediately() {
95-
return super.dispatchAllWithCount();
96-
}
97-
9877
/**
9978
* This will schedule a task to check the predicate and dispatch if true right now. It will not do
10079
* a pre check of the preodicate like {@link #dispatchAll()} would
@@ -111,7 +90,7 @@ private void reschedule(String key, DataLoader<?, ?> dataLoader) {
11190
}
11291

11392
private void dispatchOrReschedule(String key, DataLoader<?, ?> dataLoader) {
114-
if (dispatchPredicate.test(key, dataLoader)) {
93+
if (shouldDispatch(key, dataLoader)) {
11594
dataLoader.dispatch();
11695
} else {
11796
reschedule(key, dataLoader);
@@ -128,12 +107,10 @@ public static Builder newScheduledRegistry() {
128107
return new Builder();
129108
}
130109

131-
public static class Builder {
110+
public static class Builder extends DataLoaderRegistry.Builder<ScheduledDataLoaderRegistry.Builder> {
132111

133112
private ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
134-
private DispatchPredicate dispatchPredicate = (key, dl) -> true;
135113
private Duration schedule = Duration.ofMillis(10);
136-
private final Map<String, DataLoader<?, ?>> dataLoaders = new HashMap<>();
137114

138115
public Builder scheduledExecutorService(ScheduledExecutorService executorService) {
139116
this.scheduledExecutorService = nonNull(executorService);
@@ -145,37 +122,6 @@ public Builder schedule(Duration schedule) {
145122
return this;
146123
}
147124

148-
public Builder dispatchPredicate(DispatchPredicate dispatchPredicate) {
149-
this.dispatchPredicate = nonNull(dispatchPredicate);
150-
return this;
151-
}
152-
153-
/**
154-
* This will register a new dataloader
155-
*
156-
* @param key the key to put the data loader under
157-
* @param dataLoader the data loader to register
158-
*
159-
* @return this builder for a fluent pattern
160-
*/
161-
public Builder register(String key, DataLoader<?, ?> dataLoader) {
162-
dataLoaders.put(key, dataLoader);
163-
return this;
164-
}
165-
166-
/**
167-
* This will combine together the data loaders in this builder with the ones
168-
* from a previous {@link DataLoaderRegistry}
169-
*
170-
* @param otherRegistry the previous {@link DataLoaderRegistry}
171-
*
172-
* @return this builder for a fluent pattern
173-
*/
174-
public Builder registerAll(DataLoaderRegistry otherRegistry) {
175-
dataLoaders.putAll(otherRegistry.getDataLoadersMap());
176-
return this;
177-
}
178-
179125
/**
180126
* @return the newly built {@link ScheduledDataLoaderRegistry}
181127
*/

0 commit comments

Comments
 (0)