2222 */
2323package net .tascalate .concurrent ;
2424
25+ import java .util .Collection ;
2526import java .util .LinkedList ;
2627import java .util .Objects ;
27- import java .util .Queue ;
2828import java .util .concurrent .Callable ;
2929import java .util .concurrent .CompletableFuture ;
3030import java .util .concurrent .Executor ;
3131import java .util .concurrent .RejectedExecutionException ;
32- import java .util .function .Consumer ;
32+ import java .util .function .BiConsumer ;
3333import java .util .function .Function ;
3434
3535class CallbackRegistry <T > {
36- private State <T > state = InitialState .instance ();
37-
3836 private final Object mutex = new Object ();
39-
37+ private State <T > state = InitialState .instance ();
38+
4039 /**
4140 * Adds the given callbacks to this registry.
4241 */
43- <U > void addCallbacks (Consumer <? super Callable < U >> stageTransition ,
42+ <U > void addCallbacks (AbstractCompletableTask < U > target ,
4443 Function <? super T , ? extends U > successCallback ,
4544 Function <Throwable , ? extends U > failureCallback ,
4645 Executor executor ) {
@@ -49,11 +48,8 @@ <U> void addCallbacks(Consumer<? super Callable<U>> stageTransition,
4948 Objects .requireNonNull (failureCallback , "'failureCallback' must not be null" );
5049 Objects .requireNonNull (executor , "'executor' must not be null" );
5150
52- @ SuppressWarnings ("unchecked" )
53- Consumer <? super Callable <?>> typedTransition = (Consumer <? super Callable <?>>)stageTransition ;
54-
5551 synchronized (mutex ) {
56- state = state .addCallbacks (typedTransition , successCallback , failureCallback , executor );
52+ state = state .addCallbacks (target , successCallback , failureCallback , executor );
5753 }
5854 }
5955
@@ -114,11 +110,11 @@ boolean isCompleted() {
114110 * State of the registry. All subclasses are meant to be used form a
115111 * synchronized block and are NOT thread safe on their own.
116112 */
117- private static abstract class State <S > {
118- protected abstract State <S > addCallbacks (Consumer <? super Callable <?>> stageTransition ,
119- Function <? super S , ?> successCallback ,
120- Function <Throwable , ?> failureCallback ,
121- Executor executor );
113+ static abstract class State <S > {
114+ protected abstract < U > State <S > addCallbacks (AbstractCompletableTask < U > target ,
115+ Function <? super S , ? extends U > successCallback ,
116+ Function <Throwable , ? extends U > failureCallback ,
117+ Executor executor );
122118
123119 protected State <S > getSuccessState (S result ) {
124120 throw new IllegalStateException ("success method should not be called multiple times" );
@@ -147,17 +143,17 @@ protected boolean isFailure() {
147143 * Result is not known yet and no callbacks registered. Using shared
148144 * instance so we do not allocate instance where it may not be needed.
149145 */
150- private static class InitialState <S > extends State <S > {
146+ static class InitialState <S > extends State <S > {
151147 private static final InitialState <Object > instance = new InitialState <>();
152148
153149 @ Override
154- protected State <S > addCallbacks (Consumer <? super Callable <?>> stageTransition ,
155- Function <? super S , ?> successCallback ,
156- Function <Throwable , ?> failureCallback ,
157- Executor executor ) {
150+ protected < U > State <S > addCallbacks (AbstractCompletableTask < U > target ,
151+ Function <? super S , ? extends U > successCallback ,
152+ Function <Throwable , ? extends U > failureCallback ,
153+ Executor executor ) {
158154
159155 IntermediateState <S > intermediateState = new IntermediateState <>();
160- intermediateState .addCallbacks (stageTransition , successCallback , failureCallback , executor );
156+ intermediateState .addCallbacks (target , successCallback , failureCallback , executor );
161157 return intermediateState ;
162158 }
163159
@@ -177,24 +173,30 @@ protected boolean isCompleted() {
177173 }
178174
179175 @ SuppressWarnings ("unchecked" )
180- private static <T > State <T > instance () {
176+ static <T > State <T > instance () {
181177 return (State <T >) instance ;
182178 }
183179 }
184180
185181 /**
186182 * Result is not known yet.
187183 */
188- private static class IntermediateState <S > extends State <S > {
189- private final Queue < CallbackHolder <? super S >> callbacks = new LinkedList <>();
184+ static class IntermediateState <S > extends State <S > {
185+ private final Collection < BiConsumer <? super S , ? super Throwable >> callbacks = new LinkedList <>();
190186
191187 @ Override
192- protected State <S > addCallbacks (Consumer <? super Callable <?>> stageTransition ,
193- Function <? super S , ?> successCallback ,
194- Function <Throwable , ?> failureCallback ,
195- Executor executor ) {
188+ protected < U > State <S > addCallbacks (AbstractCompletableTask < U > target ,
189+ Function <? super S , ? extends U > successCallback ,
190+ Function <Throwable , ? extends U > failureCallback ,
191+ Executor executor ) {
196192
197- callbacks .add (new CallbackHolder <>(stageTransition , successCallback , failureCallback , executor ));
193+ callbacks .add ((r , e ) -> {
194+ if (null == e ) {
195+ callCallback (target , successCallback , r , executor );
196+ } else {
197+ callCallback (target , failureCallback , e , executor );
198+ }
199+ });
198200 return this ;
199201 }
200202
@@ -205,10 +207,8 @@ protected State<S> getSuccessState(S result) {
205207
206208 @ Override
207209 protected void callSuccessCallbacks (S result ) {
208- // no need to remove callbacks from the queue, this instance will be
209- // thrown away at once
210- for (CallbackHolder <? super S > callback : callbacks ) {
211- callback .callSuccessCallback (result );
210+ for (BiConsumer <? super S , ? super Throwable > callback : callbacks ) {
211+ callback .accept (result , null );
212212 }
213213 }
214214
@@ -219,10 +219,8 @@ protected State<S> getFailureState(Throwable failure) {
219219
220220 @ Override
221221 protected void callFailureCallbacks (Throwable failure ) {
222- // no need to remove callbacks from the queue, this instance will be
223- // thrown away at once
224- for (CallbackHolder <? super S > callback : callbacks ) {
225- callback .callFailureCallback (failure );
222+ for (BiConsumer <? super S , ? super Throwable > callback : callbacks ) {
223+ callback .accept (null , failure );
226224 }
227225 }
228226
@@ -235,41 +233,39 @@ protected boolean isCompleted() {
235233 /**
236234 * Holds the result.
237235 */
238- private static final class SuccessState <S > extends State <S > {
236+ static final class SuccessState <S > extends State <S > {
239237 private final S result ;
240238
241- private SuccessState (S result ) {
239+ SuccessState (S result ) {
242240 this .result = result ;
243241 }
244242
245243 @ Override
246- protected State <S > addCallbacks (Consumer <? super Callable <?>> stageTransition ,
247- Function <? super S , ?> successCallback ,
248- Function <Throwable , ?> failureCallback ,
249- Executor executor ) {
250-
251- callCallback (stageTransition , successCallback , result , executor );
244+ protected <U > State <S > addCallbacks (AbstractCompletableTask <U > target ,
245+ Function <? super S , ? extends U > successCallback ,
246+ Function <Throwable , ? extends U > failureCallback ,
247+ Executor executor ) {
248+ callCallback (target , successCallback , result , executor );
252249 return this ;
253250 }
254251 }
255252
256253 /**
257254 * Holds the failure.
258255 */
259- private static final class FailureState <S > extends State <S > {
256+ static final class FailureState <S > extends State <S > {
260257 private final Throwable failure ;
261258
262- private FailureState (Throwable failure ) {
259+ FailureState (Throwable failure ) {
263260 this .failure = failure ;
264261 }
265262
266263 @ Override
267- protected State <S > addCallbacks (Consumer <? super Callable <?>> stageTransition ,
268- Function <? super S , ?> successCallback ,
269- Function <Throwable , ?> failureCallback ,
270- Executor executor ) {
271-
272- callCallback (stageTransition , failureCallback , failure , executor );
264+ protected <U > State <S > addCallbacks (AbstractCompletableTask <U > target ,
265+ Function <? super S , ? extends U > successCallback ,
266+ Function <Throwable , ? extends U > failureCallback ,
267+ Executor executor ) {
268+ callCallback (target , failureCallback , failure , executor );
273269 return this ;
274270 }
275271
@@ -278,47 +274,21 @@ protected boolean isFailure() {
278274 }
279275 }
280276
281- private static final class CallbackHolder <S > {
282- private final Consumer <? super Callable <?>> stageTransition ;
283- private final Function <? super S , ?> successCallback ;
284- private final Function <Throwable , ?> failureCallback ;
285- private final Executor executor ;
286-
287- private CallbackHolder (Consumer <? super Callable <?>> stageTransition ,
288- Function <? super S , ?> successCallback ,
289- Function <Throwable , ?> failureCallback ,
290- Executor executor ) {
291-
292- this .stageTransition = stageTransition ;
293- this .successCallback = successCallback ;
294- this .failureCallback = failureCallback ;
295- this .executor = executor ;
296- }
297-
298- void callSuccessCallback (S result ) {
299- callCallback (stageTransition , successCallback , result , executor );
300- }
301-
302- void callFailureCallback (Throwable failure ) {
303- callCallback (stageTransition , failureCallback , failure , executor );
304- }
305- }
306-
307- private static <S , U > void callCallback (Consumer <? super Callable <?>> stageTransition ,
308- Function <? super S , ? extends U > callback ,
309- S value ,
310- Executor executor ) {
277+ static <T , U > void callCallback (AbstractCompletableTask <U > target ,
278+ Function <? super T , ? extends U > callback ,
279+ T value ,
280+ Executor executor ) {
311281
312282 Callable <U > callable = () -> callback .apply (value );
313283 try {
314- executor .execute ( (AsyncTask )() -> stageTransition . accept (callable ) );
284+ executor .execute ( (AsyncTask )() -> target . fireTransition (callable ) );
315285 } catch (RejectedExecutionException ex ) {
316286 // Propagate error in-place
317287 Callable <U > propagateError = () -> { throw ex ; };
318- stageTransition . accept (propagateError );
288+ target . fireTransition (propagateError );
319289 }
320290 }
321-
291+
322292 @ FunctionalInterface
323293 static interface AsyncTask extends Runnable , CompletableFuture .AsynchronousCompletionTask {}
324294
0 commit comments