Skip to content

Commit ee3d026

Browse files
committed
JavaDoc-s; minor refactoring of Promises methods' names
1 parent 00a81de commit ee3d026

File tree

10 files changed

+194
-12
lines changed

10 files changed

+194
-12
lines changed

src/main/java/net/tascalate/concurrent/AbstractCompletableTask.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,14 @@
3838
import java.util.function.Consumer;
3939
import java.util.function.Function;
4040

41+
/**
42+
* Base superclass for both root and intermediate {@link Promise}-s that
43+
* represent blocking long-running tasks
44+
*
45+
* @author vsilaev
46+
*
47+
* @param <T>
48+
*/
4149
abstract class AbstractCompletableTask<T> extends PromiseAdapter<T> implements Promise<T> {
4250

4351
private final CallbackRegistry<T> callbackRegistry = new CallbackRegistry<>();

src/main/java/net/tascalate/concurrent/AbstractDelegatingPromise.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,15 @@
2323
import java.util.function.Consumer;
2424
import java.util.function.Function;
2525

26+
/**
27+
* Helper class to create a concrete {@link Promise} subclass via delegation
28+
* to the wrapped {@link CompletionStage}
29+
*
30+
* @author vsilaev
31+
*
32+
* @param <T>
33+
* @param <D>
34+
*/
2635
abstract public class AbstractDelegatingPromise<T, D extends CompletionStage<T>> implements Promise<T> {
2736
final protected D completionStage;
2837

src/main/java/net/tascalate/concurrent/AggregatingPromise.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ void onComplete(int idx, T result, Throwable error) {
106106
// Synchronized around done
107107
markRemainingCancelled();
108108
// Now no other thread can modify errors array.
109-
onError(new MultitargetException(new ArrayList<>(Arrays.asList(errors))));
109+
onFailure(new MultitargetException(new ArrayList<>(Arrays.asList(errors))));
110110

111111
if (cancelRemaining) {
112112
cancelPromises();

src/main/java/net/tascalate/concurrent/CompletablePromise.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,13 @@
2424
import java.util.concurrent.TimeUnit;
2525
import java.util.concurrent.TimeoutException;
2626

27+
/**
28+
* The {@link CompletablePromise} is an adapter of a {@link CompletableFuture} to the {@link Promise} API
29+
*
30+
* @author vsilaev
31+
*
32+
* @param <T>
33+
*/
2734
public class CompletablePromise<T> extends AbstractDelegatingPromise<T, CompletableFuture<T>> implements Promise<T> {
2835

2936
public CompletablePromise() {
@@ -38,7 +45,7 @@ protected boolean onSuccess(T value) {
3845
return completionStage.complete(value);
3946
}
4047

41-
protected boolean onError(Throwable ex) {
48+
protected boolean onFailure(Throwable ex) {
4249
return completionStage.completeExceptionally(ex);
4350
}
4451

src/main/java/net/tascalate/concurrent/CompletableSubTask.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,12 @@
1919
import java.util.concurrent.Executor;
2020
import java.util.concurrent.atomic.AtomicBoolean;
2121

22+
/**
23+
* The {@link Promise} implementation for intermediate long-running blocking task
24+
* @author vsilaev
25+
*
26+
* @param <T>
27+
*/
2228
class CompletableSubTask<T> extends AbstractCompletableTask<T> {
2329

2430
static class DelegatingCallable<T> implements Callable<T> {

src/main/java/net/tascalate/concurrent/CompletableTask.java

Lines changed: 73 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,32 +19,100 @@
1919
import java.util.concurrent.Executor;
2020
import java.util.concurrent.RunnableFuture;
2121

22+
/**
23+
*
24+
* Concrete implementation of {@link Promise} interface for long-running blocking tasks
25+
*
26+
* @author vsilaev
27+
*
28+
* @param <T>
29+
*/
2230
public class CompletableTask<T> extends AbstractCompletableTask<T> implements RunnableFuture<T> {
2331

32+
/**
33+
* Creates a CompletableTask; for internal use only
34+
* @param executor
35+
* a default {@link Executor} to run functions passed to async composition methods
36+
* @param callable
37+
* a {@link Callable} that completes this task
38+
*/
2439
protected CompletableTask(final Executor executor, Callable<T> callable) {
2540
super(executor, callable);
2641
}
2742

43+
/**
44+
* Executes wrapped {@link Callable}; don't use explicitly
45+
*/
2846
@Override
2947
public void run() {
3048
task.run();
3149
}
3250

33-
public static <T> Promise<T> resolve(T value, Executor defaultExecutor) {
51+
/**
52+
* Returns a resolved {@link Promise} with specified value; the promise is "bound" to the specified executor.
53+
* I.e. any function passed to composition methods of Promise (like <code>thenApplyAsync</code>
54+
* / <code>thenAcceptAsync</code> / <code>whenCompleteAsync</code> etc.) will be executed using this executor
55+
* unless executor is overridden via explicit composition method parameter. Moreover, any nested
56+
* composition calls will use same executor, if it’s not redefined via explicit composition method parameter:
57+
* {@code}<pre>CompletableTask
58+
* .complete("Hello!", myExecutor)
59+
* .thenApplyAsync(myMapper)
60+
* .thenApplyAsync(myTransformer)
61+
* .thenAcceptAsync(myConsumer)
62+
* .thenRunAsync(myAction)
63+
* </pre>
64+
* All of <code>myMapper</code>, <code>myTransformer</code>, <code>myConsumer</code>, <code>myActtion</code> will be executed using <code>myExecutor</code>
65+
*
66+
* @param value
67+
* a resolved value of the promise
68+
* @param executor
69+
* a default {@link Executor} to run functions passed to async composition methods
70+
* (like <code>thenApplyAsync</code> / <code>thenAcceptAsync</code> / <code>whenCompleteAsync</code> etc.)
71+
* @return
72+
* resolved {@link Promise} with a value passed; the promise is bound to the specified executor
73+
*/
74+
public static <T> Promise<T> complete(T value, Executor defaultExecutor) {
3475
CompletableTask<T> result = new CompletableTask<T>(defaultExecutor, () -> value);
3576
SAME_THREAD_EXECUTOR.execute(result);
3677
return result;
3778
}
38-
39-
public static Promise<Void> asyncOn(Executor defaultExecutor) {
40-
return resolve(null, defaultExecutor);
79+
80+
/**
81+
* Returns a resolved no-value {@link Promise} that is "bound" to the specified executor.
82+
* I.e. any function passed to composition methods of Promise (like <code>thenApplyAsync</code>
83+
* / <code>thenAcceptAsync</code> / <code>whenCompleteAsync</code> etc.) will be executed using this executor
84+
* unless executor is overridden via explicit composition method parameter. Moreover, any nested
85+
* composition calls will use same executor, if it’s not redefined via explicit composition method parameter:
86+
* {@code}<pre>CompletableTask
87+
* .asyncOn(myExecutor)
88+
* .thenApplyAsync(myValueGenerator)
89+
* .thenAcceptAsync(myConsumer)
90+
* .thenRunAsync(myAction)
91+
* </pre>
92+
* All of <code>myValueGenerator</code>, <code>myConsumer</code>, <code>myActtion</code> will be executed using <code>myExecutor</code>
93+
*
94+
* @param executor
95+
* a default {@link Executor} to run functions passed to async composition methods
96+
* (like <code>thenApplyAsync</code> / <code>thenAcceptAsync</code> / <code>whenCompleteAsync</code> etc.)
97+
* @return
98+
* resolved non-value {@link Promise} bound to the specified executor
99+
*/
100+
public static Promise<Void> asyncOn(Executor executor) {
101+
return complete(null, executor);
41102
}
42103

43104
@Override
44105
Runnable setupTransition(Callable<T> code) {
45106
throw new UnsupportedOperationException();
46107
}
47-
108+
109+
/**
110+
* Creates a nested sub-task for a composition method
111+
* @param executor
112+
* a default executor for async composition methods of nested sub-task
113+
* @return
114+
* an instance of {@link CompletableSubTask} bound to the specified executor
115+
*/
48116
@Override
49117
protected <U> AbstractCompletableTask<U> createCompletionStage(Executor executor) {
50118
return new CompletableSubTask<U>(executor);

src/main/java/net/tascalate/concurrent/Promise.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,15 @@
2323
import java.util.function.Consumer;
2424
import java.util.function.Function;
2525

26+
/**
27+
* <p>{@link Promise} is a combination of the {@link CompletionStage} and {@link Future} contracts.
28+
* It provides both composition methods of the former and blocking access methods of the later.
29+
* <p>Every composition method derived from the {@link CompletionStage} interface is overridden to
30+
* return a new Promise;
31+
* @author vsilaev
32+
*
33+
* @param <T>
34+
*/
2635
public interface Promise<T> extends Future<T>, CompletionStage<T> {
2736

2837
public <U> Promise<U> thenApply(Function<? super T, ? extends U> fn);

src/main/java/net/tascalate/concurrent/PromiseAdapter.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,13 @@
2929
import java.util.function.Consumer;
3030
import java.util.function.Function;
3131

32+
/**
33+
* Helper class to create a concrete {@link Promise} subclass as an
34+
* implementation from scratch.
35+
* @author vsilaev
36+
*
37+
* @param <T>
38+
*/
3239
abstract public class PromiseAdapter<T> implements Promise<T> {
3340
protected static final Executor SAME_THREAD_EXECUTOR = new Executor() {
3441
public void execute(Runnable command) {

src/main/java/net/tascalate/concurrent/Promises.java

Lines changed: 73 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,14 +23,49 @@
2323
import java.util.function.Consumer;
2424
import java.util.function.Function;
2525

26+
/**
27+
* Utility class to create a resolved (either successfully or faulty) {@link Promise}-s;
28+
* to wrap an arbitrary {@link CompletionStage} interface to the {@link Promise} API;
29+
* to combine several {@link CompletionStage}-s into aggregating promise.
30+
*
31+
* @author vsilaev
32+
*
33+
*/
2634
public class Promises {
2735

28-
public static <T> Promise<T> readyValue(T value) {
36+
/**
37+
* Method to create a successfully resolved {@link Promise} with a value provided
38+
* @param value
39+
* a value to wrap
40+
* @return
41+
* a successfully resolved {@link Promise} with a value provided
42+
*/
43+
public static <T> Promise<T> success(T value) {
2944
final CompletablePromise<T> result = new CompletablePromise<>();
3045
result.onSuccess(value);
3146
return result;
3247
}
48+
49+
/**
50+
* Method to create a faulty resolved {@link Promise} with an exception provided
51+
* @param exception
52+
* an exception to wrap
53+
* @return
54+
* a faulty resolved {@link Promise} with an exception provided
55+
*/
56+
public static Promise<?> failure(Throwable exception) {
57+
final CompletablePromise<?> result = new CompletablePromise<>();
58+
result.onFailure(exception);
59+
return result;
60+
}
3361

62+
/**
63+
* Adapts a stage passed to the {@link Promise} API
64+
* @param stage
65+
* a {@link CompletionStage} to be wrapped
66+
* @return
67+
* a {@link Promise}
68+
*/
3469
public static <T> Promise<T> from(CompletionStage<T> stage) {
3570
if (stage instanceof Promise) {
3671
return (Promise<T>) stage;
@@ -41,7 +76,7 @@ public static <T> Promise<T> from(CompletionStage<T> stage) {
4176
}
4277

4378
final CompletablePromise<T> result = createLinkedPromise(stage);
44-
stage.whenComplete(handler(result::onSuccess, result::onError));
79+
stage.whenComplete(handler(result::onSuccess, result::onFailure));
4580
return result;
4681
}
4782

@@ -52,7 +87,7 @@ static <T, R> Promise<R> from(CompletionStage<T> stage,
5287
final CompletablePromise<R> result = createLinkedPromise(stage);
5388
stage.whenComplete(handler(
5489
acceptConverted(result::onSuccess, resultConverter),
55-
acceptConverted(result::onError, errorConverter)
90+
acceptConverted(result::onFailure, errorConverter)
5691
));
5792
return result;
5893
}
@@ -70,17 +105,50 @@ public boolean cancel(boolean mayInterruptIfRunning) {
70105
}
71106
};
72107
}
73-
108+
109+
/**
110+
* <p>Returns a promise that is resolved successfully when all {@link CompletionStage}-s passed as parameters are completed normally;
111+
* if any promise completed exceptionally, then resulting promise is resolved exceptionally as well.
112+
* <p>The resolved result of this promise contains a list of the resolved results of the {@link CompletionStage}-s passed as an
113+
* argument at corresponding positions.
114+
*
115+
* @param promises
116+
* an array of {@link CompletionStage}-s to combine
117+
* @return
118+
* a combined promise
119+
*/
74120
@SafeVarargs
75121
public static <T> Promise<List<T>> all(final CompletionStage<? extends T>... promises) {
76122
return atLeast(promises.length, 0, true, promises);
77123
}
78124

125+
/**
126+
* <p>Returns a promise that is resolved successfully when any {@link CompletionStage} passed as parameters is completed normally (race is possible);
127+
* if all promises completed exceptionally, then resulting promise is resolved exceptionally as well.
128+
* <p>The resolved result of this promise contains a value of the first resolved result of the {@link CompletionStage}-s passed as an
129+
* argument.
130+
* @param promises
131+
* an array of {@link CompletionStage}-s to combine
132+
* @return
133+
* a combined promise
134+
*/
79135
@SafeVarargs
80136
public static <T> Promise<T> any(final CompletionStage<? extends T>... promises) {
81137
return unwrap(atLeast(1, promises.length - 1, true, promises), false);
82138
}
83139

140+
/**
141+
* <p>Returns a promise that is resolved successfully when any {@link CompletionStage} passed as parameters is completed normally (race is possible);
142+
* if any promise completed exceptionally before first result is available, then resulting promise is resolved exceptionally as well
143+
* (unlike non-Strict variant, where exceptions are ignored if result is available at all).
144+
* <p>The resolved result of this promise contains a value of the first resolved result of the {@link CompletionStage}-s passed as an
145+
* argument.
146+
*
147+
* @param promises
148+
* an array of {@link CompletionStage}-s to combine
149+
* @return
150+
* a combined promise
151+
*/
84152
@SafeVarargs
85153
public static <T> Promise<T> anyStrict(final CompletionStage<? extends T>... promises) {
86154
return unwrap(atLeast(1, 0, true, promises), true);
@@ -104,7 +172,7 @@ public static <T> Promise<List<T>> atLeast(final int minResultsCount, final int
104172
throw new IllegalArgumentException(
105173
"The number of futures supplied is less than a number of futures to await");
106174
} else if (minResultsCount == 0) {
107-
return readyValue(Collections.emptyList());
175+
return success(Collections.emptyList());
108176
} else if (promises.length == 1) {
109177
return from(promises[0], Collections::singletonList, Function.<Throwable> identity());
110178
} else {

0 commit comments

Comments
 (0)