Skip to content

Commit 43eed88

Browse files
refactor: Avoid concurrency issues during some writes and reads.
1 parent bdc718b commit 43eed88

File tree

2 files changed

+9
-9
lines changed

2 files changed

+9
-9
lines changed

src/main/java/org/springframework/data/neo4j/core/ReactiveNeo4jTemplate.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -395,7 +395,7 @@ <T, R> Flux<R> doSave(Iterable<R> instances, Class<T> domainType) {
395395
NestedRelationshipProcessingStateMachine stateMachine = new NestedRelationshipProcessingStateMachine(neo4jMappingContext);
396396
EntityFromDtoInstantiatingConverter<T> converter = new EntityFromDtoInstantiatingConverter<>(domainType, neo4jMappingContext);
397397
return Flux.fromIterable(instances)
398-
.flatMap(instance -> {
398+
.concatMap(instance -> {
399399
T domainObject = converter.convert(instance);
400400

401401
@SuppressWarnings("unchecked")
@@ -525,7 +525,7 @@ public <T, R> Flux<R> saveAllAs(Iterable<T> instances, Class<R> resultType) {
525525
Neo4jPersistentEntity<?> entityMetaData = neo4jMappingContext.getRequiredPersistentEntity(commonElementType);
526526
Neo4jPersistentProperty idProperty = entityMetaData.getIdProperty();
527527

528-
return savedInstances.flatMap(savedInstance -> {
528+
return savedInstances.concatMap(savedInstance -> {
529529
PersistentPropertyAccessor<T> propertyAccessor = entityMetaData.getPropertyAccessor(savedInstance);
530530
return findById(propertyAccessor.getProperty(idProperty), commonElementType);
531531
}).map(instance -> localProjectionFactory.createProjection(resultType, instance));
@@ -586,7 +586,7 @@ private <T> Flux<T> saveAllImpl(Iterable<T> instances, @Nullable Collection<Prop
586586
.all()
587587
.collectMap(m -> (Value) m.getT1(), m -> (Long) m.getT2());
588588
}).flatMapMany(idToInternalIdMapping -> Flux.fromIterable(entitiesToBeSaved)
589-
.flatMap(t -> {
589+
.concatMap(t -> {
590590
PersistentPropertyAccessor<T> propertyAccessor = entityMetaData.getPropertyAccessor(t.getT3());
591591
Neo4jPersistentProperty idProperty = entityMetaData.getRequiredIdProperty();
592592
Object id = convertIdValues(idProperty, propertyAccessor.getProperty(idProperty));
@@ -713,7 +713,7 @@ private Mono<NodesAndRelationshipsByIdStatementProvider> createNodesAndRelations
713713
Set<Long> processedRelationshipIds = ctx.get("processedRelationships");
714714
Set<Long> processedNodeIds = ctx.get("processedNodes");
715715
return Flux.fromIterable(entityMetaData.getRelationshipsInHierarchy(queryFragments::includeField))
716-
.flatMap(relationshipDescription -> {
716+
.concatMap(relationshipDescription -> {
717717

718718
Statement statement = cypherGenerator.prepareMatchOf(entityMetaData, relationshipDescription,
719719
queryFragments.getMatchOn(), queryFragments.getCondition())
@@ -781,7 +781,7 @@ private Flux<Tuple2<Collection<Long>, Collection<Long>>> iterateNextLevel(Collec
781781
return queryFragments.includeField(prepend);
782782
}
783783
))
784-
.flatMap(relDe -> {
784+
.concatMap(relDe -> {
785785
Node node = anyNode(Constants.NAME_OF_TYPED_ROOT_NODE.apply(target));
786786

787787
Statement statement = cypherGenerator
@@ -1193,7 +1193,7 @@ public Flux<T> getResults() {
11931193

11941194
return fetchSpec.all().switchOnFirst((signal, f) -> {
11951195
if (signal.hasValue() && preparedQuery.resultsHaveBeenAggregated()) {
1196-
return f.flatMap(nested -> Flux.fromIterable((Collection<T>) nested).distinct()).distinct();
1196+
return f.concatMap(nested -> Flux.fromIterable((Collection<T>) nested).distinct()).distinct();
11971197
}
11981198
return f;
11991199
});

src/main/java/org/springframework/data/neo4j/repository/support/SimpleReactiveNeo4jRepository.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ public Flux<T> findAllById(Iterable<ID> ids) {
8585

8686
@Override
8787
public Flux<T> findAllById(Publisher<ID> idStream) {
88-
return Flux.from(idStream).buffer().flatMap(this::findAllById);
88+
return Flux.from(idStream).buffer().concatMap(this::findAllById);
8989
}
9090

9191
@Override
@@ -135,7 +135,7 @@ public <S extends T> Flux<S> saveAll(Iterable<S> entities) {
135135
@Transactional
136136
public <S extends T> Flux<S> saveAll(Publisher<S> entityStream) {
137137

138-
return Flux.from(entityStream).flatMap(this::save);
138+
return Flux.from(entityStream).concatMap(this::save);
139139
}
140140

141141
/*
@@ -218,7 +218,7 @@ public Mono<Void> deleteAll(Iterable<? extends T> entities) {
218218
public Mono<Void> deleteAll(Publisher<? extends T> entitiesPublisher) {
219219

220220
Assert.notNull(entitiesPublisher, "The given Publisher of entities must not be null");
221-
return Flux.from(entitiesPublisher).flatMap(this::delete).then();
221+
return Flux.from(entitiesPublisher).concatMap(this::delete).then();
222222
}
223223

224224
/*

0 commit comments

Comments
 (0)