|
1 | 1 | # tascalate-concurrent |
2 | | -Implementation of blocking (IO-Bound) cancellable java.util.concurrent.CompletionStage and related extensions to java.util.concurrent.ExecutorService-s |
| 2 | +The library provides an implementation of the CompletionStage interface and related classes these are designed to support long-running blocking tasks (typically, I/O bound) - unlike Java 8 built-in implementation, [CompletableFuture](https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CompletableFuture.html), that is primarily supports computational tasks. |
| 3 | +# Why a CompletableFuture is not enough? |
| 4 | +There are several shortcomings associated with [CompletableFuture](https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CompletableFuture.html) implementation that complicate its usage for blocking tasks: |
| 5 | +1. `CompletableFuture.cancel()` [method](https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CompletableFuture.html#cancel-boolean-) does not interrupt underlying thread; it merely puts future to exceptionally completed state. So if you use any blocking calls inside functions passed to `thenApplyAsync` / `thenAcceptAsync` / etc these functions will run till the end and never will be interrupted. Please see [CompletableFuture can't be interrupted](http://www.nurkiewicz.com/2015/03/completablefuture-cant-be-interrupted.html) by Tomasz Nurkiewicz |
| 6 | +2. By default, all `*Async` composition methods use `ForkJoinPool.commonPool()` ([see here](https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ForkJoinPool.html#commonPool--)) unless explicit [Executor](https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/Executor.html) is specified. This thread pool shared between all [CompletableFuture](https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CompletableFuture.html)-s, all parallel streams and all applications deployed on the same JVM. This hard-coded, unconfigurable thread pool is completely outside of our control, hard to monitor and scale. Therefore you should always specify your own Executor. |
| 7 | +3. Additionally, built-in Java 8 concurrency classes provides pretty inconvenient API to combine several [CompletionStage](https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CompletionStage.html)-s. `CompletableFuture.allOf` / `CompletableFuture.anyOf` methods accept only [CompletableFuture](https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CompletableFuture.html) as arguments; you have no mechanism to combine arbitrary [CompletionStage](https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CompletionStage.html)-s without converting them to [CompletableFuture](https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CompletableFuture.html) first. Also, the return type of the aforementioned `CompletableFuture.allOf` is declared as `CompletableFuture<Void>` - hence you are unable to extract conveniently individual results of the each future supplied. `CompletableFuture.anyOf` is even worse in this regard; for more details please read on here: [CompletableFuture in Action](http://www.nurkiewicz.com/2013/05/java-8-completablefuture-in-action.html) (see Shortcomings) by Tomasz Nurkiewicz |
| 8 | + |
| 9 | +# How to use? |
| 10 | +Add Maven dependency |
| 11 | +``` |
| 12 | +<dependency> |
| 13 | + <groupId>net.tascalate.concurrent</groupId> |
| 14 | + <artifactId>net.tascalate.concurrent.lib</artifactId> |
| 15 | + <version>0.5</version> |
| 16 | +</dependency> |
| 17 | +``` |
| 18 | + |
| 19 | +# What is inside? |
| 20 | +## 1. Promise interface |
| 21 | +The interface may be best described by the formula: |
| 22 | +``` |
| 23 | +Promise == CompletionStage + Future |
| 24 | +``` |
| 25 | + |
| 26 | +I.e., it combines both blocking [Future](https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/Future.html)’s API (including `cancel()` [method](https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/Future.html#cancel-boolean-) AND composition capabilities of [CompletionStage](https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CompletionStage.html)’s API. Importantly, all composition methods of [CompletionStage](https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CompletionStage.html) API (`thenAccept`, `thenCombine`, `whenComplete` etc.) are re-declared to return `Promise` as well. |
| 27 | + |
| 28 | +## 2. CompletableTask |
| 29 | +This is why this project was ever started. `CompletableTask` is the implementation of the `Promise` API for long-running blocking tasks. There are 2 unit operations to create a `CompletableTask`: |
| 30 | + |
| 31 | +a. `CompletableTask.asyncOn(Executor executor)` |
| 32 | + |
| 33 | +Returns a resolved no-value `Promise` that is “bound” to specified executor. I.e. any function passed to composition methods of `Promise` (like `thenApplyAsync` / `thenAcceptAsync` / `whenCompleteAsync` etc.) will be executed using this executor unless executor is overridden via explicit composition method parameter. Moreover, any nested composition calls will use same executor, if it’s not redefined via explicit composition method parameter: |
| 34 | +``` |
| 35 | +CompletableTask |
| 36 | + .asyncOn(myExecutor) |
| 37 | + .thenApplyAsync(myValueGenerator) |
| 38 | + .thenAcceptAsync(myConsumer) |
| 39 | + .thenRunAsync(myAction); |
| 40 | +``` |
| 41 | +All of `myValueGenerator`, `myConsumer`, `myActtion` will be executed using `myExecutor` |
| 42 | + |
| 43 | +Most importantly, all composed promises support true cancellation (incl. interrupting thread) for the functions supplied as arguments: |
| 44 | +``` |
| 45 | +Promise<?> p1 = |
| 46 | +CompletableTask |
| 47 | + .asyncOn(myExecutor) |
| 48 | + .thenApplyAsync(myValueGenerator) |
| 49 | + .thenAcceptAsync(myConsumer); |
| 50 | + |
| 51 | +Promise<?> p2 = p1.thenRunAsync(myAction); |
| 52 | +... |
| 53 | +p.cancel(); |
| 54 | +``` |
| 55 | +In the example above `myConsumer` will be interrupted when already running. Both p1 and p1 will be resolved faulty with [CancellationException](https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CancellationException.html). |
| 56 | + |
| 57 | +b. `CompletableTask.complete(T value, Executor executor)` |
| 58 | + |
| 59 | +Same as above, but the starting point is a resolved Promise with a specified value: |
| 60 | +``` |
| 61 | +CompletableTask |
| 62 | + .complete("Hello!", myExecutor) |
| 63 | + .thenApplyAsync(myMapper) |
| 64 | + .thenApplyAsync(myTransformer) |
| 65 | + .thenAcceptAsync(myConsumer) |
| 66 | + .thenRunAsync(myAction); |
| 67 | +``` |
| 68 | +All of `myMapper`, `myTransformer`, `myConsumer`, `myActtion` will be executed using `myExecutor` |
| 69 | + |
| 70 | +## 3. Helper class Promises |
| 71 | +The class |
| 72 | +provides convenient methods to combine several `CompletionStage`-s: |
| 73 | + |
| 74 | +`public static <T> Promise<List<T>> all(CompletionStage<? extends T>... promises)` |
| 75 | + |
| 76 | +Returns a promise that is completed normally when all `CompletionStage`-s passed as parameters are completed normally; if any promise completed exceptionally, then resulting promise is completed exceptionally as well |
| 77 | + |
| 78 | +`public static <T> Promise<T> any(final CompletionStage<? extends T>... promises)` |
| 79 | + |
| 80 | +Returns a promise that is completed normally when any CompletionStage passed as parameters is completed normally (race is possible); if all promises completed exceptionally, then resulting promise is completed exceptionally as well |
| 81 | + |
| 82 | +`public static <T> Promise<T> anyStrict(CompletionStage<? extends T>... promises)` |
| 83 | + |
| 84 | +Returns a promise that is completed normally when any CompletionStage passed as parameters is completed normally (race is possible); if any promise completed exceptionally before first result is available, then resulting promise is completed exceptionally as well (unlike non-Strict variant, where exceptions are ignored if result is available at all) |
| 85 | + |
| 86 | +`public static <T> Promise<List<T>> atLeast(int minResultsCount, CompletionStage<? extends T>... promises)` |
| 87 | +Generalization of the any method. Returns a promise that is completed normally when at least minResultCount |
| 88 | +of CompletionStage-s passed as parameters are completed normally (race is possible); if less than minResultCount of promises completed normally, then resulting promise is completed exceptionally |
| 89 | + |
| 90 | +`public static <T> Promise<List<T>> atLeastStrict(int minResultsCount, CompletionStage<? extends T>... promises)` |
| 91 | +Generalization of the anyStrict method. Returns a promise that is completed normally when at least minResultCount of CompletionStage-s passed as parameters are completed normally (race is possible); if any promise completed exceptionally before minResultCount of results are available, then resulting promise is completed exceptionally as well (unlike non-Strict variant, where exceptions are ignored if minResultsCount of results are available at all) |
| 92 | + |
| 93 | +Additionally, it's possible to convert to `Promise` API ready value: |
| 94 | + |
| 95 | +`public static <T> Promise<T> success(T value)` |
| 96 | + |
| 97 | +...exception: |
| 98 | + |
| 99 | +`public static Promise<?> failure(Throwable exception)` |
| 100 | + |
| 101 | +...or arbitrary `CompletionStage` implementation: |
| 102 | + |
| 103 | +`public static <T> Promise<T> from(CompletionStage<T> stage)` |
| 104 | + |
| 105 | +## 3. Extensions to ExecutorService API |
| 106 | + |
| 107 | +It’s not mandatory to use any specific subclasses of `Executor` with `CompletableTask` – you may use any implementation. However, someone may find beneficial to have a `Promise`-aware `ExecutorService` API. Below is a list of related classes/interfaces: |
| 108 | + |
| 109 | +a. Interface `TaskExecutorService` |
| 110 | + |
| 111 | +Specialization of ExecutorService that uses `Promise` as a result of `submit(...)` methods. |
| 112 | + |
| 113 | +b. Class `ThreadPoolTaskExecutor` |
| 114 | +A subclass of standard [ThreadPoolExecutor](https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ThreadPoolExecutor.html) that implements `TaskExecutorService` interface. |
| 115 | + |
| 116 | +c. Class `TaskExecutors` |
| 117 | + |
| 118 | +A drop-in replacement for [Executors](https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/Executors.html) utility class that returns various useful implementations of `TaskExecutorService` instead of standard `ExecutorService`. |
| 119 | + |
| 120 | +# Acknowledgements |
| 121 | + |
| 122 | +Internal implementation details are greatly inspired [by the work](https://github.com/lukas-krecan/completion-stage) done by [Lukáš Křečan](https://github.com/lukas-krecan) |
| 123 | + |
0 commit comments