Skip to content

Commit d2cfac1

Browse files
Operator Tests Retrofitted to use TestObserver
- work around inability of Mockito to correctly mock an abstract class - 15 of 590 tests still failing
1 parent 6d9d540 commit d2cfac1

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

48 files changed

+288
-307
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@
104104
import rx.operators.OperatorMerge;
105105
import rx.operators.OperatorParallel;
106106
import rx.operators.OperatorTake;
107-
import rx.operators.OperatorTakeTimed;
107+
import rx.operators.OperationTakeTimed;
108108
import rx.operators.OperatorTimestamp;
109109
import rx.operators.OperatorToObservableList;
110110
import rx.operators.OperatorToObservableSortedList;
@@ -212,7 +212,10 @@ public final static <T> Observable<T> create(final OnSubscribeFunc<T> f) {
212212

213213
@Override
214214
public void call(Observer<? super T> observer) {
215-
f.onSubscribe(observer);
215+
Subscription s = f.onSubscribe(observer);
216+
if (s != null) {
217+
observer.add(s);
218+
}
216219
}
217220

218221
});
@@ -7134,7 +7137,7 @@ public final Observable<T> take(long time, TimeUnit unit) {
71347137
* @see <a href="https://github.com/Netflix/RxJava/wiki/Filtering-Observables#take">RxJava Wiki: take()</a>
71357138
*/
71367139
public final Observable<T> take(long time, TimeUnit unit, Scheduler scheduler) {
7137-
return create(new OperatorTakeTimed.TakeTimed<T>(this, time, unit, scheduler));
7140+
return create(new OperationTakeTimed.TakeTimed<T>(this, time, unit, scheduler));
71387141
}
71397142

71407143
/**
@@ -8465,7 +8468,7 @@ private boolean isInternalImplementation(Object o) {
84658468
} else {
84668469
// we treat the following package as "internal" and don't wrap it
84678470
Package p = o.getClass().getPackage(); // it can be null
8468-
Boolean isInternal = (p != null && p.getName().startsWith("rx.Observers"));
8471+
Boolean isInternal = (p != null && p.getName().startsWith("rx.operators"));
84698472
internalClassMap.put(clazz, isInternal);
84708473
return isInternal;
84718474
}

rxjava-core/src/main/java/rx/operators/OperatorTakeTimed.java renamed to rxjava-core/src/main/java/rx/operators/OperationTakeTimed.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@
3838
* subscribing Observer's <code>onNext</code> function a maximum of <code>num</code> times before
3939
* invoking <code>onCompleted</code>.
4040
*/
41-
public final class OperatorTakeTimed {
41+
public final class OperationTakeTimed {
4242

4343
//TODO this has not been migrated to use bind yet
4444

rxjava-core/src/test/java/rx/operators/OperationAmbTest.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import rx.Observable.OnSubscribeFunc;
3030
import rx.Observer;
3131
import rx.Subscription;
32+
import rx.observers.TestObserver;
3233
import rx.schedulers.TestScheduler;
3334
import rx.subscriptions.CompositeSubscription;
3435
import rx.util.functions.Action0;
@@ -90,7 +91,7 @@ public void testAmb() {
9091

9192
@SuppressWarnings("unchecked")
9293
Observer<String> observer = (Observer<String>) mock(Observer.class);
93-
o.subscribe(observer);
94+
o.subscribe(new TestObserver<String>(observer));
9495

9596
scheduler.advanceTimeBy(100000, TimeUnit.MILLISECONDS);
9697

@@ -119,7 +120,7 @@ public void testAmb2() {
119120

120121
@SuppressWarnings("unchecked")
121122
Observer<String> observer = (Observer<String>) mock(Observer.class);
122-
o.subscribe(observer);
123+
o.subscribe(new TestObserver<String>(observer));
123124

124125
scheduler.advanceTimeBy(100000, TimeUnit.MILLISECONDS);
125126

@@ -146,7 +147,7 @@ public void testAmb3() {
146147

147148
@SuppressWarnings("unchecked")
148149
Observer<String> observer = (Observer<String>) mock(Observer.class);
149-
o.subscribe(observer);
150+
o.subscribe(new TestObserver<String>(observer));
150151

151152
scheduler.advanceTimeBy(100000, TimeUnit.MILLISECONDS);
152153
InOrder inOrder = inOrder(observer);

rxjava-core/src/test/java/rx/operators/OperationAverageTest.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323

2424
import rx.Observable;
2525
import rx.Observer;
26+
import rx.observers.TestObserver;
2627
import rx.operators.OperationReduceTest.CustomException;
2728
import rx.util.functions.Func1;
2829

@@ -281,7 +282,7 @@ public Integer call(String t1) {
281282

282283
Observable<Integer> result = source.averageInteger(length);
283284
Observer<Object> o = mock(Observer.class);
284-
result.subscribe(o);
285+
result.subscribe(new TestObserver<Object>(o));
285286

286287
testThrows(o, CustomException.class);
287288
}
@@ -298,7 +299,7 @@ public Long call(String t1) {
298299

299300
Observable<Long> result = source.averageLong(length);
300301
Observer<Object> o = mock(Observer.class);
301-
result.subscribe(o);
302+
result.subscribe(new TestObserver<Object>(o));
302303

303304
testThrows(o, CustomException.class);
304305
}
@@ -315,7 +316,7 @@ public Float call(String t1) {
315316

316317
Observable<Float> result = source.averageFloat(length);
317318
Observer<Object> o = mock(Observer.class);
318-
result.subscribe(o);
319+
result.subscribe(new TestObserver<Object>(o));
319320

320321
testThrows(o, CustomException.class);
321322
}
@@ -332,7 +333,7 @@ public Double call(String t1) {
332333

333334
Observable<Double> result = source.averageDouble(length);
334335
Observer<Object> o = mock(Observer.class);
335-
result.subscribe(o);
336+
result.subscribe(new TestObserver<Object>(o));
336337

337338
testThrows(o, CustomException.class);
338339
}

rxjava-core/src/test/java/rx/operators/OperationBufferTest.java

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import rx.Observable;
3535
import rx.Observer;
3636
import rx.Subscription;
37+
import rx.observers.TestObserver;
3738
import rx.schedulers.TestScheduler;
3839
import rx.subjects.PublishSubject;
3940
import rx.subscriptions.Subscriptions;
@@ -166,7 +167,7 @@ public Subscription onSubscribe(Observer<? super String> observer) {
166167
});
167168

168169
Observable<List<String>> buffered = Observable.create(buffer(source, 100, TimeUnit.MILLISECONDS, 2, scheduler));
169-
buffered.subscribe(observer);
170+
buffered.subscribe(new TestObserver<List<String>>(observer));
170171

171172
InOrder inOrder = Mockito.inOrder(observer);
172173
scheduler.advanceTimeTo(100, TimeUnit.MILLISECONDS);
@@ -198,7 +199,7 @@ public Subscription onSubscribe(Observer<? super String> observer) {
198199
});
199200

200201
Observable<List<String>> buffered = Observable.create(buffer(source, 100, TimeUnit.MILLISECONDS, scheduler));
201-
buffered.subscribe(observer);
202+
buffered.subscribe(new TestObserver<List<String>>(observer));
202203

203204
InOrder inOrder = Mockito.inOrder(observer);
204205
scheduler.advanceTimeTo(101, TimeUnit.MILLISECONDS);
@@ -251,7 +252,7 @@ public Subscription onSubscribe(Observer<? super Object> observer) {
251252
};
252253

253254
Observable<List<String>> buffered = Observable.create(buffer(source, openings, closer));
254-
buffered.subscribe(observer);
255+
buffered.subscribe(new TestObserver<List<String>>(observer));
255256

256257
InOrder inOrder = Mockito.inOrder(observer);
257258
scheduler.advanceTimeTo(500, TimeUnit.MILLISECONDS);
@@ -292,7 +293,7 @@ public Subscription onSubscribe(Observer<? super Object> observer) {
292293
};
293294

294295
Observable<List<String>> buffered = Observable.create(buffer(source, closer));
295-
buffered.subscribe(observer);
296+
buffered.subscribe(new TestObserver<List<String>>(observer));
296297

297298
InOrder inOrder = Mockito.inOrder(observer);
298299
scheduler.advanceTimeTo(500, TimeUnit.MILLISECONDS);
@@ -370,7 +371,7 @@ public void testBufferStopsWhenUnsubscribed1() {
370371

371372
Observer<List<Integer>> o = mock(Observer.class);
372373

373-
Subscription s = source.buffer(100, 200, TimeUnit.MILLISECONDS, scheduler).subscribe(o);
374+
Subscription s = source.buffer(100, 200, TimeUnit.MILLISECONDS, scheduler).subscribe(new TestObserver<List<Integer>>(o));
374375

375376
InOrder inOrder = Mockito.inOrder(o);
376377

@@ -394,7 +395,7 @@ public void bufferWithBONormal1() {
394395
Observer<Object> o = mock(Observer.class);
395396
InOrder inOrder = Mockito.inOrder(o);
396397

397-
source.toObservable().buffer(boundary.toObservable()).subscribe(o);
398+
source.toObservable().buffer(boundary.toObservable()).subscribe(new TestObserver<Object>(o));
398399

399400
source.onNext(1);
400401
source.onNext(2);
@@ -430,7 +431,7 @@ public void bufferWithBOEmptyLastViaBoundary() {
430431
Observer<Object> o = mock(Observer.class);
431432
InOrder inOrder = Mockito.inOrder(o);
432433

433-
source.toObservable().buffer(boundary.toObservable()).subscribe(o);
434+
source.toObservable().buffer(boundary.toObservable()).subscribe(new TestObserver<Object>(o));
434435

435436
boundary.onCompleted();
436437

@@ -450,7 +451,7 @@ public void bufferWithBOEmptyLastViaSource() {
450451
Observer<Object> o = mock(Observer.class);
451452
InOrder inOrder = Mockito.inOrder(o);
452453

453-
source.toObservable().buffer(boundary.toObservable()).subscribe(o);
454+
source.toObservable().buffer(boundary.toObservable()).subscribe(new TestObserver<Object>(o));
454455

455456
source.onCompleted();
456457

@@ -470,7 +471,7 @@ public void bufferWithBOEmptyLastViaBoth() {
470471
Observer<Object> o = mock(Observer.class);
471472
InOrder inOrder = Mockito.inOrder(o);
472473

473-
source.toObservable().buffer(boundary.toObservable()).subscribe(o);
474+
source.toObservable().buffer(boundary.toObservable()).subscribe(new TestObserver<Object>(o));
474475

475476
source.onCompleted();
476477
boundary.onCompleted();
@@ -490,7 +491,7 @@ public void bufferWithBOSourceThrows() {
490491
@SuppressWarnings("unchecked")
491492
Observer<Object> o = mock(Observer.class);
492493

493-
source.toObservable().buffer(boundary.toObservable()).subscribe(o);
494+
source.toObservable().buffer(boundary.toObservable()).subscribe(new TestObserver<Object>(o));
494495
source.onNext(1);
495496
source.onError(new OperationReduceTest.CustomException());
496497

@@ -507,7 +508,7 @@ public void bufferWithBOBoundaryThrows() {
507508
@SuppressWarnings("unchecked")
508509
Observer<Object> o = mock(Observer.class);
509510

510-
source.toObservable().buffer(boundary.toObservable()).subscribe(o);
511+
source.toObservable().buffer(boundary.toObservable()).subscribe(new TestObserver<Object>(o));
511512

512513
source.onNext(1);
513514
boundary.onError(new OperationReduceTest.CustomException());

rxjava-core/src/test/java/rx/operators/OperationCombineLatestTest.java

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import rx.Observable;
2929
import rx.Observer;
3030
import rx.Subscription;
31+
import rx.observers.TestObserver;
3132
import rx.subjects.PublishSubject;
3233
import rx.subscriptions.Subscriptions;
3334
import rx.util.functions.Func2;
@@ -51,7 +52,7 @@ public String call(String v1, String v2) {
5152
throw new RuntimeException("I don't work.");
5253
}
5354
}));
54-
combined.subscribe(w);
55+
combined.subscribe(new TestObserver<String>(w));
5556

5657
w1.observer.onNext("first value of w1");
5758
w2.observer.onNext("first value of w2");
@@ -72,7 +73,7 @@ public void testCombineLatestDifferentLengthObservableSequences1() {
7273
TestObservable w3 = new TestObservable();
7374

7475
Observable<String> combineLatestW = Observable.create(combineLatest(Observable.create(w1), Observable.create(w2), Observable.create(w3), getConcat3StringsCombineLatestFunction()));
75-
combineLatestW.subscribe(w);
76+
combineLatestW.subscribe(new TestObserver<String>(w));
7677

7778
/* simulate sending data */
7879
// once for w1
@@ -110,7 +111,7 @@ public void testCombineLatestDifferentLengthObservableSequences2() {
110111
TestObservable w3 = new TestObservable();
111112

112113
Observable<String> combineLatestW = Observable.create(combineLatest(Observable.create(w1), Observable.create(w2), Observable.create(w3), getConcat3StringsCombineLatestFunction()));
113-
combineLatestW.subscribe(w);
114+
combineLatestW.subscribe(new TestObserver<String>(w));
114115

115116
/* simulate sending data */
116117
// 4 times for w1
@@ -146,7 +147,7 @@ public void testCombineLatestWithInterleavingSequences() {
146147
TestObservable w3 = new TestObservable();
147148

148149
Observable<String> combineLatestW = Observable.create(combineLatest(Observable.create(w1), Observable.create(w2), Observable.create(w3), getConcat3StringsCombineLatestFunction()));
149-
combineLatestW.subscribe(w);
150+
combineLatestW.subscribe(new TestObserver<String>(w));
150151

151152
/* simulate sending data */
152153
w1.observer.onNext("1a");
@@ -185,7 +186,7 @@ public void testCombineLatest2Types() {
185186
Observer<String> aObserver = mock(Observer.class);
186187

187188
Observable<String> w = Observable.create(combineLatest(Observable.from("one", "two"), Observable.from(2, 3, 4), combineLatestFunction));
188-
w.subscribe(aObserver);
189+
w.subscribe(new TestObserver<String>(aObserver));
189190

190191
verify(aObserver, never()).onError(any(Throwable.class));
191192
verify(aObserver, times(1)).onCompleted();
@@ -204,7 +205,7 @@ public void testCombineLatest3TypesA() {
204205
Observer<String> aObserver = mock(Observer.class);
205206

206207
Observable<String> w = Observable.create(combineLatest(Observable.from("one", "two"), Observable.from(2), Observable.from(new int[] { 4, 5, 6 }), combineLatestFunction));
207-
w.subscribe(aObserver);
208+
w.subscribe(new TestObserver<String>(aObserver));
208209

209210
verify(aObserver, never()).onError(any(Throwable.class));
210211
verify(aObserver, times(1)).onCompleted();
@@ -221,7 +222,7 @@ public void testCombineLatest3TypesB() {
221222
Observer<String> aObserver = mock(Observer.class);
222223

223224
Observable<String> w = Observable.create(combineLatest(Observable.from("one"), Observable.from(2), Observable.from(new int[] { 4, 5, 6 }, new int[] { 7, 8 }), combineLatestFunction));
224-
w.subscribe(aObserver);
225+
w.subscribe(new TestObserver<String>(aObserver));
225226

226227
verify(aObserver, never()).onError(any(Throwable.class));
227228
verify(aObserver, times(1)).onCompleted();
@@ -334,7 +335,7 @@ public void combineSimple() {
334335

335336
Observer<Object> observer = mock(Observer.class);
336337

337-
source.subscribe(observer);
338+
source.subscribe(new TestObserver<Object>(observer));
338339

339340
InOrder inOrder = inOrder(observer);
340341

@@ -380,8 +381,8 @@ public void combineMultipleObservers() {
380381
Observer<Object> observer1 = mock(Observer.class);
381382
Observer<Object> observer2 = mock(Observer.class);
382383

383-
source.subscribe(observer1);
384-
source.subscribe(observer2);
384+
source.subscribe(new TestObserver<Object>(observer1));
385+
source.subscribe(new TestObserver<Object>(observer2));
385386

386387
InOrder inOrder1 = inOrder(observer1);
387388
InOrder inOrder2 = inOrder(observer2);
@@ -435,7 +436,7 @@ public void testFirstNeverProduces() {
435436

436437
Observer<Object> observer = mock(Observer.class);
437438

438-
source.subscribe(observer);
439+
source.subscribe(new TestObserver<Object>(observer));
439440

440441
InOrder inOrder = inOrder(observer);
441442

@@ -458,7 +459,7 @@ public void testSecondNeverProduces() {
458459

459460
Observer<Object> observer = mock(Observer.class);
460461

461-
source.subscribe(observer);
462+
source.subscribe(new TestObserver<Object>(observer));
462463

463464
InOrder inOrder = inOrder(observer);
464465

rxjava-core/src/test/java/rx/operators/OperationConcatTest.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import rx.Observable;
3434
import rx.Observer;
3535
import rx.Subscription;
36+
import rx.observers.TestObserver;
3637
import rx.schedulers.TestScheduler;
3738
import rx.subscriptions.BooleanSubscription;
3839

@@ -308,7 +309,7 @@ public void testConcatConcurrentWithInfinity() {
308309

309310
Observable<String> concat = Observable.create(concatF);
310311

311-
concat.take(50).subscribe(aObserver);
312+
concat.take(50).subscribe(new TestObserver<String>(aObserver));
312313

313314
//Wait for the thread to start up.
314315
try {
@@ -404,7 +405,7 @@ public void testConcatUnsubscribe() {
404405

405406
try {
406407
// Subscribe
407-
s1.wrap(concat.subscribe(aObserver));
408+
s1.wrap(concat.subscribe(new TestObserver<String>(aObserver)));
408409
//Block main thread to allow observable "w1" to complete and observable "w2" to call onNext once.
409410
callOnce.await();
410411
// Unsubcribe
@@ -447,7 +448,7 @@ public void testConcatUnsubscribeConcurrent() {
447448

448449
Observable<String> concat = Observable.create(concatF);
449450

450-
Subscription s1 = concat.subscribe(aObserver);
451+
Subscription s1 = concat.subscribe(new TestObserver<String>(aObserver));
451452

452453
try {
453454
//Block main thread to allow observable "w1" to complete and observable "w2" to call onNext exactly once.
@@ -565,8 +566,8 @@ public void testMultipleObservers() {
565566
Observable<Long> timer = Observable.interval(500, TimeUnit.MILLISECONDS, s).take(2);
566567
Observable<Long> o = Observable.concat(timer, timer);
567568

568-
o.subscribe(o1);
569-
o.subscribe(o2);
569+
o.subscribe(new TestObserver<Object>(o1));
570+
o.subscribe(new TestObserver<Object>(o2));
570571

571572
InOrder inOrder1 = inOrder(o1);
572573
InOrder inOrder2 = inOrder(o2);

0 commit comments

Comments
 (0)