@@ -7280,6 +7280,10 @@ public final <R> Flowable<R> compose(FlowableTransformer<? super T, ? extends R>
72807280 * that result from concatenating those resulting Publishers.
72817281 * <p>
72827282 * <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/concatMap.png" alt="">
7283+ * <p>
7284+ * Note that there is no guarantee where the given {@code mapper} function will be executed; it could be on the subscribing thread,
7285+ * on the upstream thread signaling the new item to be mapped or on the thread where the inner source terminates. To ensure
7286+ * the {@code mapper} function is confined to a known thread, use the {@link #concatMap(Function, int, Scheduler)} overload.
72837287 * <dl>
72847288 * <dt><b>Backpressure:</b></dt>
72857289 * <dd>The operator honors backpressure from downstream. Both this and the inner {@code Publisher}s are
@@ -7312,6 +7316,10 @@ public final <R> Flowable<R> concatMap(Function<? super T, ? extends Publisher<?
73127316 * that result from concatenating those resulting Publishers.
73137317 * <p>
73147318 * <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/concatMap.png" alt="">
7319+ * <p>
7320+ * Note that there is no guarantee where the given {@code mapper} function will be executed; it could be on the subscribing thread,
7321+ * on the upstream thread signaling the new item to be mapped or on the thread where the inner source terminates. To ensure
7322+ * the {@code mapper} function is confined to a known thread, use the {@link #concatMap(Function, int, Scheduler)} overload.
73157323 * <dl>
73167324 * <dt><b>Backpressure:</b></dt>
73177325 * <dd>The operator honors backpressure from downstream. Both this and the inner {@code Publisher}s are
@@ -7332,6 +7340,7 @@ public final <R> Flowable<R> concatMap(Function<? super T, ? extends Publisher<?
73327340 * @return a Flowable that emits the result of applying the transformation function to each item emitted
73337341 * by the source Publisher and concatenating the Publishers obtained from this transformation
73347342 * @see <a href="http://reactivex.io/documentation/operators/flatmap.html">ReactiveX operators documentation: FlatMap</a>
7343+ * @see #concatMap(Function, int, Scheduler)
73357344 */
73367345 @CheckReturnValue
73377346 @NonNull
@@ -7351,6 +7360,52 @@ public final <R> Flowable<R> concatMap(Function<? super T, ? extends Publisher<?
73517360 return RxJavaPlugins.onAssembly(new FlowableConcatMap<T, R>(this, mapper, prefetch, ErrorMode.IMMEDIATE));
73527361 }
73537362
7363+ /**
7364+ * Returns a new Flowable that emits items resulting from applying a function (on a designated scheduler)
7365+ * that you supply to each item emitted by the source Publisher, where that function returns a Publisher, and then emitting the items
7366+ * that result from concatenating those resulting Publishers.
7367+ * <p>
7368+ * <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/concatMap.png" alt="">
7369+ * <p>
7370+ * The difference between {@link #concatMap(Function, int)} and this operator is that this operator guarantees the {@code mapper}
7371+ * function is executed on the specified scheduler.
7372+ * <dl>
7373+ * <dt><b>Backpressure:</b></dt>
7374+ * <dd>The operator honors backpressure from downstream. Both this and the inner {@code Publisher}s are
7375+ * expected to honor backpressure as well. If the source {@code Publisher} violates the rule, the operator will
7376+ * signal a {@code MissingBackpressureException}. If any of the inner {@code Publisher}s doesn't honor
7377+ * backpressure, that <em>may</em> throw an {@code IllegalStateException} when that
7378+ * {@code Publisher} completes.</dd>
7379+ * <dt><b>Scheduler:</b></dt>
7380+ * <dd>{@code concatMap} executes the given {@code mapper} function on the provided {@link Scheduler}.</dd>
7381+ * </dl>
7382+ *
7383+ * @param <R> the type of the inner Publisher sources and thus the output type
7384+ * @param mapper
7385+ * a function that, when applied to an item emitted by the source Publisher, returns a
7386+ * Publisher
7387+ * @param prefetch
7388+ * the number of elements to prefetch from the current Flowable
7389+ * @param scheduler
7390+ * the scheduler where the {@code mapper} function will be executed
7391+ * @return a Flowable that emits the result of applying the transformation function to each item emitted
7392+ * by the source Publisher and concatenating the Publishers obtained from this transformation
7393+ * @see <a href="http://reactivex.io/documentation/operators/flatmap.html">ReactiveX operators documentation: FlatMap</a>
7394+ * @since 3.0.0
7395+ * @see #concatMap(Function, int)
7396+ * @see #concatMapDelayError(Function, int, boolean, Scheduler)
7397+ */
7398+ @CheckReturnValue
7399+ @NonNull
7400+ @BackpressureSupport(BackpressureKind.FULL)
7401+ @SchedulerSupport(SchedulerSupport.CUSTOM)
7402+ public final <R> Flowable<R> concatMap(Function<? super T, ? extends Publisher<? extends R>> mapper, int prefetch, Scheduler scheduler) {
7403+ ObjectHelper.requireNonNull(mapper, "mapper is null");
7404+ ObjectHelper.verifyPositive(prefetch, "prefetch");
7405+ ObjectHelper.requireNonNull(scheduler, "scheduler");
7406+ return RxJavaPlugins.onAssembly(new FlowableConcatMapScheduler<T, R>(this, mapper, prefetch, ErrorMode.IMMEDIATE, scheduler));
7407+ }
7408+
73547409 /**
73557410 * Maps the upstream items into {@link CompletableSource}s and subscribes to them one after the
73567411 * other completes.
@@ -7520,7 +7575,10 @@ public final Completable concatMapCompletableDelayError(Function<? super T, ? ex
75207575 * one at a time and emits their values in order
75217576 * while delaying any error from either this or any of the inner Publishers
75227577 * till all of them terminate.
7523- *
7578+ * <p>
7579+ * Note that there is no guarantee where the given {@code mapper} function will be executed; it could be on the subscribing thread,
7580+ * on the upstream thread signaling the new item to be mapped or on the thread where the inner source terminates. To ensure
7581+ * the {@code mapper} function is confined to a known thread, use the {@link #concatMapDelayError(Function, int, boolean, Scheduler)} overload.
75247582 * <dl>
75257583 * <dt><b>Backpressure:</b></dt>
75267584 * <dd>The operator honors backpressure from downstream. Both this and the inner {@code Publisher}s are
@@ -7535,6 +7593,7 @@ public final Completable concatMapCompletableDelayError(Function<? super T, ? ex
75357593 * @param <R> the result value type
75367594 * @param mapper the function that maps the items of this Publisher into the inner Publishers.
75377595 * @return the new Publisher instance with the concatenation behavior
7596+ * @see #concatMapDelayError(Function, int, boolean, Scheduler)
75387597 */
75397598 @CheckReturnValue
75407599 @BackpressureSupport(BackpressureKind.FULL)
@@ -7548,6 +7607,10 @@ public final <R> Flowable<R> concatMapDelayError(Function<? super T, ? extends P
75487607 * one at a time and emits their values in order
75497608 * while delaying any error from either this or any of the inner Publishers
75507609 * till all of them terminate.
7610+ * <p>
7611+ * Note that there is no guarantee where the given {@code mapper} function will be executed; it could be on the subscribing thread,
7612+ * on the upstream thread signaling the new item to be mapped or on the thread where the inner source terminates. To ensure
7613+ * the {@code mapper} function is confined to a known thread, use the {@link #concatMapDelayError(Function, int, boolean, Scheduler)} overload.
75517614 *
75527615 * <dl>
75537616 * <dt><b>Backpressure:</b></dt>
@@ -7568,6 +7631,7 @@ public final <R> Flowable<R> concatMapDelayError(Function<? super T, ? extends P
75687631 * if true, all errors from the outer and inner Publisher sources are delayed until the end,
75697632 * if false, an error from the main source is signaled when the current Publisher source terminates
75707633 * @return the new Publisher instance with the concatenation behavior
7634+ * @see #concatMapDelayError(Function, int, boolean, Scheduler)
75717635 */
75727636 @CheckReturnValue
75737637 @NonNull
@@ -7588,6 +7652,51 @@ public final <R> Flowable<R> concatMapDelayError(Function<? super T, ? extends P
75887652 return RxJavaPlugins.onAssembly(new FlowableConcatMap<T, R>(this, mapper, prefetch, tillTheEnd ? ErrorMode.END : ErrorMode.BOUNDARY));
75897653 }
75907654
7655+ /**
7656+ * Maps each of the upstream items into a Publisher, subscribes to them one after the other,
7657+ * one at a time and emits their values in order
7658+ * while executing the mapper function on the designated scheduler, delaying any error from either this or any of the
7659+ * inner Publishers till all of them terminate.
7660+ * <p>
7661+ * The difference between {@link #concatMapDelayError(Function, int, boolean)} and this operator is that this operator guarantees the {@code mapper}
7662+ * function is executed on the specified scheduler.
7663+ *
7664+ * <dl>
7665+ * <dt><b>Backpressure:</b></dt>
7666+ * <dd>The operator honors backpressure from downstream. Both this and the inner {@code Publisher}s are
7667+ * expected to honor backpressure as well. If the source {@code Publisher} violates the rule, the operator will
7668+ * signal a {@code MissingBackpressureException}. If any of the inner {@code Publisher}s doesn't honor
7669+ * backpressure, that <em>may</em> throw an {@code IllegalStateException} when that
7670+ * {@code Publisher} completes.</dd>
7671+ * <dt><b>Scheduler:</b></dt>
7672+ * <dd>{@code concatMapDelayError} executes the given {@code mapper} function on the provided {@link Scheduler}.</dd>
7673+ * </dl>
7674+ *
7675+ * @param <R> the result value type
7676+ * @param mapper the function that maps the items of this Publisher into the inner Publishers.
7677+ * @param prefetch
7678+ * the number of elements to prefetch from the current Flowable
7679+ * @param tillTheEnd
7680+ * if true, all errors from the outer and inner Publisher sources are delayed until the end,
7681+ * if false, an error from the main source is signaled when the current Publisher source terminates
7682+ * @param scheduler
7683+ * the scheduler where the {@code mapper} function will be executed
7684+ * @return the new Publisher instance with the concatenation behavior
7685+ * @see #concatMapDelayError(Function, int, boolean)
7686+ * @since 3.0.0
7687+ */
7688+ @CheckReturnValue
7689+ @NonNull
7690+ @BackpressureSupport(BackpressureKind.FULL)
7691+ @SchedulerSupport(SchedulerSupport.CUSTOM)
7692+ public final <R> Flowable<R> concatMapDelayError(Function<? super T, ? extends Publisher<? extends R>> mapper,
7693+ int prefetch, boolean tillTheEnd, Scheduler scheduler) {
7694+ ObjectHelper.requireNonNull(mapper, "mapper is null");
7695+ ObjectHelper.verifyPositive(prefetch, "prefetch");
7696+ ObjectHelper.requireNonNull(scheduler, "scheduler is null");
7697+ return RxJavaPlugins.onAssembly(new FlowableConcatMapScheduler<T, R>(this, mapper, prefetch, tillTheEnd ? ErrorMode.END : ErrorMode.BOUNDARY, scheduler));
7698+ }
7699+
75917700 /**
75927701 * Maps a sequence of values into Publishers and concatenates these Publishers eagerly into a single
75937702 * Publisher.
0 commit comments