@@ -120,31 +120,38 @@ public void testMultiThreadedWithNPE() {
120
120
121
121
@ Test
122
122
public void testMultiThreadedWithNPEinMiddle () {
123
- TestMultiThreadedObservable onSubscribe = new TestMultiThreadedObservable ("one" , "two" , "three" , null , "four" , "five" , "six" , "seven" , "eight" , "nine" );
124
- Observable <String > w = Observable .create (onSubscribe );
125
-
126
- BusyObserver busyobserver = new BusyObserver ();
127
-
128
- w .serialize ().subscribe (busyobserver );
129
- onSubscribe .waitToFinish ();
130
-
131
- System .out .println ("maxConcurrentThreads: " + onSubscribe .maxConcurrentThreads .get ());
132
- // this should not be the full number of items since the error should stop it before it completes all 9
133
- System .out .println ("onNext count: " + busyobserver .onNextCount .get ());
134
- assertTrue (busyobserver .onNextCount .get () < 9 );
135
- assertTrue (busyobserver .onError );
136
- // no onCompleted because onError was invoked
137
- assertFalse (busyobserver .onCompleted );
138
- // non-deterministic because unsubscribe happens after 'waitToFinish' releases
139
- // so commenting out for now as this is not a critical thing to test here
140
- // verify(s, times(1)).unsubscribe();
141
-
142
- // we can have concurrency ...
143
- assertTrue (onSubscribe .maxConcurrentThreads .get () > 1 );
144
- // ... but the onNext execution should be single threaded
145
- assertEquals (1 , busyobserver .maxConcurrentThreads .get ());
123
+ boolean lessThan9 = false ;
124
+ for (int i = 0 ; i < 3 ; i ++) {
125
+ TestMultiThreadedObservable onSubscribe = new TestMultiThreadedObservable ("one" , "two" , "three" , null , "four" , "five" , "six" , "seven" , "eight" , "nine" );
126
+ Observable <String > w = Observable .create (onSubscribe );
127
+
128
+ BusyObserver busyobserver = new BusyObserver ();
129
+
130
+ w .serialize ().subscribe (busyobserver );
131
+ onSubscribe .waitToFinish ();
132
+
133
+ System .out .println ("maxConcurrentThreads: " + onSubscribe .maxConcurrentThreads .get ());
134
+ // this should not always be the full number of items since the error should (very often)
135
+ // stop it before it completes all 9
136
+ System .out .println ("onNext count: " + busyobserver .onNextCount .get ());
137
+ if (busyobserver .onNextCount .get () < 9 ) {
138
+ lessThan9 = true ;
139
+ }
140
+ assertTrue (busyobserver .onError );
141
+ // no onCompleted because onError was invoked
142
+ assertFalse (busyobserver .onCompleted );
143
+ // non-deterministic because unsubscribe happens after 'waitToFinish' releases
144
+ // so commenting out for now as this is not a critical thing to test here
145
+ // verify(s, times(1)).unsubscribe();
146
+
147
+ // we can have concurrency ...
148
+ assertTrue (onSubscribe .maxConcurrentThreads .get () > 1 );
149
+ // ... but the onNext execution should be single threaded
150
+ assertEquals (1 , busyobserver .maxConcurrentThreads .get ());
151
+ }
152
+ assertTrue (lessThan9 );
146
153
}
147
-
154
+
148
155
/**
149
156
* A thread that will pass data to onNext
150
157
*/
@@ -276,6 +283,7 @@ public TestMultiThreadedObservable(String... values) {
276
283
@ Override
277
284
public void call (final Subscriber <? super String > observer ) {
278
285
System .out .println ("TestMultiThreadedObservable subscribed to ..." );
286
+ final NullPointerException npe = new NullPointerException ();
279
287
t = new Thread (new Runnable () {
280
288
281
289
@ Override
@@ -290,11 +298,12 @@ public void run() {
290
298
threadsRunning .incrementAndGet ();
291
299
try {
292
300
// perform onNext call
293
- System .out .println ("TestMultiThreadedObservable onNext: " + s );
294
301
if (s == null ) {
302
+ System .out .println ("TestMultiThreadedObservable onNext: null" );
295
303
// force an error
296
- throw new NullPointerException ();
297
- }
304
+ throw npe ;
305
+ } else
306
+ System .out .println ("TestMultiThreadedObservable onNext: " + s );
298
307
observer .onNext (s );
299
308
// capture 'maxThreads'
300
309
int concurrentThreads = threadsRunning .get ();
0 commit comments