Skip to content

Commit d6bc923

Browse files
davidmotenakarnokd
authored andcommitted
add TestSubscriber2 to provide method chained version of TestSubscriber and support Observable.test() (#4777)
1 parent 6385051 commit d6bc923

File tree

6 files changed

+648
-0
lines changed

6 files changed

+648
-0
lines changed

src/main/java/rx/Completable.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import rx.annotations.*;
2424
import rx.exceptions.*;
2525
import rx.functions.*;
26+
import rx.internal.observers.AssertableSubscriberObservable;
2627
import rx.internal.operators.*;
2728
import rx.internal.util.*;
2829
import rx.observers.*;
@@ -2364,4 +2365,26 @@ public void call() {
23642365
}
23652366
});
23662367
}
2368+
2369+
// -------------------------------------------------------------------------
2370+
// Fluent test support, super handy and reduces test preparation boilerplate
2371+
// -------------------------------------------------------------------------
2372+
/**
2373+
* Creates an AssertableSubscriber that requests {@code Long.MAX_VALUE} and subscribes
2374+
* it to this Observable.
2375+
* <dl>
2376+
* <dt><b>Backpressure:</b><dt>
2377+
* <dd>The returned AssertableSubscriber consumes this Observable in an unbounded fashion.</dd>
2378+
* <dt><b>Scheduler:</b></dt>
2379+
* <dd>{@code test} does not operate by default on a particular {@link Scheduler}.</dd>
2380+
* </dl>
2381+
* @return the new AssertableSubscriber instance
2382+
* @since 1.2.3
2383+
*/
2384+
@Experimental
2385+
public final AssertableSubscriber<Void> test() {
2386+
AssertableSubscriberObservable<Void> ts = AssertableSubscriberObservable.create(Long.MAX_VALUE);
2387+
subscribe(ts);
2388+
return ts;
2389+
}
23672390
}

src/main/java/rx/Observable.java

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,12 @@
1818
import rx.annotations.*;
1919
import rx.exceptions.*;
2020
import rx.functions.*;
21+
import rx.internal.observers.AssertableSubscriberObservable;
2122
import rx.internal.operators.*;
2223
import rx.internal.util.*;
2324
import rx.observables.*;
2425
import rx.observers.SafeSubscriber;
26+
import rx.observers.AssertableSubscriber;
2527
import rx.plugins.*;
2628
import rx.schedulers.*;
2729
import rx.subscriptions.Subscriptions;
@@ -12640,4 +12642,45 @@ public final <T2, R> Observable<R> zipWith(Iterable<? extends T2> other, Func2<?
1264012642
public final <T2, R> Observable<R> zipWith(Observable<? extends T2> other, Func2<? super T, ? super T2, ? extends R> zipFunction) {
1264112643
return (Observable<R>)zip(this, other, zipFunction);
1264212644
}
12645+
12646+
// -------------------------------------------------------------------------
12647+
// Fluent test support, super handy and reduces test preparation boilerplate
12648+
// -------------------------------------------------------------------------
12649+
/**
12650+
* Creates a AssertableSubscriber that requests {@code Long.MAX_VALUE} and subscribes
12651+
* it to this Observable.
12652+
* <dl>
12653+
* <dt><b>Backpressure:</b><dt>
12654+
* <dd>The returned AssertableSubscriber consumes this Observable in an unbounded fashion.</dd>
12655+
* <dt><b>Scheduler:</b></dt>
12656+
* <dd>{@code test} does not operate by default on a particular {@link Scheduler}.</dd>
12657+
* </dl>
12658+
* @return the new AssertableSubscriber instance
12659+
* @since 1.2.3
12660+
*/
12661+
@Experimental
12662+
public final AssertableSubscriber<T> test() {
12663+
AssertableSubscriber<T> ts = AssertableSubscriberObservable.create(Long.MAX_VALUE);
12664+
subscribe(ts);
12665+
return ts;
12666+
}
12667+
12668+
/**
12669+
* Creates an AssertableSubscriber with the initial request amount and subscribes
12670+
* it to this Observable.
12671+
* <dl>
12672+
* <dt><b>Backpressure:</b><dt>
12673+
* <dd>The returned AssertableSubscriber requests the given {@code initialRequest} amount upfront.</dd>
12674+
* <dt><b>Scheduler:</b></dt>
12675+
* <dd>{@code test} does not operate by default on a particular {@link Scheduler}.</dd>
12676+
* </dl>
12677+
* @return the new AssertableSubscriber instance
12678+
* @since 1.2.3
12679+
*/
12680+
@Experimental
12681+
public final AssertableSubscriber<T> test(long initialRequestAmount) {
12682+
AssertableSubscriber<T> ts = AssertableSubscriberObservable.create(initialRequestAmount);
12683+
subscribe(ts);
12684+
return ts;
12685+
}
1264312686
}

src/main/java/rx/Single.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import rx.annotations.*;
2020
import rx.exceptions.*;
2121
import rx.functions.*;
22+
import rx.internal.observers.AssertableSubscriberObservable;
2223
import rx.internal.operators.*;
2324
import rx.internal.util.*;
2425
import rx.observables.ConnectableObservable;
@@ -2675,4 +2676,26 @@ public final Single<T> delaySubscription(Observable<?> other) {
26752676
}
26762677
return create(new SingleOnSubscribeDelaySubscriptionOther<T>(this, other));
26772678
}
2679+
2680+
// -------------------------------------------------------------------------
2681+
// Fluent test support, super handy and reduces test preparation boilerplate
2682+
// -------------------------------------------------------------------------
2683+
/**
2684+
* Creates an AssertableSubscriber that requests {@code Long.MAX_VALUE} and subscribes
2685+
* it to this Observable.
2686+
* <dl>
2687+
* <dt><b>Backpressure:</b><dt>
2688+
* <dd>The returned AssertableSubscriber consumes this Observable in an unbounded fashion.</dd>
2689+
* <dt><b>Scheduler:</b></dt>
2690+
* <dd>{@code test} does not operate by default on a particular {@link Scheduler}.</dd>
2691+
* </dl>
2692+
* @return the new AssertableSubscriber instance
2693+
* @since 1.2.3
2694+
*/
2695+
@Experimental
2696+
public final AssertableSubscriber<T> test() {
2697+
AssertableSubscriberObservable<T> ts = AssertableSubscriberObservable.create(Long.MAX_VALUE);
2698+
subscribe(ts);
2699+
return ts;
2700+
}
26782701
}

0 commit comments

Comments
 (0)