@@ -922,8 +922,9 @@ public static <T, R> Observable<R> combineLatest(Iterable<? extends Observable<?
922922 * {@code observables}, one after the other, without interleaving them
923923 * @see <a href="http://reactivex.io/documentation/operators/concat.html">ReactiveX operators documentation: Concat</a>
924924 */
925+ @SuppressWarnings({ "unchecked", "rawtypes" })
925926 public static <T> Observable<T> concat(Observable<? extends Observable<? extends T>> observables) {
926- return observables.lift(OperatorConcat.<T>instance ());
927+ return observables.concatMap((Func1)UtilityFunctions.identity ());
927928 }
928929
929930 /**
@@ -1158,6 +1159,45 @@ public static <T> Observable<T> concat(Observable<? extends T> t1, Observable<?
11581159 return concat(just(t1, t2, t3, t4, t5, t6, t7, t8, t9));
11591160 }
11601161
1162+ /**
1163+ * Concatenates the Observable sequence of Observables into a single sequence by subscribing to each inner Observable,
1164+ * one after the other, one at a time and delays any errors till the all inner and the outer Observables terminate.
1165+ *
1166+ * <dl>
1167+ * <dt><b>Backpressure:</b></dt>
1168+ * <dd>{@code concatDelayError} fully supports backpressure.</dd>
1169+ * <dt><b>Scheduler:</b></dt>
1170+ * <dd>{@code concatDelayError} does not operate by default on a particular {@link Scheduler}.</dd>
1171+ * </dl>
1172+ *
1173+ * @param sources the Observable sequence of Observables
1174+ * @return the new Observable with the concatenating behavior
1175+ */
1176+ @SuppressWarnings({ "rawtypes", "unchecked" })
1177+ @Experimental
1178+ public static <T> Observable<T> concatDelayError(Observable<? extends Observable<? extends T>> sources) {
1179+ return sources.concatMapDelayError((Func1)UtilityFunctions.identity());
1180+ }
1181+
1182+ /**
1183+ * Concatenates the Iterable sequence of Observables into a single sequence by subscribing to each Observable,
1184+ * one after the other, one at a time and delays any errors till the all inner Observables terminate.
1185+ *
1186+ * <dl>
1187+ * <dt><b>Backpressure:</b></dt>
1188+ * <dd>{@code concatDelayError} fully supports backpressure.</dd>
1189+ * <dt><b>Scheduler:</b></dt>
1190+ * <dd>{@code concatDelayError} does not operate by default on a particular {@link Scheduler}.</dd>
1191+ * </dl>
1192+ *
1193+ * @param sources the Iterable sequence of Observables
1194+ * @return the new Observable with the concatenating behavior
1195+ */
1196+ @Experimental
1197+ public static <T> Observable<T> concatDelayError(Iterable<? extends Observable<? extends T>> sources) {
1198+ return concatDelayError(from(sources));
1199+ }
1200+
11611201 /**
11621202 * Returns an Observable that calls an Observable factory to create an Observable for each new Observer
11631203 * that subscribes. That is, for each subscriber, the actual Observable that subscriber observes is
@@ -3957,7 +3997,37 @@ public final R call(R state, T value) {
39573997 * @see <a href="http://reactivex.io/documentation/operators/flatmap.html">ReactiveX operators documentation: FlatMap</a>
39583998 */
39593999 public final <R> Observable<R> concatMap(Func1<? super T, ? extends Observable<? extends R>> func) {
3960- return concat(map(func));
4000+ if (this instanceof ScalarSynchronousObservable) {
4001+ ScalarSynchronousObservable<T> scalar = (ScalarSynchronousObservable<T>) this;
4002+ return scalar.scalarFlatMap(func);
4003+ }
4004+ return create(new OnSubscribeConcatMap<T, R>(this, func, 2, OnSubscribeConcatMap.IMMEDIATE));
4005+ }
4006+
4007+ /**
4008+ * Maps each of the items into an Observable, subscribes to them one after the other,
4009+ * one at a time and emits their values in order
4010+ * while delaying any error from either this or any of the inner Observables
4011+ * till all of them terminate.
4012+ *
4013+ * <dl>
4014+ * <dt><b>Backpressure:</b></dt>
4015+ * <dd>{@code concatMapDelayError} fully supports backpressure.</dd>
4016+ * <dt><b>Scheduler:</b></dt>
4017+ * <dd>{@code concatMapDelayError} does not operate by default on a particular {@link Scheduler}.</dd>
4018+ * </dl>
4019+ *
4020+ * @param <R> the result value type
4021+ * @param func the function that maps the items of this Observable into the inner Observables.
4022+ * @return the new Observable instance with the concatenation behavior
4023+ */
4024+ @Experimental
4025+ public final <R> Observable<R> concatMapDelayError(Func1<? super T, ? extends Observable<?extends R>> func) {
4026+ if (this instanceof ScalarSynchronousObservable) {
4027+ ScalarSynchronousObservable<T> scalar = (ScalarSynchronousObservable<T>) this;
4028+ return scalar.scalarFlatMap(func);
4029+ }
4030+ return create(new OnSubscribeConcatMap<T, R>(this, func, 2, OnSubscribeConcatMap.END));
39614031 }
39624032
39634033 /**
0 commit comments