Skip to content

Commit eea02d8

Browse files
Fast Producer / Slow Consumer Backpressure Tests for ObserveOn
1 parent df7bf90 commit eea02d8

File tree

3 files changed

+101
-67
lines changed

3 files changed

+101
-67
lines changed

rxjava-core/src/main/java/rx/schedulers/Schedulers.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -183,4 +183,8 @@ public Thread newThread(Runnable r) {
183183

184184
return result;
185185
}
186+
187+
public static TestScheduler test() {
188+
return new TestScheduler();
189+
}
186190
}

rxjava-core/src/test/java/rx/operators/OperatorObserveOnTest.java

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,15 @@
2929
import org.mockito.stubbing.Answer;
3030

3131
import rx.Observable;
32+
import rx.Observable.OnSubscribeFunc;
3233
import rx.Observer;
3334
import rx.Scheduler;
35+
import rx.Subscription;
36+
import rx.schedulers.ImmediateScheduler;
3437
import rx.schedulers.Schedulers;
3538
import rx.schedulers.TestScheduler;
39+
import rx.schedulers.TrampolineScheduler;
40+
import rx.subscriptions.BooleanSubscription;
3641
import rx.util.functions.Action0;
3742
import rx.util.functions.Action1;
3843
import rx.util.functions.Func1;
@@ -305,6 +310,98 @@ public void call(Integer t1) {
305310
});
306311
}
307312

313+
@Test
314+
public final void testBackpressureOnFastProducerSlowConsumerWithUnsubscribeNewThread() throws InterruptedException {
315+
testBackpressureOnFastProducerSlowConsumerWithUnsubscribe(Schedulers.newThread());
316+
}
317+
318+
@Test
319+
public final void testBackpressureOnFastProducerSlowConsumerWithUnsubscribeIO() throws InterruptedException {
320+
testBackpressureOnFastProducerSlowConsumerWithUnsubscribe(Schedulers.io());
321+
}
322+
323+
@Test
324+
public final void testBackpressureOnFastProducerSlowConsumerWithUnsubscribeTrampoline() throws InterruptedException {
325+
testBackpressureOnFastProducerSlowConsumerWithUnsubscribe(Schedulers.trampoline());
326+
}
327+
328+
@Test
329+
public final void testBackpressureOnFastProducerSlowConsumerWithUnsubscribeTestScheduler() throws InterruptedException {
330+
testBackpressureOnFastProducerSlowConsumerWithUnsubscribe(Schedulers.test());
331+
}
332+
333+
@Test
334+
public final void testBackpressureOnFastProducerSlowConsumerWithUnsubscribeComputation() throws InterruptedException {
335+
testBackpressureOnFastProducerSlowConsumerWithUnsubscribe(Schedulers.computation());
336+
}
337+
338+
private final void testBackpressureOnFastProducerSlowConsumerWithUnsubscribe(Scheduler scheduler) throws InterruptedException {
339+
final AtomicInteger countEmitted = new AtomicInteger();
340+
final AtomicInteger countTaken = new AtomicInteger();
341+
int value = Observable.create(new OnSubscribeFunc<Integer>() {
342+
343+
@Override
344+
public Subscription onSubscribe(final Observer<? super Integer> o) {
345+
final BooleanSubscription s = BooleanSubscription.create();
346+
Thread t = new Thread(new Runnable() {
347+
348+
@Override
349+
public void run() {
350+
int i = 1;
351+
while (!s.isUnsubscribed() && i <= 100) {
352+
System.out.println("onNext from fast producer [" + Thread.currentThread() + "]: " + i);
353+
o.onNext(i++);
354+
}
355+
o.onCompleted();
356+
}
357+
});
358+
t.setDaemon(true);
359+
t.start();
360+
return s;
361+
}
362+
}).doOnNext(new Action1<Integer>() {
363+
364+
@Override
365+
public void call(Integer i) {
366+
countEmitted.incrementAndGet();
367+
}
368+
}).doOnCompleted(new Action0() {
369+
370+
@Override
371+
public void call() {
372+
System.out.println("-------- Done Emitting from Source ---------");
373+
}
374+
}).observeOn(scheduler).doOnNext(new Action1<Integer>() {
375+
376+
@Override
377+
public void call(Integer i) {
378+
System.out.println(">> onNext to slowConsumer [" + Thread.currentThread() + "] pre-take: " + i);
379+
//force it to be slower than the producer
380+
try {
381+
Thread.sleep(10);
382+
} catch (InterruptedException e) {
383+
e.printStackTrace();
384+
}
385+
countTaken.incrementAndGet();
386+
}
387+
}).take(10).toBlockingObservable().last();
388+
389+
if (scheduler instanceof TrampolineScheduler || scheduler instanceof ImmediateScheduler || scheduler instanceof TestScheduler) {
390+
// since there is no concurrency it will block and only emit as many as it can process
391+
assertEquals(10, countEmitted.get());
392+
} else {
393+
// the others with concurrency should not emit all 100 ... but 10 + 2 in the pipeline
394+
// NOTE: The +2 could change if the implementation of the queue logic changes. See Javadoc at top of class.
395+
assertEquals(12, countEmitted.get());
396+
}
397+
// number received after take (but take will filter any extra)
398+
assertEquals(10, value);
399+
// so we also want to check the doOnNext after observeOn to see if it got unsubscribed
400+
Thread.sleep(200); // let time pass to see if the scheduler is still doing work
401+
// we expect only 10 to make it through the observeOn side
402+
assertEquals(10, countTaken.get());
403+
}
404+
308405
private static int randomIntFrom0to100() {
309406
// XORShift instead of Math.random http://javamex.com/tutorials/random_numbers/xorshift.shtml
310407
long x = System.nanoTime();

rxjava-core/src/test/java/rx/schedulers/AbstractSchedulerTests.java

Lines changed: 0 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -55,73 +55,6 @@ public abstract class AbstractSchedulerTests {
5555
*/
5656
protected abstract Scheduler getScheduler();
5757

58-
@Test
59-
public final void unsubscribeWithFastProducerWithSlowConsumerCausingQueuing() throws InterruptedException {
60-
final AtomicInteger countEmitted = new AtomicInteger();
61-
final AtomicInteger countTaken = new AtomicInteger();
62-
int value = Observable.create(new OnSubscribeFunc<Integer>() {
63-
64-
@Override
65-
public Subscription onSubscribe(final Observer<? super Integer> o) {
66-
final BooleanSubscription s = BooleanSubscription.create();
67-
Thread t = new Thread(new Runnable() {
68-
69-
@Override
70-
public void run() {
71-
int i = 1;
72-
while (!s.isUnsubscribed() && i <= 100) {
73-
System.out.println("onNext from fast producer: " + i);
74-
o.onNext(i++);
75-
}
76-
o.onCompleted();
77-
}
78-
});
79-
t.setDaemon(true);
80-
t.start();
81-
return s;
82-
}
83-
}).doOnNext(new Action1<Integer>() {
84-
85-
@Override
86-
public void call(Integer i) {
87-
countEmitted.incrementAndGet();
88-
}
89-
}).doOnCompleted(new Action0() {
90-
91-
@Override
92-
public void call() {
93-
System.out.println("-------- Done Emitting from Source ---------");
94-
}
95-
}).observeOn(getScheduler()).doOnNext(new Action1<Integer>() {
96-
97-
@Override
98-
public void call(Integer i) {
99-
System.out.println(">> onNext to slowConsumer pre-take: " + i);
100-
//force it to be slower than the producer
101-
try {
102-
Thread.sleep(10);
103-
} catch (InterruptedException e) {
104-
e.printStackTrace();
105-
}
106-
countTaken.incrementAndGet();
107-
}
108-
}).take(10).toBlockingObservable().last();
109-
110-
if (getScheduler() instanceof TrampolineScheduler || getScheduler() instanceof ImmediateScheduler) {
111-
// since there is no concurrency it will block and only emit as many as it can process
112-
assertEquals(10, countEmitted.get());
113-
} else {
114-
// they will all emit because the consumer is running slow
115-
assertEquals(100, countEmitted.get());
116-
}
117-
// number received after take (but take will filter any extra)
118-
assertEquals(10, value);
119-
// so we also want to check the doOnNext after observeOn to see if it got unsubscribed
120-
Thread.sleep(200); // let time pass to see if the scheduler is still doing work
121-
// we expect only 10 to make it through the observeOn side
122-
assertEquals(10, countTaken.get());
123-
}
124-
12558
@Test
12659
public void testNestedActions() throws InterruptedException {
12760
Scheduler scheduler = getScheduler();

0 commit comments

Comments
 (0)