Skip to content

Commit d1cd153

Browse files
authored
2.x: make Obs.combineLatest consistent with Flowable + doc cornercase (#4987)
* 2.x: make Obs.combineLatest consistent with Flowable + doc cornercase * early termination to cancel the other sources
1 parent 3768a53 commit d1cd153

File tree

6 files changed

+449
-4
lines changed

6 files changed

+449
-4
lines changed

src/main/java/io/reactivex/Flowable.java

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,10 @@ public static int bufferSize() {
143143
* Note on method signature: since Java doesn't allow creating a generic array with {@code new T[]}, the
144144
* implementation of this operator has to create an {@code Object[]} instead. Unfortunately, a
145145
* {@code Function<Integer[], R>} passed to the method would trigger a {@code ClassCastException}.
146+
* <p>
147+
* If any of the sources never produces an item but only terminates (normally or with an error), the
148+
* resulting sequence terminates immediately (normally or with all the errors accumulated till that point).
149+
* If that input source is also synchronous, other sources after it will not be subscribed to.
146150
*
147151
* <dl>
148152
* <dt><b>Backpressure:</b></dt>
@@ -180,6 +184,10 @@ public static <T, R> Flowable<R> combineLatest(Publisher<? extends T>[] sources,
180184
* Note on method signature: since Java doesn't allow creating a generic array with {@code new T[]}, the
181185
* implementation of this operator has to create an {@code Object[]} instead. Unfortunately, a
182186
* {@code Function<Integer[], R>} passed to the method would trigger a {@code ClassCastException}.
187+
* <p>
188+
* If any of the sources never produces an item but only terminates (normally or with an error), the
189+
* resulting sequence terminates immediately (normally or with all the errors accumulated till that point).
190+
* If that input source is also synchronous, other sources after it will not be subscribed to.
183191
*
184192
* <dl>
185193
* <dt><b>Backpressure:</b></dt>
@@ -217,6 +225,10 @@ public static <T, R> Flowable<R> combineLatest(Function<? super Object[], ? exte
217225
* Note on method signature: since Java doesn't allow creating a generic array with {@code new T[]}, the
218226
* implementation of this operator has to create an {@code Object[]} instead. Unfortunately, a
219227
* {@code Function<Integer[], R>} passed to the method would trigger a {@code ClassCastException}.
228+
* <p>
229+
* If any of the sources never produces an item but only terminates (normally or with an error), the
230+
* resulting sequence terminates immediately (normally or with all the errors accumulated till that point).
231+
* If that input source is also synchronous, other sources after it will not be subscribed to.
220232
*
221233
* <dl>
222234
* <dt><b>Backpressure:</b></dt>
@@ -262,6 +274,10 @@ public static <T, R> Flowable<R> combineLatest(Publisher<? extends T>[] sources,
262274
* Note on method signature: since Java doesn't allow creating a generic array with {@code new T[]}, the
263275
* implementation of this operator has to create an {@code Object[]} instead. Unfortunately, a
264276
* {@code Function<Integer[], R>} passed to the method would trigger a {@code ClassCastException}.
277+
* <p>
278+
* If any of the sources never produces an item but only terminates (normally or with an error), the
279+
* resulting sequence terminates immediately (normally or with all the errors accumulated till that point).
280+
* If that input source is also synchronous, other sources after it will not be subscribed to.
265281
*
266282
* <dl>
267283
* <dt><b>Backpressure:</b></dt>
@@ -300,6 +316,10 @@ public static <T, R> Flowable<R> combineLatest(Iterable<? extends Publisher<? ex
300316
* Note on method signature: since Java doesn't allow creating a generic array with {@code new T[]}, the
301317
* implementation of this operator has to create an {@code Object[]} instead. Unfortunately, a
302318
* {@code Function<Integer[], R>} passed to the method would trigger a {@code ClassCastException}.
319+
* <p>
320+
* If any of the sources never produces an item but only terminates (normally or with an error), the
321+
* resulting sequence terminates immediately (normally or with all the errors accumulated till that point).
322+
* If that input source is also synchronous, other sources after it will not be subscribed to.
303323
*
304324
* <dl>
305325
* <dt><b>Backpressure:</b></dt>
@@ -343,6 +363,10 @@ public static <T, R> Flowable<R> combineLatest(Iterable<? extends Publisher<? ex
343363
* Note on method signature: since Java doesn't allow creating a generic array with {@code new T[]}, the
344364
* implementation of this operator has to create an {@code Object[]} instead. Unfortunately, a
345365
* {@code Function<Integer[], R>} passed to the method would trigger a {@code ClassCastException}.
366+
* <p>
367+
* If any of the sources never produces an item but only terminates (normally or with an error), the
368+
* resulting sequence terminates immediately (normally or with all the errors accumulated till that point).
369+
* If that input source is also synchronous, other sources after it will not be subscribed to.
346370
*
347371
* <dl>
348372
* <dt><b>Backpressure:</b></dt>
@@ -382,6 +406,10 @@ public static <T, R> Flowable<R> combineLatestDelayError(Publisher<? extends T>[
382406
* Note on method signature: since Java doesn't allow creating a generic array with {@code new T[]}, the
383407
* implementation of this operator has to create an {@code Object[]} instead. Unfortunately, a
384408
* {@code Function<Integer[], R>} passed to the method would trigger a {@code ClassCastException}.
409+
* <p>
410+
* If any of the sources never produces an item but only terminates (normally or with an error), the
411+
* resulting sequence terminates immediately (normally or with all the errors accumulated till that point).
412+
* If that input source is also synchronous, other sources after it will not be subscribed to.
385413
*
386414
* <dl>
387415
* <dt><b>Backpressure:</b></dt>
@@ -421,6 +449,10 @@ public static <T, R> Flowable<R> combineLatestDelayError(Function<? super Object
421449
* Note on method signature: since Java doesn't allow creating a generic array with {@code new T[]}, the
422450
* implementation of this operator has to create an {@code Object[]} instead. Unfortunately, a
423451
* {@code Function<Integer[], R>} passed to the method would trigger a {@code ClassCastException}.
452+
* <p>
453+
* If any of the sources never produces an item but only terminates (normally or with an error), the
454+
* resulting sequence terminates immediately (normally or with all the errors accumulated till that point).
455+
* If that input source is also synchronous, other sources after it will not be subscribed to.
424456
*
425457
* <dl>
426458
* <dt><b>Backpressure:</b></dt>
@@ -462,6 +494,10 @@ public static <T, R> Flowable<R> combineLatestDelayError(Function<? super Object
462494
* Note on method signature: since Java doesn't allow creating a generic array with {@code new T[]}, the
463495
* implementation of this operator has to create an {@code Object[]} instead. Unfortunately, a
464496
* {@code Function<Integer[], R>} passed to the method would trigger a {@code ClassCastException}.
497+
* <p>
498+
* If any of the sources never produces an item but only terminates (normally or with an error), the
499+
* resulting sequence terminates immediately (normally or with all the errors accumulated till that point).
500+
* If that input source is also synchronous, other sources after it will not be subscribed to.
465501
*
466502
* <dl>
467503
* <dt><b>Backpressure:</b></dt>
@@ -509,6 +545,10 @@ public static <T, R> Flowable<R> combineLatestDelayError(Publisher<? extends T>[
509545
* Note on method signature: since Java doesn't allow creating a generic array with {@code new T[]}, the
510546
* implementation of this operator has to create an {@code Object[]} instead. Unfortunately, a
511547
* {@code Function<Integer[], R>} passed to the method would trigger a {@code ClassCastException}.
548+
* <p>
549+
* If any of the sources never produces an item but only terminates (normally or with an error), the
550+
* resulting sequence terminates immediately (normally or with all the errors accumulated till that point).
551+
* If that input source is also synchronous, other sources after it will not be subscribed to.
512552
*
513553
* <dl>
514554
* <dt><b>Backpressure:</b></dt>
@@ -548,6 +588,10 @@ public static <T, R> Flowable<R> combineLatestDelayError(Iterable<? extends Publ
548588
* Note on method signature: since Java doesn't allow creating a generic array with {@code new T[]}, the
549589
* implementation of this operator has to create an {@code Object[]} instead. Unfortunately, a
550590
* {@code Function<Integer[], R>} passed to the method would trigger a {@code ClassCastException}.
591+
* <p>
592+
* If any of the sources never produces an item but only terminates (normally or with an error), the
593+
* resulting sequence terminates immediately (normally or with all the errors accumulated till that point).
594+
* If that input source is also synchronous, other sources after it will not be subscribed to.
551595
*
552596
* <dl>
553597
* <dt><b>Backpressure:</b></dt>
@@ -588,6 +632,10 @@ public static <T, R> Flowable<R> combineLatestDelayError(Iterable<? extends Publ
588632
* source Publishers each time an item is received from either of the source Publishers, where this
589633
* aggregation is defined by a specified function.
590634
* <p>
635+
* If any of the sources never produces an item but only terminates (normally or with an error), the
636+
* resulting sequence terminates immediately (normally or with all the errors accumulated till that point).
637+
* If that input source is also synchronous, other sources after it will not be subscribed to.
638+
* <p>
591639
* <img width="640" height="380" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/combineLatest.png" alt="">
592640
* <dl>
593641
* <dt><b>Backpressure:</b></dt>
@@ -629,6 +677,10 @@ public static <T1, T2, R> Flowable<R> combineLatest(
629677
* source Publishers each time an item is received from any of the source Publishers, where this
630678
* aggregation is defined by a specified function.
631679
* <p>
680+
* If any of the sources never produces an item but only terminates (normally or with an error), the
681+
* resulting sequence terminates immediately (normally or with all the errors accumulated till that point).
682+
* If that input source is also synchronous, other sources after it will not be subscribed to.
683+
* <p>
632684
* <img width="640" height="380" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/combineLatest.png" alt="">
633685
* <dl>
634686
* <dt><b>Backpressure:</b></dt>
@@ -674,6 +726,10 @@ public static <T1, T2, T3, R> Flowable<R> combineLatest(
674726
* source Publishers each time an item is received from any of the source Publishers, where this
675727
* aggregation is defined by a specified function.
676728
* <p>
729+
* If any of the sources never produces an item but only terminates (normally or with an error), the
730+
* resulting sequence terminates immediately (normally or with all the errors accumulated till that point).
731+
* If that input source is also synchronous, other sources after it will not be subscribed to.
732+
* <p>
677733
* <img width="640" height="380" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/combineLatest.png" alt="">
678734
* <dl>
679735
* <dt><b>Backpressure:</b></dt>
@@ -723,6 +779,10 @@ public static <T1, T2, T3, T4, R> Flowable<R> combineLatest(
723779
* source Publishers each time an item is received from any of the source Publishers, where this
724780
* aggregation is defined by a specified function.
725781
* <p>
782+
* If any of the sources never produces an item but only terminates (normally or with an error), the
783+
* resulting sequence terminates immediately (normally or with all the errors accumulated till that point).
784+
* If that input source is also synchronous, other sources after it will not be subscribed to.
785+
* <p>
726786
* <img width="640" height="380" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/combineLatest.png" alt="">
727787
* <dl>
728788
* <dt><b>Backpressure:</b></dt>
@@ -777,6 +837,10 @@ public static <T1, T2, T3, T4, T5, R> Flowable<R> combineLatest(
777837
* source Publishers each time an item is received from any of the source Publishers, where this
778838
* aggregation is defined by a specified function.
779839
* <p>
840+
* If any of the sources never produces an item but only terminates (normally or with an error), the
841+
* resulting sequence terminates immediately (normally or with all the errors accumulated till that point).
842+
* If that input source is also synchronous, other sources after it will not be subscribed to.
843+
* <p>
780844
* <img width="640" height="380" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/combineLatest.png" alt="">
781845
* <dl>
782846
* <dt><b>Backpressure:</b></dt>
@@ -835,6 +899,10 @@ public static <T1, T2, T3, T4, T5, T6, R> Flowable<R> combineLatest(
835899
* source Publishers each time an item is received from any of the source Publishers, where this
836900
* aggregation is defined by a specified function.
837901
* <p>
902+
* If any of the sources never produces an item but only terminates (normally or with an error), the
903+
* resulting sequence terminates immediately (normally or with all the errors accumulated till that point).
904+
* If that input source is also synchronous, other sources after it will not be subscribed to.
905+
* <p>
838906
* <img width="640" height="380" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/combineLatest.png" alt="">
839907
* <dl>
840908
* <dt><b>Backpressure:</b></dt>
@@ -898,6 +966,10 @@ public static <T1, T2, T3, T4, T5, T6, T7, R> Flowable<R> combineLatest(
898966
* source Publishers each time an item is received from any of the source Publishers, where this
899967
* aggregation is defined by a specified function.
900968
* <p>
969+
* If any of the sources never produces an item but only terminates (normally or with an error), the
970+
* resulting sequence terminates immediately (normally or with all the errors accumulated till that point).
971+
* If that input source is also synchronous, other sources after it will not be subscribed to.
972+
* <p>
901973
* <img width="640" height="380" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/combineLatest.png" alt="">
902974
* <dl>
903975
* <dt><b>Backpressure:</b></dt>
@@ -965,6 +1037,10 @@ public static <T1, T2, T3, T4, T5, T6, T7, T8, R> Flowable<R> combineLatest(
9651037
* source Publishers each time an item is received from any of the source Publishers, where this
9661038
* aggregation is defined by a specified function.
9671039
* <p>
1040+
* If any of the sources never produces an item but only terminates (normally or with an error), the
1041+
* resulting sequence terminates immediately (normally or with all the errors accumulated till that point).
1042+
* If that input source is also synchronous, other sources after it will not be subscribed to.
1043+
* <p>
9681044
* <img width="640" height="380" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/combineLatest.png" alt="">
9691045
* <dl>
9701046
* <dt><b>Backpressure:</b></dt>

0 commit comments

Comments
 (0)