Skip to content

Commit 5d81d9f

Browse files
committed
Extended PromisesValues with more tests and handling more edge cases
Expanded the read me text a lot
1 parent 7e1ac35 commit 5d81d9f

File tree

9 files changed

+450
-88
lines changed

9 files changed

+450
-88
lines changed

README.md

Lines changed: 6 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -26,15 +26,12 @@ and Nicholas Schrock (@schrockn) from [Facebook](https://www.facebook.com/), the
2626
## Table of contents
2727

2828
- [Features](#features)
29+
- [Examples](#examples)
2930
- [Differences to reference implementation](#differences-to-reference-implementation)
3031
- [Manual dispatching](#manual-dispatching)
3132
- [Let's get started!](#lets-get-started)
3233
- [Installing](#installing)
3334
- [Building](#building)
34-
- [Project plans](#project-plans)
35-
- [Current releases](#current-releases)
36-
- [Known issues](#known-issues)
37-
- [Future ideas](#future-ideas)
3835
- [Other information sources](#other-information-sources)
3936
- [Contributing](#contributing)
4037
- [Acknowledgements](#acknowledgements)
@@ -184,16 +181,6 @@ To build from source use the Gradle wrapper:
184181
./gradlew clean build
185182
```
186183

187-
## Project plans
188-
189-
### Current releases
190-
191-
- `1.0.0` Initial release
192-
193-
### Known issues
194-
195-
- Tests on job queue ordering need refactoring to Futures, one test currently omitted
196-
197184

198185
## Other information sources
199186

@@ -219,21 +206,22 @@ This library was originally written for use within a [VertX world](http://vertx.
219206
itself. All the heavy lifting has been done by this project : [vertx-dataloader](https://github.com/engagingspaces/vertx-dataloader)
220207
including the extensive testing.
221208

209+
This particular port was done to reduce the dependency on Vertx and to write a pure Java 8 implementation with no dependencies and also
210+
to use the more normative Java CompletableFuture. [vertx-core](http://vertx.io/docs/vertx-core/java/) is not a lightweight library by any means
211+
so having a pure Java 8 implementation is very desirable.
212+
222213

223214
This library is entirely inspired by the great works of [Lee Byron](https://github.com/leebyron) and
224215
[Nicholas Schrock](https://github.com/schrockn) from [Facebook](https://www.facebook.com/) whom we would like to thank, and
225216
especially @leebyron for taking the time and effort to provide 100% coverage on the codebase. A set of tests which
226217
were also ported.
227218

228-
This particular port was done to reduce the dependency on Vertx and to write a pure Java 8 implementation with no dependencies and also
229-
to use the more normative Java CompletableFuture. [vertx-core](http://vertx.io/docs/vertx-core/java/) is not a lightweight library by any means
230-
so having a pure Java 8 implementation is very desirable.
231219

232220

233221
## Licensing
234222

235223
This project is licensed under the
236224
[Apache Commons v2.0](https://www.apache.org/licenses/LICENSE-2.0) license.
237225

238-
Copyright © 2016 Arnold Schrijver and others
226+
Copyright © 2016 Arnold Schrijver, 2017 Brad Baker and others
239227
[contributors](https://github.com/bbakerman/java-dataloader/graphs/contributors)

src/main/java/org/dataloader/DataLoader.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,37 @@ public PromisedValues<V> dispatch() {
192192
return batchOfPromisedValues;
193193
}
194194

195+
/**
196+
* Normally {@link #dispatch()} is an asynchronous operation but this version will 'join' on the
197+
* results if dispatch and wait for them to complete. If the {@link CompletableFuture} callbacks make more
198+
* calls to this data loader then the {@link #dispatchDepth()} will be &gt; 0 and this method will loop
199+
* around and wait for any other extra batch loads to occur.
200+
*
201+
* @return the list of all results when the {@link #dispatchDepth()} reached 0
202+
*/
203+
public List<V> dispatchAndJoin() {
204+
List<V> results = new ArrayList<>();
205+
206+
List<V> joinedResults = dispatch().toCompletableFuture().join();
207+
results.addAll(joinedResults);
208+
while (this.dispatchDepth() > 0) {
209+
joinedResults = dispatch().toCompletableFuture().join();
210+
results.addAll(joinedResults);
211+
}
212+
return results;
213+
}
214+
215+
216+
/**
217+
* @return the depth of the batched key loads that need to be dispatched
218+
*/
219+
public int dispatchDepth() {
220+
synchronized (loaderQueue) {
221+
return loaderQueue.size();
222+
}
223+
}
224+
225+
195226
/**
196227
* Clears the future with the specified key from the cache, if caching is enabled, so it will be re-fetched
197228
* on the next load request.

src/main/java/org/dataloader/PromisedValues.java

Lines changed: 78 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
* also compose a {@link CompletableFuture} of that list of values via {@link #toCompletableFuture()}
2323
*/
2424
public interface PromisedValues<T> {
25+
2526
/**
2627
* Returns a new {@link PromisedValues} that is completed when all of
2728
* the given {@link CompletableFuture}s complete. If any of the given
@@ -31,7 +32,7 @@ public interface PromisedValues<T> {
3132
* @param cfs the {@link CompletableFuture}s to combine
3233
* @param <T> the type of values
3334
*
34-
* @return a new CombinedFutures
35+
* @return a new PromisedValues
3536
*/
3637
static <T> PromisedValues<T> allOf(List<CompletableFuture<T>> cfs) {
3738
return PromisedValuesImpl.combineAllOf(cfs);
@@ -47,7 +48,7 @@ static <T> PromisedValues<T> allOf(List<CompletableFuture<T>> cfs) {
4748
* @param f2 the 2nd completable future
4849
* @param <T> the type of values
4950
*
50-
* @return a new CombinedFutures
51+
* @return a new PromisedValues
5152
*/
5253
static <T> PromisedValues<T> allOf(CompletableFuture<T> f1, CompletableFuture<T> f2) {
5354
return PromisedValuesImpl.combineAllOf(asList(f1, f2));
@@ -64,12 +65,13 @@ static <T> PromisedValues<T> allOf(CompletableFuture<T> f1, CompletableFuture<T>
6465
* @param f3 the 3rd completable future
6566
* @param <T> the type of values
6667
*
67-
* @return a new CombinedFutures
68+
* @return a new PromisedValues
6869
*/
6970
static <T> PromisedValues allOf(CompletableFuture<T> f1, CompletableFuture<T> f2, CompletableFuture<T> f3) {
7071
return PromisedValuesImpl.combineAllOf(asList(f1, f2, f3));
7172
}
7273

74+
7375
/**
7476
* Returns a new {@link PromisedValues} that is completed when all of
7577
* the given {@link CompletableFuture}s complete. If any of the given
@@ -82,12 +84,80 @@ static <T> PromisedValues allOf(CompletableFuture<T> f1, CompletableFuture<T> f2
8284
* @param f4 the 4th completable future
8385
* @param <T> the type of values
8486
*
85-
* @return a new CombinedFutures
87+
* @return a new PromisedValues
8688
*/
8789
static <T> PromisedValues allOf(CompletableFuture<T> f1, CompletableFuture<T> f2, CompletableFuture<T> f3, CompletableFuture<T> f4) {
8890
return PromisedValuesImpl.combineAllOf(asList(f1, f2, f3, f4));
8991
}
9092

93+
94+
/**
95+
* Returns a new {@link PromisedValues} that is completed when all of
96+
* the given {@link PromisedValues}s complete. If any of the given
97+
* {@link PromisedValues}s complete exceptionally, then the returned
98+
* {@link PromisedValues} also does so.
99+
*
100+
* @param cfs the list to combine
101+
* @param <T> the type of values
102+
*
103+
* @return a new PromisedValues
104+
*/
105+
static <T> PromisedValues<T> allPromisedValues(List<PromisedValues<T>> cfs) {
106+
return PromisedValuesImpl.combinePromisedValues(cfs);
107+
}
108+
109+
/**
110+
* Returns a new {@link PromisedValues} that is completed when all of
111+
* the given {@link PromisedValues}s complete. If any of the given
112+
* {@link PromisedValues}s complete exceptionally, then the returned
113+
* {@link PromisedValues} also does so.
114+
*
115+
* @param pv1 the 1st promised value
116+
* @param pv2 the 2nd promised value
117+
* @param <T> the type of values
118+
*
119+
* @return a new PromisedValues
120+
*/
121+
static <T> PromisedValues<T> allPromisedValues(PromisedValues<T> pv1, PromisedValues<T> pv2) {
122+
return PromisedValuesImpl.combinePromisedValues(asList(pv1, pv2));
123+
}
124+
125+
/**
126+
* Returns a new {@link PromisedValues} that is completed when all of
127+
* the given {@link PromisedValues}s complete. If any of the given
128+
* {@link PromisedValues}s complete exceptionally, then the returned
129+
* {@link PromisedValues} also does so.
130+
*
131+
* @param pv1 the 1st promised value
132+
* @param pv2 the 2nd promised value
133+
* @param pv3 the 3rd promised value
134+
* @param <T> the type of values
135+
*
136+
* @return a new PromisedValues
137+
*/
138+
static <T> PromisedValues<T> allPromisedValues(PromisedValues<T> pv1, PromisedValues<T> pv2, PromisedValues<T> pv3) {
139+
return PromisedValuesImpl.combinePromisedValues(asList(pv1, pv2, pv3));
140+
}
141+
142+
/**
143+
* Returns a new {@link PromisedValues} that is completed when all of
144+
* the given {@link PromisedValues}s complete. If any of the given
145+
* {@link PromisedValues}s complete exceptionally, then the returned
146+
* {@link PromisedValues} also does so.
147+
*
148+
* @param pv1 the 1st promised value
149+
* @param pv2 the 2nd promised value
150+
* @param pv3 the 3rd promised value
151+
* @param pv4 the 4th promised value
152+
* @param <T> the type of values
153+
*
154+
* @return a new PromisedValues
155+
*/
156+
static <T> PromisedValues<T> allPromisedValues(PromisedValues<T> pv1, PromisedValues<T> pv2, PromisedValues<T> pv3, PromisedValues<T> pv4) {
157+
return PromisedValuesImpl.combinePromisedValues(asList(pv1, pv2, pv3, pv4));
158+
}
159+
160+
91161
/**
92162
* When the all the futures complete, this call back will be invoked with this {@link PromisedValues} as a parameter
93163
*
@@ -169,11 +239,14 @@ static <T> PromisedValues allOf(CompletableFuture<T> f1, CompletableFuture<T> f2
169239
* (unchecked) {@link CompletionException} with the underlying
170240
* exception as its cause.
171241
*
242+
* @return the list of completed values similar to {@link #toList()}
243+
*
172244
* @throws CancellationException if the computation was cancelled
173245
* @throws CompletionException if this future completed
174246
* exceptionally or a completion computation threw an exception
247+
*
175248
*/
176-
void join();
249+
List<T> join();
177250

178251
/**
179252
* @return this as a {@link CompletableFuture} that returns the list of underlying values

src/main/java/org/dataloader/impl/PromisedValuesImpl.java

Lines changed: 32 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,11 @@
55
import java.util.ArrayList;
66
import java.util.List;
77
import java.util.concurrent.CompletableFuture;
8+
import java.util.concurrent.CompletionException;
89
import java.util.concurrent.ExecutionException;
910
import java.util.concurrent.atomic.AtomicReference;
1011
import java.util.function.Consumer;
12+
import java.util.stream.Collectors;
1113

1214
import static org.dataloader.impl.Assertions.assertState;
1315
import static org.dataloader.impl.Assertions.nonNull;
@@ -22,7 +24,10 @@ private PromisedValuesImpl(List<CompletableFuture<T>> cfs) {
2224
this.futures = nonNull(cfs);
2325
this.cause = new AtomicReference<>();
2426
CompletableFuture[] futuresArray = cfs.toArray(new CompletableFuture[cfs.size()]);
25-
this.controller = CompletableFuture.allOf(futuresArray);
27+
this.controller = CompletableFuture.allOf(futuresArray).handle((result, throwable) -> {
28+
setCause(throwable);
29+
return null;
30+
});
2631
}
2732

2833
private PromisedValuesImpl(PromisedValuesImpl<T> other, CompletableFuture<Void> controller) {
@@ -35,30 +40,44 @@ public static <T> PromisedValues<T> combineAllOf(List<CompletableFuture<T>> cfs)
3540
return new PromisedValuesImpl<>(nonNull(cfs));
3641
}
3742

43+
public static <T> PromisedValues<T> combinePromisedValues(List<PromisedValues<T>> promisedValues) {
44+
List<CompletableFuture<T>> cfs = promisedValues.stream()
45+
.map(pv -> (PromisedValuesImpl<T>) pv)
46+
.flatMap(pv -> pv.futures.stream())
47+
.collect(Collectors.toList());
48+
return new PromisedValuesImpl<>(cfs);
49+
}
50+
51+
private void setCause(Throwable throwable) {
52+
if (throwable != null) {
53+
if (throwable instanceof CompletionException && throwable.getCause() != null) {
54+
cause.set(throwable.getCause());
55+
} else {
56+
cause.set(throwable);
57+
}
58+
}
59+
}
3860

3961
@Override
4062
public PromisedValues<T> thenAccept(Consumer<PromisedValues<T>> handler) {
63+
nonNull(handler);
4164
CompletableFuture<Void> newController = controller.handle((result, throwable) -> {
42-
if (throwable != null) {
43-
cause.set(throwable);
44-
}
45-
if (handler != null) {
46-
handler.accept(this);
47-
}
48-
return null;
65+
setCause(throwable);
66+
handler.accept(this);
67+
return result;
4968
});
5069
return new PromisedValuesImpl<>(this, newController);
5170
}
5271

5372

5473
@Override
5574
public boolean succeeded() {
56-
return CompletableFutureKit.succeeded(controller);
75+
return isDone() && cause.get() == null;
5776
}
5877

5978
@Override
6079
public boolean failed() {
61-
return CompletableFutureKit.failed(controller);
80+
return isDone() && cause.get() != null;
6281
}
6382

6483
@Override
@@ -110,12 +129,14 @@ public int size() {
110129
}
111130

112131
@Override
113-
public void join() {
132+
public List<T> join() {
114133
controller.join();
134+
return toList();
115135
}
116136

117137
@Override
118138
public CompletableFuture<List<T>> toCompletableFuture() {
119139
return controller.thenApply(v -> toList());
120140
}
141+
121142
}

0 commit comments

Comments
 (0)