Skip to content

Commit e085561

Browse files
refactor saveAll flow for publisher
1 parent ab1479c commit e085561

File tree

2 files changed

+38
-15
lines changed

2 files changed

+38
-15
lines changed

spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/support/SimpleReactiveMongoRepository.java

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -116,13 +116,11 @@ public <S extends T> Flux<S> saveAll(Iterable<S> entities) {
116116
}
117117

118118
@Override
119-
public <S extends T> Flux<S> saveAll(Publisher<S> entityStream) {
119+
public <S extends T> Flux<S> saveAll(Publisher<S> publisher) {
120120

121-
Assert.notNull(entityStream, "The given Publisher of entities must not be null");
121+
Assert.notNull(publisher, "The given Publisher of entities must not be null");
122122

123-
return Flux.from(entityStream).concatMap(entity -> entityInformation.isNew(entity) ? //
124-
mongoOperations.insert(entity, entityInformation.getCollectionName()) : //
125-
mongoOperations.save(entity, entityInformation.getCollectionName()));
123+
return concatMapSequentially(publisher, this::save);
126124
}
127125

128126
@Override
@@ -498,6 +496,20 @@ static <T> Flux<T> concatMapSequentially(List<T> source,
498496
return first.concatWith(theRest);
499497
}
500498

499+
static <T> Flux<T> concatMapSequentially(Publisher<T> publisher,
500+
Function<? super T, ? extends Publisher<? extends T>> mapper) {
501+
502+
return Flux.from(publisher).switchOnFirst(((signal, source) -> {
503+
504+
if (!signal.hasValue()) {
505+
return source.concatMap(mapper);
506+
}
507+
508+
Mono<T> firstCall = Mono.from(mapper.apply(signal.get()));
509+
return firstCall.concatWith(source.skip(1).flatMapSequential(mapper));
510+
}));
511+
}
512+
501513
private static <E> List<E> toList(Iterable<E> source) {
502514
return source instanceof List<E> list ? list : new ArrayList<>(toCollection(source));
503515
}

spring-data-mongodb/src/test/java/org/springframework/data/mongodb/repository/SimpleReactiveMongoRepositoryTests.java

Lines changed: 21 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,18 +18,16 @@
1818
import static org.assertj.core.api.Assertions.*;
1919
import static org.springframework.data.domain.ExampleMatcher.*;
2020

21-
import org.junit.jupiter.api.RepeatedTest;
22-
import org.springframework.data.mongodb.ReactiveMongoTransactionManager;
23-
import org.springframework.transaction.TransactionDefinition;
24-
import org.springframework.transaction.reactive.TransactionalOperator;
2521
import reactor.core.publisher.Flux;
2622
import reactor.core.publisher.Mono;
2723
import reactor.test.StepVerifier;
2824

2925
import java.util.Arrays;
3026
import java.util.Objects;
27+
import java.util.stream.Stream;
3128

3229
import org.junit.jupiter.api.BeforeEach;
30+
import org.junit.jupiter.api.RepeatedTest;
3331
import org.junit.jupiter.api.Test;
3432
import org.junit.jupiter.api.extension.ExtendWith;
3533
import org.springframework.beans.BeansException;
@@ -44,6 +42,7 @@
4442
import org.springframework.data.domain.Sort;
4543
import org.springframework.data.domain.Sort.Direction;
4644
import org.springframework.data.domain.Sort.Order;
45+
import org.springframework.data.mongodb.ReactiveMongoTransactionManager;
4746
import org.springframework.data.mongodb.core.ReactiveMongoTemplate;
4847
import org.springframework.data.mongodb.repository.support.ReactiveMongoRepositoryFactory;
4948
import org.springframework.data.mongodb.repository.support.SimpleReactiveMongoRepository;
@@ -52,6 +51,8 @@
5251
import org.springframework.lang.Nullable;
5352
import org.springframework.test.context.ContextConfiguration;
5453
import org.springframework.test.context.junit.jupiter.SpringExtension;
54+
import org.springframework.transaction.TransactionDefinition;
55+
import org.springframework.transaction.reactive.TransactionalOperator;
5556
import org.springframework.util.ClassUtils;
5657

5758
/**
@@ -337,16 +338,26 @@ void savePublisherOfEntitiesShouldInsertEntity() {
337338
assertThat(boyd.getId()).isNotNull();
338339
}
339340

340-
@RepeatedTest(10)
341+
@RepeatedTest(10) // GH-4838
341342
void transactionalSaveAllForStuffThatIsConsideredAnUpdateOfExistingData() {
342343

343344
ReactiveMongoTransactionManager txmgr = new ReactiveMongoTransactionManager(template.getMongoDatabaseFactory());
344345
TransactionalOperator.create(txmgr, TransactionDefinition.withDefaults()).execute(callback -> {
345346
return repository.saveAll(Arrays.asList(oliver, dave, carter, boyd, stefan, leroi, alicia));
346-
})
347-
.as(StepVerifier::create) //
348-
.expectNext(oliver, dave, carter, boyd, stefan, leroi, alicia)
349-
.verifyComplete();
347+
}).as(StepVerifier::create) //
348+
.expectNext(oliver, dave, carter, boyd, stefan, leroi, alicia).verifyComplete();
349+
}
350+
351+
@RepeatedTest(10) // GH-4838
352+
void transactionalSaveAllWithPublisherForStuffThatIsConsideredAnUpdateOfExistingData() {
353+
354+
ReactiveMongoTransactionManager txmgr = new ReactiveMongoTransactionManager(template.getMongoDatabaseFactory());
355+
Flux<ReactivePerson> personFlux = Flux.fromStream(Stream.of(oliver, dave, carter, boyd, stefan, leroi, alicia));
356+
357+
TransactionalOperator.create(txmgr, TransactionDefinition.withDefaults()).execute(callback -> {
358+
return repository.saveAll(personFlux);
359+
}).as(StepVerifier::create) //
360+
.expectNextCount(7).verifyComplete();
350361
}
351362

352363
@Test // GH-3609
@@ -358,7 +369,7 @@ void savePublisherOfImmutableEntitiesShouldInsertEntity() {
358369
.consumeNextWith(actual -> {
359370
assertThat(actual.id).isNotNull();
360371
}) //
361-
.verifyComplete();
372+
.verifyComplete();
362373
}
363374

364375
@Test // DATAMONGO-1444

0 commit comments

Comments
 (0)