1717
1818import static org .springframework .data .mongodb .core .query .Criteria .*;
1919
20- import reactor .core .CoreSubscriber ;
2120import reactor .core .publisher .Flux ;
2221import reactor .core .publisher .Mono ;
2322
@@ -114,7 +113,7 @@ public <S extends T> Flux<S> saveAll(Iterable<S> entities) {
114113 Streamable <S > source = Streamable .of (entities );
115114
116115 return source .stream ().allMatch (entityInformation ::isNew ) ? //
117- insert (entities ) : new AeonFlux <> (source ). combatMap ( this ::save );
116+ insert (entities ) : doItSomewhatSequentially (source , this ::save );
118117 }
119118
120119 @ Override
@@ -127,6 +126,20 @@ public <S extends T> Flux<S> saveAll(Publisher<S> entityStream) {
127126 mongoOperations .save (entity , entityInformation .getCollectionName ()));
128127 }
129128
129+ static <T > Flux <T > doItSomewhatSequentially /* how should we actually call this? */ (Streamable <T > ts , Function <? super T , ? extends Publisher <? extends T >> mapper ) {
130+
131+ List <T > list = ts .toList ();
132+ if (list .size () == 1 ) {
133+ return Flux .just (list .iterator ().next ()).flatMap (mapper );
134+ } else if (list .size () == 2 ) {
135+ return Flux .fromIterable (list ).concatMap (mapper );
136+ }
137+
138+ Flux <T > first = Flux .just (list .get (0 )).flatMap (mapper );
139+ Flux <T > theRest = Flux .fromIterable (list .subList (1 , list .size ())).flatMapSequential (mapper );
140+ return first .concatWith (theRest );
141+ }
142+
130143 @ Override
131144 public Mono <T > findById (ID id ) {
132145
@@ -566,46 +579,4 @@ private ReactiveFindOperation.TerminatingFind<T> createQuery(UnaryOperator<Query
566579 }
567580
568581 }
569-
570- static class AeonFlux <T > extends Flux <T > {
571-
572- private final Streamable <T > source ;
573- private final Flux <T > delegate ;
574-
575- AeonFlux (Streamable <T > source ) {
576- this (source , Flux .fromIterable (source ));
577- }
578-
579- private AeonFlux (Streamable <T > source , Flux <T > delegate ) {
580- this .source = source ;
581- this .delegate = delegate ;
582- }
583-
584- @ Override
585- public void subscribe (CoreSubscriber <? super T > actual ) {
586- delegate .subscribe (actual );
587- }
588-
589- Flux <T > combatMap (Function <? super T , ? extends Publisher <? extends T >> mapper ) {
590- return new AeonFlux <>(source , combatMapList (source .toList (), mapper ));
591- }
592-
593- private static <T > Flux <T > combatMapList (List <T > list ,
594- Function <? super T , ? extends Publisher <? extends T >> mapper ) {
595-
596- if (list .isEmpty ()) {
597- return Flux .empty ();
598- }
599- if (list .size () == 1 ) {
600- return Flux .just (list .iterator ().next ()).flatMap (mapper );
601- }
602- if (list .size () == 2 ) {
603- return Flux .fromIterable (list ).concatMap (mapper );
604- }
605-
606- Flux <T > first = Flux .just (list .get (0 )).flatMap (mapper );
607- Flux <T > theRest = Flux .fromIterable (list .subList (1 , list .size ())).flatMapSequential (mapper );
608- return first .concatWith (theRest );
609- }
610- }
611582}
0 commit comments