1515 */
1616package org .springframework .data .neo4j .core ;
1717
18+ import org .neo4j .driver .Bookmark ;
1819import org .neo4j .driver .Driver ;
20+ import org .neo4j .driver .Query ;
1921import org .neo4j .driver .Record ;
22+ import org .neo4j .driver .Value ;
2023import org .neo4j .driver .reactive .RxQueryRunner ;
2124import org .neo4j .driver .reactive .RxResult ;
2225import org .neo4j .driver .reactive .RxSession ;
2326import org .neo4j .driver .summary .ResultSummary ;
2427import org .neo4j .driver .types .TypeSystem ;
25- import org .reactivestreams .Publisher ;
2628import org .springframework .core .convert .ConversionService ;
2729import org .springframework .core .convert .converter .ConverterRegistry ;
2830import org .springframework .core .convert .support .DefaultConversionService ;
3537import reactor .core .publisher .Flux ;
3638import reactor .core .publisher .Mono ;
3739import reactor .util .function .Tuple2 ;
40+ import reactor .util .function .Tuples ;
3841
42+ import java .util .Collection ;
43+ import java .util .Collections ;
44+ import java .util .HashSet ;
3945import java .util .Map ;
46+ import java .util .Set ;
47+ import java .util .concurrent .locks .ReentrantReadWriteLock ;
48+ import java .util .function .BiConsumer ;
4049import java .util .function .BiFunction ;
4150import java .util .function .Function ;
4251import java .util .function .Supplier ;
@@ -57,6 +66,10 @@ class DefaultReactiveNeo4jClient implements ReactiveNeo4jClient {
5766 private final ConversionService conversionService ;
5867 private final Neo4jPersistenceExceptionTranslator persistenceExceptionTranslator = new Neo4jPersistenceExceptionTranslator ();
5968
69+ // Basically a local bookmark manager
70+ private final Set <Bookmark > bookmarks = new HashSet <>();
71+ private final ReentrantReadWriteLock bookmarksLock = new ReentrantReadWriteLock ();
72+
6073 DefaultReactiveNeo4jClient (Driver driver , @ Nullable ReactiveDatabaseSelectionProvider databaseSelectionProvider ) {
6174
6275 this .driver = driver ;
@@ -66,31 +79,97 @@ class DefaultReactiveNeo4jClient implements ReactiveNeo4jClient {
6679 new Neo4jConversions ().registerConvertersIn ((ConverterRegistry ) conversionService );
6780 }
6881
69- Mono <RxStatementRunnerHolder > retrieveRxStatementRunnerHolder (Mono <DatabaseSelection > databaseSelection ) {
82+ Mono <RxQueryRunner > retrieveRxStatementRunnerHolder (Mono <DatabaseSelection > databaseSelection ) {
7083
7184 return databaseSelection .flatMap (targetDatabase ->
7285 ReactiveNeo4jTransactionManager .retrieveReactiveTransaction (driver , targetDatabase .getValue ())
73- .map (rxTransaction -> new RxStatementRunnerHolder (rxTransaction , Mono .empty (), Mono .empty ())) //
74- .switchIfEmpty (Mono .using (
75- () -> driver .rxSession (Neo4jTransactionUtils .defaultSessionConfig (targetDatabase .getValue ())),
76- session -> Mono .from (session .beginTransaction ()).map (tx -> new RxStatementRunnerHolder (tx , tx .commit (), tx .rollback ())),
77- RxSession ::close )
78- )
79- );
86+ .map (RxQueryRunner .class ::cast )
87+ .zipWith (Mono .just (Collections .<Bookmark >emptySet ()))
88+ .switchIfEmpty (Mono .fromSupplier (() -> {
89+ ReentrantReadWriteLock .ReadLock lock = bookmarksLock .readLock ();
90+ try {
91+ lock .lock ();
92+ Set <Bookmark > lastBookmarks = new HashSet <>(bookmarks );
93+ return Tuples .of (driver .rxSession (Neo4jTransactionUtils .sessionConfig (false , lastBookmarks , targetDatabase .getValue ())), lastBookmarks );
94+ } finally {
95+ lock .unlock ();
96+ }
97+ })))
98+ .map (t -> new DelegatingQueryRunner (t .getT1 (), t .getT2 (), (usedBookmarks , newBookmark ) -> {
99+ ReentrantReadWriteLock .WriteLock lock = bookmarksLock .writeLock ();
100+ try {
101+ lock .lock ();
102+ bookmarks .removeAll (usedBookmarks );
103+ bookmarks .add (newBookmark );
104+ } finally {
105+ lock .unlock ();
106+ }
107+ }));
108+ }
109+
110+ private static class DelegatingQueryRunner implements RxQueryRunner {
111+
112+ private final RxQueryRunner delegate ;
113+ private final Collection <Bookmark > usedBookmarks ;
114+ private final BiConsumer <Collection <Bookmark >, Bookmark > newBookmarkConsumer ;
115+
116+ private DelegatingQueryRunner (RxQueryRunner delegate , Collection <Bookmark > lastBookmarks , BiConsumer <Collection <Bookmark >, Bookmark > newBookmarkConsumer ) {
117+ this .delegate = delegate ;
118+ this .usedBookmarks = lastBookmarks ;
119+ this .newBookmarkConsumer = newBookmarkConsumer ;
120+ }
121+
122+ Mono <Void > close () {
123+
124+ // We're only going to close sessions we have acquired inside the client, not something that
125+ // has been retrieved from the tx manager.
126+ if (this .delegate instanceof RxSession ) {
127+ RxSession session = (RxSession ) this .delegate ;
128+ return Mono .fromDirect (session .close ()).then ().doOnSuccess (signal ->
129+ this .newBookmarkConsumer .accept (usedBookmarks , session .lastBookmark ()));
130+ }
131+
132+ return Mono .empty ();
133+ }
134+
135+ @ Override
136+ public RxResult run (String query , Value parameters ) {
137+ return delegate .run (query , parameters );
138+ }
139+
140+ @ Override
141+ public RxResult run (String query , Map <String , Object > parameters ) {
142+ return delegate .run (query , parameters );
143+ }
144+
145+ @ Override
146+ public RxResult run (String query , Record parameters ) {
147+ return delegate .run (query , parameters );
148+ }
149+
150+ @ Override
151+ public RxResult run (String query ) {
152+ return delegate .run (query );
153+ }
154+
155+ @ Override
156+ public RxResult run (Query query ) {
157+ return delegate .run (query );
158+ }
80159 }
81160
82161 <T > Mono <T > doInQueryRunnerForMono (Mono <DatabaseSelection > databaseSelection , Function <RxQueryRunner , Mono <T >> func ) {
83162
84163 return Mono .usingWhen (retrieveRxStatementRunnerHolder (databaseSelection ),
85- holder -> func . apply ( holder . getRxQueryRunner ()), RxStatementRunnerHolder :: getCommit ,
86- ( holder , ex ) -> holder . getRollback (), RxStatementRunnerHolder :: getCommit );
164+ func :: apply ,
165+ runner -> (( DelegatingQueryRunner ) runner ). close () );
87166 }
88167
89168 <T > Flux <T > doInStatementRunnerForFlux (Mono <DatabaseSelection > databaseSelection , Function <RxQueryRunner , Flux <T >> func ) {
90169
91170 return Flux .usingWhen (retrieveRxStatementRunnerHolder (databaseSelection ),
92- holder -> func . apply ( holder . getRxQueryRunner ()), RxStatementRunnerHolder :: getCommit ,
93- ( holder , ex ) -> holder . getRollback (), RxStatementRunnerHolder :: getCommit );
171+ func :: apply ,
172+ runner -> (( DelegatingQueryRunner ) runner ). close () );
94173 }
95174
96175 @ Override
@@ -314,29 +393,4 @@ public Mono<T> run() {
314393 return doInQueryRunnerForMono (databaseSelection , callback );
315394 }
316395 }
317-
318- final class RxStatementRunnerHolder {
319- private final RxQueryRunner rxQueryRunner ;
320-
321- private final Publisher <Void > commit ;
322- private final Publisher <Void > rollback ;
323-
324- RxStatementRunnerHolder (RxQueryRunner rxQueryRunner , Publisher <Void > commit , Publisher <Void > rollback ) {
325- this .rxQueryRunner = rxQueryRunner ;
326- this .commit = commit ;
327- this .rollback = rollback ;
328- }
329-
330- public RxQueryRunner getRxQueryRunner () {
331- return rxQueryRunner ;
332- }
333-
334- public Publisher <Void > getCommit () {
335- return commit ;
336- }
337-
338- public Publisher <Void > getRollback () {
339- return rollback ;
340- }
341- }
342396}
0 commit comments