Skip to content

Commit 894fe90

Browse files
authored
Merge pull request #1923 from smallrye/fix/uni-memoize-forwarding-race
fix: race condition in UniMemoizeOp on awaiters forwarding
2 parents 175991b + 826ee87 commit 894fe90

File tree

3 files changed

+91
-41
lines changed

3 files changed

+91
-41
lines changed

context-propagation/src/test/java/io/smallrye/mutiny/context/UniContextPropagationTest.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,24 @@
55
import static org.junit.jupiter.api.Assertions.assertNull;
66

77
import java.time.Duration;
8-
import java.util.concurrent.*;
8+
import java.util.concurrent.CompletableFuture;
9+
import java.util.concurrent.CountDownLatch;
10+
import java.util.concurrent.ExecutionException;
11+
import java.util.concurrent.Executor;
12+
import java.util.concurrent.ExecutorService;
13+
import java.util.concurrent.Executors;
14+
import java.util.concurrent.ForkJoinPool;
15+
import java.util.concurrent.TimeUnit;
916
import java.util.concurrent.atomic.AtomicInteger;
1017
import java.util.concurrent.atomic.AtomicReference;
1118
import java.util.function.Consumer;
1219

1320
import org.eclipse.microprofile.context.ThreadContext;
14-
import org.junit.jupiter.api.*;
21+
import org.junit.jupiter.api.AfterAll;
22+
import org.junit.jupiter.api.AfterEach;
23+
import org.junit.jupiter.api.BeforeAll;
24+
import org.junit.jupiter.api.BeforeEach;
25+
import org.junit.jupiter.api.Test;
1526

1627
import io.smallrye.context.CleanAutoCloseable;
1728
import io.smallrye.context.SmallRyeThreadContext;

implementation/src/main/java/io/smallrye/mutiny/operators/uni/UniMemoizeOp.java

Lines changed: 60 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22

33
import static io.smallrye.mutiny.helpers.ParameterValidation.nonNull;
44

5+
import java.util.ArrayList;
6+
import java.util.List;
57
import java.util.concurrent.ConcurrentLinkedQueue;
68
import java.util.concurrent.locks.ReentrantLock;
79
import java.util.function.BooleanSupplier;
@@ -47,27 +49,39 @@ public UniMemoizeOp(Uni<? extends I> upstream, BooleanSupplier invalidationReque
4749
@Override
4850
public void subscribe(UniSubscriber<? super I> subscriber) {
4951
nonNull(subscriber, "subscriber");
52+
53+
boolean shouldSubscribeUpstream = false;
54+
Object cached = null;
55+
boolean wasInCachingState = false;
56+
5057
internalLock.lock();
51-
checkForInvalidation();
52-
switch (state) {
53-
case INIT:
54-
state = State.WAITING_FOR_UPSTREAM;
55-
awaiters.add(subscriber);
56-
currentContext = subscriber.context();
57-
internalLock.unlock();
58-
subscriber.onSubscribe(new MemoizedSubscription(subscriber));
59-
upstream().subscribe().withSubscriber(this);
60-
break;
61-
case WAITING_FOR_UPSTREAM:
62-
awaiters.add(subscriber);
63-
internalLock.unlock();
64-
subscriber.onSubscribe(new MemoizedSubscription(subscriber));
65-
break;
66-
case CACHING:
67-
internalLock.unlock();
68-
subscriber.onSubscribe(new MemoizedSubscription(subscriber));
69-
forwardTo(subscriber);
70-
break;
58+
try {
59+
checkForInvalidation(); // May throw an exception
60+
switch (state) {
61+
case INIT:
62+
state = State.WAITING_FOR_UPSTREAM;
63+
awaiters.add(subscriber);
64+
currentContext = subscriber.context();
65+
shouldSubscribeUpstream = true;
66+
break;
67+
case WAITING_FOR_UPSTREAM:
68+
awaiters.add(subscriber);
69+
break;
70+
case CACHING:
71+
cached = cachedResult;
72+
wasInCachingState = true;
73+
break;
74+
}
75+
} finally {
76+
internalLock.unlock();
77+
}
78+
79+
subscriber.onSubscribe(new MemoizedSubscription(subscriber));
80+
81+
if (shouldSubscribeUpstream) {
82+
upstream().subscribe().withSubscriber(this);
83+
} else if (wasInCachingState) {
84+
forwardTo(subscriber, cached);
7185
}
7286
}
7387

@@ -91,35 +105,49 @@ public void onSubscribe(UniSubscription subscription) {
91105
@Override
92106
public void onItem(I item) {
93107
internalLock.lock();
108+
List<UniSubscriber<? super I>> toNotify = null;
94109
if (state == State.WAITING_FOR_UPSTREAM) {
95110
state = State.CACHING;
96111
cachedResult = item;
97-
internalLock.unlock();
98-
notifyAwaiters();
99-
} else {
100-
internalLock.unlock();
112+
toNotify = gatherAwaiters();
113+
}
114+
internalLock.unlock();
115+
if (toNotify != null) {
116+
notifyAwaiters(toNotify, item);
117+
}
118+
}
119+
120+
private List<UniSubscriber<? super I>> gatherAwaiters() {
121+
return new ArrayList<>(awaiters);
122+
}
123+
124+
private void notifyAwaiters(List<UniSubscriber<? super I>> toNotify, Object result) {
125+
for (UniSubscriber<? super I> awaiter : toNotify) {
126+
forwardTo(awaiter, result);
101127
}
102128
}
103129

104130
@Override
105131
public void onFailure(Throwable failure) {
106132
internalLock.lock();
133+
List<UniSubscriber<? super I>> toNotify = null;
107134
if (state == State.WAITING_FOR_UPSTREAM) {
108135
state = State.CACHING;
109136
cachedResult = failure;
110-
internalLock.unlock();
111-
notifyAwaiters();
112-
} else {
113-
internalLock.unlock();
137+
toNotify = gatherAwaiters();
138+
}
139+
internalLock.unlock();
140+
if (toNotify != null) {
141+
notifyAwaiters(toNotify, failure);
114142
}
115143
}
116144

117145
@SuppressWarnings("unchecked")
118-
private void forwardTo(UniSubscriber<? super I> subscriber) {
119-
if (cachedResult instanceof Throwable) {
120-
subscriber.onFailure((Throwable) cachedResult);
146+
private void forwardTo(UniSubscriber<? super I> subscriber, Object result) {
147+
if (result instanceof Throwable) {
148+
subscriber.onFailure((Throwable) result);
121149
} else {
122-
subscriber.onItem((I) cachedResult);
150+
subscriber.onItem((I) result);
123151
}
124152
}
125153

@@ -128,13 +156,6 @@ public Context context() {
128156
return currentContext;
129157
}
130158

131-
private void notifyAwaiters() {
132-
UniSubscriber<? super I> awaiter;
133-
while ((awaiter = awaiters.poll()) != null) {
134-
forwardTo(awaiter);
135-
}
136-
}
137-
138159
private class MemoizedSubscription implements UniSubscription {
139160

140161
private final UniSubscriber<? super I> subscriber;

implementation/src/test/java/io/smallrye/mutiny/groups/UniMemoizeTest.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -590,4 +590,22 @@ void reproducer_1910() {
590590
resultFuture.cancel(true);
591591
}
592592
}
593+
594+
@RepeatedTest(1000)
595+
public void testCachingRaceInNotification() {
596+
AtomicInteger sub = new AtomicInteger();
597+
AtomicInteger count = new AtomicInteger();
598+
599+
ExecutorService pool = ForkJoinPool.commonPool();
600+
Uni<Integer> uni = Uni.createFrom().item(count::incrementAndGet)
601+
.emitOn(pool)
602+
.runSubscriptionOn(pool)
603+
.memoize().until(() -> sub.incrementAndGet() > 2);
604+
605+
assertThat(uni.await().atMost(Duration.ofMillis(100))).isEqualTo(1);
606+
assertThat(uni.await().atMost(Duration.ofMillis(100))).isEqualTo(1);
607+
assertThat(uni.await().atMost(Duration.ofMillis(100))).isEqualTo(2);
608+
assertThat(uni.await().atMost(Duration.ofMillis(100))).isEqualTo(3);
609+
assertThat(uni.await().atMost(Duration.ofMillis(100))).isEqualTo(4);
610+
}
593611
}

0 commit comments

Comments
 (0)