Skip to content

Commit 18e9640

Browse files
Polish things up a bit
1 parent 23fd571 commit 18e9640

File tree

1 file changed

+52
-26
lines changed

1 file changed

+52
-26
lines changed

spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/support/SimpleReactiveMongoRepository.java

Lines changed: 52 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import reactor.core.publisher.Mono;
2222

2323
import java.io.Serializable;
24+
import java.util.ArrayList;
2425
import java.util.Collection;
2526
import java.util.Collections;
2627
import java.util.List;
@@ -47,7 +48,6 @@
4748
import org.springframework.data.mongodb.repository.query.MongoEntityInformation;
4849
import org.springframework.data.repository.query.FluentQuery;
4950
import org.springframework.data.util.StreamUtils;
50-
import org.springframework.data.util.Streamable;
5151
import org.springframework.lang.Nullable;
5252
import org.springframework.util.Assert;
5353

@@ -110,10 +110,9 @@ public <S extends T> Flux<S> saveAll(Iterable<S> entities) {
110110

111111
Assert.notNull(entities, "The given Iterable of entities must not be null");
112112

113-
Streamable<S> source = Streamable.of(entities);
114-
113+
List<S> source = toList(entities);
115114
return source.stream().allMatch(entityInformation::isNew) ? //
116-
insert(entities) : doItSomewhatSequentially(source, this::save);
115+
insert(source) : concatMapSequentially(source, this::save);
117116
}
118117

119118
@Override
@@ -126,20 +125,6 @@ public <S extends T> Flux<S> saveAll(Publisher<S> entityStream) {
126125
mongoOperations.save(entity, entityInformation.getCollectionName()));
127126
}
128127

129-
static <T> Flux<T> doItSomewhatSequentially/* how should we actually call this? */(Streamable<T> ts, Function<? super T, ? extends Publisher<? extends T>> mapper) {
130-
131-
List<T> list = ts.toList();
132-
if (list.size() == 1) {
133-
return Flux.just(list.iterator().next()).flatMap(mapper);
134-
} else if (list.size() == 2) {
135-
return Flux.fromIterable(list).concatMap(mapper);
136-
}
137-
138-
Flux<T> first = Flux.just(list.get(0)).flatMap(mapper);
139-
Flux<T> theRest = Flux.fromIterable(list.subList(1, list.size())).flatMapSequential(mapper);
140-
return first.concatWith(theRest);
141-
}
142-
143128
@Override
144129
public Mono<T> findById(ID id) {
145130

@@ -349,8 +334,11 @@ public <S extends T> Flux<S> insert(Iterable<S> entities) {
349334

350335
Assert.notNull(entities, "The given Iterable of entities must not be null");
351336

352-
Collection<S> source = toCollection(entities);
353-
return source.isEmpty() ? Flux.empty() : mongoOperations.insert(source, entityInformation.getCollectionName());
337+
return insert(toCollection(entities));
338+
}
339+
340+
private <S extends T> Flux<S> insert(Collection<S> entities) {
341+
return entities.isEmpty() ? Flux.empty() : mongoOperations.insert(entities, entityInformation.getCollectionName());
354342
}
355343

356344
@Override
@@ -453,6 +441,12 @@ void setRepositoryMethodMetadata(CrudMethodMetadata crudMethodMetadata) {
453441
this.crudMethodMetadata = crudMethodMetadata;
454442
}
455443

444+
private Flux<T> findAll(Query query) {
445+
446+
getReadPreference().ifPresent(query::withReadPreference);
447+
return mongoOperations.find(query, entityInformation.getJavaType(), entityInformation.getCollectionName());
448+
}
449+
456450
private Optional<ReadPreference> getReadPreference() {
457451

458452
if (crudMethodMetadata == null) {
@@ -474,15 +468,47 @@ private Query getIdQuery(Iterable<? extends ID> ids) {
474468
return new Query(where(entityInformation.getIdAttribute()).in(toCollection(ids)));
475469
}
476470

477-
private static <E> Collection<E> toCollection(Iterable<E> ids) {
478-
return ids instanceof Collection<E> collection ? collection
479-
: StreamUtils.createStreamFromIterator(ids.iterator()).collect(Collectors.toList());
471+
/**
472+
* Transform the elements emitted by this Flux into Publishers, then flatten these inner publishers into a single
473+
* Flux. The operation does not allow interleave between performing the map operation for the first and second source
474+
* element guaranteeing the mapping operation completed before subscribing to its following inners, that will then be
475+
* subscribed to eagerly emitting elements in order of their source.
476+
*
477+
* <pre class="code">
478+
* Flux.just(first-element).flatMap(...)
479+
* .concatWith(Flux.fromIterable(remaining-elements).flatMapSequential(...))
480+
* </pre>
481+
*
482+
* @param source the collection of elements to transform.
483+
* @param mapper the transformation {@link Function}. Must not be {@literal null}.
484+
* @return never {@literal null}.
485+
* @param <T> source type
486+
*/
487+
static <T> Flux<T> concatMapSequentially(List<T> source,
488+
Function<? super T, ? extends Publisher<? extends T>> mapper) {
489+
490+
if (source.isEmpty()) {
491+
return Flux.empty();
492+
}
493+
if (source.size() == 1) {
494+
return Flux.just(source.iterator().next()).flatMap(mapper);
495+
}
496+
if (source.size() == 2) {
497+
return Flux.fromIterable(source).concatMap(mapper);
498+
}
499+
500+
Flux<T> first = Flux.just(source.get(0)).flatMap(mapper);
501+
Flux<T> theRest = Flux.fromIterable(source.subList(1, source.size())).flatMapSequential(mapper);
502+
return first.concatWith(theRest);
480503
}
481504

482-
private Flux<T> findAll(Query query) {
505+
private static <E> List<E> toList(Iterable<E> source) {
506+
return source instanceof List<E> list ? list : new ArrayList<>(toCollection(source));
507+
}
483508

484-
getReadPreference().ifPresent(query::withReadPreference);
485-
return mongoOperations.find(query, entityInformation.getJavaType(), entityInformation.getCollectionName());
509+
private static <E> Collection<E> toCollection(Iterable<E> source) {
510+
return source instanceof Collection<E> collection ? collection
511+
: StreamUtils.createStreamFromIterator(source.iterator()).collect(Collectors.toList());
486512
}
487513

488514
/**

0 commit comments

Comments
 (0)