Skip to content

Commit 204cf06

Browse files
committed
Adding DelayPolicy.withCustomizer to allow chaining calls for custom classes;
Making RetryPolicy.exceptionClassRetryable overridable; Adding DependentPromise.as_ with more specific function arg type; In AggregatingPromise cancel remaining before firing event; Fix std. CompletableFuture.thenCompose[Async] behavior for cancellation -- now the Promise returned from fn is cancelled as well
1 parent 68ec7ea commit 204cf06

File tree

12 files changed

+215
-24
lines changed

12 files changed

+215
-24
lines changed

src/main/java/net/tascalate/concurrent/AggregatingPromise.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ abstract class AggregatingPromise<T, R> extends CompletablePromise<List<R>> {
4242
private final boolean cancelRemaining;
4343
private final List<? extends CompletionStage<? extends T>> promises;
4444

45+
@FunctionalInterface
4546
interface Constructor<T, R> {
4647
AggregatingPromise<T, R> create(int minResultsCount, int maxErrorsCount, boolean cancelRemaining,
4748
List<? extends CompletionStage<? extends T>> promises);
@@ -143,10 +144,14 @@ void onComplete(int idx, T result, Throwable error) {
143144
// Synchronized around done
144145
markRemainingCancelled();
145146
// Now no other thread can modify results array.
146-
onSuccess(collectResults(minResultsCount, completions));
147+
148+
// Cancel before firing events:
149+
// this helps with releasing thread pools, throttling etc.
147150
if (cancelRemaining) {
148151
cancelPromises();
149152
}
153+
154+
onSuccess(collectResults(minResultsCount, completions));
150155
}
151156
}
152157
} else {
@@ -163,11 +168,14 @@ void onComplete(int idx, T result, Throwable error) {
163168
// Synchronized around done
164169
markRemainingCancelled();
165170
// Now no other thread can modify errors array.
166-
onFailure(new MultitargetException(errors));
167-
171+
172+
// Cancel before firing events:
173+
// this helps with releasing thread pools, throttling etc.
168174
if (cancelRemaining) {
169175
cancelPromises();
170176
}
177+
178+
onFailure(new MultitargetException(errors));
171179
}
172180
}
173181
}

src/main/java/net/tascalate/concurrent/CallbackRegistry.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -284,9 +284,9 @@ void callFailureCallback(Throwable failure) {
284284
}
285285

286286
private static <S, U> void callCallback(Consumer<? super Callable<?>> stageTransition,
287-
Function<? super S, ? extends U> callback,
288-
S value,
289-
Executor executor) {
287+
Function<? super S, ? extends U> callback,
288+
S value,
289+
Executor executor) {
290290

291291
Callable<U> callable = () -> callback.apply(value);
292292
try {

src/main/java/net/tascalate/concurrent/CompletableFutureWrapper.java

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
import java.util.concurrent.CompletableFuture;
2020
import java.util.concurrent.CompletionException;
2121
import java.util.concurrent.CompletionStage;
22+
import java.util.concurrent.Executor;
23+
import java.util.function.Function;
2224

2325
import net.tascalate.concurrent.decorators.AbstractFutureDecorator;
2426

@@ -30,7 +32,6 @@ protected CompletableFutureWrapper() {
3032
this(new CompletableFuture<T>());
3133
}
3234

33-
3435
protected CompletableFutureWrapper(CompletableFuture<T> delegate) {
3536
super(delegate);
3637
}
@@ -49,4 +50,24 @@ public T join() throws CancellationException, CompletionException {
4950
protected <U> Promise<U> wrap(CompletionStage<U> original) {
5051
return new CompletableFutureWrapper<>((CompletableFuture<U>)original);
5152
}
53+
54+
// By default CompletableFuture doesn't interrupt a promise from thenCompose(fn)!
55+
@Override
56+
public <U> Promise<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn) {
57+
ComposedFutureRef<U> ref = new ComposedFutureRef<>();
58+
return super.thenCompose(ref.captureResult(fn)).onCancel(ref.cancelCaptured);
59+
}
60+
61+
@Override
62+
public <U> Promise<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn) {
63+
ComposedFutureRef<U> ref = new ComposedFutureRef<>();
64+
return super.thenComposeAsync(ref.captureResult(fn)).onCancel(ref.cancelCaptured);
65+
}
66+
67+
@Override
68+
public <U> Promise<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn,
69+
Executor executor) {
70+
ComposedFutureRef<U> ref = new ComposedFutureRef<>();
71+
return super.thenComposeAsync(ref.captureResult(fn), executor).onCancel(ref.cancelCaptured);
72+
}
5273
}

src/main/java/net/tascalate/concurrent/CompletionStageWrapper.java

Lines changed: 43 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,14 @@
2525
import java.util.concurrent.CompletionStage;
2626
import java.util.concurrent.CountDownLatch;
2727
import java.util.concurrent.ExecutionException;
28+
import java.util.concurrent.Executor;
2829
import java.util.concurrent.Future;
2930
import java.util.concurrent.TimeUnit;
3031
import java.util.concurrent.TimeoutException;
32+
import java.util.function.Function;
3133

3234
import net.tascalate.concurrent.decorators.AbstractCompletionStageDecorator;
35+
import net.tascalate.concurrent.decorators.AbstractPromiseDecorator;
3336
import net.tascalate.concurrent.decorators.BlockingCompletionStageDecorator;
3437

3538
public class CompletionStageWrapper<T>
@@ -117,20 +120,57 @@ public T join() {
117120
throw wrapCompletionException(fault);
118121
}
119122
}
120-
121123

122124
@Override
123125
protected <U> Promise<U> wrap(CompletionStage<U> original) {
124126
return Promises.from(original);
125127
}
126128

127129
public static <T> Promise<T> from(CompletionStage<T> stage) {
130+
return from(stage, true);
131+
}
132+
133+
public static <T> Promise<T> from(CompletionStage<T> stage, boolean composeSafe) {
134+
Promise<T> result;
128135
if (stage instanceof Future) {
129136
// If we can delegate blocking Future API...
130-
return BlockingCompletionStageDecorator.from(stage);
137+
result = BlockingCompletionStageDecorator.from(stage);
131138
} else {
132139
// Otherwise fallback to own implementation
133-
return new CompletionStageWrapper<>(stage);
140+
result = new CompletionStageWrapper<>(stage);
134141
}
142+
return composeSafe ? new ComposeSafePromise<>(result) : result;
143+
}
144+
145+
// By default CompletableFuture doesn't interrupt a promise from thenCompose(fn)!
146+
// Pessimistically assume this "feature" for all CompletionStage impls
147+
static class ComposeSafePromise<T> extends AbstractPromiseDecorator<T, Promise<T>> {
148+
protected ComposeSafePromise(Promise<T> delegate) {
149+
super(delegate);
150+
}
151+
152+
@Override
153+
protected <U> Promise<U> wrap(CompletionStage<U> original) {
154+
return new ComposeSafePromise<>((Promise<U>)original);
155+
}
156+
157+
@Override
158+
public <U> Promise<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn) {
159+
ComposedFutureRef<U> ref = new ComposedFutureRef<>();
160+
return super.thenCompose(ref.captureResult(fn)).onCancel(ref.cancelCaptured);
161+
}
162+
163+
@Override
164+
public <U> Promise<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn) {
165+
ComposedFutureRef<U> ref = new ComposedFutureRef<>();
166+
return super.thenComposeAsync(ref.captureResult(fn)).onCancel(ref.cancelCaptured);
167+
}
168+
169+
@Override
170+
public <U> Promise<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn,
171+
Executor executor) {
172+
ComposedFutureRef<U> ref = new ComposedFutureRef<>();
173+
return super.thenComposeAsync(ref.captureResult(fn), executor).onCancel(ref.cancelCaptured);
174+
}
135175
}
136176
}
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/**
2+
* Copyright 2015-2020 Valery Silaev (http://vsilaev.com)
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package net.tascalate.concurrent;
17+
18+
import java.util.concurrent.CompletionStage;
19+
import java.util.concurrent.atomic.AtomicReference;
20+
import java.util.function.Function;
21+
22+
class ComposedFutureRef<U> extends AtomicReference<CompletionStage<U>> {
23+
private static final long serialVersionUID = 1L;
24+
25+
<T> Function<? super T, ? extends CompletionStage<U>> captureResult(Function<? super T, ? extends CompletionStage<U>> fn) {
26+
return v -> {
27+
CompletionStage<U> result = fn.apply(v);
28+
set(result);
29+
return result;
30+
};
31+
}
32+
33+
Runnable cancelCaptured = () -> {
34+
CompletionStage<U> stage = get();
35+
if (null != stage) {
36+
SharedFunctions.cancelPromise(stage, true);
37+
}
38+
};
39+
}

0 commit comments

Comments
 (0)