Skip to content

Commit 68084c3

Browse files
committed
Handle RejectExecutionException; as a result - refactor and simplify related code
1 parent 6303e0b commit 68084c3

File tree

5 files changed

+114
-29
lines changed

5 files changed

+114
-29
lines changed

src/main/java/net/tascalate/concurrent/AbstractCompletableTask.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ protected void cancelOrigins(boolean mayInterruptIfRunning) {
8383
}
8484
}
8585

86-
abstract Runnable setupTransition(Callable<T> code);
86+
abstract void fireTransition(Callable<T> code);
8787

8888
@Override
8989
public boolean cancel(boolean mayInterruptIfRunning) {
@@ -293,7 +293,7 @@ public <U> Promise<U> thenComposeAsync(Function<? super T, ? extends CompletionS
293293
}
294294

295295
private <U> Consumer<? super U> runTransition(Function<? super U, ? extends T> converter) {
296-
return u -> setupTransition(() -> converter.apply(u)).run();
296+
return u -> fireTransition(() -> converter.apply(u));
297297
}
298298

299299
@Override
@@ -372,7 +372,7 @@ public CompletableFuture<T> toCompletableFuture() {
372372
// nextStage is CompletableFuture rather than AbstractCompletableTask
373373
// so trigger completion on ad-hoc runnable rather than on
374374
// nextStage.task
375-
Function<Callable<T>, Runnable> setup = c -> () -> {
375+
Consumer<Callable<T>> setup = c -> {
376376
try {
377377
c.call();
378378
} catch (Throwable ex) {
@@ -474,15 +474,15 @@ private <U> void addCallbacks(AbstractCompletableTask<U> targetStage,
474474
Function<Throwable, ? extends U> failureCallback,
475475
Executor executor) {
476476

477-
addCallbacks(targetStage::setupTransition, successCallback, failureCallback, executor);
477+
addCallbacks(targetStage::fireTransition, successCallback, failureCallback, executor);
478478
}
479479

480-
private <U> void addCallbacks(Function<? super Callable<U>, ? extends Runnable> targetSetup,
480+
private <U> void addCallbacks(Consumer<? super Callable<U>> stageTransition,
481481
Function<? super T, ? extends U> successCallback,
482482
Function<Throwable, ? extends U> failureCallback,
483483
Executor executor) {
484484

485-
callbackRegistry.addCallbacks(targetSetup, successCallback, failureCallback, executor);
485+
callbackRegistry.addCallbacks(stageTransition, successCallback, failureCallback, executor);
486486
}
487487

488488
}

src/main/java/net/tascalate/concurrent/CallbackRegistry.java

Lines changed: 27 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@
2727
import java.util.Queue;
2828
import java.util.concurrent.Callable;
2929
import java.util.concurrent.Executor;
30+
import java.util.concurrent.RejectedExecutionException;
31+
import java.util.function.Consumer;
3032
import java.util.function.Function;
3133

3234
class CallbackRegistry<T> {
@@ -37,7 +39,7 @@ class CallbackRegistry<T> {
3739
/**
3840
* Adds the given callbacks to this registry.
3941
*/
40-
<U> void addCallbacks(Function<? super Callable<U>, ? extends Runnable> targetSetup,
42+
<U> void addCallbacks(Consumer<? super Callable<U>> stageTransition,
4143
Function<? super T, ? extends U> successCallback,
4244
Function<Throwable, ? extends U> failureCallback,
4345
Executor executor) {
@@ -47,11 +49,10 @@ <U> void addCallbacks(Function<? super Callable<U>, ? extends Runnable> targetSe
4749
Objects.requireNonNull(executor, "'executor' must not be null");
4850

4951
@SuppressWarnings("unchecked")
50-
Function<? super Callable<?>, ? extends Runnable> typedTargetSetup =
51-
(Function<? super Callable<?>, ? extends Runnable>) targetSetup;
52+
Consumer<? super Callable<?>> typedTransition = (Consumer<? super Callable<?>>)stageTransition;
5253

5354
synchronized (mutex) {
54-
state = state.addCallbacks(typedTargetSetup, successCallback, failureCallback, executor);
55+
state = state.addCallbacks(typedTransition, successCallback, failureCallback, executor);
5556
}
5657
}
5758

@@ -101,7 +102,7 @@ boolean failure(Throwable failure) {
101102
* synchronized block and are NOT thread safe on their own.
102103
*/
103104
private static abstract class State<S> {
104-
protected abstract State<S> addCallbacks(Function<? super Callable<?>, ? extends Runnable> targetSetup,
105+
protected abstract State<S> addCallbacks(Consumer<? super Callable<?>> stageTransition,
105106
Function<? super S, ?> successCallback,
106107
Function<Throwable, ?> failureCallback,
107108
Executor executor);
@@ -133,13 +134,13 @@ private static class InitialState<S> extends State<S> {
133134
private static final InitialState<Object> instance = new InitialState<>();
134135

135136
@Override
136-
protected State<S> addCallbacks(Function<? super Callable<?>, ? extends Runnable> targetSetup,
137+
protected State<S> addCallbacks(Consumer<? super Callable<?>> stageTransition,
137138
Function<? super S, ?> successCallback,
138139
Function<Throwable, ?> failureCallback,
139140
Executor executor) {
140141

141142
IntermediateState<S> intermediateState = new IntermediateState<>();
142-
intermediateState.addCallbacks(targetSetup, successCallback, failureCallback, executor);
143+
intermediateState.addCallbacks(stageTransition, successCallback, failureCallback, executor);
143144
return intermediateState;
144145
}
145146

@@ -171,12 +172,12 @@ private static class IntermediateState<S> extends State<S> {
171172
private final Queue<CallbackHolder<? super S>> callbacks = new LinkedList<>();
172173

173174
@Override
174-
protected State<S> addCallbacks(Function<? super Callable<?>, ? extends Runnable> targetSetup,
175+
protected State<S> addCallbacks(Consumer<? super Callable<?>> stageTransition,
175176
Function<? super S, ?> successCallback,
176177
Function<Throwable, ?> failureCallback,
177178
Executor executor) {
178179

179-
callbacks.add(new CallbackHolder<>(targetSetup, successCallback, failureCallback, executor));
180+
callbacks.add(new CallbackHolder<>(stageTransition, successCallback, failureCallback, executor));
180181
return this;
181182
}
182183

@@ -225,12 +226,12 @@ private SuccessState(S result) {
225226
}
226227

227228
@Override
228-
protected State<S> addCallbacks(Function<? super Callable<?>, ? extends Runnable> targetSetup,
229+
protected State<S> addCallbacks(Consumer<? super Callable<?>> stageTransition,
229230
Function<? super S, ?> successCallback,
230231
Function<Throwable, ?> failureCallback,
231232
Executor executor) {
232233

233-
callCallback(targetSetup, successCallback, result, executor);
234+
callCallback(stageTransition, successCallback, result, executor);
234235
return this;
235236
}
236237
}
@@ -246,49 +247,55 @@ private FailureState(Throwable failure) {
246247
}
247248

248249
@Override
249-
protected State<S> addCallbacks(Function<? super Callable<?>, ? extends Runnable> targetSetup,
250+
protected State<S> addCallbacks(Consumer<? super Callable<?>> stageTransition,
250251
Function<? super S, ?> successCallback,
251252
Function<Throwable, ?> failureCallback,
252253
Executor executor) {
253254

254-
callCallback(targetSetup, failureCallback, failure, executor);
255+
callCallback(stageTransition, failureCallback, failure, executor);
255256
return this;
256257
}
257258
}
258259

259260
private static final class CallbackHolder<S> {
260-
private final Function<? super Callable<?>, ? extends Runnable> targetSetup;
261+
private final Consumer<? super Callable<?>> stageTransition;
261262
private final Function<? super S, ?> successCallback;
262263
private final Function<Throwable, ?> failureCallback;
263264
private final Executor executor;
264265

265-
private CallbackHolder(Function<? super Callable<?>, ? extends Runnable> targetSetup,
266+
private CallbackHolder(Consumer<? super Callable<?>> stageTransition,
266267
Function<? super S, ?> successCallback,
267268
Function<Throwable, ?> failureCallback,
268269
Executor executor) {
269270

270-
this.targetSetup = targetSetup;
271+
this.stageTransition = stageTransition;
271272
this.successCallback = successCallback;
272273
this.failureCallback = failureCallback;
273274
this.executor = executor;
274275
}
275276

276277
void callSuccessCallback(S result) {
277-
callCallback(targetSetup, successCallback, result, executor);
278+
callCallback(stageTransition, successCallback, result, executor);
278279
}
279280

280281
void callFailureCallback(Throwable failure) {
281-
callCallback(targetSetup, failureCallback, failure, executor);
282+
callCallback(stageTransition, failureCallback, failure, executor);
282283
}
283284
}
284285

285-
private static <S, U> void callCallback(Function<? super Callable<U>, ? extends Runnable> targetSetup,
286+
private static <S, U> void callCallback(Consumer<? super Callable<?>> stageTransition,
286287
Function<? super S, ? extends U> callback,
287288
S value,
288289
Executor executor) {
289290

290291
Callable<U> callable = () -> callback.apply(value);
291-
executor.execute(targetSetup.apply(callable));
292+
try {
293+
executor.execute( () -> stageTransition.accept(callable) );
294+
} catch (RejectedExecutionException ex) {
295+
// Propagate error in-place
296+
Callable<U> propagateError = () -> { throw ex; };
297+
stageTransition.accept(propagateError);
298+
}
292299
}
293300

294301
}

src/main/java/net/tascalate/concurrent/CompletableSubTask.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,10 +56,10 @@ public T call() throws Exception {
5656
}
5757

5858
@Override
59-
Runnable setupTransition(Callable<T> code) {
59+
void fireTransition(Callable<T> code) {
6060
DelegatingCallable<T> transitionCall = (DelegatingCallable<T>) action;
6161
transitionCall.setup(code);
62-
return task;
62+
task.run();
6363
}
6464

6565
@Override

src/main/java/net/tascalate/concurrent/CompletableTask.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,7 @@ public static Promise<Duration> delay(Duration duration, Executor executor) {
167167
}
168168

169169
@Override
170-
Runnable setupTransition(Callable<T> code) {
170+
void fireTransition(Callable<T> code) {
171171
throw new UnsupportedOperationException();
172172
}
173173

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
/**
2+
* Copyright 2015-2018 Valery Silaev (http://vsilaev.com)
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package net.tascalate.concurrent;
17+
18+
import static org.junit.Assert.assertFalse;
19+
import static org.junit.Assert.assertTrue;
20+
21+
import java.util.concurrent.CountDownLatch;
22+
import java.util.concurrent.ExecutionException;
23+
import java.util.concurrent.ExecutorService;
24+
import java.util.concurrent.Executors;
25+
import java.util.concurrent.atomic.AtomicBoolean;
26+
27+
import org.junit.After;
28+
import org.junit.Before;
29+
import org.junit.Test;
30+
31+
/**
32+
* @author vsilaev
33+
*/
34+
public class StopExecutorTests {
35+
36+
private TaskExecutorService executor;
37+
38+
@Before
39+
public void setUp() {
40+
executor = TaskExecutors.newFixedThreadPool(4);
41+
}
42+
43+
@After
44+
public void tearDown() {
45+
executor.shutdown();
46+
}
47+
48+
@Test
49+
public void test_shutting_down_executor_will_not_hang_promise_due_to_pending_callback() throws ExecutionException, InterruptedException {
50+
ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
51+
52+
CountDownLatch latch = new CountDownLatch(1);
53+
AtomicBoolean flag = new AtomicBoolean();
54+
Promise<Void> p = CompletableTask.asyncOn(executor)
55+
.thenApplyAsync(v -> {
56+
try {
57+
latch.await();
58+
} catch (InterruptedException ex) {
59+
}
60+
return v;
61+
}, singleThreadExecutor)
62+
.handleAsync((v, t) -> {
63+
flag.set(true);
64+
return null;
65+
});
66+
67+
singleThreadExecutor.shutdownNow();
68+
latch.countDown();
69+
//singleThreadExecutor.awaitTermination(10, TimeUnit.SECONDS);
70+
try {
71+
p.get();
72+
assertFalse("Promise should not be resolved susscessfully", true);
73+
} catch (Exception ex) {
74+
assertTrue("Exception is thrown: " + ex.getMessage(), true);
75+
}
76+
assertFalse(flag.get());
77+
}
78+
}

0 commit comments

Comments
 (0)