Skip to content

Commit e4f0348

Browse files
refactor: Avoid concurrency issues during some writes and reads.
1 parent b3dc32c commit e4f0348

File tree

2 files changed

+9
-9
lines changed

2 files changed

+9
-9
lines changed

src/main/java/org/springframework/data/neo4j/core/ReactiveNeo4jTemplate.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -401,7 +401,7 @@ <T, R> Flux<R> doSave(Iterable<R> instances, Class<T> domainType) {
401401
NestedRelationshipProcessingStateMachine stateMachine = new NestedRelationshipProcessingStateMachine(neo4jMappingContext);
402402
EntityFromDtoInstantiatingConverter<T> converter = new EntityFromDtoInstantiatingConverter<>(domainType, neo4jMappingContext);
403403
return Flux.fromIterable(instances)
404-
.flatMap(instance -> {
404+
.concatMap(instance -> {
405405
T domainObject = converter.convert(instance);
406406

407407
@SuppressWarnings("unchecked")
@@ -534,7 +534,7 @@ public <T, R> Flux<R> saveAllAs(Iterable<T> instances, Class<R> resultType) {
534534
Neo4jPersistentEntity<?> entityMetaData = neo4jMappingContext.getRequiredPersistentEntity(commonElementType);
535535
Neo4jPersistentProperty idProperty = entityMetaData.getIdProperty();
536536

537-
return savedInstances.flatMap(savedInstance -> {
537+
return savedInstances.concatMap(savedInstance -> {
538538
PersistentPropertyAccessor<T> propertyAccessor = entityMetaData.getPropertyAccessor(savedInstance);
539539
return findById(propertyAccessor.getProperty(idProperty), commonElementType);
540540
}).map(instance -> localProjectionFactory.createProjection(resultType, instance));
@@ -595,7 +595,7 @@ private <T> Flux<T> saveAllImpl(Iterable<T> instances, @Nullable Collection<Prop
595595
.all()
596596
.collectMap(m -> (Value) m.getT1(), m -> (String) m.getT2());
597597
}).flatMapMany(idToInternalIdMapping -> Flux.fromIterable(entitiesToBeSaved)
598-
.flatMap(t -> {
598+
.concatMap(t -> {
599599
PersistentPropertyAccessor<T> propertyAccessor = entityMetaData.getPropertyAccessor(t.getT3());
600600
Neo4jPersistentProperty idProperty = entityMetaData.getRequiredIdProperty();
601601
Object id = convertIdValues(idProperty, propertyAccessor.getProperty(idProperty));
@@ -722,7 +722,7 @@ private Mono<NodesAndRelationshipsByIdStatementProvider> createNodesAndRelations
722722
Set<String> processedRelationshipIds = ctx.get("processedRelationships");
723723
Set<String> processedNodeIds = ctx.get("processedNodes");
724724
return Flux.fromIterable(entityMetaData.getRelationshipsInHierarchy(queryFragments::includeField))
725-
.flatMap(relationshipDescription -> {
725+
.concatMap(relationshipDescription -> {
726726

727727
Statement statement = cypherGenerator.prepareMatchOf(entityMetaData, relationshipDescription,
728728
queryFragments.getMatchOn(), queryFragments.getCondition())
@@ -777,7 +777,7 @@ private Flux<Tuple2<Collection<String>, Collection<String>>> iterateNextLevel(Co
777777
return queryFragments.includeField(prepend);
778778
}
779779
))
780-
.flatMap(relDe -> {
780+
.concatMap(relDe -> {
781781
Node node = anyNode(Constants.NAME_OF_TYPED_ROOT_NODE.apply(target));
782782

783783
Statement statement = cypherGenerator
@@ -1188,7 +1188,7 @@ public Flux<T> getResults() {
11881188

11891189
return fetchSpec.all().switchOnFirst((signal, f) -> {
11901190
if (signal.hasValue() && preparedQuery.resultsHaveBeenAggregated()) {
1191-
return f.flatMap(nested -> Flux.fromIterable((Collection<T>) nested).distinct()).distinct();
1191+
return f.concatMap(nested -> Flux.fromIterable((Collection<T>) nested).distinct()).distinct();
11921192
}
11931193
return f;
11941194
});

src/main/java/org/springframework/data/neo4j/repository/support/SimpleReactiveNeo4jRepository.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ public Flux<T> findAllById(Iterable<ID> ids) {
8585

8686
@Override
8787
public Flux<T> findAllById(Publisher<ID> idStream) {
88-
return Flux.from(idStream).buffer().flatMap(this::findAllById);
88+
return Flux.from(idStream).buffer().concatMap(this::findAllById);
8989
}
9090

9191
@Override
@@ -135,7 +135,7 @@ public <S extends T> Flux<S> saveAll(Iterable<S> entities) {
135135
@Transactional
136136
public <S extends T> Flux<S> saveAll(Publisher<S> entityStream) {
137137

138-
return Flux.from(entityStream).flatMap(this::save);
138+
return Flux.from(entityStream).concatMap(this::save);
139139
}
140140

141141
/*
@@ -218,7 +218,7 @@ public Mono<Void> deleteAll(Iterable<? extends T> entities) {
218218
public Mono<Void> deleteAll(Publisher<? extends T> entitiesPublisher) {
219219

220220
Assert.notNull(entitiesPublisher, "The given Publisher of entities must not be null");
221-
return Flux.from(entitiesPublisher).flatMap(this::delete).then();
221+
return Flux.from(entitiesPublisher).concatMap(this::delete).then();
222222
}
223223

224224
/*

0 commit comments

Comments
 (0)