Skip to content

Commit ee94970

Browse files
Merge pull request #332 from benjchristensen/issue-329-unit-tests
Issue 329: Fix non-deterministic unit tests
2 parents 4f24e3d + 5b5aade commit ee94970

File tree

5 files changed

+32
-52
lines changed

5 files changed

+32
-52
lines changed

rxjava-core/src/main/java/rx/operators/OperationGroupBy.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -548,8 +548,8 @@ public void onNext(String outputMessage) {
548548
// sentEvents will go until 'eventCounter' hits 20 and then unsubscribes
549549
// which means it will also send (but ignore) the 19/20 events for the other group
550550
// It will not however send all 100 events.
551-
assertEquals(39, sentEventCounter.get(), 2);
552-
// gave it a delta of 2 so the threading/unsubscription race has wiggle
551+
assertEquals(39, sentEventCounter.get(), 10);
552+
// gave it a delta of 10 to account for the threading/unsubscription race condition which can vary depending on a machines performance, thread-scheduler, etc
553553
}
554554

555555
private static class Event {

rxjava-core/src/main/java/rx/operators/OperationMaterialize.java

Lines changed: 7 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import java.util.List;
2121
import java.util.Vector;
22+
import java.util.concurrent.ExecutionException;
2223

2324
import org.junit.Test;
2425

@@ -139,25 +140,13 @@ public void testMaterialize2() {
139140
}
140141

141142
@Test
142-
public void testMultipleSubscribes() {
143-
final TestAsyncErrorObservable o1 = new TestAsyncErrorObservable("one", "two", null, "three");
144-
145-
Observable<Notification<String>> m = Observable.create(materialize(o1));
146-
147-
TestObserver Observer1 = new TestObserver();
148-
m.subscribe(Observer1);
149-
150-
TestObserver Observer2 = new TestObserver();
151-
m.subscribe(Observer2);
143+
public void testMultipleSubscribes() throws InterruptedException, ExecutionException {
144+
final TestAsyncErrorObservable o = new TestAsyncErrorObservable("one", "two", null, "three");
152145

153-
try {
154-
o1.t.join();
155-
} catch (InterruptedException e) {
156-
throw new RuntimeException(e);
157-
}
146+
Observable<Notification<String>> m = Observable.create(materialize(o));
158147

159-
assertEquals(3, Observer1.notifications.size());
160-
assertEquals(3, Observer2.notifications.size());
148+
assertEquals(3, m.toList().toBlockingObservable().toFuture().get().size());
149+
assertEquals(3, m.toList().toBlockingObservable().toFuture().get().size());
161150
}
162151

163152
}
@@ -193,7 +182,7 @@ private static class TestAsyncErrorObservable extends Observable<String> {
193182
valuesToReturn = values;
194183
}
195184

196-
Thread t;
185+
volatile Thread t;
197186

198187
@Override
199188
public Subscription subscribe(final Observer<String> observer) {

rxjava-core/src/main/java/rx/operators/OperationNext.java

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import java.util.concurrent.ExecutorService;
2828
import java.util.concurrent.Executors;
2929
import java.util.concurrent.Future;
30+
import java.util.concurrent.TimeUnit;
3031
import java.util.concurrent.atomic.AtomicBoolean;
3132
import java.util.concurrent.atomic.AtomicInteger;
3233

@@ -313,6 +314,8 @@ private static class TestException extends RuntimeException {
313314
@Test
314315
public void testNoBufferingOrBlockingOfSequence() throws Throwable {
315316
final CountDownLatch finished = new CountDownLatch(1);
317+
final int COUNT = 30;
318+
final CountDownLatch timeHasPassed = new CountDownLatch(COUNT);
316319
final AtomicBoolean running = new AtomicBoolean(true);
317320
final AtomicInteger count = new AtomicInteger(0);
318321
final Observable<Integer> obs = Observable.create(new Func1<Observer<Integer>, Subscription>() {
@@ -326,7 +329,7 @@ public void run() {
326329
try {
327330
while (running.get()) {
328331
o.onNext(count.incrementAndGet());
329-
Thread.sleep(0, 100);
332+
timeHasPassed.countDown();
330333
}
331334
o.onCompleted();
332335
} catch (Throwable e) {
@@ -350,19 +353,14 @@ public void run() {
350353
// we should have a different value
351354
assertTrue("a and b should be different", a != b);
352355

353-
// wait for some time
354-
Thread.sleep(100);
355-
// make sure the counter in the observable has increased beyond b
356-
while (count.get() <= (b + 10)) {
357-
Thread.sleep(100);
358-
}
356+
// wait for some time (if times out we are blocked somewhere so fail ... set very high for very slow, constrained machines)
357+
timeHasPassed.await(8000, TimeUnit.MILLISECONDS);
359358

360359
assertTrue(it.hasNext());
361-
int expectedHigherThan = count.get();
362360
int c = it.next();
363361

364362
assertTrue("c should not just be the next in sequence", c != (b + 1));
365-
assertTrue("expected that c [" + c + "] is higher than " + expectedHigherThan, c > expectedHigherThan);
363+
assertTrue("expected that c [" + c + "] is higher than or equal to " + COUNT, c >= COUNT);
366364

367365
assertTrue(it.hasNext());
368366

rxjava-core/src/main/java/rx/plugins/RxJavaPlugins.java

Lines changed: 13 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -153,33 +153,27 @@ private static Object getPluginImplementationViaProperty(Class<?> pluginClass) {
153153

154154
public static class UnitTest {
155155

156-
@After
157-
@Before
158-
public void reset() {
159-
// use private access to reset so we can test different initializations via the public static flow
160-
RxJavaPlugins.getInstance().errorHandler.set(null);
161-
RxJavaPlugins.getInstance().observableExecutionHook.set(null);
162-
}
163-
164156
@Test
165157
public void testErrorHandlerDefaultImpl() {
166-
RxJavaErrorHandler impl = RxJavaPlugins.getInstance().getErrorHandler();
158+
RxJavaErrorHandler impl = new RxJavaPlugins().getErrorHandler();
167159
assertTrue(impl instanceof RxJavaErrorHandlerDefault);
168160
}
169161

170162
@Test
171163
public void testErrorHandlerViaRegisterMethod() {
172-
RxJavaPlugins.getInstance().registerErrorHandler(new RxJavaErrorHandlerTestImpl());
173-
RxJavaErrorHandler impl = RxJavaPlugins.getInstance().getErrorHandler();
164+
RxJavaPlugins p = new RxJavaPlugins();
165+
p.registerErrorHandler(new RxJavaErrorHandlerTestImpl());
166+
RxJavaErrorHandler impl = p.getErrorHandler();
174167
assertTrue(impl instanceof RxJavaErrorHandlerTestImpl);
175168
}
176169

177170
@Test
178171
public void testErrorHandlerViaProperty() {
179172
try {
173+
RxJavaPlugins p = new RxJavaPlugins();
180174
String fullClass = getFullClassNameForTestClass(RxJavaErrorHandlerTestImpl.class);
181175
System.setProperty("rxjava.plugin.RxJavaErrorHandler.implementation", fullClass);
182-
RxJavaErrorHandler impl = RxJavaPlugins.getInstance().getErrorHandler();
176+
RxJavaErrorHandler impl = p.getErrorHandler();
183177
assertTrue(impl instanceof RxJavaErrorHandlerTestImpl);
184178
} finally {
185179
System.clearProperty("rxjava.plugin.RxJavaErrorHandler.implementation");
@@ -193,23 +187,26 @@ public static class RxJavaErrorHandlerTestImpl extends RxJavaErrorHandler {
193187

194188
@Test
195189
public void testObservableExecutionHookDefaultImpl() {
196-
RxJavaObservableExecutionHook impl = RxJavaPlugins.getInstance().getObservableExecutionHook();
190+
RxJavaPlugins p = new RxJavaPlugins();
191+
RxJavaObservableExecutionHook impl = p.getObservableExecutionHook();
197192
assertTrue(impl instanceof RxJavaObservableExecutionHookDefault);
198193
}
199194

200195
@Test
201196
public void testObservableExecutionHookViaRegisterMethod() {
202-
RxJavaPlugins.getInstance().registerObservableExecutionHook(new RxJavaObservableExecutionHookTestImpl());
203-
RxJavaObservableExecutionHook impl = RxJavaPlugins.getInstance().getObservableExecutionHook();
197+
RxJavaPlugins p = new RxJavaPlugins();
198+
p.registerObservableExecutionHook(new RxJavaObservableExecutionHookTestImpl());
199+
RxJavaObservableExecutionHook impl = p.getObservableExecutionHook();
204200
assertTrue(impl instanceof RxJavaObservableExecutionHookTestImpl);
205201
}
206202

207203
@Test
208204
public void testObservableExecutionHookViaProperty() {
209205
try {
206+
RxJavaPlugins p = new RxJavaPlugins();
210207
String fullClass = getFullClassNameForTestClass(RxJavaObservableExecutionHookTestImpl.class);
211208
System.setProperty("rxjava.plugin.RxJavaObservableExecutionHook.implementation", fullClass);
212-
RxJavaObservableExecutionHook impl = RxJavaPlugins.getInstance().getObservableExecutionHook();
209+
RxJavaObservableExecutionHook impl = p.getObservableExecutionHook();
213210
assertTrue(impl instanceof RxJavaObservableExecutionHookTestImpl);
214211
} finally {
215212
System.clearProperty("rxjava.plugin.RxJavaErrorHandler.implementation");

rxjava-core/src/test/java/rx/concurrency/TestSchedulers.java

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -306,12 +306,7 @@ public Subscription call(Scheduler scheduler, BooleanSubscription cancel) {
306306
observer.onNext(42);
307307
latch.countDown();
308308

309-
try {
310-
Thread.sleep(1);
311-
} catch (InterruptedException e) {
312-
e.printStackTrace();
313-
}
314-
309+
// this will recursively schedule this task for execution again
315310
scheduler.schedule(cancel, this);
316311

317312
return cancel;
@@ -353,7 +348,8 @@ public void onNext(Integer args) {
353348
fail("Timed out waiting on completion latch");
354349
}
355350

356-
assertEquals(10, count.get()); // wondering if this could be 11 in a race condition (which would be okay due to how unsubscribe works ... just it would make this test non-deterministic)
351+
// the count can be 10 or higher due to thread scheduling of the unsubscribe vs the scheduler looping to emit the count
352+
assertTrue(count.get() >= 10);
357353
assertTrue(completed.get());
358354
}
359355

0 commit comments

Comments
 (0)