Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import java.util.concurrent.Flow.Subscription;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Function;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -58,14 +58,15 @@ private class AndSupervisor implements UniSubscription {
private final List<UniHandler> handlers = new ArrayList<>();
private final UniSubscriber<? super O> subscriber;

AtomicBoolean cancelled = new AtomicBoolean();
AtomicInteger nextIndex = new AtomicInteger();
final AtomicBoolean cancelled = new AtomicBoolean();
final AtomicInteger nextIndex = new AtomicInteger();
final AtomicInteger wip = new AtomicInteger();

AndSupervisor(UniSubscriber<? super O> sub) {
subscriber = sub;

Context context = subscriber.context();
for (Uni uni : unis) {
for (Uni<?> uni : unis) {
UniHandler result = new UniHandler(this, uni, context);
handlers.add(result);
}
Expand All @@ -78,7 +79,7 @@ private void run() {
upperBound = handlers.size();
} else {
upperBound = Math.min(handlers.size(), concurrency);
nextIndex = new AtomicInteger(upperBound);
nextIndex.set(upperBound);
}
for (int i = 0; i < upperBound; i++) {
if (cancelled.get()) {
Expand All @@ -103,40 +104,47 @@ public void cancel() {
* @param failed whether the {@code res} just fired a failure
*/
void check(UniHandler res, boolean failed) {
int incomplete = unis.size();

// One of the uni failed, and we can fire a failure immediately.
if (failed && !collectAllFailureBeforeFiring) {
if (cancelled.compareAndSet(false, true)) {
// Cancel all subscriptions
handlers.forEach(UniHandler::cancel);
// Invoke observer
subscriber.onFailure(res.failure);
}
if (wip.getAndIncrement() > 0) {
return;
}

for (UniHandler result : handlers) {
if (result.failure != null || result.item != SENTINEL) {
incomplete = incomplete - 1;
int incomplete;
do {
incomplete = unis.size();

// One of the uni failed, and we can fire a failure immediately.
if (failed && !collectAllFailureBeforeFiring) {
if (cancelled.compareAndSet(false, true)) {
// Cancel all subscriptions
handlers.forEach(UniHandler::cancel);
// Invoke observer
subscriber.onFailure(res.failure);
}
return;
}
}

if (incomplete == 0) {
// All unis has fired an event, check the outcome
if (cancelled.compareAndSet(false, true)) {
List<Throwable> failures = getFailures();
List<Object> items = getItems();
computeAndFireTheOutcome(failures, items);
for (UniHandler result : handlers) {
if (result.failure != null || result.item != SENTINEL) {
incomplete = incomplete - 1;
}
}
}

if (concurrency != -1 && !cancelled.get()) {
int nextIndex = this.nextIndex.getAndIncrement();
if (nextIndex < unis.size()) {
handlers.get(nextIndex).subscribe();
if (incomplete == 0) {
// All unis has fired an event, check the outcome
if (cancelled.compareAndSet(false, true)) {
List<Throwable> failures = getFailures();
List<Object> items = getItems();
computeAndFireTheOutcome(failures, items);
}
}
}

if (concurrency != -1 && !cancelled.get()) {
int nextIndex = this.nextIndex.getAndIncrement();
if (nextIndex < unis.size()) {
handlers.get(nextIndex).subscribe();
}
}
} while (wip.decrementAndGet() > 0 && incomplete > 0);
}

private void computeAndFireTheOutcome(List<Throwable> failures, List<Object> items) {
Expand Down Expand Up @@ -172,14 +180,18 @@ private List<Throwable> getFailures() {

private class UniHandler implements UniSubscription, UniSubscriber {

final AtomicReference<UniSubscription> subscription = new AtomicReference<>();
private final AndSupervisor supervisor;
private final Uni uni;
private final Context context;
final AndSupervisor supervisor;
final Uni<?> uni;
final Context context;

volatile UniSubscription subscription;
Object item = SENTINEL;
Throwable failure;

UniHandler(AndSupervisor supervisor, Uni observed, Context context) {
private static final AtomicReferenceFieldUpdater<UniAndCombination.UniHandler, UniSubscription> SUBSCRIPTION_UPDATER = AtomicReferenceFieldUpdater
.newUpdater(UniAndCombination.UniHandler.class, UniSubscription.class, "subscription");

UniHandler(AndSupervisor supervisor, Uni<?> observed, Context context) {
this.supervisor = supervisor;
this.uni = observed;
this.context = context;
Expand All @@ -192,7 +204,7 @@ public Context context() {

@Override
public final void onSubscribe(UniSubscription sub) {
if (!subscription.compareAndSet(null, sub)) {
if (!SUBSCRIPTION_UPDATER.compareAndSet(this, null, sub)) {
// cancelling this second subscription
// because we already add a subscription (most probably CANCELLED)
sub.cancel();
Expand All @@ -201,7 +213,7 @@ public final void onSubscribe(UniSubscription sub) {

@Override
public final void onFailure(Throwable t) {
if (subscription.getAndSet(EmptyUniSubscription.CANCELLED) == EmptyUniSubscription.CANCELLED) {
if (SUBSCRIPTION_UPDATER.getAndSet(this, EmptyUniSubscription.CANCELLED) == EmptyUniSubscription.CANCELLED) {
// Already cancelled, do nothing
Infrastructure.handleDroppedException(t);
return;
Expand All @@ -212,7 +224,7 @@ public final void onFailure(Throwable t) {

@Override
public final void onItem(Object x) {
if (subscription.getAndSet(EmptyUniSubscription.CANCELLED) == EmptyUniSubscription.CANCELLED) {
if (SUBSCRIPTION_UPDATER.getAndSet(this, EmptyUniSubscription.CANCELLED) == EmptyUniSubscription.CANCELLED) {
// Already cancelled, do nothing
return;
}
Expand All @@ -222,14 +234,14 @@ public final void onItem(Object x) {

@Override
public void cancel() {
Subscription sub = subscription.getAndSet(EmptyUniSubscription.CANCELLED);
Subscription sub = SUBSCRIPTION_UPDATER.getAndSet(this, EmptyUniSubscription.CANCELLED);
if (sub != null) {
sub.cancel();
}
}

@SuppressWarnings("unchecked")
public void subscribe() {
//noinspection unchecked
AbstractUni.subscribe(uni, this);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,7 @@
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
Expand All @@ -23,8 +20,10 @@
import org.junit.jupiter.api.parallel.ResourceLock;

import io.smallrye.mutiny.helpers.test.AssertSubscriber;
import io.smallrye.mutiny.helpers.test.UniAssertSubscriber;
import io.smallrye.mutiny.infrastructure.Infrastructure;
import io.smallrye.mutiny.tuples.Tuple2;
import io.smallrye.mutiny.tuples.Tuple3;
import junit5.support.InfrastructureResource;

@ResourceLock(value = InfrastructureResource.NAME, mode = ResourceAccessMode.READ)
Expand Down Expand Up @@ -133,4 +132,17 @@ void reproducer_1891() {
assertThat(failure.get()).describedAs("No failure must have been emitted").isNull();
assertThat(completed.get()).isTrue();
}

@RepeatedTest(1_000)
void reproducer_1993() {
// Race condition in UniAndCombination, spotted in https://github.com/smallrye/smallrye-mutiny/issues/1993
Uni<String> a = Uni.createFrom().completionStage(() -> CompletableFuture.supplyAsync(() -> "A"));
Uni<String> b = Uni.createFrom().completionStage(() -> CompletableFuture.supplyAsync(() -> "B"));
Uni<String> c = Uni.createFrom().completionStage(() -> CompletableFuture.supplyAsync(() -> "C"));
Uni<Tuple3<String, String, String>> uni = Uni.combine().all().unis(a, b, c).usingConcurrencyOf(3).asTuple();

UniAssertSubscriber<Tuple3<String, String, String>> sub = uni.subscribe().withSubscriber(UniAssertSubscriber.create());
sub.awaitItem(Duration.ofSeconds(5))
.assertItem(Tuple3.of("A", "B", "C"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -297,5 +297,4 @@ public void testWithSetOfUnis() {
.assertCompleted()
.assertItem(1 + 2 + 3 + 4 + 5 + 6 + 7 + 8 + 9);
}

}