Skip to content

Commit 2dacec6

Browse files
rozzajyemin
authored andcommitted
Made checkCompleted abstract in AbstractSubscriptions
There is no guarantee that there would be single calls to request before data arrives. Therefore checkCompleted should not default to true. Tracking completed state fixes a potential race conditions for Subscriptions to SingleResultCallbacks. JAVA-1973
1 parent d8c4702 commit 2dacec6

File tree

5 files changed

+59
-10
lines changed

5 files changed

+59
-10
lines changed

driver-async/src/main/com/mongodb/async/client/AbstractSubscription.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -93,9 +93,7 @@ void requestMoreData() {
9393
void postTerminate() {
9494
}
9595

96-
boolean checkCompleted() {
97-
return true;
98-
}
96+
abstract boolean checkCompleted();
9997

10098
boolean isTerminated() {
10199
return isTerminated;

driver-async/src/main/com/mongodb/async/client/FlatteningSingleResultCallbackSubscription.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,10 @@ class FlatteningSingleResultCallbackSubscription<TResult> extends AbstractSubscr
2525

2626
private final Block<SingleResultCallback<List<TResult>>> block;
2727

28+
/* protected by `this` */
29+
private boolean completed;
30+
/* protected by `this` */
31+
2832
public FlatteningSingleResultCallbackSubscription(final Block<SingleResultCallback<List<TResult>>> block,
2933
final Observer<? super TResult> observer) {
3034
super(observer);
@@ -40,9 +44,17 @@ public void onResult(final List<TResult> result, final Throwable t) {
4044
if (t != null) {
4145
onError(t);
4246
} else {
47+
synchronized (FlatteningSingleResultCallbackSubscription.this) {
48+
completed = true;
49+
}
4350
addToQueue(result);
4451
}
4552
}
4653
});
4754
}
55+
56+
@Override
57+
boolean checkCompleted() {
58+
return completed;
59+
}
4860
}

driver-async/src/main/com/mongodb/async/client/SingleResultCallbackSubscription.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,10 @@ class SingleResultCallbackSubscription<TResult> extends AbstractSubscription<TRe
2323

2424
private final Block<SingleResultCallback<TResult>> block;
2525

26+
/* protected by `this` */
27+
private boolean completed;
28+
/* protected by `this` */
29+
2630
public SingleResultCallbackSubscription(final Block<SingleResultCallback<TResult>> block,
2731
final Observer<? super TResult> observer) {
2832
super(observer);
@@ -38,9 +42,17 @@ public void onResult(final TResult result, final Throwable t) {
3842
if (t != null) {
3943
onError(t);
4044
} else {
45+
synchronized (SingleResultCallbackSubscription.this) {
46+
completed = true;
47+
}
4148
addToQueue(result);
4249
}
4350
}
4451
});
4552
}
53+
54+
@Override
55+
boolean checkCompleted() {
56+
return completed;
57+
}
4658
}

driver-async/src/test/unit/com/mongodb/async/client/FlatteningSingleResultCallbackSubscriptionSpecification.groovy

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -43,15 +43,28 @@ class FlatteningSingleResultCallbackSubscriptionSpecification extends Specificat
4343
1 * block.apply(_)
4444
}
4545

46-
4746
def 'should call onComplete after all data has been consumed'() {
4847
given:
49-
def block = getBlock()
48+
SingleResultCallback<List> listSingleResultCallback
5049
def observer = new TestObserver()
51-
observeAndFlatten(block).subscribe(observer)
50+
observeAndFlatten(new Block<SingleResultCallback<List>>() {
51+
@Override
52+
void apply(final SingleResultCallback<List> callback) {
53+
listSingleResultCallback = callback
54+
}
55+
}).subscribe(observer)
56+
57+
when:
58+
observer.requestMore(5)
59+
observer.requestMore(5)
60+
61+
then:
62+
observer.assertNoErrors()
63+
observer.assertReceivedOnNext([])
64+
observer.assertNoTerminalEvent()
5265

5366
when:
54-
observer.requestMore(10)
67+
listSingleResultCallback.onResult([1, 2, 3, 4], null)
5568

5669
then:
5770
observer.assertNoErrors()

driver-async/src/test/unit/com/mongodb/async/client/SingleResultCallbackSubscriptionSpecification.groovy

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,12 +46,26 @@ class SingleResultCallbackSubscriptionSpecification extends Specification {
4646

4747
def 'should call onComplete after all data has been consumed'() {
4848
given:
49-
def block = getBlock()
49+
SingleResultCallback<Integer> singleResultCallback
5050
def observer = new TestObserver()
51-
observe(block).subscribe(observer)
51+
observe(new Block<SingleResultCallback<Integer>>() {
52+
@Override
53+
void apply(final SingleResultCallback<Integer> callback) {
54+
singleResultCallback = callback
55+
}
56+
}).subscribe(observer)
5257

5358
when:
54-
observer.requestMore(10)
59+
observer.requestMore(5)
60+
observer.requestMore(5)
61+
62+
then:
63+
observer.assertNoErrors()
64+
observer.assertReceivedOnNext([])
65+
observer.assertNoTerminalEvent()
66+
67+
when:
68+
singleResultCallback.onResult(1, null)
5569

5670
then:
5771
observer.assertNoErrors()

0 commit comments

Comments
 (0)