diff --git a/implementation/src/main/java/io/smallrye/mutiny/operators/uni/UniSubscribeToCompletionStage.java b/implementation/src/main/java/io/smallrye/mutiny/operators/uni/UniSubscribeToCompletionStage.java index 8c062a25f..ab397d6cd 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/operators/uni/UniSubscribeToCompletionStage.java +++ b/implementation/src/main/java/io/smallrye/mutiny/operators/uni/UniSubscribeToCompletionStage.java @@ -1,5 +1,6 @@ package io.smallrye.mutiny.operators.uni; +import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicReference; @@ -13,21 +14,22 @@ public class UniSubscribeToCompletionStage { public static CompletableFuture subscribe(Uni uni, Context context) { final AtomicReference cancellable = new AtomicReference<>(); - CompletableFuture future = new CompletableFuture() { - @Override - public boolean cancel(boolean mayInterruptIfRunning) { - boolean cancelled = super.cancel(mayInterruptIfRunning); - if (cancelled) { + CompletableFuture future = Infrastructure.wrapCompletableFuture(new CompletableFuture()); + future.whenComplete((val, x) -> { + if (x instanceof CancellationException) { + // forward the cancellation to the uni + if (future.isCancelled()) { Cancellable c = cancellable.get(); if (c != null) { c.cancel(); } } - return cancelled; } - }; - + }); cancellable.set(uni.subscribe().with(context, future::complete, future::completeExceptionally)); - return Infrastructure.wrapCompletableFuture(future); + // We return future here and not whatever is returned from future.whenComplete, because that + // new stage will wrap any exceptions into a CompletionException which we do not want, and + // is exposed by UniOrTest (at least) + return future; } } diff --git a/pom.xml b/pom.xml index ecce83b70..e74daff8b 100644 --- a/pom.xml +++ b/pom.xml @@ -89,7 +89,7 @@ 3.0.1 1.3 - 2.2.1 + 2.3.0 3.14.1 2.13.9 4.1.0