2020import org .reactivestreams .*;
2121
2222import io .reactivex .rxjava3 .annotations .*;
23- import io .reactivex .rxjava3 .core .Observable ;
2423import io .reactivex .rxjava3 .disposables .Disposable ;
2524import io .reactivex .rxjava3 .exceptions .*;
2625import io .reactivex .rxjava3 .functions .*;
3231import io .reactivex .rxjava3 .internal .operators .flowable .*;
3332import io .reactivex .rxjava3 .internal .operators .maybe .*;
3433import io .reactivex .rxjava3 .internal .operators .mixed .*;
35- import io .reactivex .rxjava3 .internal .operators .observable .* ;
34+ import io .reactivex .rxjava3 .internal .operators .observable .ObservableSingleSingle ;
3635import io .reactivex .rxjava3 .internal .operators .single .*;
3736import io .reactivex .rxjava3 .internal .util .ErrorMode ;
3837import io .reactivex .rxjava3 .observers .TestObserver ;
@@ -195,7 +194,7 @@ public static <T> Single<T> ambArray(@NonNull SingleSource<? extends T>... sourc
195194 @ SchedulerSupport (SchedulerSupport .NONE )
196195 @ BackpressureSupport (BackpressureKind .FULL )
197196 public static <T > Flowable <T > concat (@ NonNull Iterable <@ NonNull ? extends SingleSource <? extends T >> sources ) {
198- return concat ( Flowable .fromIterable (sources ));
197+ return Flowable .fromIterable (sources ). concatMapSingleDelayError ( Functions . identity (), false );
199198 }
200199
201200 /**
@@ -216,10 +215,9 @@ public static <T> Flowable<T> concat(@NonNull Iterable<@NonNull ? extends Single
216215 @ CheckReturnValue
217216 @ NonNull
218217 @ SchedulerSupport (SchedulerSupport .NONE )
219- @ SuppressWarnings ({ "unchecked" , "rawtypes" })
220218 public static <T > Observable <T > concat (@ NonNull ObservableSource <? extends SingleSource <? extends T >> sources ) {
221219 Objects .requireNonNull (sources , "sources is null" );
222- return RxJavaPlugins .onAssembly (new ObservableConcatMap (sources , SingleInternalHelper . toObservable (), 2 , ErrorMode .IMMEDIATE ));
220+ return RxJavaPlugins .onAssembly (new ObservableConcatMapSingle <> (sources , Functions . identity (), ErrorMode .IMMEDIATE , 2 ));
223221 }
224222
225223 /**
@@ -272,11 +270,10 @@ public static <T> Flowable<T> concat(@NonNull Publisher<@NonNull ? extends Singl
272270 @ NonNull
273271 @ BackpressureSupport (BackpressureKind .FULL )
274272 @ SchedulerSupport (SchedulerSupport .NONE )
275- @ SuppressWarnings ({ "unchecked" , "rawtypes" })
276273 public static <T > Flowable <T > concat (@ NonNull Publisher <@ NonNull ? extends SingleSource <? extends T >> sources , int prefetch ) {
277274 Objects .requireNonNull (sources , "sources is null" );
278275 ObjectHelper .verifyPositive (prefetch , "prefetch" );
279- return RxJavaPlugins .onAssembly (new FlowableConcatMapPublisher (sources , SingleInternalHelper . toFlowable (), prefetch , ErrorMode .IMMEDIATE ));
276+ return RxJavaPlugins .onAssembly (new FlowableConcatMapSinglePublisher <> (sources , Functions . identity (), ErrorMode .IMMEDIATE , prefetch ));
280277 }
281278
282279 /**
@@ -308,7 +305,7 @@ public static <T> Flowable<T> concat(
308305 ) {
309306 Objects .requireNonNull (source1 , "source1 is null" );
310307 Objects .requireNonNull (source2 , "source2 is null" );
311- return concat ( Flowable .fromArray (source1 , source2 ));
308+ return Flowable .fromArray (source1 , source2 ). concatMapSingleDelayError ( Functions . identity (), false );
312309 }
313310
314311 /**
@@ -344,7 +341,7 @@ public static <T> Flowable<T> concat(
344341 Objects .requireNonNull (source1 , "source1 is null" );
345342 Objects .requireNonNull (source2 , "source2 is null" );
346343 Objects .requireNonNull (source3 , "source3 is null" );
347- return concat ( Flowable .fromArray (source1 , source2 , source3 ));
344+ return Flowable .fromArray (source1 , source2 , source3 ). concatMapSingleDelayError ( Functions . identity (), false );
348345 }
349346
350347 /**
@@ -383,7 +380,7 @@ public static <T> Flowable<T> concat(
383380 Objects .requireNonNull (source2 , "source2 is null" );
384381 Objects .requireNonNull (source3 , "source3 is null" );
385382 Objects .requireNonNull (source4 , "source4 is null" );
386- return concat ( Flowable .fromArray (source1 , source2 , source3 , source4 ));
383+ return Flowable .fromArray (source1 , source2 , source3 , source4 ). concatMapSingleDelayError ( Functions . identity (), false );
387384 }
388385
389386 /**
@@ -409,7 +406,7 @@ public static <T> Flowable<T> concat(
409406 @ SchedulerSupport (SchedulerSupport .NONE )
410407 @ SafeVarargs
411408 public static <T > Flowable <T > concatArray (@ NonNull SingleSource <? extends T >... sources ) {
412- return Flowable .fromArray (sources ).concatMap ( SingleInternalHelper . toFlowable (), 2 );
409+ return Flowable .fromArray (sources ).concatMapSingleDelayError ( Functions . identity (), false );
413410 }
414411
415412 /**
@@ -435,7 +432,7 @@ public static <T> Flowable<T> concatArray(@NonNull SingleSource<? extends T>...
435432 @ SchedulerSupport (SchedulerSupport .NONE )
436433 @ SafeVarargs
437434 public static <T > Flowable <T > concatArrayDelayError (@ NonNull SingleSource <? extends T >... sources ) {
438- return Flowable .fromArray (sources ).concatMapDelayError ( SingleInternalHelper . toFlowable (), true , 2 );
435+ return Flowable .fromArray (sources ).concatMapSingleDelayError ( Functions . identity (), true );
439436 }
440437
441438 /**
@@ -1091,7 +1088,7 @@ public static <T> Single<T> fromObservable(@NonNull ObservableSource<? extends T
10911088 @ BackpressureSupport (BackpressureKind .FULL )
10921089 @ SchedulerSupport (SchedulerSupport .NONE )
10931090 public static <T > Flowable <T > merge (@ NonNull Iterable <@ NonNull ? extends SingleSource <? extends T >> sources ) {
1094- return merge ( Flowable .fromIterable (sources ));
1091+ return Flowable .fromIterable (sources ). flatMapSingle ( Functions . identity ( ));
10951092 }
10961093
10971094 /**
@@ -1129,10 +1126,9 @@ public static <T> Flowable<T> merge(@NonNull Iterable<@NonNull ? extends SingleS
11291126 @ NonNull
11301127 @ BackpressureSupport (BackpressureKind .FULL )
11311128 @ SchedulerSupport (SchedulerSupport .NONE )
1132- @ SuppressWarnings ({ "unchecked" , "rawtypes" })
11331129 public static <T > Flowable <T > merge (@ NonNull Publisher <@ NonNull ? extends SingleSource <? extends T >> sources ) {
11341130 Objects .requireNonNull (sources , "sources is null" );
1135- return RxJavaPlugins .onAssembly (new FlowableFlatMapPublisher (sources , SingleInternalHelper . toFlowable (), false , Integer .MAX_VALUE , Flowable . bufferSize () ));
1131+ return RxJavaPlugins .onAssembly (new FlowableFlatMapSinglePublisher <> (sources , Functions . identity (), false , Integer .MAX_VALUE ));
11361132 }
11371133
11381134 /**
@@ -1212,7 +1208,7 @@ public static <T> Flowable<T> merge(
12121208 ) {
12131209 Objects .requireNonNull (source1 , "source1 is null" );
12141210 Objects .requireNonNull (source2 , "source2 is null" );
1215- return merge ( Flowable .fromArray (source1 , source2 ));
1211+ return Flowable .fromArray (source1 , source2 ). flatMapSingle ( Functions . identity (), false , Integer . MAX_VALUE );
12161212 }
12171213
12181214 /**
@@ -1265,7 +1261,7 @@ public static <T> Flowable<T> merge(
12651261 Objects .requireNonNull (source1 , "source1 is null" );
12661262 Objects .requireNonNull (source2 , "source2 is null" );
12671263 Objects .requireNonNull (source3 , "source3 is null" );
1268- return merge ( Flowable .fromArray (source1 , source2 , source3 ));
1264+ return Flowable .fromArray (source1 , source2 , source3 ). flatMapSingle ( Functions . identity (), false , Integer . MAX_VALUE );
12691265 }
12701266
12711267 /**
@@ -1321,7 +1317,7 @@ public static <T> Flowable<T> merge(
13211317 Objects .requireNonNull (source2 , "source2 is null" );
13221318 Objects .requireNonNull (source3 , "source3 is null" );
13231319 Objects .requireNonNull (source4 , "source4 is null" );
1324- return merge ( Flowable .fromArray (source1 , source2 , source3 , source4 ));
1320+ return Flowable .fromArray (source1 , source2 , source3 , source4 ). flatMapSingle ( Functions . identity (), false , Integer . MAX_VALUE );
13251321 }
13261322
13271323 /**
@@ -1360,7 +1356,7 @@ public static <T> Flowable<T> merge(
13601356 @ SchedulerSupport (SchedulerSupport .NONE )
13611357 @ SafeVarargs
13621358 public static <T > Flowable <T > mergeArray (SingleSource <? extends T >... sources ) {
1363- return Flowable .fromArray (sources ).flatMapSingle (Functions .identity (), false , sources .length );
1359+ return Flowable .fromArray (sources ).flatMapSingle (Functions .identity (), false , Math . max ( 1 , sources .length ) );
13641360 }
13651361
13661362 /**
@@ -1396,7 +1392,7 @@ public static <T> Flowable<T> mergeArray(SingleSource<? extends T>... sources) {
13961392 @ SafeVarargs
13971393 @ NonNull
13981394 public static <T > Flowable <T > mergeArrayDelayError (@ NonNull SingleSource <? extends T >... sources ) {
1399- return Flowable .fromArray (sources ).flatMapSingle (Functions .identity (), true , sources .length );
1395+ return Flowable .fromArray (sources ).flatMapSingle (Functions .identity (), true , Math . max ( 1 , sources .length ) );
14001396 }
14011397
14021398 /**
@@ -1423,7 +1419,7 @@ public static <T> Flowable<T> mergeArrayDelayError(@NonNull SingleSource<? exten
14231419 @ BackpressureSupport (BackpressureKind .FULL )
14241420 @ SchedulerSupport (SchedulerSupport .NONE )
14251421 public static <T > Flowable <T > mergeDelayError (@ NonNull Iterable <@ NonNull ? extends SingleSource <? extends T >> sources ) {
1426- return mergeDelayError ( Flowable .fromIterable (sources ));
1422+ return Flowable .fromIterable (sources ). flatMapSingle ( Functions . identity (), true , Integer . MAX_VALUE );
14271423 }
14281424
14291425 /**
@@ -1449,10 +1445,9 @@ public static <T> Flowable<T> mergeDelayError(@NonNull Iterable<@NonNull ? exten
14491445 @ NonNull
14501446 @ BackpressureSupport (BackpressureKind .FULL )
14511447 @ SchedulerSupport (SchedulerSupport .NONE )
1452- @ SuppressWarnings ({ "unchecked" , "rawtypes" })
14531448 public static <T > Flowable <T > mergeDelayError (@ NonNull Publisher <@ NonNull ? extends SingleSource <? extends T >> sources ) {
14541449 Objects .requireNonNull (sources , "sources is null" );
1455- return RxJavaPlugins .onAssembly (new FlowableFlatMapPublisher (sources , SingleInternalHelper . toFlowable (), true , Integer .MAX_VALUE , Flowable . bufferSize () ));
1450+ return RxJavaPlugins .onAssembly (new FlowableFlatMapSinglePublisher <> (sources , Functions . identity (), true , Integer .MAX_VALUE ));
14561451 }
14571452
14581453 /**
@@ -1490,7 +1485,7 @@ public static <T> Flowable<T> mergeDelayError(
14901485 ) {
14911486 Objects .requireNonNull (source1 , "source1 is null" );
14921487 Objects .requireNonNull (source2 , "source2 is null" );
1493- return mergeDelayError ( Flowable .fromArray (source1 , source2 ));
1488+ return Flowable .fromArray (source1 , source2 ). flatMapSingle ( Functions . identity (), true , Integer . MAX_VALUE );
14941489 }
14951490
14961491 /**
@@ -1532,7 +1527,7 @@ public static <T> Flowable<T> mergeDelayError(
15321527 Objects .requireNonNull (source1 , "source1 is null" );
15331528 Objects .requireNonNull (source2 , "source2 is null" );
15341529 Objects .requireNonNull (source3 , "source3 is null" );
1535- return mergeDelayError ( Flowable .fromArray (source1 , source2 , source3 ));
1530+ return Flowable .fromArray (source1 , source2 , source3 ). flatMapSingle ( Functions . identity (), true , Integer . MAX_VALUE );
15361531 }
15371532
15381533 /**
@@ -1577,7 +1572,7 @@ public static <T> Flowable<T> mergeDelayError(
15771572 Objects .requireNonNull (source2 , "source2 is null" );
15781573 Objects .requireNonNull (source3 , "source3 is null" );
15791574 Objects .requireNonNull (source4 , "source4 is null" );
1580- return mergeDelayError ( Flowable .fromArray (source1 , source2 , source3 , source4 ));
1575+ return Flowable .fromArray (source1 , source2 , source3 , source4 ). flatMapSingle ( Functions . identity (), true , Integer . MAX_VALUE );
15811576 }
15821577
15831578 /**
0 commit comments