diff --git a/implementation/src/main/java/io/smallrye/mutiny/operators/uni/UniAndCombination.java b/implementation/src/main/java/io/smallrye/mutiny/operators/uni/UniAndCombination.java index 9a1a8c88d..8bf85ac0d 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/operators/uni/UniAndCombination.java +++ b/implementation/src/main/java/io/smallrye/mutiny/operators/uni/UniAndCombination.java @@ -5,7 +5,7 @@ import java.util.concurrent.Flow.Subscription; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import java.util.function.Function; import java.util.stream.Collectors; @@ -58,14 +58,15 @@ private class AndSupervisor implements UniSubscription { private final List handlers = new ArrayList<>(); private final UniSubscriber subscriber; - AtomicBoolean cancelled = new AtomicBoolean(); - AtomicInteger nextIndex = new AtomicInteger(); + final AtomicBoolean cancelled = new AtomicBoolean(); + final AtomicInteger nextIndex = new AtomicInteger(); + final AtomicInteger wip = new AtomicInteger(); AndSupervisor(UniSubscriber sub) { subscriber = sub; Context context = subscriber.context(); - for (Uni uni : unis) { + for (Uni uni : unis) { UniHandler result = new UniHandler(this, uni, context); handlers.add(result); } @@ -78,7 +79,7 @@ private void run() { upperBound = handlers.size(); } else { upperBound = Math.min(handlers.size(), concurrency); - nextIndex = new AtomicInteger(upperBound); + nextIndex.set(upperBound); } for (int i = 0; i < upperBound; i++) { if (cancelled.get()) { @@ -103,40 +104,47 @@ public void cancel() { * @param failed whether the {@code res} just fired a failure */ void check(UniHandler res, boolean failed) { - int incomplete = unis.size(); - - // One of the uni failed, and we can fire a failure immediately. - if (failed && !collectAllFailureBeforeFiring) { - if (cancelled.compareAndSet(false, true)) { - // Cancel all subscriptions - handlers.forEach(UniHandler::cancel); - // Invoke observer - subscriber.onFailure(res.failure); - } + if (wip.getAndIncrement() > 0) { return; } - for (UniHandler result : handlers) { - if (result.failure != null || result.item != SENTINEL) { - incomplete = incomplete - 1; + int incomplete; + do { + incomplete = unis.size(); + + // One of the uni failed, and we can fire a failure immediately. + if (failed && !collectAllFailureBeforeFiring) { + if (cancelled.compareAndSet(false, true)) { + // Cancel all subscriptions + handlers.forEach(UniHandler::cancel); + // Invoke observer + subscriber.onFailure(res.failure); + } + return; } - } - if (incomplete == 0) { - // All unis has fired an event, check the outcome - if (cancelled.compareAndSet(false, true)) { - List failures = getFailures(); - List items = getItems(); - computeAndFireTheOutcome(failures, items); + for (UniHandler result : handlers) { + if (result.failure != null || result.item != SENTINEL) { + incomplete = incomplete - 1; + } } - } - if (concurrency != -1 && !cancelled.get()) { - int nextIndex = this.nextIndex.getAndIncrement(); - if (nextIndex < unis.size()) { - handlers.get(nextIndex).subscribe(); + if (incomplete == 0) { + // All unis has fired an event, check the outcome + if (cancelled.compareAndSet(false, true)) { + List failures = getFailures(); + List items = getItems(); + computeAndFireTheOutcome(failures, items); + } } - } + + if (concurrency != -1 && !cancelled.get()) { + int nextIndex = this.nextIndex.getAndIncrement(); + if (nextIndex < unis.size()) { + handlers.get(nextIndex).subscribe(); + } + } + } while (wip.decrementAndGet() > 0 && incomplete > 0); } private void computeAndFireTheOutcome(List failures, List items) { @@ -172,14 +180,18 @@ private List getFailures() { private class UniHandler implements UniSubscription, UniSubscriber { - final AtomicReference subscription = new AtomicReference<>(); - private final AndSupervisor supervisor; - private final Uni uni; - private final Context context; + final AndSupervisor supervisor; + final Uni uni; + final Context context; + + volatile UniSubscription subscription; Object item = SENTINEL; Throwable failure; - UniHandler(AndSupervisor supervisor, Uni observed, Context context) { + private static final AtomicReferenceFieldUpdater SUBSCRIPTION_UPDATER = AtomicReferenceFieldUpdater + .newUpdater(UniAndCombination.UniHandler.class, UniSubscription.class, "subscription"); + + UniHandler(AndSupervisor supervisor, Uni observed, Context context) { this.supervisor = supervisor; this.uni = observed; this.context = context; @@ -192,7 +204,7 @@ public Context context() { @Override public final void onSubscribe(UniSubscription sub) { - if (!subscription.compareAndSet(null, sub)) { + if (!SUBSCRIPTION_UPDATER.compareAndSet(this, null, sub)) { // cancelling this second subscription // because we already add a subscription (most probably CANCELLED) sub.cancel(); @@ -201,7 +213,7 @@ public final void onSubscribe(UniSubscription sub) { @Override public final void onFailure(Throwable t) { - if (subscription.getAndSet(EmptyUniSubscription.CANCELLED) == EmptyUniSubscription.CANCELLED) { + if (SUBSCRIPTION_UPDATER.getAndSet(this, EmptyUniSubscription.CANCELLED) == EmptyUniSubscription.CANCELLED) { // Already cancelled, do nothing Infrastructure.handleDroppedException(t); return; @@ -212,7 +224,7 @@ public final void onFailure(Throwable t) { @Override public final void onItem(Object x) { - if (subscription.getAndSet(EmptyUniSubscription.CANCELLED) == EmptyUniSubscription.CANCELLED) { + if (SUBSCRIPTION_UPDATER.getAndSet(this, EmptyUniSubscription.CANCELLED) == EmptyUniSubscription.CANCELLED) { // Already cancelled, do nothing return; } @@ -222,14 +234,14 @@ public final void onItem(Object x) { @Override public void cancel() { - Subscription sub = subscription.getAndSet(EmptyUniSubscription.CANCELLED); + Subscription sub = SUBSCRIPTION_UPDATER.getAndSet(this, EmptyUniSubscription.CANCELLED); if (sub != null) { sub.cancel(); } } + @SuppressWarnings("unchecked") public void subscribe() { - //noinspection unchecked AbstractUni.subscribe(uni, this); } } diff --git a/implementation/src/test/java/io/smallrye/mutiny/BugReproducersTest.java b/implementation/src/test/java/io/smallrye/mutiny/BugReproducersTest.java index cc5386067..8c6947284 100644 --- a/implementation/src/test/java/io/smallrye/mutiny/BugReproducersTest.java +++ b/implementation/src/test/java/io/smallrye/mutiny/BugReproducersTest.java @@ -8,10 +8,7 @@ import java.util.ArrayList; import java.util.Comparator; import java.util.List; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -23,8 +20,10 @@ import org.junit.jupiter.api.parallel.ResourceLock; import io.smallrye.mutiny.helpers.test.AssertSubscriber; +import io.smallrye.mutiny.helpers.test.UniAssertSubscriber; import io.smallrye.mutiny.infrastructure.Infrastructure; import io.smallrye.mutiny.tuples.Tuple2; +import io.smallrye.mutiny.tuples.Tuple3; import junit5.support.InfrastructureResource; @ResourceLock(value = InfrastructureResource.NAME, mode = ResourceAccessMode.READ) @@ -133,4 +132,17 @@ void reproducer_1891() { assertThat(failure.get()).describedAs("No failure must have been emitted").isNull(); assertThat(completed.get()).isTrue(); } + + @RepeatedTest(1_000) + void reproducer_1993() { + // Race condition in UniAndCombination, spotted in https://github.com/smallrye/smallrye-mutiny/issues/1993 + Uni a = Uni.createFrom().completionStage(() -> CompletableFuture.supplyAsync(() -> "A")); + Uni b = Uni.createFrom().completionStage(() -> CompletableFuture.supplyAsync(() -> "B")); + Uni c = Uni.createFrom().completionStage(() -> CompletableFuture.supplyAsync(() -> "C")); + Uni> uni = Uni.combine().all().unis(a, b, c).usingConcurrencyOf(3).asTuple(); + + UniAssertSubscriber> sub = uni.subscribe().withSubscriber(UniAssertSubscriber.create()); + sub.awaitItem(Duration.ofSeconds(5)) + .assertItem(Tuple3.of("A", "B", "C")); + } } diff --git a/implementation/src/test/java/io/smallrye/mutiny/operators/UniAndTest.java b/implementation/src/test/java/io/smallrye/mutiny/operators/UniAndTest.java index 6ca9c6c80..54bda1f24 100644 --- a/implementation/src/test/java/io/smallrye/mutiny/operators/UniAndTest.java +++ b/implementation/src/test/java/io/smallrye/mutiny/operators/UniAndTest.java @@ -297,5 +297,4 @@ public void testWithSetOfUnis() { .assertCompleted() .assertItem(1 + 2 + 3 + 4 + 5 + 6 + 7 + 8 + 9); } - }