Skip to content

Commit 9da37d1

Browse files
committed
Ensure Observables catch errors when requesting data
JAVA-2048
1 parent 755cfc9 commit 9da37d1

File tree

4 files changed

+90
-5
lines changed

4 files changed

+90
-5
lines changed

driver-async/src/main/com/mongodb/async/client/AbstractSubscription.java

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -79,9 +79,9 @@ public void request(final long n) {
7979
}
8080

8181
if (requestData) {
82-
requestInitialData();
82+
tryRequestInitialData();
8383
} else {
84-
processResultsQueue();
84+
tryProcessResultsQueue();
8585
}
8686
}
8787

@@ -107,14 +107,14 @@ void addToQueue(final TResult result) {
107107
if (result != null) {
108108
resultsQueue.add(result);
109109
}
110-
processResultsQueue();
110+
tryProcessResultsQueue();
111111
}
112112

113113
void addToQueue(final List<TResult> results) {
114114
if (results != null) {
115115
resultsQueue.addAll(results);
116116
}
117-
processResultsQueue();
117+
tryProcessResultsQueue();
118118
}
119119

120120
void onError(final Throwable t) {
@@ -146,6 +146,22 @@ void onComplete() {
146146
}
147147
}
148148

149+
private void tryRequestInitialData() {
150+
try {
151+
requestInitialData();
152+
} catch (Throwable t) {
153+
onError(t);
154+
}
155+
}
156+
157+
private void tryProcessResultsQueue() {
158+
try {
159+
processResultsQueue();
160+
} catch (Throwable t) {
161+
onError(t);
162+
}
163+
}
164+
149165
private void processResultsQueue() {
150166
boolean mustProcess = false;
151167

driver-async/src/test/unit/com/mongodb/async/client/FlatteningSingleResultCallbackSubscriptionSpecification.groovy

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -242,6 +242,24 @@ class FlatteningSingleResultCallbackSubscriptionSpecification extends Specificat
242242
observer.assertTerminalEvent()
243243
}
244244

245+
def 'should call onError if the passed block errors'() {
246+
given:
247+
def observer = new TestObserver()
248+
observeAndFlatten(new Block<SingleResultCallback<List<Integer>>>() {
249+
@Override
250+
void apply(final SingleResultCallback<List<Integer>> callback) {
251+
throw new MongoException('failed');
252+
}
253+
}).subscribe(observer)
254+
255+
when:
256+
observer.requestMore(1)
257+
258+
then:
259+
observer.assertErrored()
260+
observer.assertTerminalEvent()
261+
}
262+
245263
def getBlock() {
246264
new Block<SingleResultCallback<List<Integer>>>() {
247265

driver-async/src/test/unit/com/mongodb/async/client/MongoIterableSubscriptionSpecification.groovy

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -372,6 +372,23 @@ class MongoIterableSubscriptionSpecification extends Specification {
372372

373373
then:
374374
observer.assertTerminalEvent()
375+
observer.assertErrored()
376+
}
377+
378+
def 'should call onError if MongoIterable errors'() {
379+
given:
380+
def observer = new TestObserver()
381+
observe(getMongoIterable(getFailingCursor(failImmediately))).subscribe(observer)
382+
383+
when:
384+
observer.requestMore(3)
385+
386+
then:
387+
observer.assertTerminalEvent()
388+
observer.assertErrored()
389+
390+
where:
391+
failImmediately << [true, false]
375392
}
376393

377394
def getMongoIterable() {
@@ -394,4 +411,21 @@ class MongoIterableSubscriptionSpecification extends Specification {
394411
}
395412
}
396413
}
414+
415+
def getFailingCursor(boolean failImmediately) {
416+
Mock(AsyncBatchCursor) {
417+
def cursorResults = [[1, 2]]
418+
def hasSetBatchSize = failImmediately
419+
setBatchSize(_) >> {
420+
if (!hasSetBatchSize) {
421+
hasSetBatchSize = true
422+
} else {
423+
throw new MongoException("Failure")
424+
}
425+
}
426+
next(_) >> {
427+
it[0].onResult(cursorResults.isEmpty() ? null : cursorResults.remove(0), null)
428+
}
429+
}
430+
}
397431
}

driver-async/src/test/unit/com/mongodb/async/client/SingleResultCallbackSubscriptionSpecification.groovy

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,6 @@ class SingleResultCallbackSubscriptionSpecification extends Specification {
4343
1 * block.apply(_)
4444
}
4545

46-
4746
def 'should call onComplete after all data has been consumed'() {
4847
given:
4948
SingleResultCallback<Integer> singleResultCallback
@@ -241,6 +240,24 @@ class SingleResultCallbackSubscriptionSpecification extends Specification {
241240
observer.assertTerminalEvent()
242241
}
243242

243+
def 'should call onError if the passed block errors'() {
244+
given:
245+
def observer = new TestObserver()
246+
observe(new Block<SingleResultCallback<Integer>>() {
247+
@Override
248+
void apply(final SingleResultCallback<Integer> callback) {
249+
throw new MongoException('failed');
250+
}
251+
}).subscribe(observer)
252+
253+
when:
254+
observer.requestMore(1)
255+
256+
then:
257+
observer.assertErrored()
258+
observer.assertTerminalEvent()
259+
}
260+
244261
def getBlock() {
245262
getBlock(1)
246263
}

0 commit comments

Comments
 (0)