Skip to content

Commit e632c2e

Browse files
committed
AsyncCompletions throws CompletionException (like CompletableFuture.join) instead of sneaky throw; less shared state in Promises.partitioned impl.; fix docs
1 parent b66dc5e commit e632c2e

File tree

4 files changed

+88
-48
lines changed

4 files changed

+88
-48
lines changed

src/main/java/net/tascalate/concurrent/Promises.java

Lines changed: 73 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -292,71 +292,71 @@ public static <T, A, R> Promise<R> partitioned(Iterable<? extends T> values,
292292
int batchSize,
293293
Function<? super T, CompletionStage<? extends T>> spawner,
294294
Collector<T, A, R> downstream) {
295-
return partitioned1(values.iterator(), batchSize, spawner, downstream);
295+
return partitioned1(values.iterator(), null, batchSize, spawner, downstream);
296296
}
297297

298298
public static <T, A, R> Promise<R> partitioned(Iterable<? extends T> values,
299299
int batchSize,
300300
Function<? super T, CompletionStage<? extends T>> spawner,
301301
Collector<T, A, R> downstream,
302302
Executor downstreamExecutor) {
303-
return partitioned2(values.iterator(), batchSize, spawner, downstream, downstreamExecutor);
303+
return partitioned2(values.iterator(), null, batchSize, spawner, downstream, downstreamExecutor);
304304
}
305305

306306
public static <T, A, R> Promise<R> partitioned(Stream<? extends T> values,
307307
int batchSize,
308308
Function<? super T, CompletionStage<? extends T>> spawner,
309309
Collector<T, A, R> downstream) {
310-
return partitioned1(values.iterator(), batchSize, spawner, downstream);
310+
return partitioned1(values.iterator(), values, batchSize, spawner, downstream);
311311
}
312312

313313
public static <T, A, R> Promise<R> partitioned(Stream<? extends T> values,
314314
int batchSize,
315315
Function<? super T, CompletionStage<? extends T>> spawner,
316316
Collector<T, A, R> downstream,
317317
Executor downstreamExecutor) {
318-
return partitioned2(values.iterator(), batchSize, spawner, downstream, downstreamExecutor);
318+
return partitioned2(values.iterator(), values, batchSize, spawner, downstream, downstreamExecutor);
319319
}
320320

321321

322322
private static <T, A, R> Promise<R> partitioned1(Iterator<? extends T> values,
323-
int batchSize,
324-
Function<? super T, CompletionStage<? extends T>> spawner,
325-
Collector<T, A, R> downstream) {
323+
Object source,
324+
int batchSize,
325+
Function<? super T, CompletionStage<? extends T>> spawner,
326+
Collector<T, A, R> downstream) {
326327
return
327328
parallelStep1(values, batchSize, spawner, downstream)
328329
.dependent()
329-
.thenApply(downstream.finisher(), true)
330-
.as(onCloseSource(values))
330+
.thenApply(downstream.finisher().compose(IndexedStep::payload), true)
331+
.as(onCloseSource(null != source? source : values))
331332
.unwrap();
332333
}
333334

334335

335336
private static <T, A, R> Promise<R> partitioned2(Iterator<? extends T> values,
336-
int batchSize,
337-
Function<? super T, CompletionStage<? extends T>> spawner,
338-
Collector<T, A, R> downstream,
339-
Executor downstreamExecutor) {
337+
Object source,
338+
int batchSize,
339+
Function<? super T, CompletionStage<? extends T>> spawner,
340+
Collector<T, A, R> downstream,
341+
Executor downstreamExecutor) {
340342
return
341343
parallelStep2(values, batchSize, spawner, downstream, downstreamExecutor)
342344
.dependent()
343-
.thenApplyAsync(downstream.finisher(), downstreamExecutor, true)
344-
.as(onCloseSource(values))
345+
.thenApplyAsync(downstream.finisher().compose(IndexedStep::payload), downstreamExecutor, true)
346+
.as(onCloseSource(null != source? source : values))
345347
.unwrap();
346348
}
347349

348-
private static <T, A, R> Promise<A> parallelStep1(Iterator<? extends T> values,
349-
int batchSize,
350-
Function<? super T, CompletionStage<? extends T>> spawner,
351-
Collector<T, A, R> downstream) {
350+
private static <T, A, R> Promise<IndexedStep<A>> parallelStep1(
351+
Iterator<? extends T> values, int batchSize,
352+
Function<? super T, CompletionStage<? extends T>> spawner,
353+
Collector<T, A, R> downstream) {
352354

353-
int[] step = new int[1];
354-
return loop(null, __ -> step[0] == 0 || values.hasNext(), current -> {
355+
return loop(new IndexedStep<>(), step -> step.initial() || values.hasNext(), step -> {
355356
List<T> valuesBatch = drainBatch(values, batchSize);
356-
boolean initial = step[0]++ == 0;
357357
if (valuesBatch.isEmpty()) {
358358
// Over
359-
return Promises.success(initial ? downstream.supplier().get() : current);
359+
return Promises.success(step.initial() ? step.next(downstream.supplier().get()) : step);
360360
} else {
361361
List<CompletionStage<? extends T>> promisesBatch =
362362
valuesBatch.stream()
@@ -366,27 +366,27 @@ private static <T, A, R> Promise<A> parallelStep1(Iterator<? extends T> values,
366366
return
367367
Promises.all(promisesBatch)
368368
.dependent()
369-
.thenApply(vals -> accumulate(vals, initial, current, downstream), true);
369+
.thenApply(vals -> step.next(
370+
accumulate(vals, step.initial(), step.payload(), downstream)
371+
), true);
370372
}
371373
});
372374
}
373375

374-
private static <T, A, R> Promise<A> parallelStep2(Iterator<? extends T> values,
375-
int batchSize,
376-
Function<? super T, CompletionStage<? extends T>> spawner,
377-
Collector<T, A, R> downstream,
378-
Executor downstreamExecutor) {
376+
private static <T, A, R> Promise<IndexedStep<A>> parallelStep2(
377+
Iterator<? extends T> values, int batchSize,
378+
Function<? super T, CompletionStage<? extends T>> spawner,
379+
Collector<T, A, R> downstream,
380+
Executor downstreamExecutor) {
379381

380-
int[] step = new int[1];
381-
return loop(null, __ -> step[0] == 0 || values.hasNext(), current -> {
382+
return loop(new IndexedStep<>(), step -> step.initial() || values.hasNext(), step -> {
382383
List<T> valuesBatch = drainBatch(values, batchSize);
383-
boolean initial = step[0]++ == 0;
384384
if (valuesBatch.isEmpty()) {
385385
// Over
386-
return initial ?
387-
CompletableTask.supplyAsync(downstream.supplier(), downstreamExecutor)
386+
return step.initial() ?
387+
CompletableTask.supplyAsync(() -> step.next(downstream.supplier().get()), downstreamExecutor)
388388
:
389-
Promises.success(current);
389+
Promises.success(step);
390390
} else {
391391
List<CompletionStage<? extends T>> promisesBatch =
392392
valuesBatch.stream()
@@ -396,11 +396,40 @@ private static <T, A, R> Promise<A> parallelStep2(Iterator<? extends T> values,
396396
return
397397
Promises.all(promisesBatch)
398398
.dependent()
399-
.thenApplyAsync(vals -> accumulate(vals, initial, current, downstream), downstreamExecutor, true);
399+
.thenApplyAsync(vals -> step.next(
400+
accumulate(vals, step.initial(), step.payload(), downstream)
401+
), downstreamExecutor, true);
400402
}
401403
});
402404
}
403405

406+
407+
private static class IndexedStep<T> {
408+
private final int idx;
409+
private final T payload;
410+
411+
IndexedStep() {
412+
this(0, null);
413+
}
414+
415+
private IndexedStep(int idx, T payload) {
416+
this.idx = idx;
417+
this.payload = payload;
418+
}
419+
420+
IndexedStep<T> next(T payload) {
421+
return new IndexedStep<>(idx + 1, payload);
422+
}
423+
424+
boolean initial() {
425+
return idx == 0;
426+
}
427+
428+
T payload() {
429+
return payload;
430+
}
431+
}
432+
404433
private static <T> List<T> drainBatch(Iterator<? extends T> values, int batchSize) {
405434
List<T> valuesBatch = new ArrayList<>(batchSize);
406435
for (int count = 0; values.hasNext() && count < batchSize; count++) {
@@ -1060,13 +1089,13 @@ private static <T extends C, C> Promise<T> retryImpl(Function<? super RetryConte
10601089

10611090
class State {
10621091
private final boolean isDone;
1063-
private final Promise<?> prevAsync;
1092+
private final DependentPromise<?> prevAsync;
10641093

10651094
final RetryContext<C> ctx;
10661095
final RetryPolicy.Verdict verdict;
10671096

10681097

1069-
private State(RetryContext<C> ctx, Promise<?> prevAsync, boolean isDone) {
1098+
private State(RetryContext<C> ctx, DependentPromise<?> prevAsync, boolean isDone) {
10701099
this.ctx = ctx;
10711100
this.prevAsync = prevAsync;
10721101
this.isDone = isDone;
@@ -1079,11 +1108,11 @@ private State(RetryContext<C> ctx, Promise<?> prevAsync, boolean isDone) {
10791108
}
10801109

10811110
// Transition to intermediate
1082-
State next(Throwable error, Duration d, Promise<?> prevAsync) {
1111+
State next(Throwable error, Duration d, DependentPromise<?> prevAsync) {
10831112
return new State(ctx.nextRetry(d, unwrapCompletionException(error)), useAsync(prevAsync), false);
10841113
}
10851114

1086-
State next(T result, Duration d, Promise<?> prevAsync) {
1115+
State next(T result, Duration d, DependentPromise<?> prevAsync) {
10871116
return new State(ctx.nextRetry(d, result), useAsync(prevAsync), false);
10881117
}
10891118

@@ -1102,7 +1131,7 @@ Promise<T> toPromise() {
11021131
return isDone ? success((T)ctx.getLastResult()) : failure(ctx.asFailure());
11031132
}
11041133

1105-
Promise<?> useAsync(Promise<?> prevAsync) {
1134+
private DependentPromise<?> useAsync(DependentPromise<?> prevAsync) {
11061135
return prevAsync != null ? prevAsync : this.prevAsync;
11071136
}
11081137

@@ -1111,7 +1140,6 @@ DependentPromise<?> makeDelay(Duration delay) {
11111140
Timeouts.delay(delay).dependent()
11121141
:
11131142
prevAsync.exceptionally(SharedFunctions.nullify()) // Don't propagate own error
1114-
.dependent()
11151143
.delay(delay, true, true);
11161144
}
11171145
}
@@ -1123,14 +1151,14 @@ DependentPromise<?> makeDelay(Duration delay) {
11231151
Supplier<Promise<State>> callSupplier = () -> {
11241152
long startTime = System.nanoTime();
11251153

1126-
Promise<T> target = futureFactory.apply(ctx);
1154+
DependentPromise<T> target = futureFactory.apply(ctx).dependent();
11271155
DependentPromise<T> withTimeout;
11281156

11291157
Duration timeout = verdict.timeout();
11301158
if (DelayPolicy.isValid(timeout)) {
1131-
withTimeout = target.dependent().orTimeout(timeout, true, true);
1159+
withTimeout = target.orTimeout(timeout, true, true);
11321160
} else {
1133-
withTimeout = target.dependent();
1161+
withTimeout = target;
11341162
}
11351163

11361164
return withTimeout.handle((value, ex) -> {

src/main/java/net/tascalate/concurrent/Try.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616
package net.tascalate.concurrent;
1717

18+
import java.util.concurrent.CancellationException;
1819
import java.util.function.BiFunction;
1920

2021
abstract class Try<R> {
@@ -57,7 +58,16 @@ static final class Failure<R> extends Try<R> {
5758

5859
@Override
5960
R done() {
61+
/*
6062
return sneakyThrow(error);
63+
*/
64+
if (error instanceof Error) {
65+
throw (Error)error;
66+
} else if (error instanceof CancellationException) {
67+
throw (CancellationException)error;
68+
} else {
69+
throw SharedFunctions.wrapCompletionException(error);
70+
}
6171
}
6272

6373
@Override
@@ -88,10 +98,12 @@ static <R> Try<R> nothing() {
8898
return (Try<R>)NOTHING;
8999
}
90100

101+
/*
91102
@SuppressWarnings("unchecked")
92103
static <T, E extends Throwable> T sneakyThrow(Throwable e) throws E {
93104
throw (E) e;
94105
}
106+
*/
95107

96108

97109
private static final Try<Object> NOTHING = success(null);

src/main/java/net/tascalate/concurrent/locks/AbstractAsyncLock.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,15 +39,15 @@ public interface AbstractAsyncLock<T extends AbstractAsyncLock.Token> {
3939
* lock after it has been acquired and the lock-protected action has
4040
* completed.
4141
*
42-
* @return A {@link CompletionStage} which will complete with a
42+
* @return A {@link Promise} which will complete with a
4343
* {@link Token} when the lock has been exclusively acquired
4444
*/
4545
Promise<T> acquire();
4646

4747
/**
4848
* A lock token indicating that the associated lock has been exclusively
4949
* acquired. Once the protected action is completed, the lock may be
50-
* released by calling {@link Token#releaseLock()}
50+
* released by calling {@link Token#release()}
5151
*/
5252
interface Token extends AutoCloseable {
5353
/** Releases this lock, allowing others to acquire it. */

src/main/java/net/tascalate/concurrent/locks/AsyncSemaphoreLock.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ default public Promise<Token> acquire() {
4242
/**
4343
* A semaphore token indicating that necessary permit has been exclusively acquired. Once the
4444
* protected action is completed, permits may be released by calling
45-
* {@link Token#releaseLock()}
45+
* {@link Token#release()}
4646
*/
4747
interface Token extends AsyncLock.Token {
4848
long permits();

0 commit comments

Comments
 (0)