23
23
use Bolt \protocol \V5_3 ;
24
24
use Bolt \protocol \V5_4 ;
25
25
use Exception ;
26
+ use Laudis \Neo4j \Bolt \Messages \BoltGoodbyeMessage ;
26
27
use Laudis \Neo4j \Common \ConnectionConfiguration ;
27
28
use Laudis \Neo4j \Common \Neo4jLogger ;
28
29
use Laudis \Neo4j \Contracts \AuthenticateInterface ;
37
38
use Laudis \Neo4j \Types \CypherList ;
38
39
use Psr \Http \Message \UriInterface ;
39
40
use Psr \Log \LogLevel ;
41
+ use Throwable ;
40
42
use WeakReference ;
41
43
42
44
/**
46
48
*/
47
49
class BoltConnection implements ConnectionInterface
48
50
{
51
+ private BoltMessageFactory $ messageFactory ;
52
+
49
53
/**
50
54
* @note We are using references to "subscribed results" to maintain backwards compatibility and try and strike
51
55
* a balance between performance and ease of use.
@@ -80,7 +84,9 @@ public function __construct(
80
84
/** @psalm-readonly */
81
85
private readonly ConnectionConfiguration $ config ,
82
86
private readonly ?Neo4jLogger $ logger ,
83
- ) {}
87
+ ) {
88
+ $ this ->messageFactory = new BoltMessageFactory ($ this ->protocol (), $ this ->logger );
89
+ }
84
90
85
91
public function getEncryptionLevel (): string
86
92
{
@@ -194,10 +200,8 @@ public function consumeResults(): void
194
200
*/
195
201
public function reset (): void
196
202
{
197
- $ this ->logger ?->log(LogLevel::DEBUG , 'RESET ' );
198
- $ response = $ this ->protocol ()
199
- ->reset ()
200
- ->getResponse ();
203
+ $ message = $ this ->messageFactory ->createResetMessage ();
204
+ $ response = $ message ->send ()->getResponse ();
201
205
$ this ->assertNoFailure ($ response );
202
206
$ this ->subscribedResults = [];
203
207
}
@@ -212,10 +216,8 @@ public function begin(?string $database, ?float $timeout, BookmarkHolder $holder
212
216
$ this ->consumeResults ();
213
217
214
218
$ extra = $ this ->buildRunExtra ($ database , $ timeout , $ holder , AccessMode::WRITE ());
215
- $ this ->logger ?->log(LogLevel::DEBUG , 'BEGIN ' , $ extra );
216
- $ response = $ this ->protocol ()
217
- ->begin ($ extra )
218
- ->getResponse ();
219
+ $ message = $ this ->messageFactory ->createBeginMessage ($ extra );
220
+ $ response = $ message ->send ()->getResponse ();
219
221
$ this ->assertNoFailure ($ response );
220
222
}
221
223
@@ -227,10 +229,9 @@ public function begin(?string $database, ?float $timeout, BookmarkHolder $holder
227
229
public function discard (?int $ qid ): void
228
230
{
229
231
$ extra = $ this ->buildResultExtra (null , $ qid );
230
- $ this ->logger ?->log(LogLevel::DEBUG , 'DISCARD ' , $ extra );
231
- $ response = $ this ->protocol ()
232
- ->discard ($ extra )
233
- ->getResponse ();
232
+
233
+ $ message = $ this ->messageFactory ->createDiscardMessage ($ extra );
234
+ $ response = $ message ->send ()->getResponse ();
234
235
$ this ->assertNoFailure ($ response );
235
236
}
236
237
@@ -247,14 +248,13 @@ public function run(
247
248
?string $ database ,
248
249
?float $ timeout ,
249
250
BookmarkHolder $ holder ,
250
- ?AccessMode $ mode
251
+ ?AccessMode $ mode,
251
252
): array {
252
253
$ extra = $ this ->buildRunExtra ($ database , $ timeout , $ holder , $ mode );
253
- $ this ->logger ?->log(LogLevel::DEBUG , 'RUN ' , $ extra );
254
- $ response = $ this ->protocol ()
255
- ->run ($ text , $ parameters , $ extra )
256
- ->getResponse ();
254
+ $ message = $ this ->messageFactory ->createRunMessage ($ text , $ parameters , $ extra );
255
+ $ response = $ message ->send ()->getResponse ();
257
256
$ this ->assertNoFailure ($ response );
257
+
258
258
/** @var BoltMeta */
259
259
return $ response ->content ;
260
260
}
@@ -266,12 +266,10 @@ public function run(
266
266
*/
267
267
public function commit (): void
268
268
{
269
- $ this ->logger ?->log(LogLevel::DEBUG , 'COMMIT ' );
270
269
$ this ->consumeResults ();
271
270
272
- $ response = $ this ->protocol ()
273
- ->commit ()
274
- ->getResponse ();
271
+ $ message = $ this ->messageFactory ->createCommitMessage ();
272
+ $ response = $ message ->send ()->getResponse ();
275
273
$ this ->assertNoFailure ($ response );
276
274
}
277
275
@@ -282,12 +280,10 @@ public function commit(): void
282
280
*/
283
281
public function rollback (): void
284
282
{
285
- $ this ->logger ?->log(LogLevel::DEBUG , 'ROLLBACK ' );
286
283
$ this ->consumeResults ();
287
284
288
- $ response = $ this ->protocol ()
289
- ->rollback ()
290
- ->getResponse ();
285
+ $ message = $ this ->messageFactory ->createRollbackMessage ();
286
+ $ response = $ message ->send ()->getResponse ();
291
287
$ this ->assertNoFailure ($ response );
292
288
}
293
289
@@ -313,8 +309,10 @@ public function pull(?int $qid, ?int $fetchSize): array
313
309
$ this ->logger ?->log(LogLevel::DEBUG , 'PULL ' , $ extra );
314
310
315
311
$ tbr = [];
312
+ $ message = $ this ->messageFactory ->createPullMessage ($ extra );
313
+
316
314
/** @var Response $response */
317
- foreach ($ this -> protocol ()-> pull ( $ extra )->getResponses () as $ response ) {
315
+ foreach ($ message -> send ( )->getResponses () as $ response ) {
318
316
$ this ->assertNoFailure ($ response );
319
317
$ tbr [] = $ response ->content ;
320
318
}
@@ -336,12 +334,15 @@ public function close(): void
336
334
$ this ->consumeResults ();
337
335
}
338
336
339
- $ this ->logger ?->log(LogLevel::DEBUG , 'GOODBYE ' );
340
- $ this ->protocol ()->goodbye ();
337
+ $ message = new BoltGoodbyeMessage (
338
+ $ this ->protocol (),
339
+ $ this ->logger
340
+ );
341
+ $ message ->send ();
341
342
342
343
unset($ this ->boltProtocol ); // has to be set to null as the sockets don't recover nicely contrary to what the underlying code might lead you to believe;
343
344
}
344
- } catch (\ Throwable ) {
345
+ } catch (Throwable ) {
345
346
}
346
347
}
347
348
@@ -403,7 +404,8 @@ private function assertNoFailure(Response $response): void
403
404
{
404
405
if ($ response ->signature === Signature::FAILURE ) {
405
406
$ this ->logger ?->log(LogLevel::ERROR , 'FAILURE ' );
406
- $ resetResponse = $ this ->protocol ()->reset ()->getResponse ();
407
+ $ message = $ this ->messageFactory ->createResetMessage ();
408
+ $ resetResponse = $ message ->send ()->getResponse ();
407
409
$ this ->subscribedResults = [];
408
410
if ($ resetResponse ->signature === Signature::FAILURE ) {
409
411
throw new Neo4jException ([Neo4jError::fromBoltResponse ($ resetResponse ), Neo4jError::fromBoltResponse ($ response )]);
0 commit comments