19
19
import org .dataloader .impl .CompletableFutureKit ;
20
20
21
21
import java .util .ArrayList ;
22
- import java .util .Collections ;
22
+ import java .util .Collection ;
23
23
import java .util .LinkedHashMap ;
24
24
import java .util .List ;
25
25
import java .util .Map ;
26
26
import java .util .concurrent .CompletableFuture ;
27
27
import java .util .stream .Collectors ;
28
28
29
29
import static java .util .Collections .emptyList ;
30
+ import static java .util .Collections .singletonList ;
30
31
import static org .dataloader .impl .Assertions .assertState ;
31
32
import static org .dataloader .impl .Assertions .nonNull ;
32
33
@@ -98,8 +99,10 @@ private CacheMap<Object, CompletableFuture<V>> determineCacheMap(DataLoaderOptio
98
99
*/
99
100
public CompletableFuture <V > load (K key ) {
100
101
Object cacheKey = getCacheKey (nonNull (key ));
101
- if (loaderOptions .cachingEnabled () && futureCache .containsKey (cacheKey )) {
102
- return futureCache .get (cacheKey );
102
+ synchronized (futureCache ) {
103
+ if (loaderOptions .cachingEnabled () && futureCache .containsKey (cacheKey )) {
104
+ return futureCache .get (cacheKey );
105
+ }
103
106
}
104
107
105
108
CompletableFuture <V > future = new CompletableFuture <>();
@@ -108,15 +111,17 @@ public CompletableFuture<V> load(K key) {
108
111
loaderQueue .put (key , future );
109
112
}
110
113
} else {
111
- // immediate execution of batch function (but not promise itself)
114
+ // immediate execution of batch function
112
115
CompletableFuture <List <V >> batchedLoad = batchLoadFunction
113
- .load (Collections . singletonList (key ))
116
+ .load (singletonList (key ))
114
117
.toCompletableFuture ();
115
118
future = batchedLoad
116
119
.thenApply (list -> list .get (0 ));
117
120
}
118
121
if (loaderOptions .cachingEnabled ()) {
119
- futureCache .set (cacheKey , future );
122
+ synchronized (futureCache ) {
123
+ futureCache .set (cacheKey , future );
124
+ }
120
125
}
121
126
return future ;
122
127
}
@@ -177,25 +182,64 @@ public CompletableFuture<List<V>> dispatch() {
177
182
// the previously cached future objects that the client already has been given
178
183
// via calls to load("foo") and loadMany(["foo","bar"])
179
184
//
185
+ int maxBatchSize = loaderOptions .maxBatchSize ();
186
+ if (maxBatchSize > 0 && maxBatchSize < keys .size ()) {
187
+ return sliceIntoBatchesOfBatches (keys , queuedFutures , maxBatchSize );
188
+ } else {
189
+ return dispatchQueueBatch (keys , queuedFutures );
190
+ }
191
+ }
192
+
193
+ private CompletableFuture <List <V >> sliceIntoBatchesOfBatches (List <K > keys , List <CompletableFuture <V >> queuedFutures , int maxBatchSize ) {
194
+ // the number of keys is > than what the batch loader function can accept
195
+ // so make multiple calls to the loader
196
+ List <CompletableFuture <List <V >>> allBatches = new ArrayList <>();
197
+ int len = keys .size ();
198
+ int batchCount = (int ) Math .ceil (len / (double ) maxBatchSize );
199
+ for (int i = 0 ; i < batchCount ; i ++) {
200
+
201
+ int fromIndex = i * maxBatchSize ;
202
+ int toIndex = Math .min ((i + 1 ) * maxBatchSize , len );
203
+
204
+ List <K > subKeys = keys .subList (fromIndex , toIndex );
205
+ List <CompletableFuture <V >> subFutures = queuedFutures .subList (fromIndex , toIndex );
206
+
207
+ allBatches .add (dispatchQueueBatch (subKeys , subFutures ));
208
+ }
209
+ //
210
+ // now reassemble all the futures into one that is the complete set of results
211
+ return CompletableFuture .allOf (allBatches .toArray (new CompletableFuture [allBatches .size ()]))
212
+ .thenApply (v -> allBatches .stream ()
213
+ .map (CompletableFuture ::join )
214
+ .flatMap (Collection ::stream )
215
+ .collect (Collectors .toList ()));
216
+ }
217
+
218
+ private CompletableFuture <List <V >> dispatchQueueBatch (List <K > keys , List <CompletableFuture <V >> queuedFutures ) {
180
219
return batchLoadFunction .load (keys )
181
220
.toCompletableFuture ()
182
221
.thenApply (values -> {
183
222
assertState (keys .size () == values .size (), "The size of the promised values MUST be the same size as the key list" );
184
223
185
224
for (int idx = 0 ; idx < queuedFutures .size (); idx ++) {
186
- V value = values .get (idx );
225
+ Object value = values .get (idx );
187
226
CompletableFuture <V > future = queuedFutures .get (idx );
188
- future .complete (value );
227
+ if (value instanceof Throwable ) {
228
+ future .completeExceptionally ((Throwable ) value );
229
+ } else {
230
+ @ SuppressWarnings ("unchecked" )
231
+ V val = (V ) value ;
232
+ future .complete (val );
233
+ }
189
234
}
190
235
return values ;
191
236
}).exceptionally (ex -> {
192
237
for (int idx = 0 ; idx < queuedFutures .size (); idx ++) {
193
238
K key = keys .get (idx );
194
239
CompletableFuture <V > future = queuedFutures .get (idx );
195
- // clear any cached view of this key
196
- futureCache .delete (key );
197
240
future .completeExceptionally (ex );
198
-
241
+ // clear any cached view of this key
242
+ clear (key );
199
243
}
200
244
return emptyList ();
201
245
});
@@ -242,7 +286,9 @@ public int dispatchDepth() {
242
286
*/
243
287
public DataLoader <K , V > clear (K key ) {
244
288
Object cacheKey = getCacheKey (key );
245
- futureCache .delete (cacheKey );
289
+ synchronized (futureCache ) {
290
+ futureCache .delete (cacheKey );
291
+ }
246
292
return this ;
247
293
}
248
294
@@ -252,7 +298,9 @@ public DataLoader<K, V> clear(K key) {
252
298
* @return the data loader for fluent coding
253
299
*/
254
300
public DataLoader <K , V > clearAll () {
255
- futureCache .clear ();
301
+ synchronized (futureCache ) {
302
+ futureCache .clear ();
303
+ }
256
304
return this ;
257
305
}
258
306
@@ -266,8 +314,10 @@ public DataLoader<K, V> clearAll() {
266
314
*/
267
315
public DataLoader <K , V > prime (K key , V value ) {
268
316
Object cacheKey = getCacheKey (key );
269
- if (!futureCache .containsKey (cacheKey )) {
270
- futureCache .set (cacheKey , CompletableFuture .completedFuture (value ));
317
+ synchronized (futureCache ) {
318
+ if (!futureCache .containsKey (cacheKey )) {
319
+ futureCache .set (cacheKey , CompletableFuture .completedFuture (value ));
320
+ }
271
321
}
272
322
return this ;
273
323
}
0 commit comments