Skip to content

Commit 599601a

Browse files
stevenshandagnir
andauthored
Fix race in FlatteningSubscriber where onClose called before items pa… (#3735)
* Fix race in FlatteningSubscriber where onClose called before items passed to onNext * Add stochastic unit test for race condition in FlatteningSubscriber --------- Co-authored-by: Dongie Agnir <[email protected]>
1 parent 7ad6ef2 commit 599601a

File tree

3 files changed

+72
-2
lines changed

3 files changed

+72
-2
lines changed
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"type": "bugfix",
3+
"category": "AWS SDK for Java v2",
4+
"contributor": "stevenshan",
5+
"description": "Fix race in FlatteningSubscriber where onClose called before items passed to onNext. See [#3734](https://github.com/aws/aws-sdk-java-v2/issues/3734)"
6+
}

utils/src/main/java/software/amazon/awssdk/utils/async/FlatteningSubscriber.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -235,7 +235,7 @@ private void handleOnCompleteState() {
235235
* result is subject to change.
236236
*/
237237
private boolean onCompleteNeeded() {
238-
return allItems.isEmpty() && onCompleteCalledByUpstream && !terminalCallMadeDownstream;
238+
return onCompleteCalledByUpstream && allItems.isEmpty() && !terminalCallMadeDownstream;
239239
}
240240

241241
/**

utils/src/test/java/software/amazon/awssdk/utils/async/FlatteningSubscriberTest.java

Lines changed: 65 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,17 @@
1818
import static org.mockito.Mockito.times;
1919

2020
import java.util.Arrays;
21+
import java.util.Collections;
22+
import java.util.List;
23+
import java.util.concurrent.CompletableFuture;
24+
import java.util.concurrent.ExecutorService;
25+
import java.util.concurrent.Executors;
26+
import java.util.concurrent.atomic.AtomicInteger;
2127
import org.junit.jupiter.api.BeforeEach;
2228
import org.junit.jupiter.api.Test;
2329
import org.mockito.ArgumentCaptor;
2430
import org.mockito.Mockito;
31+
import org.reactivestreams.Publisher;
2532
import org.reactivestreams.Subscriber;
2633
import org.reactivestreams.Subscription;
2734

@@ -195,10 +202,67 @@ public void requestsFromDownstreamDoNothingAfterOnError() {
195202
Mockito.verifyNoMoreInteractions(mockUpstream, mockDelegate);
196203
}
197204

205+
@Test
206+
public void stochastic_dataFlushedBeforeOnComplete() {
207+
ExecutorService exec = Executors.newSingleThreadExecutor();
208+
try {
209+
for (int i = 0; i < 30_000_000; ++i) {
210+
Publisher<List<String>> iterablePublisher = subscriber -> subscriber.onSubscribe(new Subscription() {
211+
@Override
212+
public void request(long l) {
213+
exec.submit(() -> {
214+
subscriber.onNext(Collections.singletonList("data"));
215+
subscriber.onComplete();
216+
});
217+
}
218+
219+
@Override
220+
public void cancel() {
221+
}
222+
});
223+
224+
AtomicInteger seen = new AtomicInteger(0);
225+
CompletableFuture<Void> finished = new CompletableFuture<>();
226+
FlatteningSubscriber<String> elementSubscriber = new FlatteningSubscriber<>(new Subscriber<String>() {
227+
@Override
228+
public void onSubscribe(Subscription subscription) {
229+
subscription.request(1);
230+
}
231+
232+
@Override
233+
public void onNext(String s) {
234+
seen.incrementAndGet();
235+
}
236+
237+
@Override
238+
public void onError(Throwable e) {
239+
finished.completeExceptionally(e);
240+
}
241+
242+
@Override
243+
public void onComplete() {
244+
if (seen.get() != 1) {
245+
finished.completeExceptionally(
246+
new RuntimeException("Should have gotten 1 element before onComplete"));
247+
} else {
248+
finished.complete(null);
249+
}
250+
}
251+
});
252+
253+
iterablePublisher.subscribe(elementSubscriber);
254+
255+
finished.join();
256+
}
257+
} finally {
258+
exec.shutdown();
259+
}
260+
}
261+
198262
private Subscription getDownstreamFromDelegate() {
199263
ArgumentCaptor<Subscription> subscriptionCaptor = ArgumentCaptor.forClass(Subscription.class);
200264
Mockito.verify(mockDelegate).onSubscribe(subscriptionCaptor.capture());
201265
return subscriptionCaptor.getValue();
202266
}
203267

204-
}
268+
}

0 commit comments

Comments
 (0)