Skip to content

Commit 87d33c4

Browse files
GH-2238 - Expose the Neo4j-Client QueryRunner to be used with the Cypher-DSL 2021.2.1+.
Enhanced both imperative and reactive clients to return a query runner that integrates with ongoing Spring transactions to be used with the Cypher DSL. Closes #2238.
1 parent 5b3fb3e commit 87d33c4

File tree

7 files changed

+319
-7
lines changed

7 files changed

+319
-7
lines changed

src/main/java/org/springframework/data/neo4j/core/DefaultNeo4jClient.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,8 @@ class DefaultNeo4jClient implements Neo4jClient {
7979
new Neo4jConversions().registerConvertersIn((ConverterRegistry) conversionService);
8080
}
8181

82-
QueryRunner getQueryRunner(DatabaseSelection databaseSelection) {
82+
@Override
83+
public QueryRunner getQueryRunner(DatabaseSelection databaseSelection) {
8384

8485
String targetDatabase = databaseSelection.getValue();
8586
QueryRunner queryRunner = Neo4jTransactionManager.retrieveTransaction(driver, targetDatabase);

src/main/java/org/springframework/data/neo4j/core/DefaultReactiveNeo4jClient.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,8 @@ class DefaultReactiveNeo4jClient implements ReactiveNeo4jClient {
7979
new Neo4jConversions().registerConvertersIn((ConverterRegistry) conversionService);
8080
}
8181

82-
Mono<RxQueryRunner> retrieveRxStatementRunnerHolder(Mono<DatabaseSelection> databaseSelection) {
82+
@Override
83+
public Mono<RxQueryRunner> getQueryRunner(Mono<DatabaseSelection> databaseSelection) {
8384

8485
return databaseSelection.flatMap(targetDatabase ->
8586
ReactiveNeo4jTransactionManager.retrieveReactiveTransaction(driver, targetDatabase.getValue())
@@ -160,14 +161,14 @@ public RxResult run(Query query) {
160161

161162
<T> Mono<T> doInQueryRunnerForMono(Mono<DatabaseSelection> databaseSelection, Function<RxQueryRunner, Mono<T>> func) {
162163

163-
return Mono.usingWhen(retrieveRxStatementRunnerHolder(databaseSelection),
164+
return Mono.usingWhen(getQueryRunner(databaseSelection),
164165
func::apply,
165166
runner -> ((DelegatingQueryRunner) runner).close());
166167
}
167168

168169
<T> Flux<T> doInStatementRunnerForFlux(Mono<DatabaseSelection> databaseSelection, Function<RxQueryRunner, Flux<T>> func) {
169170

170-
return Flux.usingWhen(retrieveRxStatementRunnerHolder(databaseSelection),
171+
return Flux.usingWhen(getQueryRunner(databaseSelection),
171172
func::apply,
172173
runner -> ((DelegatingQueryRunner) runner).close());
173174
}

src/main/java/org/springframework/data/neo4j/core/Neo4jClient.java

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,6 @@
4242
@API(status = API.Status.STABLE, since = "6.0")
4343
public interface Neo4jClient {
4444

45-
// TODO Create examples how to use the callbacks etc. with Springs TransactionTemplate to deal with rollbacks etc.
46-
4745
LogAccessor cypherLog = new LogAccessor(LogFactory.getLog("org.springframework.data.neo4j.cypher"));
4846

4947
static Neo4jClient create(Driver driver) {
@@ -56,6 +54,27 @@ static Neo4jClient create(Driver driver, DatabaseSelectionProvider databaseSelec
5654
return new DefaultNeo4jClient(driver, databaseSelectionProvider);
5755
}
5856

57+
/**
58+
* @return A managed query runner
59+
* @see #getQueryRunner(DatabaseSelection)
60+
* @since 6.2
61+
*/
62+
default QueryRunner getQueryRunner() {
63+
return getQueryRunner(DatabaseSelection.undecided());
64+
}
65+
66+
/**
67+
* Retrieves a query runner that will participate in ongoing Spring transactions (either in declarative
68+
* (implicit via {@code @Transactional}) or in programmatically (explicit via transaction template) ones).
69+
* This runner can be used with the Cypher-DSL for example.
70+
* If the client cannot retrieve an ongoing Spring transaction, this runner will use auto-commit semantics.
71+
*
72+
* @param databaseSelection The target database.
73+
* @return A managed query runner
74+
* @since 6.2
75+
*/
76+
QueryRunner getQueryRunner(DatabaseSelection databaseSelection);
77+
5978
/**
6079
* Entrypoint for creating a new Cypher query. Doesn't matter at this point whether it's a match, merge, create or
6180
* removal of things.

src/main/java/org/springframework/data/neo4j/core/ReactiveNeo4jClient.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,27 @@ static ReactiveNeo4jClient create(Driver driver, ReactiveDatabaseSelectionProvid
5757
return new DefaultReactiveNeo4jClient(driver, databaseSelectionProvider);
5858
}
5959

60+
/**
61+
* @return A managed query runner
62+
* @see #getQueryRunner(Mono)
63+
* @since 6.2
64+
*/
65+
default Mono<RxQueryRunner> getQueryRunner() {
66+
return getQueryRunner(Mono.just(DatabaseSelection.undecided()));
67+
}
68+
69+
/**
70+
* Retrieves a query runner that will participate in ongoing Spring transactions (either in declarative
71+
* (implicit via {@code @Transactional}) or in programmatically (explicit via transaction template) ones).
72+
* This runner can be used with the Cypher-DSL for example.
73+
* If the client cannot retrieve an ongoing Spring transaction, this runner will use auto-commit semantics.
74+
*
75+
* @param databaseSelection The target database.
76+
* @return A managed query runner
77+
* @since 6.2
78+
*/
79+
Mono<RxQueryRunner> getQueryRunner(Mono<DatabaseSelection> databaseSelection);
80+
6081
/**
6182
* Entrypoint for creating a new Cypher query. Doesn't matter at this point whether it's a match, merge, create or
6283
* removal of things.
Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
/*
2+
* Copyright 2011-2021 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.data.neo4j.integration.imperative;
17+
18+
import static org.assertj.core.api.Assertions.assertThat;
19+
20+
import java.util.Collection;
21+
import java.util.Collections;
22+
import java.util.List;
23+
24+
import org.junit.jupiter.api.BeforeEach;
25+
import org.junit.jupiter.api.Test;
26+
import org.neo4j.cypherdsl.core.Cypher;
27+
import org.neo4j.cypherdsl.core.Node;
28+
import org.neo4j.cypherdsl.core.executables.ExecutableResultStatement;
29+
import org.neo4j.cypherdsl.core.executables.ExecutableStatement;
30+
import org.neo4j.driver.Driver;
31+
import org.neo4j.driver.Record;
32+
import org.neo4j.driver.Session;
33+
import org.neo4j.driver.Transaction;
34+
import org.springframework.beans.factory.annotation.Autowired;
35+
import org.springframework.context.annotation.Bean;
36+
import org.springframework.context.annotation.Configuration;
37+
import org.springframework.data.neo4j.config.AbstractNeo4jConfig;
38+
import org.springframework.data.neo4j.core.DatabaseSelectionProvider;
39+
import org.springframework.data.neo4j.core.Neo4jClient;
40+
import org.springframework.data.neo4j.core.transaction.Neo4jBookmarkManager;
41+
import org.springframework.data.neo4j.core.transaction.Neo4jTransactionManager;
42+
import org.springframework.data.neo4j.integration.shared.common.PersonWithAllConstructor;
43+
import org.springframework.data.neo4j.test.BookmarkCapture;
44+
import org.springframework.data.neo4j.test.Neo4jExtension.Neo4jConnectionSupport;
45+
import org.springframework.data.neo4j.test.Neo4jIntegrationTest;
46+
import org.springframework.transaction.PlatformTransactionManager;
47+
import org.springframework.transaction.annotation.EnableTransactionManagement;
48+
import org.springframework.transaction.support.TransactionTemplate;
49+
50+
/**
51+
* @author Michael J. Simons
52+
*/
53+
@Neo4jIntegrationTest
54+
class Neo4jClientIT {
55+
56+
protected static Neo4jConnectionSupport neo4jConnectionSupport;
57+
58+
@BeforeEach
59+
void setupData(@Autowired BookmarkCapture bookmarkCapture, @Autowired Driver driver) {
60+
61+
try (
62+
Session session = driver.session(bookmarkCapture.createSessionConfig());
63+
Transaction transaction = session.beginTransaction()
64+
) {
65+
transaction.run("MATCH (n) detach delete n");
66+
transaction.commit();
67+
}
68+
}
69+
70+
@Test // GH-2238
71+
void clientShouldIntegrateWithCypherDSL(@Autowired TransactionTemplate transactionTemplate,
72+
@Autowired Neo4jClient client,
73+
@Autowired BookmarkCapture bookmarkCapture) {
74+
75+
Node namedAnswer = Cypher.node("TheAnswer", Cypher.mapOf("value",
76+
Cypher.literalOf(23).multiply(Cypher.literalOf(2)).subtract(Cypher.literalOf(4)))).named("n");
77+
ExecutableResultStatement statement = ExecutableStatement.makeExecutable(Cypher.create(namedAnswer)
78+
.returning(namedAnswer)
79+
.build());
80+
81+
Long vanishedId = transactionTemplate.execute(transactionStatus -> {
82+
List<Record> records = statement.fetchWith(client.getQueryRunner());
83+
assertThat(records).hasSize(1)
84+
.first().extracting(r -> r.get("n").get("value").asLong()).isEqualTo(42L);
85+
86+
transactionStatus.setRollbackOnly();
87+
return records.get(0).get("n").asNode().id();
88+
});
89+
90+
// Make sure we actually interacted with the managed transaction (that had been rolled back)
91+
try (Session session = neo4jConnectionSupport.getDriver().session(bookmarkCapture.createSessionConfig())) {
92+
long cnt = session.run("MATCH (n) WHERE id(n) = $id RETURN count(n)",
93+
Collections.singletonMap("id", vanishedId)).single().get(0).asLong();
94+
assertThat(cnt).isEqualTo(0L);
95+
}
96+
}
97+
98+
@Configuration
99+
@EnableTransactionManagement
100+
static class Config extends AbstractNeo4jConfig {
101+
102+
@Bean
103+
@Override
104+
public Driver driver() {
105+
return neo4jConnectionSupport.getDriver();
106+
}
107+
108+
@Override // needed here because there is no implicit registration of entities upfront some methods under test
109+
protected Collection<String> getMappingBasePackages() {
110+
return Collections.singletonList(PersonWithAllConstructor.class.getPackage().getName());
111+
}
112+
113+
@Bean
114+
public BookmarkCapture bookmarkCapture() {
115+
return new BookmarkCapture();
116+
}
117+
118+
@Override
119+
public PlatformTransactionManager transactionManager(Driver driver,
120+
DatabaseSelectionProvider databaseNameProvider) {
121+
122+
BookmarkCapture bookmarkCapture = bookmarkCapture();
123+
return new Neo4jTransactionManager(driver, databaseNameProvider,
124+
Neo4jBookmarkManager.create(bookmarkCapture));
125+
}
126+
127+
@Bean
128+
public TransactionTemplate transactionTemplate(PlatformTransactionManager transactionManager) {
129+
return new TransactionTemplate(transactionManager);
130+
}
131+
}
132+
}

src/test/java/org/springframework/data/neo4j/integration/imperative/Neo4jTemplateIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ void setupData() {
9191

9292
try (
9393
Session session = driver.session(bookmarkCapture.createSessionConfig());
94-
Transaction transaction = session.beginTransaction();
94+
Transaction transaction = session.beginTransaction()
9595
) {
9696
transaction.run("MATCH (n) detach delete n");
9797

Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
/*
2+
* Copyright 2011-2021 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.data.neo4j.integration.reactive;
17+
18+
import static org.assertj.core.api.Assertions.assertThat;
19+
20+
import reactor.core.publisher.Flux;
21+
import reactor.test.StepVerifier;
22+
23+
import java.util.Collection;
24+
import java.util.Collections;
25+
import java.util.concurrent.atomic.AtomicLong;
26+
27+
import org.junit.jupiter.api.BeforeEach;
28+
import org.junit.jupiter.api.Test;
29+
import org.neo4j.cypherdsl.core.Cypher;
30+
import org.neo4j.cypherdsl.core.Node;
31+
import org.neo4j.cypherdsl.core.executables.ReactiveExecutableResultStatement;
32+
import org.neo4j.cypherdsl.core.executables.ReactiveExecutableStatement;
33+
import org.neo4j.driver.Driver;
34+
import org.neo4j.driver.Session;
35+
import org.neo4j.driver.Transaction;
36+
import org.springframework.beans.factory.annotation.Autowired;
37+
import org.springframework.context.annotation.Bean;
38+
import org.springframework.context.annotation.Configuration;
39+
import org.springframework.data.neo4j.config.AbstractReactiveNeo4jConfig;
40+
import org.springframework.data.neo4j.core.ReactiveDatabaseSelectionProvider;
41+
import org.springframework.data.neo4j.core.ReactiveNeo4jClient;
42+
import org.springframework.data.neo4j.core.transaction.Neo4jBookmarkManager;
43+
import org.springframework.data.neo4j.core.transaction.ReactiveNeo4jTransactionManager;
44+
import org.springframework.data.neo4j.integration.shared.common.PersonWithAllConstructor;
45+
import org.springframework.data.neo4j.test.BookmarkCapture;
46+
import org.springframework.data.neo4j.test.Neo4jExtension.Neo4jConnectionSupport;
47+
import org.springframework.data.neo4j.test.Neo4jIntegrationTest;
48+
import org.springframework.transaction.ReactiveTransactionManager;
49+
import org.springframework.transaction.annotation.EnableTransactionManagement;
50+
import org.springframework.transaction.reactive.TransactionalOperator;
51+
52+
/**
53+
* @author Michael J. Simons
54+
*/
55+
@Neo4jIntegrationTest
56+
class ReactiveNeo4jClientIT {
57+
58+
protected static Neo4jConnectionSupport neo4jConnectionSupport;
59+
60+
@BeforeEach
61+
void setupData(@Autowired BookmarkCapture bookmarkCapture, @Autowired Driver driver) {
62+
63+
try (
64+
Session session = driver.session(bookmarkCapture.createSessionConfig());
65+
Transaction transaction = session.beginTransaction()
66+
) {
67+
transaction.run("MATCH (n) detach delete n");
68+
transaction.commit();
69+
}
70+
}
71+
72+
@Test // GH-2238
73+
void clientShouldIntegrateWithCypherDSL(@Autowired TransactionalOperator transactionalOperator,
74+
@Autowired ReactiveNeo4jClient client,
75+
@Autowired BookmarkCapture bookmarkCapture) {
76+
77+
Node namedAnswer = Cypher.node("TheAnswer", Cypher.mapOf("value",
78+
Cypher.literalOf(23).multiply(Cypher.literalOf(2)).subtract(Cypher.literalOf(4)))).named("n");
79+
ReactiveExecutableResultStatement statement = ReactiveExecutableStatement.makeExecutable(
80+
Cypher.create(namedAnswer)
81+
.returning(namedAnswer)
82+
.build());
83+
84+
AtomicLong vanishedId = new AtomicLong();
85+
transactionalOperator.execute(transaction -> {
86+
Flux<Long> inner = client.getQueryRunner()
87+
.flatMapMany(statement::fetchWith)
88+
.doOnNext(r -> vanishedId.set(r.get("n").asNode().id()))
89+
.map(record -> record.get("n").get("value").asLong());
90+
91+
transaction.setRollbackOnly();
92+
return inner;
93+
}).as(StepVerifier::create)
94+
.expectNext(42L)
95+
.verifyComplete();
96+
97+
// Make sure we actually interacted with the managed transaction (that had been rolled back)
98+
try (Session session = neo4jConnectionSupport.getDriver().session(bookmarkCapture.createSessionConfig())) {
99+
long cnt = session.run("MATCH (n) WHERE id(n) = $id RETURN count(n)",
100+
Collections.singletonMap("id", vanishedId.get())).single().get(0).asLong();
101+
assertThat(cnt).isEqualTo(0L);
102+
}
103+
}
104+
105+
@Configuration
106+
@EnableTransactionManagement
107+
static class Config extends AbstractReactiveNeo4jConfig {
108+
109+
@Bean
110+
public Driver driver() {
111+
return neo4jConnectionSupport.getDriver();
112+
}
113+
114+
@Override // needed here because there is no implicit registration of entities upfront some methods under test
115+
protected Collection<String> getMappingBasePackages() {
116+
return Collections.singletonList(PersonWithAllConstructor.class.getPackage().getName());
117+
}
118+
119+
@Bean
120+
public BookmarkCapture bookmarkCapture() {
121+
return new BookmarkCapture();
122+
}
123+
124+
@Override
125+
public ReactiveTransactionManager reactiveTransactionManager(Driver driver,
126+
ReactiveDatabaseSelectionProvider databaseNameProvider) {
127+
128+
BookmarkCapture bookmarkCapture = bookmarkCapture();
129+
return new ReactiveNeo4jTransactionManager(driver, databaseNameProvider,
130+
Neo4jBookmarkManager.create(bookmarkCapture));
131+
}
132+
133+
@Bean
134+
public TransactionalOperator transactionalOperator(ReactiveTransactionManager transactionManager) {
135+
return TransactionalOperator.create(transactionManager);
136+
}
137+
}
138+
}

0 commit comments

Comments
 (0)