Skip to content

Commit 92187d7

Browse files
committed
Adding bi-completion to CompletableFutureWrapper / CompletablePromise; using it in impls.
1 parent f3c2b43 commit 92187d7

File tree

5 files changed

+19
-22
lines changed

5 files changed

+19
-22
lines changed

pom.xml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,9 @@
173173
<configuration>
174174
<source>1.8</source>
175175
<target>1.8</target>
176+
<!--
177+
<debug>false</debug>
178+
-->
176179
</configuration>
177180
</plugin>
178181
<plugin>

src/main/java/net/tascalate/concurrent/CompletableFutureWrapper.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,10 @@ protected boolean onFailure(Throwable ex) {
5151
return delegate.completeExceptionally(ex);
5252
}
5353

54+
boolean complete(T value, Throwable ex) {
55+
return null == ex ? success(value) : failure(ex);
56+
}
57+
5458
@Override
5559
protected <U> Promise<U> wrap(CompletionStage<U> original) {
5660
return new CompletableFutureWrapper<>((CompletableFuture<U>)original);

src/main/java/net/tascalate/concurrent/CompletablePromise.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,10 +48,15 @@ public boolean complete(T value) {
4848
return success(value);
4949
}
5050

51-
public boolean completeExceptionally(Throwable ex) {
51+
public boolean completeExceptionally(Throwable ex) {
5252
return failure(ex);
5353
}
5454

55+
@Override
56+
public boolean complete(T value, Throwable ex) {
57+
return super.complete(value, ex);
58+
}
59+
5560
public Promise<T> completeAsync(Supplier<? extends T> supplier) {
5661
CompletionStageAPI api = CompletionStageAPI.current();
5762
Promise<T> result = completeAsync(supplier, api.defaultExecutorOf(delegate));
@@ -82,7 +87,7 @@ public CompletablePromise<T> copy() {
8287
}
8388
return result;
8489
} else {
85-
whenComplete((r, e) -> iif(null == e ? result.complete(r) : result.completeExceptionally(e)));
90+
whenComplete(result::complete);
8691
return result;
8792
}
8893
}
@@ -97,7 +102,7 @@ public Promise<T> minimalPromise() {
97102
}
98103
return result;
99104
} else {
100-
whenComplete((r, e) -> iif(null == e ? result.success(r) : result.failure(e)));
105+
whenComplete(result::complete);
101106
return result;
102107
}
103108
}

src/main/java/net/tascalate/concurrent/ConfigurableDependentPromise.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
import static net.tascalate.concurrent.SharedFunctions.NO_SUCH_ELEMENT;
1919
import static net.tascalate.concurrent.SharedFunctions.cancelPromise;
2020
import static net.tascalate.concurrent.SharedFunctions.failure;
21-
import static net.tascalate.concurrent.SharedFunctions.iif;
2221
import static net.tascalate.concurrent.SharedFunctions.selectFirst;
2322

2423
import java.time.Duration;
@@ -908,8 +907,8 @@ public boolean cancel(boolean mayInterruptIfRunning) {
908907
}
909908
}
910909
};
911-
whenComplete((r, e) -> iif(null == e ? result.success(r) : result.failure(e)));
912-
return result.toCompletableFuture();
910+
whenComplete(result::complete);
911+
return result.toCompletableFuture(); // Effectively result.delegate
913912
}
914913
}
915914

src/main/java/net/tascalate/concurrent/Promises.java

Lines changed: 2 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,6 @@ public static <T> Promise<T> loop(T initialValue,
126126
asyncLoop.run(initialValue);
127127
return asyncLoop;
128128
}
129-
130129

131130
public static <T, R extends AutoCloseable> Promise<T> tryApply(CompletionStage<R> resourcePromise,
132131
Function<? super R, ? extends T> fn) {
@@ -148,7 +147,6 @@ public static <T, R extends AutoCloseable> Promise<T> tryApply(Promise<R> resour
148147
}, true)
149148
.unwrap();
150149
}
151-
152150

153151
public static <T, R extends AsyncCloseable> Promise<T> tryApplyEx(CompletionStage<R> resourcePromise,
154152
Function<? super R, ? extends T> fn) {
@@ -221,18 +219,13 @@ public static <T, R extends AutoCloseable> Promise<T> tryCompose(Promise<R> reso
221219
return;
222220
}
223221
// CLOSE OK
224-
if (null == actionException) {
225-
result.success(actionResult);
226-
} else {
227-
result.failure(actionException);
228-
}
222+
result.complete(actionResult, actionException);
229223
});
230224
return result.onCancel(() -> cancelPromise(action, true));
231225
}, true)
232226
.unwrap();
233227
}
234228

235-
236229
public static <T, R extends AsyncCloseable> Promise<T> tryComposeEx(Promise<R> resourcePromise,
237230
Function<? super R, ? extends CompletionStage<T>> fn) {
238231
return
@@ -273,10 +266,8 @@ public static <T, R extends AsyncCloseable> Promise<T> tryComposeEx(Promise<R> r
273266
actionException.addSuppressed(onClose);
274267
}
275268
result.failure(actionException);
276-
} else if (null != onClose) {
277-
result.failure(onClose);
278269
} else {
279-
result.success(actionResult);
270+
result.complete(actionResult, onClose);
280271
}
281272
});
282273
});
@@ -315,7 +306,6 @@ public static <T, A, R> Promise<R> partitioned(Stream<? extends T> values,
315306
return partitioned2(values.iterator(), values, batchSize, spawner, downstream, downstreamExecutor);
316307
}
317308

318-
319309
private static <T, A, R> Promise<R> partitioned1(Iterator<? extends T> values,
320310
Object source,
321311
int batchSize,
@@ -328,7 +318,6 @@ private static <T, A, R> Promise<R> partitioned1(Iterator<? extends T> values,
328318
.as(onCloseSource(null != source? source : values))
329319
.unwrap();
330320
}
331-
332321

333322
private static <T, A, R> Promise<R> partitioned2(Iterator<? extends T> values,
334323
Object source,
@@ -400,7 +389,6 @@ private static <T, A, R> Promise<IndexedStep<A>> parallelStep2(
400389
});
401390
}
402391

403-
404392
private static class IndexedStep<T> {
405393
private final int idx;
406394
private final T payload;
@@ -466,7 +454,6 @@ private static <T> Function<Promise<T>, Promise<T>> onCloseSource(Object source)
466454
return Function.identity();
467455
}
468456
}
469-
470457

471458
/**
472459
* <p>Returns a promise that is resolved successfully when all {@link CompletionStage}-s passed as parameters
@@ -488,7 +475,6 @@ public static <T> Promise<List<T>> all(CompletionStage<? extends T>... promises)
488475
return all(Arrays.asList(promises));
489476
}
490477

491-
492478
public static <T> Promise<List<T>> all(List<? extends CompletionStage<? extends T>> promises) {
493479
return all(true, promises);
494480
}

0 commit comments

Comments
 (0)