From 0c46e677771d861a78a8fd52b66efab7951f3fd9 Mon Sep 17 00:00:00 2001 From: p123-stack Date: Thu, 21 Aug 2025 13:37:52 +0530 Subject: [PATCH] fix(routing): close pool on NotALeader to re-acquire leader connection (#270) --- src/Bolt/Session.php | 57 ++++++++++++++++++++++++----- src/Common/TransactionHelper.php | 62 -------------------------------- 2 files changed, 48 insertions(+), 71 deletions(-) delete mode 100644 src/Common/TransactionHelper.php diff --git a/src/Bolt/Session.php b/src/Bolt/Session.php index c0557f27..8b161fa0 100644 --- a/src/Bolt/Session.php +++ b/src/Bolt/Session.php @@ -16,8 +16,8 @@ use Exception; use Laudis\Neo4j\Common\GeneratorHelper; use Laudis\Neo4j\Common\Neo4jLogger; -use Laudis\Neo4j\Common\TransactionHelper; use Laudis\Neo4j\Contracts\ConnectionPoolInterface; +use Laudis\Neo4j\Contracts\CypherSequence; use Laudis\Neo4j\Contracts\SessionInterface; use Laudis\Neo4j\Contracts\TransactionInterface; use Laudis\Neo4j\Contracts\UnmanagedTransactionInterface; @@ -41,6 +41,7 @@ final class Session implements SessionInterface { /** @psalm-readonly */ private readonly BookmarkHolder $bookmarkHolder; + private const ROLLBACK_CLASSIFICATIONS = ['ClientError', 'TransientError', 'DatabaseError']; /** * @param ConnectionPool|Neo4jConnectionPool $pool @@ -100,10 +101,7 @@ public function writeTransaction(callable $tsxHandler, ?TransactionConfiguration $this->getLogger()?->log(LogLevel::INFO, 'Beginning write transaction', ['config' => $config]); $config = $this->mergeTsxConfig($config); - return TransactionHelper::retry( - fn () => $this->startTransaction($config, $this->config->withAccessMode(AccessMode::WRITE())), - $tsxHandler - ); + return $this->retry($tsxHandler, false, $config); } public function readTransaction(callable $tsxHandler, ?TransactionConfiguration $config = null) @@ -111,10 +109,51 @@ public function readTransaction(callable $tsxHandler, ?TransactionConfiguration $this->getLogger()?->log(LogLevel::INFO, 'Beginning read transaction', ['config' => $config]); $config = $this->mergeTsxConfig($config); - return TransactionHelper::retry( - fn () => $this->startTransaction($config, $this->config->withAccessMode(AccessMode::READ())), - $tsxHandler - ); + return $this->retry($tsxHandler, true, $config); + } + + /** + * @template U + * + * @param callable(TransactionInterface):U $tsxHandler + * + * @return U + */ + private function retry(callable $tsxHandler, bool $read, TransactionConfiguration $config) + { + while (true) { + $transaction = null; + try { + if ($read) { + $transaction = $this->startTransaction($config, $this->config->withAccessMode(AccessMode::READ())); + } else { + $transaction = $this->startTransaction($config, $this->config->withAccessMode(AccessMode::WRITE())); + } + $tbr = $tsxHandler($transaction); + self::triggerLazyResult($tbr); + $transaction->commit(); + + return $tbr; + } catch (Neo4jException $e) { + if ($transaction && !in_array($e->getClassification(), self::ROLLBACK_CLASSIFICATIONS)) { + $transaction->rollback(); + } + + if ($e->getTitle() === 'NotALeader') { + // By closing the pool, we force the connection to be re-acquired and the routing table to be refetched + $this->pool->close(); + } elseif ($e->getClassification() !== 'TransientError') { + throw $e; + } + } + } + } + + private static function triggerLazyResult(mixed $tbr): void + { + if ($tbr instanceof CypherSequence) { + $tbr->preload(); + } } public function transaction(callable $tsxHandler, ?TransactionConfiguration $config = null) diff --git a/src/Common/TransactionHelper.php b/src/Common/TransactionHelper.php deleted file mode 100644 index 83c7e086..00000000 --- a/src/Common/TransactionHelper.php +++ /dev/null @@ -1,62 +0,0 @@ - - * - * For the full copyright and license information, please view the LICENSE - * file that was distributed with this source code. - */ - -namespace Laudis\Neo4j\Common; - -use Laudis\Neo4j\Contracts\CypherSequence; -use Laudis\Neo4j\Contracts\TransactionInterface; -use Laudis\Neo4j\Contracts\UnmanagedTransactionInterface; -use Laudis\Neo4j\Exception\Neo4jException; - -final class TransactionHelper -{ - public const ROLLBACK_CLASSIFICATIONS = ['ClientError', 'TransientError', 'DatabaseError']; - - /** - * @template U - * - * @param callable():UnmanagedTransactionInterface $tsxFactory - * @param callable(TransactionInterface):U $tsxHandler - * - * @return U - */ - public static function retry(callable $tsxFactory, callable $tsxHandler) - { - while (true) { - $transaction = null; - try { - $transaction = $tsxFactory(); - $tbr = $tsxHandler($transaction); - self::triggerLazyResult($tbr); - $transaction->commit(); - - return $tbr; - } catch (Neo4jException $e) { - if ($transaction && !in_array($e->getClassification(), self::ROLLBACK_CLASSIFICATIONS)) { - $transaction->rollback(); - } - - if ($e->getClassification() !== 'TransientError') { - throw $e; - } - } - } - } - - private static function triggerLazyResult(mixed $tbr): void - { - if ($tbr instanceof CypherSequence) { - $tbr->preload(); - } - } -}