1616package net .tascalate .concurrent ;
1717
1818import static net .tascalate .concurrent .SharedFunctions .cancelPromise ;
19+ import static net .tascalate .concurrent .SharedFunctions .wrapCompletionException ;
1920import static net .tascalate .concurrent .SharedFunctions .iif ;
2021
2122import java .time .Duration ;
@@ -162,8 +163,8 @@ public static <T, R extends AsyncCloseable> Promise<T> tryApplyEx(Promise<R> p,
162163 // So resource.close() is never cancellable
163164 return resource .close ().thenCompose (__ -> failure (onAction ));
164165 } catch (Throwable onClose ) {
165- onClose .addSuppressed (onAction );
166- return failure (onClose );
166+ onAction .addSuppressed (onClose );
167+ return failure (onAction );
167168 }
168169 }
169170
@@ -193,8 +194,8 @@ public static <T, R extends AutoCloseable> Promise<T> tryCompose(Promise<R> p,
193194 resource .close ();
194195 return failure (onAction );
195196 } catch (Exception onClose ) {
196- onClose .addSuppressed (onAction );
197- return failure (onClose );
197+ onAction .addSuppressed (onClose );
198+ return failure (onAction );
198199 }
199200 }
200201
@@ -204,10 +205,7 @@ public static <T, R extends AutoCloseable> Promise<T> tryCompose(Promise<R> p,
204205 resource .close ();
205206 result .complete (actionResult , actionException );
206207 } catch (Throwable onClose ) {
207- if (null != actionException ) {
208- onClose .addSuppressed (actionException );
209- }
210- result .failure (onClose );
208+ completeExceptionally (result , actionException , onClose );
211209 }
212210 });
213211 return result .onCancel (() -> cancelPromise (action , true ));
@@ -231,8 +229,8 @@ public static <T, R extends AsyncCloseable> Promise<T> tryComposeEx(Promise<R> p
231229 // So resource.close() is never cancellable
232230 return resource .close ().thenCompose (__ -> failure (onAction ));
233231 } catch (Throwable onClose ) {
234- onClose .addSuppressed (onAction );
235- return failure (onClose );
232+ onAction .addSuppressed (onClose );
233+ return failure (onAction );
236234 }
237235 }
238236
@@ -244,17 +242,11 @@ public static <T, R extends AsyncCloseable> Promise<T> tryComposeEx(Promise<R> p
244242 if (null == onClose ) {
245243 result .complete (actionResult , actionException );
246244 } else {
247- if (null != actionException ) {
248- onClose .addSuppressed (actionException );
249- }
250- result .failure (onClose );
245+ completeExceptionally (result , actionException , onClose );
251246 }
252247 });
253248 } catch (Throwable onClose ) {
254- if (null != actionException ) {
255- onClose .addSuppressed (actionException );
256- }
257- result .failure (onClose );
249+ completeExceptionally (result , actionException , onClose );
258250 }
259251 });
260252 return result .onCancel (() -> cancelPromise (action , true ));
@@ -300,7 +292,7 @@ private static <T, A, R> Promise<R> partitioned1(Iterator<? extends T> values,
300292 parallelStep1 (values , batchSize , spawner , downstream )
301293 .dependent ()
302294 .thenApply (downstream .finisher ().compose (IndexedStep ::payload ), true )
303- .as ( onCloseSource (null != source ? source : values ))
295+ .asʹ ( maybeClosingSource (null != source ? source : values ))
304296 .unwrap ();
305297 }
306298
@@ -314,7 +306,7 @@ private static <T, A, R> Promise<R> partitioned2(Iterator<? extends T> values,
314306 parallelStep2 (values , batchSize , spawner , downstream , downstreamExecutor )
315307 .dependent ()
316308 .thenApplyAsync (downstream .finisher ().compose (IndexedStep ::payload ), downstreamExecutor , true )
317- .as ( onCloseSource (null != source ? source : values ))
309+ .asʹ ( maybeClosingSource (null != source ? source : values ))
318310 .unwrap ();
319311 }
320312
@@ -416,21 +408,17 @@ private static <T, A, R> A accumulate(List<T> vals, boolean initial, A current,
416408 return initial ? insertion : downstream .combiner ().apply (current , insertion );
417409 }
418410
419- private static <T > Function <Promise <T >, Promise <T >> onCloseSource (Object source ) {
411+ private static <T > Function <DependentPromise <T >, DependentPromise <T >> maybeClosingSource (Object source ) {
420412 if (source instanceof AutoCloseable ) {
421- return p -> p .dependent ().whenComplete ((r , e ) -> {
422- try (AutoCloseable o = (AutoCloseable )source ) {
423-
424- } catch (RuntimeException | Error ex ) {
425- if (null != e ) {
426- ex .addSuppressed (e );
427- }
428- throw ex ;
413+ return p -> p .whenComplete ((r , e ) -> {
414+ try {
415+ ((AutoCloseable )source ).close ();
429416 } catch (Exception ex ) {
430417 if (null != e ) {
431- ex .addSuppressed (e );
418+ e .addSuppressed (ex );
419+ } else {
420+ throw wrapCompletionException (ex );
432421 }
433- throw new CompletionException (ex );
434422 }
435423 }, true );
436424 } else {
@@ -1205,6 +1193,15 @@ private static <T> Promise<T> insufficientNumberOfArguments(int minResultCount,
12051193 */
12061194 }
12071195
1196+ private static <T > void completeExceptionally (CompletableFutureWrapper <T > result , Throwable actionException , Throwable onClose ) {
1197+ if (null != actionException ) {
1198+ actionException .addSuppressed (onClose );
1199+ result .failure (actionException );
1200+ } else {
1201+ result .failure (onClose );
1202+ }
1203+ }
1204+
12081205 private static <K , T > List <? extends CompletionStage <? extends T >>
12091206 collectKeyedResults (Map <K , T > result ,
12101207 Map <? extends K , ? extends CompletionStage <? extends T >> promises ) {
0 commit comments