@@ -236,13 +236,9 @@ public <U> Promise<U> thenComposeAsync(Function<? super T, ? extends CompletionS
236236 // of nextStage.task when this nextStage is
237237 // exposed to the client, even in a "trivial" case:
238238 // Success path, just return value
239+ Consumer <? super U > onResult = nextStage .runTransition (Function .identity ());
239240 // Failure path, just re-throw exception
240- BiConsumer <? super U , ? super Throwable > moveToNextStage = (r , e ) -> {
241- if (null == e )
242- runDirectly (nextStage , Function .identity (), r , executor );
243- else
244- runDirectly (nextStage , AbstractCompletableTask ::forwardException , e , executor );
245- };
241+ Consumer <? super Throwable > onError = nextStage .runTransition (AbstractCompletableTask ::forwardException );
246242
247243 // Important -- tempStage is the target here
248244 addCallbacks (
@@ -276,23 +272,29 @@ public <U> Promise<U> thenComposeAsync(Function<? super T, ? extends CompletionS
276272 if (nextStage .isCancelled ()) {
277273 nextStage .cancelOrigins (true );
278274 } else {
279- returned .whenComplete (moveToNextStage );
275+ // Synchronous, while transition to tempStage is asynchronous already
276+ returned .whenComplete (biConsumer (onResult , onError ));
280277 }
281278 } catch (Throwable ex ) {
282279 // must-have if fn.apply above failed
283280 nextStage .resetCancellableOrigins ((CompletionStage <U >)null );
284281 // no need to check nextStage.isCancelled()
285282 // while there are no origins to cancel
286- // propagate error immediately
287- moveToNextStage .accept (null , ex );
283+ // propagate error immediately
284+ // synchronous, while transition to tempStage is asynchronous already
285+ onError .accept (ex );
288286 }
289287 }),
290- consumerAsFunction (e -> moveToNextStage . accept ( null , e ) ),
288+ consumerAsFunction (onError ),
291289 executor
292290 );
293291
294292 return nextStage ;
295293 }
294+
295+ private <U > Consumer <? super U > runTransition (Function <? super U , ? extends T > converter ) {
296+ return u -> setupTransition (() -> converter .apply (u )).run ();
297+ }
296298
297299 @ Override
298300 public Promise <T > exceptionally (Function <Throwable , ? extends T > fn ) {
@@ -442,6 +444,16 @@ private static <R> Function<R, Void> runnableAsFunction(Runnable action) {
442444 return null ;
443445 };
444446 }
447+
448+ private static <U , V > BiConsumer <U , V > biConsumer (Consumer <? super U > onResult , Consumer <? super V > onError ) {
449+ return (u , v ) -> {
450+ if (null == v ) {
451+ onResult .accept (u );
452+ } else {
453+ onError .accept (v );
454+ }
455+ };
456+ }
445457
446458 private static <U > U forwardException (Throwable e ) {
447459 throw wrapCompletionException (e );
@@ -474,11 +486,4 @@ private <U> void addCallbacks(Function<? super Callable<U>, ? extends Runnable>
474486 callbackRegistry .addCallbacks (targetSetup , successCallback , failureCallback , executor );
475487 }
476488
477- private static <S , U > void runDirectly (AbstractCompletableTask <U > targetStage ,
478- Function <? super S , ? extends U > callback ,
479- S value ,
480- Executor executor ) {
481-
482- CallbackRegistry .callCallback (targetStage ::setupTransition , callback , value , executor );
483- }
484489}
0 commit comments