Skip to content

Commit 55bc2b8

Browse files
committed
Make ConfigurableDependentPromise and "onCancel" true decorstors - completes (incl/ cancellation) when delegate is completed, not only when cancel is invoked from the decorator
1 parent 5a31e30 commit 55bc2b8

File tree

6 files changed

+69
-40
lines changed

6 files changed

+69
-40
lines changed

src/main/java/net/tascalate/concurrent/AggregatingPromise.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -181,12 +181,13 @@ void onComplete(int idx, T result, Throwable error) {
181181
}
182182
}
183183

184-
void start() {
184+
Promise<List<R>> postConstruct() {
185185
int i = 0;
186186
for (CompletionStage<? extends T> promise : promises) {
187187
int idx = i++;
188188
promise.whenComplete((r, e) -> onComplete(idx, r, e));
189189
}
190+
return this;
190191
}
191192

192193
private void markRemainingCancelled() {

src/main/java/net/tascalate/concurrent/ConfigurableDependentPromise.java

Lines changed: 34 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,28 @@ protected ConfigurableDependentPromise(Promise<T> delegate,
8888
this.cancellableOrigins = cancellableOrigins;
8989
}
9090

91+
private DependentPromise<T> postConstruct() {
92+
if (isEmptyArray(cancellableOrigins)) {
93+
// Nothing to do
94+
}
95+
if (isCancelled()) {
96+
// Wrapped over already cancelled Promise
97+
// So result.cancel() has no effect
98+
// and we have to cancel origins explicitly
99+
// right after construction
100+
cancelPromises(cancellableOrigins, true);
101+
} else if (isDone()) {
102+
// nothing to do
103+
} else {
104+
delegate.whenComplete((r, e) -> {
105+
if (isCancelled()) {
106+
cancelPromises(cancellableOrigins, true);
107+
}
108+
});
109+
}
110+
return this;
111+
}
112+
91113
public static <U> DependentPromise<U> from(Promise<U> source) {
92114
return from(source, PromiseOrigin.NONE);
93115
}
@@ -103,7 +125,7 @@ protected <U> DependentPromise<U> wrap(Promise<U> original, CompletionStage<?>[]
103125
private static <U> DependentPromise<U> doWrap(Promise<U> original,
104126
Set<PromiseOrigin> defaultEnlistOptions,
105127
CompletionStage<?>[] cancellableOrigins) {
106-
if (null == cancellableOrigins || cancellableOrigins.length == 0) {
128+
if (isEmptyArray(cancellableOrigins)) {
107129
// Nothing to enlist additionally for this "original" instance
108130
if (original instanceof ConfigurableDependentPromise) {
109131
ConfigurableDependentPromise<U> ioriginal = (ConfigurableDependentPromise<U>)original;
@@ -113,22 +135,15 @@ private static <U> DependentPromise<U> doWrap(Promise<U> original,
113135
}
114136
}
115137
}
116-
ConfigurableDependentPromise<U> result =
117-
new ConfigurableDependentPromise<>(original, defaultEnlistOptions, cancellableOrigins);
118138

119-
if (result.isCancelled()) {
120-
// Wrapped over already cancelled Promise
121-
// So result.cancel() has no effect
122-
// and we have to cancel origins explicitly
123-
// right after construction
124-
cancelPromises(result.cancellableOrigins, true);
125-
}
126-
return result;
139+
return new ConfigurableDependentPromise<>(
140+
original, defaultEnlistOptions, cancellableOrigins
141+
).postConstruct();
127142
}
128143

129144
@Override
130145
public DependentPromise<T> onCancel(Runnable code) {
131-
return new ExtraCancellationDependentPromise<>(this, code);
146+
return new ExtraCancellationDependentPromise<>(this, code).postConstruct();
132147
}
133148

134149
// All delay overloads delegate to these methods
@@ -743,12 +758,8 @@ public DependentPromise<T> dependent(Set<PromiseOrigin> defaultEnlistOptions) {
743758

744759
@Override
745760
public boolean cancel(boolean mayInterruptIfRunning) {
746-
if (delegate.cancel(mayInterruptIfRunning)) {
747-
cancelPromises(cancellableOrigins, mayInterruptIfRunning);
748-
return true;
749-
} else {
750-
return false;
751-
}
761+
// See postConstruct -- handling is done in delegate.whenComplete
762+
return delegate.cancel(mayInterruptIfRunning);
752763
}
753764

754765
@Override
@@ -864,7 +875,7 @@ private boolean defaultEnlistOrigin() {
864875
}
865876

866877
static void cancelPromises(CompletionStage<?>[] promises, boolean mayInterruptIfRunning) {
867-
if (null != promises) {
878+
if (!isEmptyArray(promises)) {
868879
Arrays.stream(promises)
869880
.filter(p -> p != null)
870881
.forEach(p -> cancelPromise(p, mayInterruptIfRunning));
@@ -874,6 +885,10 @@ static void cancelPromises(CompletionStage<?>[] promises, boolean mayInterruptIf
874885
private static boolean identicalSets(Set<?> a, Set<?> b) {
875886
return a.containsAll(b) && b.containsAll(a);
876887
}
888+
889+
private static boolean isEmptyArray(Object[] array) {
890+
return null == array || array.length == 0;
891+
}
877892

878893
static class UndecoratedCancellationPromise<T> extends AbstractPromiseDecorator<T, Promise<T>> {
879894
private final CompletionStage<?>[] dependent;

src/main/java/net/tascalate/concurrent/ExtraCancellationDependentPromise.java

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -27,14 +27,19 @@ class ExtraCancellationDependentPromise<T> extends AbstractDependentPromiseDecor
2727
this.code = code;
2828
}
2929

30-
@Override
31-
public boolean cancel(boolean mayInterruptIfRunning) {
32-
if (super.cancel(mayInterruptIfRunning)) {
30+
DependentPromise<T> postConstruct() {
31+
if (isCancelled()) {
3332
code.run();
34-
return true;
33+
} else if (isDone()) {
34+
//
3535
} else {
36-
return false;
36+
delegate.whenComplete((r, e) -> {
37+
if (isCancelled()) {
38+
code.run();
39+
}
40+
});
3741
}
42+
return this;
3843
}
3944

4045
@Override
@@ -44,7 +49,7 @@ protected <U> DependentPromise<U> wrap(CompletionStage<U> original) {
4449

4550
@Override
4651
public Promise<T> unwrap() {
47-
return new Unwrapped<>(delegate, code);
52+
return new Unwrapped<>(delegate, code).postConstruct();
4853
}
4954

5055
@Override
@@ -57,7 +62,7 @@ Promise<T> unwrap(Function<DependentPromise<T>, Promise<T>> fn) {
5762
if (delegate == unwrapped) {
5863
return this;
5964
} else {
60-
return new ExtraCancellationPromise.Unwrapped<>(unwrapped, code);
65+
return new ExtraCancellationPromise.Unwrapped<>(unwrapped, code).postConstruct();
6166
}
6267
}
6368

src/main/java/net/tascalate/concurrent/ExtraCancellationPromise.java

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -28,14 +28,19 @@ class ExtraCancellationPromise<T> extends AbstractPromiseDecorator<T, Promise<T>
2828
this.code = code;
2929
}
3030

31-
@Override
32-
public boolean cancel(boolean mayInterruptIfRunning) {
33-
if (super.cancel(mayInterruptIfRunning)) {
31+
Promise<T> postConstruct() {
32+
if (isCancelled()) {
3433
code.run();
35-
return true;
34+
} else if (isDone()) {
35+
//
3636
} else {
37-
return false;
37+
delegate.whenComplete((r, e) -> {
38+
if (isCancelled()) {
39+
code.run();
40+
}
41+
});
3842
}
43+
return this;
3944
}
4045

4146
@Override
@@ -45,7 +50,7 @@ protected <U> Promise<U> wrap(CompletionStage<U> original) {
4550

4651
@Override
4752
public Promise<T> unwrap() {
48-
return new Unwrapped<>(delegate, code);
53+
return new Unwrapped<>(delegate, code).postConstruct();
4954
}
5055

5156
@Override
@@ -58,7 +63,7 @@ Promise<T> unwrap(Function<Promise<T>, Promise<T>> fn) {
5863
if (unwrapped == delegate) {
5964
return this;
6065
} else {
61-
return new Unwrapped<>(unwrapped, code);
66+
return new Unwrapped<>(unwrapped, code).postConstruct();
6267
}
6368
}
6469

src/main/java/net/tascalate/concurrent/Promise.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -99,8 +99,9 @@ default boolean isCompletedExceptionally() {
9999
}
100100
}
101101

102+
// @Decorator
102103
default Promise<T> onCancel(Runnable code) {
103-
return new ExtraCancellationPromise<>(this, code);
104+
return new ExtraCancellationPromise<>(this, code).postConstruct();
104105
}
105106

106107
default Promise<T> delay(long timeout, TimeUnit unit) {
@@ -221,6 +222,7 @@ default Promise<T> onTimeout(Supplier<? extends T> supplier, Duration duration,
221222
* @return
222223
* created DependentPromise
223224
*/
225+
// @Decorator
224226
default DependentPromise<T> dependent() {
225227
return ConfigurableDependentPromise.from(this);
226228
}
@@ -236,10 +238,12 @@ default DependentPromise<T> dependent() {
236238
* @return
237239
* created DependentPromise
238240
*/
241+
// @Decorator
239242
default DependentPromise<T> dependent(Set<PromiseOrigin> defaultEnlistOptions) {
240243
return ConfigurableDependentPromise.from(this, defaultEnlistOptions);
241244
}
242245

246+
// @Decorator
243247
default Promise<T> defaultAsyncOn(Executor executor) {
244248
return new ExecutorBoundPromise<>(this, executor);
245249
}
@@ -258,7 +262,7 @@ default <D> D as(Function<? super Promise<T>, D> decoratorFactory) {
258262
}
259263

260264
/**
261-
* Unwraps underlying {@link Promise} if it was decorated
265+
* Unwraps underlying {@link Promise} if it was decorated (removes one level of decorators)
262266
* @return
263267
* the underlying un-decorated {@link Promise} or self if not decorated
264268
*/
@@ -267,7 +271,7 @@ default Promise<T> unwrap() {
267271
}
268272

269273
/**
270-
* Fully unwraps underlying {@link Promise}
274+
* Fully unwraps underlying {@link Promise} (removes all decoration layers)
271275
* @return
272276
* the underlying un-decorated {@link Promise} or self if not decorated
273277
*/

src/main/java/net/tascalate/concurrent/Promises.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -989,9 +989,8 @@ private static <T, R> Promise<List<R>> atLeast(int minResultsCount, int maxError
989989
CompletionStage<? extends T> stage = promises.get(0);
990990
return transform(stage, singleResultMapper, Promises::wrapMultitargetException);
991991
} else {
992-
AggregatingPromise<T, R> result = ctr.create(minResultsCount, maxErrorsCount, cancelRemaining, promises);
993-
result.start();
994-
return result;
992+
return ctr.create(minResultsCount, maxErrorsCount, cancelRemaining, promises)
993+
.postConstruct();
995994
}
996995
}
997996

0 commit comments

Comments
 (0)