13
13
14
14
namespace Laudis \Neo4j \Common ;
15
15
16
+ use BadMethodCallException ;
16
17
use Bolt \protocol \V3 ;
17
18
use Bolt \protocol \V4 ;
18
19
use Laudis \Neo4j \Bolt \ServerStateTransition ;
23
24
use Laudis \Neo4j \Databags \DriverConfiguration ;
24
25
use Laudis \Neo4j \Enum \AccessMode ;
25
26
use Laudis \Neo4j \Enum \ConnectionProtocol ;
27
+ use LogicException ;
26
28
use Psr \Http \Message \UriInterface ;
27
29
use RuntimeException ;
28
30
use Throwable ;
@@ -41,7 +43,7 @@ final class BoltConnection implements ConnectionInterface
41
43
private BoltFactory $ factory ;
42
44
43
45
private int $ ownerCount = 0 ;
44
- private string $ expectedState = 'READY ' ;
46
+ private ? string $ expectedState = 'READY ' ;
45
47
/** @var list<callable(list<ServerStateTransition>): void> */
46
48
private array $ beforeTransitionEventListeners = [];
47
49
/** @var list<callable(ServerStateTransition): void> */
@@ -132,6 +134,10 @@ public function isOpen(): bool
132
134
133
135
public function open (): void
134
136
{
137
+ if ($ this ->boltProtocol !== null ) {
138
+ throw new BadMethodCallException ('Cannot open a connection that is already open ' );
139
+ }
140
+
135
141
$ this ->boltProtocol = $ this ->factory ->build ()[0 ];
136
142
}
137
143
@@ -142,17 +148,24 @@ public function setTimeout(float $timeout): void
142
148
143
149
public function close (): void
144
150
{
145
- if ($ this ->ownerCount === 0 ) {
146
- $ this ->boltProtocol = null ;
147
- }
151
+ $ this ->handleMessage ('GOODBYE ' , function () {
152
+ $ this ->boltProtocol ->goodbye ();
153
+ });
154
+
155
+ $ this ->boltProtocol = null ;
156
+ $ this ->beforeTransitionEventListeners = [];
157
+ $ this ->afterTransitionEventListeners = [];
148
158
}
149
159
150
160
public function reset (): void
151
161
{
152
- if ( $ this ->boltProtocol ) {
162
+ $ this ->handleMessage ( ' RESET ' , function ( ) {
153
163
$ this ->boltProtocol ->reset ();
154
- $ this ->boltProtocol = $ this ->factory ->build ()[0 ];
155
- }
164
+ });
165
+
166
+ $ this ->boltProtocol = $ this ->factory ->build ()[0 ];
167
+ $ this ->beforeTransitionEventListeners = [];
168
+ $ this ->afterTransitionEventListeners = [];
156
169
}
157
170
158
171
/**
@@ -161,95 +174,109 @@ public function reset(): void
161
174
*/
162
175
public function begin (?string $ database , ?float $ timeout ): void
163
176
{
164
- if ($ this ->boltProtocol === null ) {
165
- throw new RuntimeException ('Cannot begin on a closed connection ' );
177
+ $ this ->handleMessage ('BEGIN ' , function () use ($ database , $ timeout ) {
178
+ $ this ->boltProtocol ->begin ($ this ->buildExtra ($ database , $ timeout ));
179
+ });
180
+ }
181
+
182
+ /**
183
+ * @template T
184
+ *
185
+ * @param string $message the bolt message we are trying to send
186
+ * @param callable(): T $action the actual action to send the message
187
+ *
188
+ * @return T
189
+ */
190
+ private function handleMessage (string $ message , $ action )
191
+ {
192
+ if ($ this ->boltProtocol === null || $ this ->expectedState === null ) {
193
+ throw new LogicException ("Cannot send \"$ message \" message on a closed connection " );
166
194
}
167
195
168
- $ transitions = $ this ->transitions ->getAvailableTransitionsForStateAndMessage ($ this ->expectedState , 'BEGIN ' );
196
+ // First, we fetch the available transitions for the given state of the server
197
+ // and the intended message to send.
198
+ $ transitions = $ this ->transitions ->getAvailableTransitionsForStateAndMessage ($ this ->expectedState , $ message );
169
199
200
+ // We notify the event listeners before sending the message.
201
+ // Since we don't know whether it will fail or not, we need to send all
202
+ // possible transitions.
170
203
$ this ->triggerBeforeEvents ($ transitions );
171
204
172
205
try {
173
- $ this ->boltProtocol ->begin ($ this ->buildExtra ($ database , $ timeout ));
174
- $ transition = $ this ->getSuccessTransition ($ transitions );
206
+ $ tbr = $ action ();
175
207
176
- $ this ->expectedState = $ transition ->getNewState () ?? 'READY ' ;
208
+ // If no exceptions are thrown, we know the underlying bolt library
209
+ // received a success response, making sure we can use the success transition
210
+ $ transition = $ this ->getSuccessTransition ($ transitions );
211
+ $ this ->expectedState = $ transition ->getNewState ();
177
212
} catch (Throwable $ e ) {
213
+ // If an an exception is thrown, we know the underlying bolt library
214
+ // received a failure response, making sure we can use the failure transition
215
+ // and propagate the exception further
178
216
$ transition = $ this ->getFailureTransition ($ transitions );
179
- $ this ->expectedState = $ transition ->getNewState () ?? ' READY ' ;
217
+ $ this ->expectedState = $ transition ->getNewState ();
180
218
181
219
throw $ e ;
182
220
} finally {
183
- if ( isset ( $ transition )) {
184
- $ this -> triggerAfterEvents ( $ transition );
185
- }
221
+ // In the end, all listeners need to be able to handle the state transition,
222
+ // regardless of the call stack.
223
+ $ this -> triggerAfterEvents ( $ transition );
186
224
}
225
+
226
+ return $ tbr ;
187
227
}
188
228
189
229
/**
190
230
* @return BoltMeta
191
231
*/
192
232
public function run (string $ text , array $ parameters , ?string $ database , ?float $ timeout ): array
193
233
{
194
- if ($ this ->boltProtocol === null ) {
195
- throw new RuntimeException ('Cannot run on a closed connection ' );
196
- }
197
-
198
- /** @var BoltMeta */
199
- return $ this ->boltProtocol ->run ($ text , $ parameters , $ this ->buildExtra ($ database , $ timeout ));
234
+ return $ this ->handleMessage ('RUN ' , function () use ($ text , $ parameters , $ database , $ timeout ) {
235
+ return $ this ->boltProtocol ->run ($ text , $ parameters , $ this ->buildExtra ($ database , $ timeout ));
236
+ });
200
237
}
201
238
202
239
public function commit (): void
203
240
{
204
- if ($ this ->boltProtocol === null ) {
205
- throw new RuntimeException ('Cannot commit on a closed connection ' );
206
- }
207
-
208
- $ this ->boltProtocol ->commit ();
241
+ $ this ->handleMessage ('COMMIT ' , fn () => $ this ->boltProtocol ->commit ());
209
242
}
210
243
211
244
public function rollback (): void
212
245
{
213
- if ($ this ->boltProtocol === null ) {
214
- throw new RuntimeException ('Cannot commit on a closed connection ' );
215
- }
216
-
217
- $ this ->boltProtocol ->rollback ();
246
+ $ this ->handleMessage ('ROLLBACK ' , fn () => $ this ->boltProtocol ->commit ());
218
247
}
219
248
220
249
/**
221
250
* @return non-empty-list<list>
222
251
*/
223
252
public function pull (?int $ qid , ?int $ fetchSize ): array
224
253
{
225
- if ($ this ->boltProtocol === null ) {
226
- throw new RuntimeException ('Cannot pull on a closed connection ' );
227
- }
254
+ return $ this ->handleMessage ('PULL ' , function () use ($ qid , $ fetchSize ) {
255
+ $ extra = [];
256
+ if ($ fetchSize ) {
257
+ $ extra ['n ' ] = $ fetchSize ;
258
+ }
228
259
229
- $ extra = [];
230
- if ($ fetchSize ) {
231
- $ extra ['n ' ] = $ fetchSize ;
232
- }
260
+ if ($ qid ) {
261
+ $ extra ['qid ' ] = $ qid ;
262
+ }
233
263
234
- if ($ qid ) {
235
- $ extra ['qid ' ] = $ qid ;
236
- }
264
+ if (!$ this ->boltProtocol instanceof V4 ) {
265
+ /** @var non-empty-list<list> */
266
+ return $ this ->boltProtocol ->pullAll ($ extra );
267
+ }
237
268
238
- if (!$ this ->boltProtocol instanceof V4 ) {
239
269
/** @var non-empty-list<list> */
240
- return $ this ->boltProtocol ->pullAll ($ extra );
241
- }
242
-
243
- /** @var non-empty-list<list> */
244
- return $ this ->boltProtocol ->pull ($ extra );
270
+ return $ this ->boltProtocol ->pull ($ extra );
271
+ });
245
272
}
246
273
247
274
/**
248
275
* @psalm-mutation-free
249
276
*/
250
277
public function getDriverConfiguration (): DriverConfiguration
251
278
{
252
- return $ this ->driverConfiguration ;
279
+ return $ this ->config -> getDriverConfiguration () ;
253
280
}
254
281
255
282
public function __destruct ()
@@ -297,10 +324,13 @@ private function bindAfterTransitionEventListener($listener): void
297
324
$ this ->afterTransitionEventListeners [] = $ listener ;
298
325
}
299
326
327
+ public function getExpectedState (): ?string
328
+ {
329
+ return $ this ->expectedState ;
330
+ }
331
+
300
332
/**
301
333
* @param list<ServerStateTransition> $states
302
- *
303
- * @return ServerStateTransition
304
334
*/
305
335
private function getFailureTransition (array $ states ): ServerStateTransition
306
336
{
@@ -309,8 +339,6 @@ private function getFailureTransition(array $states): ServerStateTransition
309
339
310
340
/**
311
341
* @param list<ServerStateTransition> $states
312
- *
313
- * @return ServerStateTransition
314
342
*/
315
343
private function getSuccessTransition (array $ states ): ServerStateTransition
316
344
{
@@ -319,8 +347,6 @@ private function getSuccessTransition(array $states): ServerStateTransition
319
347
320
348
/**
321
349
* @param list<ServerStateTransition> $states
322
- *
323
- * @return ServerStateTransition
324
350
*/
325
351
private function getTransitionForResponse (array $ states , string $ response ): ServerStateTransition
326
352
{
0 commit comments