Skip to content

Commit 25a4f87

Browse files
committed
Log exceptions cause by bad Observer implementations
Ensure any exceptions from userland code are logged if they cannot be processed by onError. JAVA-3266
1 parent 2b2a85a commit 25a4f87

File tree

4 files changed

+226
-12
lines changed

4 files changed

+226
-12
lines changed

driver-async/src/main/com/mongodb/async/client/AbstractSubscription.java

Lines changed: 20 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,16 @@
1616

1717
package com.mongodb.async.client;
1818

19+
import com.mongodb.MongoException;
20+
import com.mongodb.diagnostics.logging.Logger;
21+
import com.mongodb.diagnostics.logging.Loggers;
1922
import com.mongodb.lang.Nullable;
2023

2124
import java.util.List;
2225
import java.util.concurrent.ConcurrentLinkedQueue;
2326

2427
abstract class AbstractSubscription<TResult> implements Subscription {
25-
28+
private static final Logger LOGGER = Loggers.getLogger("client");
2629
private final Observer<? super TResult> observer;
2730

2831
/* protected by `this` */
@@ -120,20 +123,23 @@ void addToQueue(@Nullable final List<TResult> results) {
120123
void onError(final Throwable t) {
121124
if (terminalAction()) {
122125
postTerminate();
123-
observer.onError(t);
126+
try {
127+
observer.onError(t);
128+
} catch (Throwable t1) {
129+
LOGGER.error("Calling onError threw an exception", t1);
130+
throw MongoException.fromThrowableNonNull(t1);
131+
}
132+
} else {
133+
throw MongoException.fromThrowableNonNull(t);
124134
}
125135
}
126136

127137
void onNext(final TResult next) {
128-
boolean isTerminated;
129-
synchronized (this) {
130-
isTerminated = this.isTerminated;
131-
}
132-
133-
if (!isTerminated) {
138+
if (!isTerminated()) {
134139
try {
135140
observer.onNext(next);
136141
} catch (Throwable t) {
142+
LOGGER.error("Calling onNext threw an exception", t);
137143
onError(t);
138144
}
139145
}
@@ -142,7 +148,12 @@ void onNext(final TResult next) {
142148
void onComplete() {
143149
if (terminalAction()) {
144150
postTerminate();
145-
observer.onComplete();
151+
try {
152+
observer.onComplete();
153+
} catch (Throwable t) {
154+
LOGGER.error("Calling onComplete threw an exception", t);
155+
throw MongoException.fromThrowableNonNull(t);
156+
}
146157
}
147158
}
148159

driver-async/src/test/unit/com/mongodb/async/client/FlatteningSingleResultCallbackSubscriptionSpecification.groovy

Lines changed: 69 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -223,7 +223,6 @@ class FlatteningSingleResultCallbackSubscriptionSpecification extends Specificat
223223

224224
def 'should call onError if onNext causes an Error'() {
225225
given:
226-
def block = getBlock()
227226
def observer = new TestObserver(new Observer() {
228227
@Override
229228
void onSubscribe(final Subscription subscription) {
@@ -242,13 +241,80 @@ class FlatteningSingleResultCallbackSubscriptionSpecification extends Specificat
242241
void onComplete() {
243242
}
244243
})
245-
observeAndFlatten(block).subscribe(observer)
244+
observeAndFlatten(getBlock()).subscribe(observer)
246245

247246
when:
248247
observer.requestMore(1)
249248

250249
then:
250+
notThrown(MongoException)
251+
observer.assertTerminalEvent()
252+
observer.assertErrored()
253+
}
254+
255+
def 'should throw the exception if calling onComplete raises one'() {
256+
given:
257+
def observer = new TestObserver(new Observer() {
258+
@Override
259+
void onSubscribe(final Subscription subscription) {
260+
}
261+
262+
@Override
263+
void onNext(final Object result) {
264+
}
265+
266+
@Override
267+
void onError(final Throwable e) {
268+
}
269+
270+
@Override
271+
void onComplete() {
272+
throw new MongoException('exception calling onComplete')
273+
}
274+
})
275+
observeAndFlatten(getBlock()).subscribe(observer)
276+
277+
when:
278+
observer.requestMore(100)
279+
280+
then:
281+
def ex = thrown(MongoException)
282+
ex.message == 'exception calling onComplete'
251283
observer.assertTerminalEvent()
284+
observer.assertNoErrors()
285+
}
286+
287+
def 'should throw the exception if calling onError raises one'() {
288+
given:
289+
def observer = new TestObserver(new Observer() {
290+
@Override
291+
void onSubscribe(final Subscription subscription) {
292+
}
293+
294+
@Override
295+
void onNext(final Object result) {
296+
throw new MongoException('fail')
297+
}
298+
299+
@Override
300+
void onError(final Throwable e) {
301+
throw new MongoException('exception calling onError')
302+
}
303+
304+
@Override
305+
void onComplete() {
306+
}
307+
})
308+
observeAndFlatten(getBlock()).subscribe(observer)
309+
310+
when:
311+
observer.requestMore(100)
312+
313+
then:
314+
def ex = thrown(MongoException)
315+
ex.message == 'exception calling onError'
316+
observer.assertTerminalEvent()
317+
observer.assertErrored()
252318
}
253319

254320
def 'should call onError if the passed block errors'() {
@@ -257,7 +323,7 @@ class FlatteningSingleResultCallbackSubscriptionSpecification extends Specificat
257323
observeAndFlatten(new Block<SingleResultCallback<List<Integer>>>() {
258324
@Override
259325
void apply(final SingleResultCallback<List<Integer>> callback) {
260-
throw new MongoException('failed');
326+
throw new MongoException('failed')
261327
}
262328
}).subscribe(observer)
263329

driver-async/src/test/unit/com/mongodb/async/client/MongoIterableSubscriptionSpecification.groovy

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -432,6 +432,72 @@ class MongoIterableSubscriptionSpecification extends Specification {
432432
observer.requestMore(1)
433433

434434
then:
435+
notThrown(MongoException)
436+
observer.assertTerminalEvent()
437+
observer.assertErrored()
438+
}
439+
440+
def 'should throw the exception if calling onComplete raises one'() {
441+
given:
442+
def observer = new TestObserver(new Observer() {
443+
@Override
444+
void onSubscribe(final Subscription subscription) {
445+
}
446+
447+
@Override
448+
void onNext(final Object result) {
449+
}
450+
451+
@Override
452+
void onError(final Throwable e) {
453+
}
454+
455+
@Override
456+
void onComplete() {
457+
throw new MongoException('exception calling onComplete')
458+
}
459+
})
460+
observe(getMongoIterable()).subscribe(observer)
461+
462+
when:
463+
observer.requestMore(100)
464+
465+
then:
466+
def ex = thrown(MongoException)
467+
ex.message == 'exception calling onComplete'
468+
observer.assertTerminalEvent()
469+
observer.assertNoErrors()
470+
}
471+
472+
def 'should throw the exception if calling onError raises one'() {
473+
given:
474+
def observer = new TestObserver(new Observer() {
475+
@Override
476+
void onSubscribe(final Subscription subscription) {
477+
}
478+
479+
@Override
480+
void onNext(final Object result) {
481+
throw new MongoException('fail')
482+
}
483+
484+
@Override
485+
void onError(final Throwable e) {
486+
throw new MongoException('exception calling onError')
487+
}
488+
489+
@Override
490+
void onComplete() {
491+
}
492+
})
493+
observe(getMongoIterable()).subscribe(observer)
494+
495+
when:
496+
observer.requestMore(1)
497+
498+
then:
499+
def ex = thrown(MongoException)
500+
ex.message == 'exception calling onError'
435501
observer.assertTerminalEvent()
436502
observer.assertErrored()
437503
}

driver-async/src/test/unit/com/mongodb/async/client/SingleResultCallbackSubscriptionSpecification.groovy

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -228,6 +228,7 @@ class SingleResultCallbackSubscriptionSpecification extends Specification {
228228
observer.requestMore(1)
229229

230230
then:
231+
thrown(MongoException)
231232
observer.assertReceivedOnNext([1])
232233
observer.assertUnsubscribed()
233234
observer.assertNoTerminalEvent()
@@ -283,6 +284,76 @@ class SingleResultCallbackSubscriptionSpecification extends Specification {
283284
observer.requestMore(1)
284285

285286
then:
287+
notThrown(MongoException)
288+
observer.assertTerminalEvent()
289+
observer.assertErrored()
290+
}
291+
292+
def 'should throw the exception if calling onComplete raises one'() {
293+
given:
294+
def observer = new TestObserver(new Observer(){
295+
@Override
296+
void onSubscribe(final Subscription subscription) {
297+
}
298+
299+
@Override
300+
void onNext(final Object o) {
301+
}
302+
303+
@Override
304+
void onError(final Throwable e) {
305+
}
306+
307+
@Override
308+
void onComplete() {
309+
throw new MongoException('exception calling onComplete')
310+
}
311+
})
312+
observe(getBlock()).subscribe(observer)
313+
314+
when:
315+
observer.requestMore(100)
316+
317+
then:
318+
def ex = thrown(MongoException)
319+
observer.assertNoErrors()
320+
observer.assertTerminalEvent()
321+
ex.message == 'exception calling onComplete'
322+
}
323+
324+
def 'should throw the exception if calling onError raises one'() {
325+
given:
326+
def observer = new TestObserver(new Observer(){
327+
@Override
328+
void onSubscribe(final Subscription subscription) {
329+
}
330+
331+
@Override
332+
void onNext(final Object o) {
333+
}
334+
335+
@Override
336+
void onError(final Throwable e) {
337+
throw new MongoException('exception calling onError')
338+
}
339+
340+
@Override
341+
void onComplete() {
342+
}
343+
})
344+
observe(new Block<SingleResultCallback<Integer>>() {
345+
@Override
346+
void apply(final SingleResultCallback<Integer> callback) {
347+
throw new MongoException('fail')
348+
}
349+
}).subscribe(observer)
350+
351+
when:
352+
observer.requestMore(1)
353+
354+
then:
355+
def ex = thrown(MongoException)
356+
ex.message == 'exception calling onError'
286357
observer.assertErrored()
287358
observer.assertTerminalEvent()
288359
}

0 commit comments

Comments
 (0)