Skip to content

Commit 90f814f

Browse files
committed
Using inner and fix unit tests
1 parent 8915b8b commit 90f814f

File tree

2 files changed

+40
-69
lines changed

2 files changed

+40
-69
lines changed

rxjava-core/src/main/java/rx/operators/OperatorSubscribeOn.java

Lines changed: 34 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -58,44 +58,41 @@ public void onNext(final Observable<T> o) {
5858

5959
@Override
6060
public void call(final Inner inner) {
61-
if (!inner.isUnsubscribed()) {
62-
final CompositeSubscription cs = new CompositeSubscription();
63-
subscriber.add(Subscriptions.create(new Action0() {
64-
65-
@Override
66-
public void call() {
67-
scheduler.schedule(new Action1<Inner>() {
68-
69-
@Override
70-
public void call(final Inner inner) {
71-
cs.unsubscribe();
72-
}
73-
74-
});
75-
}
76-
77-
}));
78-
cs.add(subscriber);
79-
o.subscribe(new Subscriber<T>(cs) {
80-
81-
@Override
82-
public void onCompleted() {
83-
subscriber.onCompleted();
84-
}
85-
86-
@Override
87-
public void onError(Throwable e) {
88-
subscriber.onError(e);
89-
}
90-
91-
@Override
92-
public void onNext(T t) {
93-
subscriber.onNext(t);
94-
}
95-
});
96-
}
61+
final CompositeSubscription cs = new CompositeSubscription();
62+
subscriber.add(Subscriptions.create(new Action0() {
63+
64+
@Override
65+
public void call() {
66+
inner.schedule(new Action1<Inner>() {
67+
68+
@Override
69+
public void call(final Inner inner) {
70+
cs.unsubscribe();
71+
}
72+
73+
});
74+
}
75+
76+
}));
77+
cs.add(subscriber);
78+
o.subscribe(new Subscriber<T>(cs) {
79+
80+
@Override
81+
public void onCompleted() {
82+
subscriber.onCompleted();
83+
}
84+
85+
@Override
86+
public void onError(Throwable e) {
87+
subscriber.onError(e);
88+
}
89+
90+
@Override
91+
public void onNext(T t) {
92+
subscriber.onNext(t);
93+
}
94+
});
9795
}
98-
9996
});
10097
}
10198

rxjava-core/src/test/java/rx/operators/OperatorSubscribeOnTest.java

Lines changed: 6 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -18,53 +18,25 @@
1818
import static org.junit.Assert.assertEquals;
1919
import static org.junit.Assert.assertNotNull;
2020
import static org.junit.Assert.assertNotSame;
21-
import static org.mockito.Matchers.isA;
22-
import static org.mockito.Mockito.inOrder;
23-
import static org.mockito.Mockito.spy;
24-
import static org.mockito.Mockito.times;
21+
import static org.junit.Assert.assertTrue;
2522

2623
import java.util.Arrays;
2724
import java.util.concurrent.CountDownLatch;
2825
import java.util.concurrent.atomic.AtomicReference;
2926

3027
import org.junit.Test;
31-
import org.mockito.InOrder;
3228

3329
import rx.Observable;
3430
import rx.Observable.OnSubscribe;
35-
import rx.Scheduler;
3631
import rx.Subscriber;
3732
import rx.Subscription;
3833
import rx.observers.TestObserver;
3934
import rx.schedulers.Schedulers;
4035
import rx.subscriptions.Subscriptions;
41-
import rx.test.OperatorTester;
4236
import rx.util.functions.Action0;
43-
import rx.util.functions.Action1;
4437

4538
public class OperatorSubscribeOnTest {
4639

47-
@Test
48-
@SuppressWarnings("unchecked")
49-
public void testSubscribeOn() {
50-
Observable<Integer> w = Observable.from(Arrays.asList(1, 2, 3));
51-
52-
Scheduler scheduler = spy(OperatorTester.forwardingScheduler(Schedulers
53-
.immediate()));
54-
55-
TestObserver<Integer> observer = new TestObserver<Integer>();
56-
w.subscribeOn(scheduler).subscribe(observer);
57-
58-
InOrder inOrder = inOrder(scheduler);
59-
// The first one is for "subscribe", the second one is for
60-
// "unsubscribe".
61-
inOrder.verify(scheduler, times(2)).schedule(isA(Action1.class));
62-
inOrder.verifyNoMoreInteractions();
63-
64-
observer.assertReceivedOnNext(Arrays.asList(1, 2, 3));
65-
observer.assertTerminalEvent();
66-
}
67-
6840
private class ThreadSubscription implements Subscription {
6941
private volatile Thread thread;
7042

@@ -114,15 +86,17 @@ public void call(Subscriber<? super Integer> t1) {
11486
});
11587

11688
TestObserver<Integer> observer = new TestObserver<Integer>();
117-
w.subscribeOn(Schedulers.computation()).subscribe(observer);
89+
w.subscribeOn(Schedulers.newThread()).subscribe(observer);
11890

11991
Thread unsubscribeThread = subscription.getThread();
12092

12193
assertNotNull(unsubscribeThread);
12294
assertNotSame(Thread.currentThread(), unsubscribeThread);
12395

124-
assertNotNull(subscribeThread);
125-
assertNotSame(Thread.currentThread(), subscribeThread);
96+
assertNotNull(subscribeThread.get());
97+
assertNotSame(Thread.currentThread(), subscribeThread.get());
98+
// True for Schedulers.newThread()
99+
assertTrue(unsubscribeThread == subscribeThread.get());
126100

127101
observer.assertReceivedOnNext(Arrays.asList(1, 2));
128102
observer.assertTerminalEvent();

0 commit comments

Comments
 (0)