Skip to content

Commit 0b51d41

Browse files
JakeWhartonakarnokd
authored andcommitted
Document and test amb subscription ordering. (#5047)
1 parent 0ccc453 commit 0b51d41

File tree

10 files changed

+128
-15
lines changed

10 files changed

+128
-15
lines changed

src/main/java/io/reactivex/Completable.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,8 @@ public abstract class Completable implements CompletableSource {
4646
* <dt><b>Scheduler:</b></dt>
4747
* <dd>{@code ambArray} does not operate by default on a particular {@link Scheduler}.</dd>
4848
* </dl>
49-
* @param sources the array of source Completables
49+
* @param sources the array of source Completables. A subscription to each source will
50+
* occur in the same order as in this array.
5051
* @return the new Completable instance
5152
* @throws NullPointerException if sources is null
5253
*/
@@ -71,7 +72,8 @@ public static Completable ambArray(final CompletableSource... sources) {
7172
* <dt><b>Scheduler:</b></dt>
7273
* <dd>{@code amb} does not operate by default on a particular {@link Scheduler}.</dd>
7374
* </dl>
74-
* @param sources the array of source Completables
75+
* @param sources the array of source Completables. A subscription to each source will
76+
* occur in the same order as in this Iterable.
7577
* @return the new Completable instance
7678
* @throws NullPointerException if sources is null
7779
*/
@@ -776,7 +778,8 @@ public static Completable wrap(CompletableSource source) {
776778
* <dt><b>Scheduler:</b></dt>
777779
* <dd>{@code ambWith} does not operate by default on a particular {@link Scheduler}.</dd>
778780
* </dl>
779-
* @param other the other Completable, not null
781+
* @param other the other Completable, not null. A subscription to this provided source will occur after subscribing
782+
* to the current source.
780783
* @return the new Completable instance
781784
* @throws NullPointerException if other is null
782785
*/

src/main/java/io/reactivex/Flowable.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,8 @@ public abstract class Flowable<T> implements Publisher<T> {
7878
*
7979
* @param <T> the common element type
8080
* @param sources
81-
* an Iterable of Publishers sources competing to react first
81+
* an Iterable of Publishers sources competing to react first. A subscription to each Publisher will
82+
* occur in the same order as in this Iterable.
8283
* @return a Flowable that emits the same sequence as whichever of the source Publishers first
8384
* emitted an item or sent a termination notification
8485
* @see <a href="http://reactivex.io/documentation/operators/amb.html">ReactiveX operators documentation: Amb</a>
@@ -106,7 +107,8 @@ public static <T> Flowable<T> amb(Iterable<? extends Publisher<? extends T>> sou
106107
*
107108
* @param <T> the common element type
108109
* @param sources
109-
* an array of Publisher sources competing to react first
110+
* an array of Publisher sources competing to react first. A subscription to each Publisher will
111+
* occur in the same order as in this Iterable.
110112
* @return a Flowable that emits the same sequence as whichever of the source Publishers first
111113
* emitted an item or sent a termination notification
112114
* @see <a href="http://reactivex.io/documentation/operators/amb.html">ReactiveX operators documentation: Amb</a>
@@ -5131,7 +5133,8 @@ public final Single<Boolean> all(Predicate<? super T> predicate) {
51315133
* </dl>
51325134
*
51335135
* @param other
5134-
* a Publisher competing to react first
5136+
* a Publisher competing to react first. A subscription to this provided Publisher will occur after subscribing
5137+
* to the current Publisher.
51355138
* @return a Flowable that emits the same sequence as whichever of the source Publishers first
51365139
* emitted an item or sent a termination notification
51375140
* @see <a href="http://reactivex.io/documentation/operators/amb.html">ReactiveX operators documentation: Amb</a>

src/main/java/io/reactivex/Maybe.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,8 @@ public abstract class Maybe<T> implements MaybeSource<T> {
5454
* <dd>{@code amb} does not operate by default on a particular {@link Scheduler}.</dd>
5555
* </dl>
5656
* @param <T> the value type
57-
* @param sources the Iterable sequence of sources
57+
* @param sources the Iterable sequence of sources. A subscription to each source will
58+
* occur in the same order as in the Iterable.
5859
* @return the new Maybe instance
5960
*/
6061
@CheckReturnValue
@@ -72,7 +73,8 @@ public static <T> Maybe<T> amb(final Iterable<? extends MaybeSource<? extends T>
7273
* <dd>{@code ambArray} does not operate by default on a particular {@link Scheduler}.</dd>
7374
* </dl>
7475
* @param <T> the value type
75-
* @param sources the array of sources
76+
* @param sources the array of sources. A subscription to each source will
77+
* occur in the same order as in the array.
7678
* @return the new Maybe instance
7779
*/
7880
@CheckReturnValue
@@ -1966,7 +1968,8 @@ public static <T, R> Maybe<R> zipArray(Function<? super Object[], ? extends R> z
19661968
* </dl>
19671969
*
19681970
* @param other
1969-
* a MaybeSource competing to react first
1971+
* a MaybeSource competing to react first. A subscription to this provided source will occur after
1972+
* subscribing to the current source.
19701973
* @return a Maybe that emits the same sequence as whichever of the source MaybeSources first
19711974
* signalled
19721975
* @see <a href="http://reactivex.io/documentation/operators/amb.html">ReactiveX operators documentation: Amb</a>

src/main/java/io/reactivex/Observable.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,8 @@ public abstract class Observable<T> implements ObservableSource<T> {
6969
*
7070
* @param <T> the common element type
7171
* @param sources
72-
* an Iterable of ObservableSources sources competing to react first
72+
* an Iterable of ObservableSource sources competing to react first. A subscription to each source will
73+
* occur in the same order as in the Iterable.
7374
* @return an Observable that emits the same sequence as whichever of the source ObservableSources first
7475
* emitted an item or sent a termination notification
7576
* @see <a href="http://reactivex.io/documentation/operators/amb.html">ReactiveX operators documentation: Amb</a>
@@ -93,7 +94,8 @@ public static <T> Observable<T> amb(Iterable<? extends ObservableSource<? extend
9394
*
9495
* @param <T> the common element type
9596
* @param sources
96-
* an array of ObservableSource sources competing to react first
97+
* an array of ObservableSource sources competing to react first. A subscription to each source will
98+
* occur in the same order as in the array.
9799
* @return an Observable that emits the same sequence as whichever of the source ObservableSources first
98100
* emitted an item or sent a termination notification
99101
* @see <a href="http://reactivex.io/documentation/operators/amb.html">ReactiveX operators documentation: Amb</a>
@@ -4547,7 +4549,8 @@ public final Single<Boolean> all(Predicate<? super T> predicate) {
45474549
* </dl>
45484550
*
45494551
* @param other
4550-
* an ObservableSource competing to react first
4552+
* an ObservableSource competing to react first. A subscription to this provided source will occur after
4553+
* subscribing to the current source.
45514554
* @return an Observable that emits the same sequence as whichever of the source ObservableSources first
45524555
* emitted an item or sent a termination notification
45534556
* @see <a href="http://reactivex.io/documentation/operators/amb.html">ReactiveX operators documentation: Amb</a>

src/main/java/io/reactivex/Single.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,8 @@ public abstract class Single<T> implements SingleSource<T> {
6767
* <dd>{@code amb} does not operate by default on a particular {@link Scheduler}.</dd>
6868
* </dl>
6969
* @param <T> the value type
70-
* @param sources the Iterable sequence of sources
70+
* @param sources the Iterable sequence of sources. A subscription to each source will
71+
* occur in the same order as in this Iterable.
7172
* @return the new Single instance
7273
* @since 2.0
7374
*/
@@ -86,7 +87,8 @@ public static <T> Single<T> amb(final Iterable<? extends SingleSource<? extends
8687
* <dd>{@code ambArray} does not operate by default on a particular {@link Scheduler}.</dd>
8788
* </dl>
8889
* @param <T> the value type
89-
* @param sources the array of sources
90+
* @param sources the array of sources. A subscription to each source will
91+
* occur in the same order as in this array.
9092
* @return the new Single instance
9193
* @since 2.0
9294
*/
@@ -1493,7 +1495,8 @@ public static <T, R> Single<R> zipArray(Function<? super Object[], ? extends R>
14931495
* <dd>{@code ambWith} does not operate by default on a particular {@link Scheduler}.</dd>
14941496
* </dl>
14951497
* @param other the other SingleSource to race for the first emission of success or error
1496-
* @return the new Single instance
1498+
* @return the new Single instance. A subscription to this provided source will occur after subscribing
1499+
* to the current source.
14971500
* @since 2.0
14981501
*/
14991502
@CheckReturnValue

src/test/java/io/reactivex/internal/operators/completable/CompletableAmbTest.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,4 +145,22 @@ public void run() {
145145
}
146146
}
147147

148+
@Test
149+
public void ambWithOrder() {
150+
Completable error = Completable.error(new RuntimeException());
151+
Completable.complete().ambWith(error).test().assertComplete();
152+
}
153+
154+
@Test
155+
public void ambIterableOrder() {
156+
Completable error = Completable.error(new RuntimeException());
157+
Completable.amb(Arrays.asList(Completable.complete(), error)).test().assertComplete();
158+
}
159+
160+
@Test
161+
public void ambArrayOrder() {
162+
Completable error = Completable.error(new RuntimeException());
163+
Completable.ambArray(Completable.complete(), error).test().assertComplete();
164+
}
165+
148166
}

src/test/java/io/reactivex/internal/operators/flowable/FlowableAmbTest.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -697,4 +697,24 @@ public Flowable<Integer> apply(Integer v) throws Exception {
697697
.test()
698698
.assertFailureAndMessage(TestException.class, "next()");
699699
}
700+
701+
@Test
702+
public void ambWithOrder() {
703+
Flowable<Integer> error = Flowable.error(new RuntimeException());
704+
Flowable.just(1).ambWith(error).test().assertValue(1).assertComplete();
705+
}
706+
707+
@SuppressWarnings("unchecked")
708+
@Test
709+
public void ambIterableOrder() {
710+
Flowable<Integer> error = Flowable.error(new RuntimeException());
711+
Flowable.amb(Arrays.asList(Flowable.just(1), error)).test().assertValue(1).assertComplete();
712+
}
713+
714+
@SuppressWarnings("unchecked")
715+
@Test
716+
public void ambArrayOrder() {
717+
Flowable<Integer> error = Flowable.error(new RuntimeException());
718+
Flowable.ambArray(Flowable.just(1), error).test().assertValue(1).assertComplete();
719+
}
700720
}

src/test/java/io/reactivex/internal/operators/observable/ObservableAmbTest.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -363,4 +363,24 @@ public void run() {
363363
}
364364
}
365365
}
366+
367+
@Test
368+
public void ambWithOrder() {
369+
Observable<Integer> error = Observable.error(new RuntimeException());
370+
Observable.just(1).ambWith(error).test().assertValue(1).assertComplete();
371+
}
372+
373+
@SuppressWarnings("unchecked")
374+
@Test
375+
public void ambIterableOrder() {
376+
Observable<Integer> error = Observable.error(new RuntimeException());
377+
Observable.amb(Arrays.asList(Observable.just(1), error)).test().assertValue(1).assertComplete();
378+
}
379+
380+
@SuppressWarnings("unchecked")
381+
@Test
382+
public void ambArrayOrder() {
383+
Observable<Integer> error = Observable.error(new RuntimeException());
384+
Observable.ambArray(Observable.just(1), error).test().assertValue(1).assertComplete();
385+
}
366386
}

src/test/java/io/reactivex/internal/operators/single/SingleAmbTest.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -261,4 +261,24 @@ public void manySources() {
261261
.test()
262262
.assertResult(31);
263263
}
264+
265+
@Test
266+
public void ambWithOrder() {
267+
Single<Integer> error = Single.error(new RuntimeException());
268+
Single.just(1).ambWith(error).test().assertValue(1);
269+
}
270+
271+
@SuppressWarnings("unchecked")
272+
@Test
273+
public void ambIterableOrder() {
274+
Single<Integer> error = Single.error(new RuntimeException());
275+
Single.amb(Arrays.asList(Single.just(1), error)).test().assertValue(1);
276+
}
277+
278+
@SuppressWarnings("unchecked")
279+
@Test
280+
public void ambArrayOrder() {
281+
Single<Integer> error = Single.error(new RuntimeException());
282+
Single.ambArray(Single.just(1), error).test().assertValue(1);
283+
}
264284
}

src/test/java/io/reactivex/maybe/MaybeTest.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1580,6 +1580,26 @@ public void ambArrayOne() {
15801580
assertSame(Maybe.never(), Maybe.ambArray(Maybe.never()));
15811581
}
15821582

1583+
@Test
1584+
public void ambWithOrder() {
1585+
Maybe<Integer> error = Maybe.error(new RuntimeException());
1586+
Maybe.just(1).ambWith(error).test().assertValue(1);
1587+
}
1588+
1589+
@SuppressWarnings("unchecked")
1590+
@Test
1591+
public void ambIterableOrder() {
1592+
Maybe<Integer> error = Maybe.error(new RuntimeException());
1593+
Maybe.amb(Arrays.asList(Maybe.just(1), error)).test().assertValue(1);
1594+
}
1595+
1596+
@SuppressWarnings("unchecked")
1597+
@Test
1598+
public void ambArrayOrder() {
1599+
Maybe<Integer> error = Maybe.error(new RuntimeException());
1600+
Maybe.ambArray(Maybe.just(1), error).test().assertValue(1);
1601+
}
1602+
15831603
@SuppressWarnings("unchecked")
15841604
@Test
15851605
public void ambArray1SignalsSuccess() {

0 commit comments

Comments
 (0)