@@ -1648,9 +1648,80 @@ public static <T> Flowable<T> concatDelayError(@NonNull Publisher<@NonNull ? ext
16481648 return fromPublisher(sources).concatMapDelayError((Function)Functions.identity(), tillTheEnd, prefetch);
16491649 }
16501650
1651+ /**
1652+ * Concatenates a sequence of {@link Publisher}s eagerly into a single stream of values.
1653+ * <p>
1654+ * <img width="640" height="422" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Flowable.concatEager.i.png" alt="">
1655+ * <p>
1656+ * Eager concatenation means that once a subscriber subscribes, this operator subscribes to all of the
1657+ * source {@code Publisher}s. The operator buffers the values emitted by these {@code Publisher}s and then drains them
1658+ * in order, each one after the previous one completes.
1659+ * <dl>
1660+ * <dt><b>Backpressure:</b></dt>
1661+ * <dd>Backpressure is honored towards the downstream and the inner {@code Publisher}s are
1662+ * expected to support backpressure. Violating this assumption, the operator will
1663+ * signal {@link MissingBackpressureException}.</dd>
1664+ * <dt><b>Scheduler:</b></dt>
1665+ * <dd>This method does not operate by default on a particular {@link Scheduler}.</dd>
1666+ * </dl>
1667+ * @param <T> the value type
1668+ * @param sources a sequence of {@code Publisher}s that need to be eagerly concatenated
1669+ * @return the new {@code Flowable} instance with the specified concatenation behavior
1670+ * @throws NullPointerException if {@code sources} is {@code null}
1671+ * @since 2.0
1672+ */
1673+ @CheckReturnValue
1674+ @BackpressureSupport(BackpressureKind.FULL)
1675+ @SchedulerSupport(SchedulerSupport.NONE)
1676+ @NonNull
1677+ public static <T> Flowable<T> concatEager(@NonNull Iterable<@NonNull ? extends Publisher<@NonNull ? extends T>> sources) {
1678+ return concatEager(sources, bufferSize(), bufferSize());
1679+ }
1680+
1681+ /**
1682+ * Concatenates a sequence of {@link Publisher}s eagerly into a single stream of values and
1683+ * runs a limited number of inner sequences at once.
1684+ * <p>
1685+ * <img width="640" height="375" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Flowable.concatEager.in.png" alt="">
1686+ * <p>
1687+ * Eager concatenation means that once a subscriber subscribes, this operator subscribes to all of the
1688+ * source {@code Publisher}s. The operator buffers the values emitted by these {@code Publisher}s and then drains them
1689+ * in order, each one after the previous one completes.
1690+ * <dl>
1691+ * <dt><b>Backpressure:</b></dt>
1692+ * <dd>Backpressure is honored towards the downstream and both the outer and inner {@code Publisher}s are
1693+ * expected to support backpressure. Violating this assumption, the operator will
1694+ * signal {@link MissingBackpressureException}.</dd>
1695+ * <dt><b>Scheduler:</b></dt>
1696+ * <dd>This method does not operate by default on a particular {@link Scheduler}.</dd>
1697+ * </dl>
1698+ * @param <T> the value type
1699+ * @param sources a sequence of {@code Publisher}s that need to be eagerly concatenated
1700+ * @param maxConcurrency the maximum number of concurrently running inner {@code Publisher}s; {@link Integer#MAX_VALUE}
1701+ * is interpreted as all inner {@code Publisher}s can be active at the same time
1702+ * @param prefetch the number of elements to prefetch from each inner {@code Publisher} source
1703+ * @return the new {@code Flowable} instance with the specified concatenation behavior
1704+ * @throws NullPointerException if {@code sources} is {@code null}
1705+ * @throws IllegalArgumentException if {@code maxConcurrency} or {@code prefetch} is non-positive
1706+ * @since 2.0
1707+ */
1708+ @CheckReturnValue
1709+ @NonNull
1710+ @BackpressureSupport(BackpressureKind.FULL)
1711+ @SchedulerSupport(SchedulerSupport.NONE)
1712+ @SuppressWarnings({ "rawtypes", "unchecked" })
1713+ public static <T> Flowable<T> concatEager(@NonNull Iterable<@NonNull ? extends Publisher<@NonNull ? extends T>> sources, int maxConcurrency, int prefetch) {
1714+ Objects.requireNonNull(sources, "sources is null");
1715+ ObjectHelper.verifyPositive(maxConcurrency, "maxConcurrency");
1716+ ObjectHelper.verifyPositive(prefetch, "prefetch");
1717+ return RxJavaPlugins.onAssembly(new FlowableConcatMapEager(new FlowableFromIterable(sources), Functions.identity(), maxConcurrency, prefetch, ErrorMode.BOUNDARY));
1718+ }
1719+
16511720 /**
16521721 * Concatenates a {@link Publisher} sequence of {@code Publisher}s eagerly into a single stream of values.
16531722 * <p>
1723+ * <img width="640" height="490" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Flowable.concatEager.p.png" alt="">
1724+ * <p>
16541725 * Eager concatenation means that once a subscriber subscribes, this operator subscribes to all of the
16551726 * emitted source {@code Publisher}s as they are observed. The operator buffers the values emitted by these
16561727 * {@code Publisher}s and then drains them in order, each one after the previous one completes.
@@ -1677,7 +1748,10 @@ public static <T> Flowable<T> concatEager(@NonNull Publisher<@NonNull ? extends
16771748 }
16781749
16791750 /**
1680- * Concatenates a {@link Publisher} sequence of {@code Publisher}s eagerly into a single stream of values.
1751+ * Concatenates a {@link Publisher} sequence of {@code Publisher}s eagerly into a single stream of values and
1752+ * runs a limited number of inner sequences at once.
1753+ * <p>
1754+ * <img width="640" height="421" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Flowable.concatEager.pn.png" alt="">
16811755 * <p>
16821756 * Eager concatenation means that once a subscriber subscribes, this operator subscribes to all of the
16831757 * emitted source {@code Publisher}s as they are observed. The operator buffers the values emitted by these
@@ -1713,7 +1787,10 @@ public static <T> Flowable<T> concatEager(@NonNull Publisher<@NonNull ? extends
17131787 }
17141788
17151789 /**
1716- * Concatenates a sequence of {@link Publisher}s eagerly into a single stream of values.
1790+ * Concatenates a sequence of {@link Publisher}s eagerly into a single stream of values,
1791+ * delaying errors until all the inner sequences terminate.
1792+ * <p>
1793+ * <img width="640" height="428" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Flowable.concatEagerDelayError.i.png" alt="">
17171794 * <p>
17181795 * Eager concatenation means that once a subscriber subscribes, this operator subscribes to all of the
17191796 * source {@code Publisher}s. The operator buffers the values emitted by these {@code Publisher}s and then drains them
@@ -1730,18 +1807,22 @@ public static <T> Flowable<T> concatEager(@NonNull Publisher<@NonNull ? extends
17301807 * @param sources a sequence of {@code Publisher}s that need to be eagerly concatenated
17311808 * @return the new {@code Flowable} instance with the specified concatenation behavior
17321809 * @throws NullPointerException if {@code sources} is {@code null}
1733- * @since 2 .0
1810+ * @since 3.0 .0
17341811 */
17351812 @CheckReturnValue
17361813 @BackpressureSupport(BackpressureKind.FULL)
17371814 @SchedulerSupport(SchedulerSupport.NONE)
17381815 @NonNull
1739- public static <T> Flowable<T> concatEager (@NonNull Iterable<@NonNull ? extends Publisher<@NonNull ? extends T>> sources) {
1740- return concatEager (sources, bufferSize(), bufferSize());
1816+ public static <T> Flowable<T> concatEagerDelayError (@NonNull Iterable<@NonNull ? extends Publisher<@NonNull ? extends T>> sources) {
1817+ return concatEagerDelayError (sources, bufferSize(), bufferSize());
17411818 }
17421819
17431820 /**
1744- * Concatenates a sequence of {@link Publisher}s eagerly into a single stream of values.
1821+ * Concatenates a sequence of {@link Publisher}s eagerly into a single stream of values,
1822+ * delaying errors until all the inner sequences terminate and runs a limited number
1823+ * of inner sequences at once.
1824+ * <p>
1825+ * <img width="640" height="380" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Flowable.concatEagerDelayError.in.png" alt="">
17451826 * <p>
17461827 * Eager concatenation means that once a subscriber subscribes, this operator subscribes to all of the
17471828 * source {@code Publisher}s. The operator buffers the values emitted by these {@code Publisher}s and then drains them
@@ -1762,18 +1843,89 @@ public static <T> Flowable<T> concatEager(@NonNull Iterable<@NonNull ? extends P
17621843 * @return the new {@code Flowable} instance with the specified concatenation behavior
17631844 * @throws NullPointerException if {@code sources} is {@code null}
17641845 * @throws IllegalArgumentException if {@code maxConcurrency} or {@code prefetch} is non-positive
1765- * @since 2 .0
1846+ * @since 3.0 .0
17661847 */
17671848 @CheckReturnValue
17681849 @NonNull
17691850 @BackpressureSupport(BackpressureKind.FULL)
17701851 @SchedulerSupport(SchedulerSupport.NONE)
17711852 @SuppressWarnings({ "rawtypes", "unchecked" })
1772- public static <T> Flowable<T> concatEager (@NonNull Iterable<@NonNull ? extends Publisher<@NonNull ? extends T>> sources, int maxConcurrency, int prefetch) {
1853+ public static <T> Flowable<T> concatEagerDelayError (@NonNull Iterable<@NonNull ? extends Publisher<@NonNull ? extends T>> sources, int maxConcurrency, int prefetch) {
17731854 Objects.requireNonNull(sources, "sources is null");
17741855 ObjectHelper.verifyPositive(maxConcurrency, "maxConcurrency");
17751856 ObjectHelper.verifyPositive(prefetch, "prefetch");
1776- return RxJavaPlugins.onAssembly(new FlowableConcatMapEager(new FlowableFromIterable(sources), Functions.identity(), maxConcurrency, prefetch, ErrorMode.IMMEDIATE));
1857+ return RxJavaPlugins.onAssembly(new FlowableConcatMapEager(new FlowableFromIterable(sources), Functions.identity(), maxConcurrency, prefetch, ErrorMode.END));
1858+ }
1859+
1860+ /**
1861+ * Concatenates a {@link Publisher} sequence of {@code Publisher}s eagerly into a single stream of values,
1862+ * delaying errors until all the inner and the outer sequences terminate.
1863+ * <p>
1864+ * <img width="640" height="496" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Flowable.concatEagerDelayError.p.png" alt="">
1865+ * <p>
1866+ * Eager concatenation means that once a subscriber subscribes, this operator subscribes to all of the
1867+ * emitted source {@code Publisher}s as they are observed. The operator buffers the values emitted by these
1868+ * {@code Publisher}s and then drains them in order, each one after the previous one completes.
1869+ * <dl>
1870+ * <dt><b>Backpressure:</b></dt>
1871+ * <dd>Backpressure is honored towards the downstream and both the outer and inner {@code Publisher}s are
1872+ * expected to support backpressure. Violating this assumption, the operator will
1873+ * signal {@link MissingBackpressureException}.</dd>
1874+ * <dt><b>Scheduler:</b></dt>
1875+ * <dd>This method does not operate by default on a particular {@link Scheduler}.</dd>
1876+ * </dl>
1877+ * @param <T> the value type
1878+ * @param sources a sequence of {@code Publisher}s that need to be eagerly concatenated
1879+ * @return the new {@code Flowable} instance with the specified concatenation behavior
1880+ * @throws NullPointerException if {@code sources} is {@code null}
1881+ * @since 3.0.0
1882+ */
1883+ @CheckReturnValue
1884+ @BackpressureSupport(BackpressureKind.FULL)
1885+ @SchedulerSupport(SchedulerSupport.NONE)
1886+ @NonNull
1887+ public static <T> Flowable<T> concatEagerDelayError(@NonNull Publisher<@NonNull ? extends Publisher<@NonNull ? extends T>> sources) {
1888+ return concatEagerDelayError(sources, bufferSize(), bufferSize());
1889+ }
1890+
1891+ /**
1892+ * Concatenates a {@link Publisher} sequence of {@code Publisher}s eagerly into a single stream of values,
1893+ * delaying errors until all the inner and outer sequences terminate and runs a limited number of inner
1894+ * sequences at once.
1895+ * <p>
1896+ * <img width="640" height="421" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Flowable.concatEagerDelayError.pn.png" alt="">
1897+ * <p>
1898+ * Eager concatenation means that once a subscriber subscribes, this operator subscribes to all of the
1899+ * emitted source {@code Publisher}s as they are observed. The operator buffers the values emitted by these
1900+ * {@code Publisher}s and then drains them in order, each one after the previous one completes.
1901+ * <dl>
1902+ * <dt><b>Backpressure:</b></dt>
1903+ * <dd>Backpressure is honored towards the downstream and both the outer and inner {@code Publisher}s are
1904+ * expected to support backpressure. Violating this assumption, the operator will
1905+ * signal {@link MissingBackpressureException}.</dd>
1906+ * <dt><b>Scheduler:</b></dt>
1907+ * <dd>This method does not operate by default on a particular {@link Scheduler}.</dd>
1908+ * </dl>
1909+ * @param <T> the value type
1910+ * @param sources a sequence of {@code Publisher}s that need to be eagerly concatenated
1911+ * @param maxConcurrency the maximum number of concurrently running inner {@code Publisher}s; {@link Integer#MAX_VALUE}
1912+ * is interpreted as all inner {@code Publisher}s can be active at the same time
1913+ * @param prefetch the number of elements to prefetch from each inner {@code Publisher} source
1914+ * @return the new {@code Flowable} instance with the specified concatenation behavior
1915+ * @throws NullPointerException if {@code sources} is {@code null}
1916+ * @throws IllegalArgumentException if {@code maxConcurrency} or {@code prefetch} is non-positive
1917+ * @since 3.0.0
1918+ */
1919+ @CheckReturnValue
1920+ @NonNull
1921+ @BackpressureSupport(BackpressureKind.FULL)
1922+ @SchedulerSupport(SchedulerSupport.NONE)
1923+ @SuppressWarnings({ "rawtypes", "unchecked" })
1924+ public static <T> Flowable<T> concatEagerDelayError(@NonNull Publisher<@NonNull ? extends Publisher<@NonNull ? extends T>> sources, int maxConcurrency, int prefetch) {
1925+ Objects.requireNonNull(sources, "sources is null");
1926+ ObjectHelper.verifyPositive(maxConcurrency, "maxConcurrency");
1927+ ObjectHelper.verifyPositive(prefetch, "prefetch");
1928+ return RxJavaPlugins.onAssembly(new FlowableConcatMapEagerPublisher(sources, Functions.identity(), maxConcurrency, prefetch, ErrorMode.END));
17771929 }
17781930
17791931 /**
@@ -10134,6 +10286,7 @@ public final <R> Flowable<R> flatMap(@NonNull Function<? super T, ? extends Publ
1013410286 * if {@code false}, the first one signaling an exception will terminate the whole sequence immediately
1013510287 * @return the new {@code Flowable} instance
1013610288 * @throws NullPointerException if {@code mapper} is {@code null}
10289+ * @throws IllegalArgumentException if {@code maxConcurrency} is non-positive
1013710290 * @see <a href="http://reactivex.io/documentation/operators/flatmap.html">ReactiveX operators documentation: FlatMap</a>
1013810291 * @since 2.0
1013910292 */
0 commit comments