File tree Expand file tree Collapse file tree 2 files changed +67
-0
lines changed
src/test/java/rx/internal/operators Expand file tree Collapse file tree 2 files changed +67
-0
lines changed Original file line number Diff line number Diff line change @@ -309,4 +309,39 @@ public void run() {
309309 }
310310
311311 }
312+
313+ @ Test
314+ public void testBackpressure () {
315+ TestSubscriber <Integer > ts = new TestSubscriber <Integer >();
316+ Observable .range (0 , 100000 )
317+ .onErrorResumeNext (new Func1 <Throwable , Observable <Integer >>() {
318+
319+ @ Override
320+ public Observable <Integer > call (Throwable t1 ) {
321+ return Observable .just (1 );
322+ }
323+
324+ })
325+ .observeOn (Schedulers .computation ())
326+ .map (new Func1 <Integer , Integer >() {
327+ int c = 0 ;
328+
329+ @ Override
330+ public Integer call (Integer t1 ) {
331+ if (c ++ <= 1 ) {
332+ // slow
333+ try {
334+ Thread .sleep (500 );
335+ } catch (InterruptedException e ) {
336+ e .printStackTrace ();
337+ }
338+ }
339+ return t1 ;
340+ }
341+
342+ })
343+ .subscribe (ts );
344+ ts .awaitTerminalEvent ();
345+ ts .assertNoErrors ();
346+ }
312347}
Original file line number Diff line number Diff line change 2121import static org .mockito .Mockito .times ;
2222import static org .mockito .Mockito .verify ;
2323
24+ import java .util .concurrent .TimeUnit ;
25+
2426import org .junit .Test ;
2527import org .mockito .Mockito ;
2628
2931import rx .Subscriber ;
3032import rx .Subscription ;
3133import rx .functions .Func1 ;
34+ import rx .observers .TestSubscriber ;
35+ import rx .schedulers .Schedulers ;
3236
3337public class OperatorOnErrorResumeNextViaObservableTest {
3438
@@ -143,4 +147,32 @@ public void run() {
143147 System .out .println ("done starting TestObservable thread" );
144148 }
145149 }
150+
151+ @ Test
152+ public void testBackpressure () {
153+ TestSubscriber <Integer > ts = new TestSubscriber <Integer >();
154+ Observable .range (0 , 100000 )
155+ .onErrorResumeNext (Observable .just (1 ))
156+ .observeOn (Schedulers .computation ())
157+ .map (new Func1 <Integer , Integer >() {
158+ int c = 0 ;
159+
160+ @ Override
161+ public Integer call (Integer t1 ) {
162+ if (c ++ <= 1 ) {
163+ // slow
164+ try {
165+ Thread .sleep (500 );
166+ } catch (InterruptedException e ) {
167+ e .printStackTrace ();
168+ }
169+ }
170+ return t1 ;
171+ }
172+
173+ })
174+ .subscribe (ts );
175+ ts .awaitTerminalEvent ();
176+ ts .assertNoErrors ();
177+ }
146178}
You can’t perform that action at this time.
0 commit comments