1818import reactor .core .publisher .Flux ;
1919import reactor .core .publisher .Mono ;
2020import reactor .util .function .Tuple2 ;
21+ import reactor .util .function .Tuples ;
2122
23+ import java .util .Collection ;
24+ import java .util .Collections ;
25+ import java .util .HashSet ;
2226import java .util .Map ;
27+ import java .util .Set ;
28+ import java .util .concurrent .locks .ReentrantReadWriteLock ;
29+ import java .util .function .BiConsumer ;
2330import java .util .function .BiFunction ;
2431import java .util .function .Function ;
2532import java .util .function .Supplier ;
2633
34+ import org .neo4j .driver .Bookmark ;
2735import org .neo4j .driver .Driver ;
36+ import org .neo4j .driver .Query ;
2837import org .neo4j .driver .Record ;
38+ import org .neo4j .driver .Value ;
2939import org .neo4j .driver .reactive .RxQueryRunner ;
3040import org .neo4j .driver .reactive .RxResult ;
3141import org .neo4j .driver .reactive .RxSession ;
@@ -57,6 +67,10 @@ class DefaultReactiveNeo4jClient implements ReactiveNeo4jClient {
5767 private final ConversionService conversionService ;
5868 private final Neo4jPersistenceExceptionTranslator persistenceExceptionTranslator = new Neo4jPersistenceExceptionTranslator ();
5969
70+ // Basically a local bookmark manager
71+ private final Set <Bookmark > bookmarks = new HashSet <>();
72+ private final ReentrantReadWriteLock bookmarksLock = new ReentrantReadWriteLock ();
73+
6074 DefaultReactiveNeo4jClient (Driver driver ) {
6175
6276 this .driver = driver ;
@@ -65,28 +79,96 @@ class DefaultReactiveNeo4jClient implements ReactiveNeo4jClient {
6579 new Neo4jConversions ().registerConvertersIn ((ConverterRegistry ) conversionService );
6680 }
6781
68- Mono <RxStatementRunnerHolder > retrieveRxStatementRunnerHolder (String targetDatabase ) {
82+ Mono <RxQueryRunner > retrieveRxStatementRunnerHolder (String targetDatabase ) {
6983
7084 return ReactiveNeo4jTransactionManager .retrieveReactiveTransaction (driver , targetDatabase )
71- .map (rxTransaction -> new RxStatementRunnerHolder (rxTransaction , Mono .empty (), Mono .empty ())) //
72- .switchIfEmpty (Mono .using (() -> driver .rxSession (Neo4jTransactionUtils .defaultSessionConfig (targetDatabase )),
73- session -> Mono .from (session .beginTransaction ())
74- .map (tx -> new RxStatementRunnerHolder (tx , tx .commit (), tx .rollback ())),
75- RxSession ::close ));
85+ .map (RxQueryRunner .class ::cast )
86+ .zipWith (Mono .just (Collections .<Bookmark >emptySet ()))
87+ .switchIfEmpty (Mono .fromSupplier (() -> {
88+ ReentrantReadWriteLock .ReadLock lock = bookmarksLock .readLock ();
89+ try {
90+ lock .lock ();
91+ Set <Bookmark > lastBookmarks = new HashSet <>(bookmarks );
92+ return Tuples .of (driver .rxSession (Neo4jTransactionUtils .sessionConfig (false , lastBookmarks , targetDatabase )), lastBookmarks );
93+ } finally {
94+ lock .unlock ();
95+ }
96+ }))
97+ .map (t -> new DelegatingQueryRunner (t .getT1 (), t .getT2 (), (usedBookmarks , newBookmark ) -> {
98+ ReentrantReadWriteLock .WriteLock lock = bookmarksLock .writeLock ();
99+ try {
100+ lock .lock ();
101+ bookmarks .removeAll (usedBookmarks );
102+ bookmarks .add (newBookmark );
103+ } finally {
104+ lock .unlock ();
105+ }
106+ }));
107+ }
108+
109+ private static class DelegatingQueryRunner implements RxQueryRunner {
110+
111+ private final RxQueryRunner delegate ;
112+ private final Collection <Bookmark > usedBookmarks ;
113+ private final BiConsumer <Collection <Bookmark >, Bookmark > newBookmarkConsumer ;
114+
115+ private DelegatingQueryRunner (RxQueryRunner delegate , Collection <Bookmark > lastBookmarks , BiConsumer <Collection <Bookmark >, Bookmark > newBookmarkConsumer ) {
116+ this .delegate = delegate ;
117+ this .usedBookmarks = lastBookmarks ;
118+ this .newBookmarkConsumer = newBookmarkConsumer ;
119+ }
120+
121+ Mono <Void > close () {
122+
123+ // We're only going to close sessions we have acquired inside the client, not something that
124+ // has been retrieved from the tx manager.
125+ if (this .delegate instanceof RxSession ) {
126+ RxSession session = (RxSession ) this .delegate ;
127+ return Mono .fromDirect (session .close ()).then ().doOnSuccess (signal ->
128+ this .newBookmarkConsumer .accept (usedBookmarks , session .lastBookmark ()));
129+ }
130+
131+ return Mono .empty ();
132+ }
133+
134+ @ Override
135+ public RxResult run (String query , Value parameters ) {
136+ return delegate .run (query , parameters );
137+ }
138+
139+ @ Override
140+ public RxResult run (String query , Map <String , Object > parameters ) {
141+ return delegate .run (query , parameters );
142+ }
143+
144+ @ Override
145+ public RxResult run (String query , Record parameters ) {
146+ return delegate .run (query , parameters );
147+ }
148+
149+ @ Override
150+ public RxResult run (String query ) {
151+ return delegate .run (query );
152+ }
153+
154+ @ Override
155+ public RxResult run (Query query ) {
156+ return delegate .run (query );
157+ }
76158 }
77159
78160 <T > Mono <T > doInQueryRunnerForMono (final String targetDatabase , Function <RxQueryRunner , Mono <T >> func ) {
79161
80162 return Mono .usingWhen (retrieveRxStatementRunnerHolder (targetDatabase ),
81- holder -> func . apply ( holder . getRxQueryRunner ()), RxStatementRunnerHolder :: getCommit ,
82- ( holder , ex ) -> holder . getRollback (), RxStatementRunnerHolder :: getCommit );
163+ func :: apply ,
164+ runner -> (( DelegatingQueryRunner ) runner ). close () );
83165 }
84166
85167 <T > Flux <T > doInStatementRunnerForFlux (final String targetDatabase , Function <RxQueryRunner , Flux <T >> func ) {
86168
87169 return Flux .usingWhen (retrieveRxStatementRunnerHolder (targetDatabase ),
88- holder -> func . apply ( holder . getRxQueryRunner ()), RxStatementRunnerHolder :: getCommit ,
89- ( holder , ex ) -> holder . getRollback (), RxStatementRunnerHolder :: getCommit );
170+ func :: apply ,
171+ runner -> (( DelegatingQueryRunner ) runner ). close () );
90172 }
91173
92174 @ Override
0 commit comments