@@ -949,8 +949,9 @@ public static <T, R> Observable<R> combineLatestDelayError(Iterable<? extends Ob
949949 * {@code observables}, one after the other, without interleaving them
950950 * @see <a href="http://reactivex.io/documentation/operators/concat.html">ReactiveX operators documentation: Concat</a>
951951 */
952+ @SuppressWarnings({ "unchecked", "rawtypes" })
952953 public static <T> Observable<T> concat(Observable<? extends Observable<? extends T>> observables) {
953- return observables.lift(OperatorConcat.<T>instance ());
954+ return observables.concatMap((Func1)UtilityFunctions.identity ());
954955 }
955956
956957 /**
@@ -1185,6 +1186,45 @@ public static <T> Observable<T> concat(Observable<? extends T> t1, Observable<?
11851186 return concat(just(t1, t2, t3, t4, t5, t6, t7, t8, t9));
11861187 }
11871188
1189+ /**
1190+ * Concatenates the Observable sequence of Observables into a single sequence by subscribing to each inner Observable,
1191+ * one after the other, one at a time and delays any errors till the all inner and the outer Observables terminate.
1192+ *
1193+ * <dl>
1194+ * <dt><b>Backpressure:</b></dt>
1195+ * <dd>{@code concatDelayError} fully supports backpressure.</dd>
1196+ * <dt><b>Scheduler:</b></dt>
1197+ * <dd>{@code concatDelayError} does not operate by default on a particular {@link Scheduler}.</dd>
1198+ * </dl>
1199+ *
1200+ * @param sources the Observable sequence of Observables
1201+ * @return the new Observable with the concatenating behavior
1202+ */
1203+ @SuppressWarnings({ "rawtypes", "unchecked" })
1204+ @Experimental
1205+ public static <T> Observable<T> concatDelayError(Observable<? extends Observable<? extends T>> sources) {
1206+ return sources.concatMapDelayError((Func1)UtilityFunctions.identity());
1207+ }
1208+
1209+ /**
1210+ * Concatenates the Iterable sequence of Observables into a single sequence by subscribing to each Observable,
1211+ * one after the other, one at a time and delays any errors till the all inner Observables terminate.
1212+ *
1213+ * <dl>
1214+ * <dt><b>Backpressure:</b></dt>
1215+ * <dd>{@code concatDelayError} fully supports backpressure.</dd>
1216+ * <dt><b>Scheduler:</b></dt>
1217+ * <dd>{@code concatDelayError} does not operate by default on a particular {@link Scheduler}.</dd>
1218+ * </dl>
1219+ *
1220+ * @param sources the Iterable sequence of Observables
1221+ * @return the new Observable with the concatenating behavior
1222+ */
1223+ @Experimental
1224+ public static <T> Observable<T> concatDelayError(Iterable<? extends Observable<? extends T>> sources) {
1225+ return concatDelayError(from(sources));
1226+ }
1227+
11881228 /**
11891229 * Returns an Observable that calls an Observable factory to create an Observable for each new Observer
11901230 * that subscribes. That is, for each subscriber, the actual Observable that subscriber observes is
@@ -3984,7 +4024,37 @@ public final R call(R state, T value) {
39844024 * @see <a href="http://reactivex.io/documentation/operators/flatmap.html">ReactiveX operators documentation: FlatMap</a>
39854025 */
39864026 public final <R> Observable<R> concatMap(Func1<? super T, ? extends Observable<? extends R>> func) {
3987- return concat(map(func));
4027+ if (this instanceof ScalarSynchronousObservable) {
4028+ ScalarSynchronousObservable<T> scalar = (ScalarSynchronousObservable<T>) this;
4029+ return scalar.scalarFlatMap(func);
4030+ }
4031+ return create(new OnSubscribeConcatMap<T, R>(this, func, 2, OnSubscribeConcatMap.IMMEDIATE));
4032+ }
4033+
4034+ /**
4035+ * Maps each of the items into an Observable, subscribes to them one after the other,
4036+ * one at a time and emits their values in order
4037+ * while delaying any error from either this or any of the inner Observables
4038+ * till all of them terminate.
4039+ *
4040+ * <dl>
4041+ * <dt><b>Backpressure:</b></dt>
4042+ * <dd>{@code concatMapDelayError} fully supports backpressure.</dd>
4043+ * <dt><b>Scheduler:</b></dt>
4044+ * <dd>{@code concatMapDelayError} does not operate by default on a particular {@link Scheduler}.</dd>
4045+ * </dl>
4046+ *
4047+ * @param <R> the result value type
4048+ * @param func the function that maps the items of this Observable into the inner Observables.
4049+ * @return the new Observable instance with the concatenation behavior
4050+ */
4051+ @Experimental
4052+ public final <R> Observable<R> concatMapDelayError(Func1<? super T, ? extends Observable<?extends R>> func) {
4053+ if (this instanceof ScalarSynchronousObservable) {
4054+ ScalarSynchronousObservable<T> scalar = (ScalarSynchronousObservable<T>) this;
4055+ return scalar.scalarFlatMap(func);
4056+ }
4057+ return create(new OnSubscribeConcatMap<T, R>(this, func, 2, OnSubscribeConcatMap.END));
39884058 }
39894059
39904060 /**
0 commit comments