@@ -64,6 +64,7 @@ class BoltConnection implements ConnectionInterface
64
64
* @var list<WeakReference<CypherList>>
65
65
*/
66
66
private array $ subscribedResults = [];
67
+ private bool $ inTransaction = false ;
67
68
68
69
/**
69
70
* @return array{0: V4_4|V5|V5_1|V5_2|V5_3|V5_4|null, 1: Connection}
@@ -206,21 +207,27 @@ public function reset(): void
206
207
$ this ->subscribedResults = [];
207
208
}
208
209
210
+ private function prepareForBegin (): void
211
+ {
212
+ if (in_array ($ this ->getServerState (), ['STREAMING ' , 'TX_STREAMING ' ], true )) {
213
+ $ this ->discardUnconsumedResults ();
214
+ }
215
+ }
216
+
209
217
/**
210
218
* Begins a transaction.
211
219
*
212
220
* Any of the preconditioned states are: 'READY', 'INTERRUPTED'.
213
221
*
214
- * @param iterable <string, scalar|array|null>|null $txMetaData
222
+ * @param array <string, scalar|array|null>|null $txMetaData
215
223
*/
216
- public function begin (?string $ database , ?float $ timeout , BookmarkHolder $ holder , ?iterable $ txMetaData ): void
224
+ public function begin (?string $ database , ?float $ timeout , BookmarkHolder $ holder , ?array $ txMetaData ): void
217
225
{
218
- $ this ->consumeResults ();
219
-
220
- $ extra = $ this ->buildRunExtra ($ database , $ timeout , $ holder , AccessMode::WRITE (), $ txMetaData );
226
+ $ extra = $ this ->buildRunExtra ($ database , $ timeout , $ holder , null , $ txMetaData , true );
221
227
$ message = $ this ->messageFactory ->createBeginMessage ($ extra );
222
228
$ response = $ message ->send ()->getResponse ();
223
229
$ this ->assertNoFailure ($ response );
230
+ $ this ->inTransaction = true ;
224
231
}
225
232
226
233
/**
@@ -253,7 +260,11 @@ public function run(
253
260
?AccessMode $ mode ,
254
261
?iterable $ tsxMetadata ,
255
262
): array {
256
- $ extra = $ this ->buildRunExtra ($ database , $ timeout , $ holder , $ mode , $ tsxMetadata );
263
+ if ($ this ->isInTransaction ()) {
264
+ $ extra = [];
265
+ } else {
266
+ $ extra = $ this ->buildRunExtra ($ database , $ timeout , $ holder , $ mode , $ tsxMetadata , false );
267
+ }
257
268
$ message = $ this ->messageFactory ->createRunMessage ($ text , $ parameters , $ extra );
258
269
$ response = $ message ->send ()->getResponse ();
259
270
$ this ->assertNoFailure ($ response );
@@ -321,7 +332,6 @@ public function close(): void
321
332
if ($ this ->isStreaming ()) {
322
333
$ this ->discardUnconsumedResults ();
323
334
}
324
-
325
335
$ message = $ this ->messageFactory ->createGoodbyeMessage ();
326
336
$ message ->send ();
327
337
@@ -331,7 +341,7 @@ public function close(): void
331
341
}
332
342
}
333
343
334
- private function buildRunExtra (?string $ database , ?float $ timeout , BookmarkHolder $ holder , ?AccessMode $ mode , ?iterable $ metadata ): array
344
+ private function buildRunExtra (?string $ database , ?float $ timeout , BookmarkHolder $ holder , ?AccessMode $ mode , ?iterable $ metadata, bool $ forBegin = false ): array
335
345
{
336
346
$ extra = [];
337
347
if ($ database !== null ) {
@@ -341,18 +351,26 @@ private function buildRunExtra(?string $database, ?float $timeout, BookmarkHolde
341
351
$ extra ['tx_timeout ' ] = (int ) ($ timeout * 1000 );
342
352
}
343
353
344
- if (!$ holder ->getBookmark ()->isEmpty ()) {
354
+ $ bookmarks = $ holder ->getBookmark ()->values ();
355
+ if (!empty ($ bookmarks )) {
345
356
$ extra ['bookmarks ' ] = $ holder ->getBookmark ()->values ();
346
357
}
347
358
348
- if ($ mode ) {
349
- $ extra ['mode ' ] = AccessMode::WRITE () === $ mode ? 'w ' : 'r ' ;
350
- }
359
+ if ($ forBegin ) {
360
+ $ bookmarks = $ holder ->getBookmark ()->values ();
361
+ if (!empty ($ bookmarks )) {
362
+ $ extra ['bookmarks ' ] = $ bookmarks ;
363
+ }
351
364
352
- if ($ metadata !== null ) {
353
- $ metadataArray = $ metadata instanceof Traversable ? iterator_to_array ($ metadata ) : $ metadata ;
354
- if (count ($ metadataArray ) > 0 ) {
355
- $ extra ['tx_metadata ' ] = $ metadataArray ;
365
+ if ($ mode !== null ) {
366
+ $ extra ['mode ' ] = $ mode === AccessMode::WRITE () ? 'w ' : 'r ' ;
367
+ }
368
+
369
+ if ($ metadata !== null ) {
370
+ $ metadataArray = $ metadata instanceof Traversable ? iterator_to_array ($ metadata ) : $ metadata ;
371
+ if (!empty ($ metadataArray )) {
372
+ $ extra ['tx_metadata ' ] = $ metadataArray ;
373
+ }
356
374
}
357
375
}
358
376
@@ -362,11 +380,13 @@ private function buildRunExtra(?string $database, ?float $timeout, BookmarkHolde
362
380
private function buildResultExtra (?int $ fetchSize , ?int $ qid ): array
363
381
{
364
382
$ extra = [];
383
+ $ fetchSize = 1000 ;
384
+ /** @psalm-suppress RedundantCondition */
365
385
if ($ fetchSize !== null ) {
366
386
$ extra ['n ' ] = $ fetchSize ;
367
387
}
368
388
369
- if ($ qid !== null ) {
389
+ if ($ qid !== null && $ qid >= 0 ) {
370
390
$ extra ['qid ' ] = $ qid ;
371
391
}
372
392
@@ -412,23 +432,38 @@ private function assertNoFailure(Response $response): void
412
432
public function discardUnconsumedResults (): void
413
433
{
414
434
$ this ->logger ?->log(LogLevel::DEBUG , 'Discarding unconsumed results ' );
415
-
416
435
$ this ->subscribedResults = array_values (array_filter (
417
436
$ this ->subscribedResults ,
418
437
static fn (WeakReference $ ref ): bool => $ ref ->get () !== null
419
438
));
420
439
421
- if (!empty ($ this ->subscribedResults )) {
422
- try {
440
+ if (empty ($ this ->subscribedResults )) {
441
+ $ this ->logger ?->log(LogLevel::DEBUG , 'No unconsumed results to discard ' );
442
+
443
+ return ;
444
+ }
445
+
446
+ $ state = $ this ->getServerState ();
447
+ $ this ->logger ?->log(LogLevel::DEBUG , "Server state before discard: {$ state }" );
448
+
449
+ try {
450
+ if (in_array ($ state , ['STREAMING ' , 'TX_STREAMING ' ], true )) {
423
451
$ this ->discard (null );
424
452
$ this ->logger ?->log(LogLevel::DEBUG , 'Sent DISCARD ALL for unconsumed results ' );
425
- } catch (Throwable $ e ) {
426
- $ this ->logger ?->log(LogLevel::ERROR , 'Failed to discard results ' , [
427
- 'exception ' => $ e ->getMessage (),
428
- ]);
453
+ } else {
454
+ $ this ->logger ?->log(LogLevel::DEBUG , 'Skipping discard - server not in streaming state ' );
429
455
}
456
+ } catch (Throwable $ e ) {
457
+ $ this ->logger ?->log(LogLevel::ERROR , 'Failed to discard results ' , [
458
+ 'exception ' => $ e ->getMessage (),
459
+ ]);
430
460
}
431
461
432
462
$ this ->subscribedResults = [];
433
463
}
464
+
465
+ private function isInTransaction (): bool
466
+ {
467
+ return $ this ->inTransaction ;
468
+ }
434
469
}
0 commit comments