Skip to content

Commit cc1be4d

Browse files
committed
Handle null results for reactive applications
JAVA-3279
1 parent 81a763b commit cc1be4d

File tree

2 files changed

+37
-9
lines changed

2 files changed

+37
-9
lines changed

driver-async/src/main/com/mongodb/async/client/AbstractSubscription.java

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626

2727
abstract class AbstractSubscription<TResult> implements Subscription {
2828
private static final Logger LOGGER = Loggers.getLogger("client");
29+
private static final Object NULL_PLACEHOLDER = new Object();
2930
private final Observer<? super TResult> observer;
3031

3132
/* protected by `this` */
@@ -36,7 +37,7 @@ abstract class AbstractSubscription<TResult> implements Subscription {
3637
private boolean isTerminated = false;
3738
/* protected by `this` */
3839

39-
private final ConcurrentLinkedQueue<TResult> resultsQueue = new ConcurrentLinkedQueue<TResult>();
40+
private final ConcurrentLinkedQueue<Object> resultsQueue = new ConcurrentLinkedQueue<Object>();
4041

4142
AbstractSubscription(final Observer<? super TResult> observer) {
4243
this.observer = observer;
@@ -109,14 +110,18 @@ synchronized long getRequested() {
109110
}
110111

111112
void addToQueue(@Nullable final TResult result) {
112-
if (result != null) {
113+
if (result == null) {
114+
resultsQueue.add(NULL_PLACEHOLDER);
115+
} else {
113116
resultsQueue.add(result);
114117
}
115118
}
116119

117120
void addToQueue(@Nullable final List<TResult> results) {
118121
if (results != null) {
119-
resultsQueue.addAll(results);
122+
for (TResult cur : results) {
123+
addToQueue(cur);
124+
}
120125
}
121126
}
122127

@@ -134,8 +139,11 @@ void onError(final Throwable t) {
134139
}
135140
}
136141

137-
void onNext(final TResult next) {
142+
private void onNext(@Nullable final TResult next) {
138143
if (!isTerminated()) {
144+
if (next == null) {
145+
throw new NullPointerException();
146+
}
139147
try {
140148
observer.onNext(next);
141149
} catch (Throwable t) {
@@ -145,7 +153,7 @@ void onNext(final TResult next) {
145153
}
146154
}
147155

148-
void onComplete() {
156+
private void onComplete() {
149157
if (terminalAction()) {
150158
postTerminate();
151159
try {
@@ -173,6 +181,7 @@ void tryProcessResultsQueue() {
173181
}
174182
}
175183

184+
@SuppressWarnings("unchecked")
176185
private void processResultsQueue() {
177186
boolean mustProcess = false;
178187

@@ -207,11 +216,11 @@ private void processResultsQueue() {
207216
processedCount = 0;
208217

209218
while (localWanted > 0) {
210-
TResult item = resultsQueue.poll();
219+
Object item = resultsQueue.poll();
211220
if (item == null) {
212221
break;
213222
} else {
214-
onNext(item);
223+
onNext(item == NULL_PLACEHOLDER ? null : (TResult) item);
215224
localWanted -= 1;
216225
processedCount += 1;
217226
}

driver-async/src/test/unit/com/mongodb/async/client/SingleResultCallbackSubscriptionSpecification.groovy

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -257,7 +257,7 @@ class SingleResultCallbackSubscriptionSpecification extends Specification {
257257
observe(new Block<SingleResultCallback<Void>>(){
258258
@Override
259259
void apply(final SingleResultCallback<Void> callback) {
260-
callback.onResult(null, null)
260+
callback.onResult(1, null)
261261
}
262262
}).subscribe(observer)
263263

@@ -266,7 +266,7 @@ class SingleResultCallbackSubscriptionSpecification extends Specification {
266266

267267
then:
268268
observer.assertNoErrors()
269-
observer.assertReceivedOnNext([])
269+
observer.assertReceivedOnNext([1])
270270
observer.assertTerminalEvent()
271271
}
272272

@@ -289,6 +289,25 @@ class SingleResultCallbackSubscriptionSpecification extends Specification {
289289
observer.assertErrored()
290290
}
291291

292+
def 'should call onError on a null result'() {
293+
given:
294+
def observer = new TestObserver()
295+
observe(new Block<SingleResultCallback<Void>>() {
296+
@Override
297+
void apply(final SingleResultCallback<Void> callback) {
298+
callback.onResult(null, null);
299+
}
300+
}).subscribe(observer)
301+
302+
when:
303+
observer.requestMore(1)
304+
305+
then:
306+
observer.assertTerminalEvent()
307+
observer.assertErrored()
308+
observer.getOnErrorEvents().first() instanceof NullPointerException
309+
}
310+
292311
def 'should throw the exception if calling onComplete raises one'() {
293312
given:
294313
def observer = new TestObserver(new Observer(){

0 commit comments

Comments
 (0)