@@ -6973,6 +6973,74 @@ public final <U, R> Observable<R> flatMap(final Func1<? super T, ? extends Obser
69736973 return merge(lift(new OperatorMapPair<T, U, R>(collectionSelector, resultSelector)), maxConcurrent);
69746974 }
69756975
6976+ /**
6977+ * Maps all upstream values to Completables and runs them together until the upstream
6978+ * and all inner Completables complete normally.
6979+ * <dl>
6980+ * <dt><b>Backpressure:</b></dt>
6981+ * <dd>The operator consumes items from upstream in an unbounded manner and ignores downstream backpressure
6982+ * as it doesn't emit items but only terminal event.</dd>
6983+ * <dt><b>Scheduler:</b></dt>
6984+ * <dd>{@code flatMapCompletable} does not operate by default on a particular {@link Scheduler}.</dd>
6985+ * </dl>
6986+ * @param mapper the function that receives an upstream value and turns it into a Completable
6987+ * to be merged.
6988+ * @return the new Observable instance
6989+ * @see #flatMapCompletable(Func1, boolean, int)
6990+ * @since 1.2.7 - experimental
6991+ */
6992+ @Experimental
6993+ public final Observable<T> flatMapCompletable(Func1<? super T, ? extends Completable> mapper) {
6994+ return flatMapCompletable(mapper, false, Integer.MAX_VALUE);
6995+ }
6996+
6997+ /**
6998+ * Maps all upstream values to Completables and runs them together, optionally delaying any errors, until the upstream
6999+ * and all inner Completables terminate.
7000+ * <dl>
7001+ * <dt><b>Backpressure:</b></dt>
7002+ * <dd>The operator consumes items from upstream in an unbounded manner and ignores downstream backpressure
7003+ * as it doesn't emit items but only terminal event.</dd>
7004+ * <dt><b>Scheduler:</b></dt>
7005+ * <dd>{@code flatMapCompletable} does not operate by default on a particular {@link Scheduler}.</dd>
7006+ * </dl>
7007+ * @param mapper the function that receives an upstream value and turns it into a Completable
7008+ * to be merged.
7009+ * @param delayErrors if true, errors from the upstream and from the inner Completables get delayed till
7010+ * the all of them terminate.
7011+ * @return the new Observable instance
7012+ * @since 1.2.7 - experimental
7013+ * @see #flatMapCompletable(Func1, boolean, int)
7014+ */
7015+ @Experimental
7016+ public final Observable<T> flatMapCompletable(Func1<? super T, ? extends Completable> mapper, boolean delayErrors) {
7017+ return flatMapCompletable(mapper, delayErrors, Integer.MAX_VALUE);
7018+ }
7019+
7020+ /**
7021+ * Maps upstream values to Completables and runs up to the given number of them together at a time,
7022+ * optionally delaying any errors, until the upstream and all inner Completables terminate.
7023+ * <dl>
7024+ * <dt><b>Backpressure:</b></dt>
7025+ * <dd>The operator consumes at most maxConcurrent items from upstream and one-by-one after as the inner
7026+ * Completables terminate. The operator ignores downstream backpressure as it doesn't emit items but
7027+ * only the terminal event.</dd>
7028+ * <dt><b>Scheduler:</b></dt>
7029+ * <dd>{@code flatMapCompletable} does not operate by default on a particular {@link Scheduler}.</dd>
7030+ * </dl>
7031+ * @param mapper the function that receives an upstream value and turns it into a Completable
7032+ * to be merged.
7033+ * @param delayErrors if true, errors from the upstream and from the inner Completables get delayed till
7034+ * the all of them terminate.
7035+ * @param maxConcurrency the maximum number of inner Completables to run at a time
7036+ * @return the new Observable instance
7037+ * @since 1.2.7 - experimental
7038+ */
7039+ @Experimental
7040+ public final Observable<T> flatMapCompletable(Func1<? super T, ? extends Completable> mapper, boolean delayErrors, int maxConcurrency) {
7041+ return unsafeCreate(new OnSubscribeFlatMapCompletable<T>(this, mapper, delayErrors, maxConcurrency));
7042+ }
7043+
69767044 /**
69777045 * Returns an Observable that merges each item emitted by the source Observable with the values in an
69787046 * Iterable corresponding to that item that is generated by a selector.
@@ -7106,6 +7174,74 @@ public final <U, R> Observable<R> flatMapIterable(Func1<? super T, ? extends Ite
71067174 return (Observable<R>)flatMap(OperatorMapPair.convertSelector(collectionSelector), resultSelector, maxConcurrent);
71077175 }
71087176
7177+ /**
7178+ * Maps all upstream values to Singles and runs them together until the upstream
7179+ * and all inner Singles complete normally.
7180+ * <dl>
7181+ * <dt><b>Backpressure:</b></dt>
7182+ * <dd>The operator consumes items from upstream in an unbounded manner and honors downstream backpressure.</dd>
7183+ * <dt><b>Scheduler:</b></dt>
7184+ * <dd>{@code flatMapSingle} does not operate by default on a particular {@link Scheduler}.</dd>
7185+ * </dl>
7186+ * @param <R> the value type of the inner Singles and the resulting Observable
7187+ * @param mapper the function that receives an upstream value and turns it into a Single
7188+ * to be merged.
7189+ * @return the new Observable instance
7190+ * @see #flatMapSingle(Func1, boolean, int)
7191+ * @since 1.2.7 - experimental
7192+ */
7193+ @Experimental
7194+ public final <R> Observable<R> flatMapSingle(Func1<? super T, ? extends Single<? extends R>> mapper) {
7195+ return flatMapSingle(mapper, false, Integer.MAX_VALUE);
7196+ }
7197+
7198+ /**
7199+ * Maps all upstream values to Singles and runs them together, optionally delaying any errors, until the upstream
7200+ * and all inner Singles terminate.
7201+ * <dl>
7202+ * <dt><b>Backpressure:</b></dt>
7203+ * <dd>The operator consumes items from upstream in an unbounded manner and honors downstream backpressure.</dd>
7204+ * <dt><b>Scheduler:</b></dt>
7205+ * <dd>{@code flatMapSingle} does not operate by default on a particular {@link Scheduler}.</dd>
7206+ * </dl>
7207+ * @param <R> the value type of the inner Singles and the resulting Observable
7208+ * @param mapper the function that receives an upstream value and turns it into a Single
7209+ * to be merged.
7210+ * @param delayErrors if true, errors from the upstream and from the inner Singles get delayed till
7211+ * the all of them terminate.
7212+ * @return the new Observable instance
7213+ * @since 1.2.7 - experimental
7214+ * @see #flatMapSingle(Func1, boolean, int)
7215+ */
7216+ @Experimental
7217+ public final <R> Observable<R> flatMapSingle(Func1<? super T, ? extends Single<? extends R>> mapper, boolean delayErrors) {
7218+ return flatMapSingle(mapper, delayErrors, Integer.MAX_VALUE);
7219+ }
7220+
7221+ /**
7222+ * Maps upstream values to Singles and runs up to the given number of them together at a time,
7223+ * optionally delaying any errors, until the upstream and all inner Singles terminate.
7224+ * <dl>
7225+ * <dt><b>Backpressure:</b></dt>
7226+ * <dd>The operator consumes at most maxConcurrent items from upstream and one-by-one after as the inner
7227+ * Singles terminate. The operator honors downstream backpressure.</dd>
7228+ * <dt><b>Scheduler:</b></dt>
7229+ * <dd>{@code flatMapSingle} does not operate by default on a particular {@link Scheduler}.</dd>
7230+ * </dl>
7231+ * @param <R> the value type of the inner Singles and the resulting Observable
7232+ * @param mapper the function that receives an upstream value and turns it into a Single
7233+ * to be merged.
7234+ * @param delayErrors if true, errors from the upstream and from the inner Singles get delayed till
7235+ * the all of them terminate.
7236+ * @param maxConcurrency the maximum number of inner Singles to run at a time
7237+ * @return the new Observable instance
7238+ * @since 1.2.7 - experimental
7239+ */
7240+ @Experimental
7241+ public final <R> Observable<R> flatMapSingle(Func1<? super T, ? extends Single<? extends R>> mapper, boolean delayErrors, int maxConcurrency) {
7242+ return unsafeCreate(new OnSubscribeFlatMapSingle<T, R>(this, mapper, delayErrors, maxConcurrency));
7243+ }
7244+
71097245 /**
71107246 * Subscribes to the {@link Observable} and receives notifications for each element.
71117247 * <p>
0 commit comments