Skip to content

Commit b11316e

Browse files
committed
Removed the dispatcher queue (wasnt needed) and made it a smidge more thread safe
1 parent 43cca0c commit b11316e

File tree

6 files changed

+170
-89
lines changed

6 files changed

+170
-89
lines changed

src/main/java/org/dataloader/DataLoader.java

Lines changed: 56 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -16,18 +16,18 @@
1616

1717
package org.dataloader;
1818

19-
import org.dataloader.impl.FutureKit;
19+
import org.dataloader.impl.CompletableFutureKit;
2020

2121
import java.util.ArrayList;
2222
import java.util.Collections;
2323
import java.util.LinkedHashMap;
2424
import java.util.List;
25-
import java.util.Objects;
25+
import java.util.Map;
2626
import java.util.concurrent.CompletableFuture;
27-
import java.util.concurrent.atomic.AtomicInteger;
2827
import java.util.stream.Collectors;
2928

3029
import static org.dataloader.impl.Assertions.assertState;
30+
import static org.dataloader.impl.Assertions.nonNull;
3131

3232
/**
3333
* Data loader is a utility class that allows batch loading of data that is identified by a set of unique keys. For
@@ -54,8 +54,7 @@ public class DataLoader<K, V> {
5454
private final BatchLoader<K, V> batchLoadFunction;
5555
private final DataLoaderOptions loaderOptions;
5656
private final CacheMap<Object, CompletableFuture<V>> futureCache;
57-
private final LinkedHashMap<K, CompletableFuture<V>> loaderQueue;
58-
private final LinkedHashMap<PromisedValues, LinkedHashMap<K, CompletableFuture<V>>> dispatchedQueues;
57+
private final Map<K, CompletableFuture<V>> loaderQueue;
5958

6059
/**
6160
* Creates a new data loader with the provided batch load function, and default options.
@@ -72,14 +71,17 @@ public DataLoader(BatchLoader<K, V> batchLoadFunction) {
7271
* @param batchLoadFunction the batch load function to use
7372
* @param options the batch load options
7473
*/
75-
@SuppressWarnings("unchecked")
7674
public DataLoader(BatchLoader<K, V> batchLoadFunction, DataLoaderOptions options) {
77-
Objects.requireNonNull(batchLoadFunction, "Batch load function cannot be null");
78-
this.batchLoadFunction = batchLoadFunction;
75+
this.batchLoadFunction = nonNull(batchLoadFunction);
7976
this.loaderOptions = options == null ? new DataLoaderOptions() : options;
80-
this.futureCache = loaderOptions.cacheMap().isPresent() ? (CacheMap<Object, CompletableFuture<V>>) loaderOptions.cacheMap().get() : CacheMap.simpleMap();
81-
this.loaderQueue = new LinkedHashMap<>();
82-
this.dispatchedQueues = new LinkedHashMap<>();
77+
this.futureCache = determineCacheMap(loaderOptions);
78+
// order of keys matter in data loader
79+
this.loaderQueue = Collections.synchronizedMap(new LinkedHashMap<>());
80+
}
81+
82+
@SuppressWarnings("unchecked")
83+
private CacheMap<Object, CompletableFuture<V>> determineCacheMap(DataLoaderOptions loaderOptions) {
84+
return loaderOptions.cacheMap().isPresent() ? (CacheMap<Object, CompletableFuture<V>>) loaderOptions.cacheMap().get() : CacheMap.simpleMap();
8385
}
8486

8587
/**
@@ -94,13 +96,12 @@ public DataLoader(BatchLoader<K, V> batchLoadFunction, DataLoaderOptions options
9496
* @return the future of the value
9597
*/
9698
public CompletableFuture<V> load(K key) {
97-
Objects.requireNonNull(key, "Key cannot be null");
98-
Object cacheKey = getCacheKey(key);
99+
Object cacheKey = getCacheKey(nonNull(key));
99100
if (loaderOptions.cachingEnabled() && futureCache.containsKey(cacheKey)) {
100101
return futureCache.get(cacheKey);
101102
}
102103

103-
CompletableFuture<V> future = FutureKit.future();
104+
CompletableFuture<V> future = new CompletableFuture<>();
104105
if (loaderOptions.batchingEnabled()) {
105106
loaderQueue.put(key, future);
106107
} else {
@@ -130,7 +131,9 @@ public CompletableFuture<V> load(K key) {
130131
* @return the composite future of the list of values
131132
*/
132133
public PromisedValues<V> loadMany(List<K> keys) {
133-
return PromisedValues.allOf(keys.stream().map(this::load).collect(Collectors.toList()));
134+
return PromisedValues.allOf(keys.stream()
135+
.map(this::load)
136+
.collect(Collectors.toList()));
134137
}
135138

136139
/**
@@ -141,30 +144,45 @@ public PromisedValues<V> loadMany(List<K> keys) {
141144
* @return the composite future of the queued load requests
142145
*/
143146
public PromisedValues<V> dispatch() {
144-
if (!loaderOptions.batchingEnabled() || loaderQueue.size() == 0) {
145-
return PromisedValues.allOf(Collections.emptyList());
146-
}
147-
List<K> keys = new ArrayList<>(loaderQueue.keySet());
148-
PromisedValues<V> batch = batchLoadFunction.load(keys);
149-
150-
assertState(keys.size() == batch.size(), "The size of the promised values MUST be the same size as the key list");
151-
152-
dispatchedQueues.put(batch, new LinkedHashMap<>(loaderQueue));
153-
batch.thenAccept(rh -> {
154-
AtomicInteger index = new AtomicInteger(0);
155-
dispatchedQueues.get(batch).forEach((key, future) -> {
156-
int idx = index.get();
157-
if (batch.succeeded(idx)) {
158-
future.complete(batch.get(idx));
159-
} else {
160-
future.completeExceptionally(batch.cause(idx));
147+
synchronized (loaderQueue) {
148+
if (!loaderOptions.batchingEnabled() || loaderQueue.size() == 0) {
149+
return PromisedValues.allOf(Collections.emptyList());
150+
}
151+
//
152+
// order of keys -> values matter in data loader hence the use of linked hash map
153+
//
154+
// See https://github.com/facebook/dataloader/blob/master/README.md for more details
155+
//
156+
List<K> keys = new ArrayList<>(loaderQueue.keySet());
157+
List<CompletableFuture<V>> futureList = keys.stream()
158+
.map(loaderQueue::get)
159+
.collect(Collectors.toList());
160+
161+
PromisedValues<V> batchOfPromisedValues = batchLoadFunction.load(keys);
162+
163+
assertState(keys.size() == batchOfPromisedValues.size(), "The size of the promised values MUST be the same size as the key list");
164+
165+
//
166+
// when the promised list of values completes, we transfer the values into
167+
// the previously cached future objects that client already has been given
168+
// via calls to load("foo") and loadMany("foo")
169+
//
170+
batchOfPromisedValues.thenAccept(promisedValues -> {
171+
for (int idx = 0; idx < futureList.size(); idx++) {
172+
CompletableFuture<V> future = futureList.get(idx);
173+
if (promisedValues.succeeded(idx)) {
174+
V value = promisedValues.get(idx);
175+
future.complete(value);
176+
} else {
177+
Throwable cause = promisedValues.cause(idx);
178+
future.completeExceptionally(cause);
179+
}
161180
}
162-
index.incrementAndGet();
163181
});
164-
dispatchedQueues.remove(batch);
165-
});
166-
loaderQueue.clear();
167-
return batch;
182+
183+
loaderQueue.clear();
184+
return batchOfPromisedValues;
185+
}
168186
}
169187

170188
/**
@@ -218,7 +236,7 @@ public DataLoader<K, V> prime(K key, V value) {
218236
public DataLoader<K, V> prime(K key, Exception error) {
219237
Object cacheKey = getCacheKey(key);
220238
if (!futureCache.containsKey(cacheKey)) {
221-
futureCache.set(cacheKey, FutureKit.failedFuture(error));
239+
futureCache.set(cacheKey, CompletableFutureKit.failedFuture(error));
222240
}
223241
return this;
224242
}

src/main/java/org/dataloader/DataLoaderOptions.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,8 @@
1616

1717
package org.dataloader;
1818

19-
import java.util.Objects;
19+
import org.dataloader.impl.Assertions;
20+
2021
import java.util.Optional;
2122

2223
/**
@@ -39,23 +40,23 @@ public DataLoaderOptions() {
3940
cachingEnabled = true;
4041
}
4142

42-
public static DataLoaderOptions create() {
43-
return new DataLoaderOptions();
44-
}
45-
4643
/**
4744
* Clones the provided data loader options.
4845
*
4946
* @param other the other options instance
5047
*/
5148
public DataLoaderOptions(DataLoaderOptions other) {
52-
Objects.requireNonNull(other, "Other data loader options cannot be null");
49+
Assertions.nonNull(other);
5350
this.batchingEnabled = other.batchingEnabled;
5451
this.cachingEnabled = other.cachingEnabled;
5552
this.cacheKeyFunction = other.cacheKeyFunction;
5653
this.cacheMap = other.cacheMap;
5754
}
5855

56+
public static DataLoaderOptions create() {
57+
return new DataLoaderOptions();
58+
}
59+
5960
/**
6061
* Option that determines whether to use batching (the default), or not.
6162
*

src/main/java/org/dataloader/impl/FutureKit.java renamed to src/main/java/org/dataloader/impl/CompletableFutureKit.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,18 +3,17 @@
33
import java.util.concurrent.CompletableFuture;
44
import java.util.concurrent.ExecutionException;
55

6-
public class FutureKit {
7-
public static <V> CompletableFuture<V> future() {
8-
return new CompletableFuture<>();
9-
}
6+
/**
7+
* Some really basic helpers when working with CompletableFutures
8+
*/
9+
public class CompletableFutureKit {
1010

1111
public static <V> CompletableFuture<V> failedFuture(Exception e) {
12-
CompletableFuture<V> future = future();
12+
CompletableFuture<V> future = new CompletableFuture<>();
1313
future.completeExceptionally(e);
1414
return future;
1515
}
1616

17-
1817
public static Throwable cause(CompletableFuture completableFuture) {
1918
if (!completableFuture.isCompletedExceptionally()) {
2019
return null;

src/main/java/org/dataloader/impl/PromisedValuesImpl.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -53,12 +53,12 @@ public PromisedValues<T> thenAccept(Consumer<PromisedValues<T>> handler) {
5353

5454
@Override
5555
public boolean succeeded() {
56-
return FutureKit.succeeded(controller);
56+
return CompletableFutureKit.succeeded(controller);
5757
}
5858

5959
@Override
6060
public boolean failed() {
61-
return FutureKit.failed(controller);
61+
return CompletableFutureKit.failed(controller);
6262
}
6363

6464
@Override
@@ -73,18 +73,18 @@ public Throwable cause() {
7373

7474
@Override
7575
public boolean succeeded(int index) {
76-
return FutureKit.succeeded(futures.get(index));
76+
return CompletableFutureKit.succeeded(futures.get(index));
7777
}
7878

7979
@Override
8080
public Throwable cause(int index) {
81-
return FutureKit.cause(futures.get(index));
81+
return CompletableFutureKit.cause(futures.get(index));
8282
}
8383

8484
@Override
8585
@SuppressWarnings("unchecked")
8686
public T get(int index) {
87-
assertState(isDone(),"The PromisedValues MUST be complete before calling the get() method");
87+
assertState(isDone(), "The PromisedValues MUST be complete before calling the get() method");
8888
try {
8989
CompletableFuture<T> future = futures.get(index);
9090
return future.get();
@@ -95,7 +95,7 @@ public T get(int index) {
9595

9696
@Override
9797
public List<T> toList() {
98-
assertState(isDone(),"The PromisedValues MUST be complete before calling the toList() method");
98+
assertState(isDone(), "The PromisedValues MUST be complete before calling the toList() method");
9999
int size = size();
100100
List<T> list = new ArrayList<>(size);
101101
for (int index = 0; index < size; index++) {

0 commit comments

Comments
 (0)