15
15
*/
16
16
package org .springframework .data .neo4j .core ;
17
17
18
+ import org .neo4j .driver .Bookmark ;
18
19
import org .neo4j .driver .Driver ;
20
+ import org .neo4j .driver .Query ;
19
21
import org .neo4j .driver .Record ;
22
+ import org .neo4j .driver .Value ;
20
23
import org .neo4j .driver .reactive .RxQueryRunner ;
21
24
import org .neo4j .driver .reactive .RxResult ;
22
25
import org .neo4j .driver .reactive .RxSession ;
23
26
import org .neo4j .driver .summary .ResultSummary ;
24
27
import org .neo4j .driver .types .TypeSystem ;
25
- import org .reactivestreams .Publisher ;
26
28
import org .springframework .core .convert .ConversionService ;
27
29
import org .springframework .core .convert .converter .ConverterRegistry ;
28
30
import org .springframework .core .convert .support .DefaultConversionService ;
35
37
import reactor .core .publisher .Flux ;
36
38
import reactor .core .publisher .Mono ;
37
39
import reactor .util .function .Tuple2 ;
40
+ import reactor .util .function .Tuples ;
38
41
42
+ import java .util .Collection ;
43
+ import java .util .Collections ;
44
+ import java .util .HashSet ;
39
45
import java .util .Map ;
46
+ import java .util .Set ;
47
+ import java .util .concurrent .locks .ReentrantReadWriteLock ;
48
+ import java .util .function .BiConsumer ;
40
49
import java .util .function .BiFunction ;
41
50
import java .util .function .Function ;
42
51
import java .util .function .Supplier ;
@@ -57,6 +66,10 @@ class DefaultReactiveNeo4jClient implements ReactiveNeo4jClient {
57
66
private final ConversionService conversionService ;
58
67
private final Neo4jPersistenceExceptionTranslator persistenceExceptionTranslator = new Neo4jPersistenceExceptionTranslator ();
59
68
69
+ // Basically a local bookmark manager
70
+ private final Set <Bookmark > bookmarks = new HashSet <>();
71
+ private final ReentrantReadWriteLock bookmarksLock = new ReentrantReadWriteLock ();
72
+
60
73
DefaultReactiveNeo4jClient (Driver driver , @ Nullable ReactiveDatabaseSelectionProvider databaseSelectionProvider ) {
61
74
62
75
this .driver = driver ;
@@ -66,31 +79,97 @@ class DefaultReactiveNeo4jClient implements ReactiveNeo4jClient {
66
79
new Neo4jConversions ().registerConvertersIn ((ConverterRegistry ) conversionService );
67
80
}
68
81
69
- Mono <RxStatementRunnerHolder > retrieveRxStatementRunnerHolder (Mono <DatabaseSelection > databaseSelection ) {
82
+ Mono <RxQueryRunner > retrieveRxStatementRunnerHolder (Mono <DatabaseSelection > databaseSelection ) {
70
83
71
84
return databaseSelection .flatMap (targetDatabase ->
72
85
ReactiveNeo4jTransactionManager .retrieveReactiveTransaction (driver , targetDatabase .getValue ())
73
- .map (rxTransaction -> new RxStatementRunnerHolder (rxTransaction , Mono .empty (), Mono .empty ())) //
74
- .switchIfEmpty (Mono .using (
75
- () -> driver .rxSession (Neo4jTransactionUtils .defaultSessionConfig (targetDatabase .getValue ())),
76
- session -> Mono .from (session .beginTransaction ()).map (tx -> new RxStatementRunnerHolder (tx , tx .commit (), tx .rollback ())),
77
- RxSession ::close )
78
- )
79
- );
86
+ .map (RxQueryRunner .class ::cast )
87
+ .zipWith (Mono .just (Collections .<Bookmark >emptySet ()))
88
+ .switchIfEmpty (Mono .fromSupplier (() -> {
89
+ ReentrantReadWriteLock .ReadLock lock = bookmarksLock .readLock ();
90
+ try {
91
+ lock .lock ();
92
+ Set <Bookmark > lastBookmarks = new HashSet <>(bookmarks );
93
+ return Tuples .of (driver .rxSession (Neo4jTransactionUtils .sessionConfig (false , lastBookmarks , targetDatabase .getValue ())), lastBookmarks );
94
+ } finally {
95
+ lock .unlock ();
96
+ }
97
+ })))
98
+ .map (t -> new DelegatingQueryRunner (t .getT1 (), t .getT2 (), (usedBookmarks , newBookmark ) -> {
99
+ ReentrantReadWriteLock .WriteLock lock = bookmarksLock .writeLock ();
100
+ try {
101
+ lock .lock ();
102
+ bookmarks .removeAll (usedBookmarks );
103
+ bookmarks .add (newBookmark );
104
+ } finally {
105
+ lock .unlock ();
106
+ }
107
+ }));
108
+ }
109
+
110
+ private static class DelegatingQueryRunner implements RxQueryRunner {
111
+
112
+ private final RxQueryRunner delegate ;
113
+ private final Collection <Bookmark > usedBookmarks ;
114
+ private final BiConsumer <Collection <Bookmark >, Bookmark > newBookmarkConsumer ;
115
+
116
+ private DelegatingQueryRunner (RxQueryRunner delegate , Collection <Bookmark > lastBookmarks , BiConsumer <Collection <Bookmark >, Bookmark > newBookmarkConsumer ) {
117
+ this .delegate = delegate ;
118
+ this .usedBookmarks = lastBookmarks ;
119
+ this .newBookmarkConsumer = newBookmarkConsumer ;
120
+ }
121
+
122
+ Mono <Void > close () {
123
+
124
+ // We're only going to close sessions we have acquired inside the client, not something that
125
+ // has been retrieved from the tx manager.
126
+ if (this .delegate instanceof RxSession ) {
127
+ RxSession session = (RxSession ) this .delegate ;
128
+ return Mono .fromDirect (session .close ()).then ().doOnSuccess (signal ->
129
+ this .newBookmarkConsumer .accept (usedBookmarks , session .lastBookmark ()));
130
+ }
131
+
132
+ return Mono .empty ();
133
+ }
134
+
135
+ @ Override
136
+ public RxResult run (String query , Value parameters ) {
137
+ return delegate .run (query , parameters );
138
+ }
139
+
140
+ @ Override
141
+ public RxResult run (String query , Map <String , Object > parameters ) {
142
+ return delegate .run (query , parameters );
143
+ }
144
+
145
+ @ Override
146
+ public RxResult run (String query , Record parameters ) {
147
+ return delegate .run (query , parameters );
148
+ }
149
+
150
+ @ Override
151
+ public RxResult run (String query ) {
152
+ return delegate .run (query );
153
+ }
154
+
155
+ @ Override
156
+ public RxResult run (Query query ) {
157
+ return delegate .run (query );
158
+ }
80
159
}
81
160
82
161
<T > Mono <T > doInQueryRunnerForMono (Mono <DatabaseSelection > databaseSelection , Function <RxQueryRunner , Mono <T >> func ) {
83
162
84
163
return Mono .usingWhen (retrieveRxStatementRunnerHolder (databaseSelection ),
85
- holder -> func . apply ( holder . getRxQueryRunner ()), RxStatementRunnerHolder :: getCommit ,
86
- ( holder , ex ) -> holder . getRollback (), RxStatementRunnerHolder :: getCommit );
164
+ func :: apply ,
165
+ runner -> (( DelegatingQueryRunner ) runner ). close () );
87
166
}
88
167
89
168
<T > Flux <T > doInStatementRunnerForFlux (Mono <DatabaseSelection > databaseSelection , Function <RxQueryRunner , Flux <T >> func ) {
90
169
91
170
return Flux .usingWhen (retrieveRxStatementRunnerHolder (databaseSelection ),
92
- holder -> func . apply ( holder . getRxQueryRunner ()), RxStatementRunnerHolder :: getCommit ,
93
- ( holder , ex ) -> holder . getRollback (), RxStatementRunnerHolder :: getCommit );
171
+ func :: apply ,
172
+ runner -> (( DelegatingQueryRunner ) runner ). close () );
94
173
}
95
174
96
175
@ Override
@@ -314,29 +393,4 @@ public Mono<T> run() {
314
393
return doInQueryRunnerForMono (databaseSelection , callback );
315
394
}
316
395
}
317
-
318
- static final class RxStatementRunnerHolder {
319
- private final RxQueryRunner rxQueryRunner ;
320
-
321
- private final Publisher <Void > commit ;
322
- private final Publisher <Void > rollback ;
323
-
324
- RxStatementRunnerHolder (RxQueryRunner rxQueryRunner , Publisher <Void > commit , Publisher <Void > rollback ) {
325
- this .rxQueryRunner = rxQueryRunner ;
326
- this .commit = commit ;
327
- this .rollback = rollback ;
328
- }
329
-
330
- public RxQueryRunner getRxQueryRunner () {
331
- return rxQueryRunner ;
332
- }
333
-
334
- public Publisher <Void > getCommit () {
335
- return commit ;
336
- }
337
-
338
- public Publisher <Void > getRollback () {
339
- return rollback ;
340
- }
341
- }
342
396
}
0 commit comments