Skip to content

Commit b8b8334

Browse files
Unit test to assert correct scheduler thread
1 parent d1f0258 commit b8b8334

File tree

1 file changed

+55
-4
lines changed

1 file changed

+55
-4
lines changed

rxjava-core/src/test/java/rx/operators/OperationObserveOnTest.java

Lines changed: 55 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
/**
22
* Copyright 2013 Netflix, Inc.
3-
*
3+
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
66
* You may obtain a copy of the License at
7-
*
8-
* http://www.apache.org/licenses/LICENSE-2.0
9-
*
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
1010
* Unless required by applicable law or agreed to in writing, software
1111
* distributed under the License is distributed on an "AS IS" BASIS,
1212
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -16,6 +16,7 @@
1616
package rx.operators;
1717

1818
import static org.junit.Assert.*;
19+
import static org.mockito.Matchers.*;
1920
import static org.mockito.Mockito.*;
2021
import static rx.operators.OperationObserveOn.*;
2122

@@ -30,6 +31,7 @@
3031
import rx.Observable;
3132
import rx.Observer;
3233
import rx.concurrency.Schedulers;
34+
import rx.util.functions.Action1;
3335

3436
public class OperationObserveOnTest {
3537

@@ -81,4 +83,53 @@ public Void answer(InvocationOnMock invocation) throws Throwable {
8183
inOrder.verify(observer, times(1)).onCompleted();
8284
inOrder.verifyNoMoreInteractions();
8385
}
86+
87+
@Test
88+
@SuppressWarnings("unchecked")
89+
public void testThreadName() throws InterruptedException {
90+
Observable<String> obs = Observable.from("one", null, "two", "three", "four");
91+
92+
Observer<String> observer = mock(Observer.class);
93+
94+
InOrder inOrder = inOrder(observer);
95+
96+
final CountDownLatch completedLatch = new CountDownLatch(1);
97+
doAnswer(new Answer<Void>() {
98+
99+
@Override
100+
public Void answer(InvocationOnMock invocation) throws Throwable {
101+
completedLatch.countDown();
102+
103+
return null;
104+
}
105+
}).when(observer).onCompleted();
106+
107+
doAnswer(new Answer<Void>() {
108+
109+
@Override
110+
public Void answer(InvocationOnMock invocation) throws Throwable {
111+
completedLatch.countDown();
112+
113+
return null;
114+
}
115+
}).when(observer).onError(any(Exception.class));
116+
117+
obs.observeOn(Schedulers.newThread()).doOnEach(new Action1<String>() {
118+
119+
@Override
120+
public void call(String t1) {
121+
String threadName = Thread.currentThread().getName();
122+
boolean correctThreadName = threadName.startsWith("RxNewThreadScheduler");
123+
System.out.println("ThreadName: " + threadName + " Correct => " + correctThreadName);
124+
assertTrue(correctThreadName);
125+
}
126+
127+
}).subscribe(observer);
128+
129+
if (!completedLatch.await(1000, TimeUnit.MILLISECONDS)) {
130+
fail("timed out waiting");
131+
}
132+
133+
inOrder.verify(observer, times(1)).onCompleted();
134+
}
84135
}

0 commit comments

Comments
 (0)