Skip to content

Commit 0c46e67

Browse files
committed
fix(routing): close pool on NotALeader to re-acquire leader connection (#270)
1 parent de6a404 commit 0c46e67

File tree

2 files changed

+48
-71
lines changed

2 files changed

+48
-71
lines changed

src/Bolt/Session.php

Lines changed: 48 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,8 @@
1616
use Exception;
1717
use Laudis\Neo4j\Common\GeneratorHelper;
1818
use Laudis\Neo4j\Common\Neo4jLogger;
19-
use Laudis\Neo4j\Common\TransactionHelper;
2019
use Laudis\Neo4j\Contracts\ConnectionPoolInterface;
20+
use Laudis\Neo4j\Contracts\CypherSequence;
2121
use Laudis\Neo4j\Contracts\SessionInterface;
2222
use Laudis\Neo4j\Contracts\TransactionInterface;
2323
use Laudis\Neo4j\Contracts\UnmanagedTransactionInterface;
@@ -41,6 +41,7 @@ final class Session implements SessionInterface
4141
{
4242
/** @psalm-readonly */
4343
private readonly BookmarkHolder $bookmarkHolder;
44+
private const ROLLBACK_CLASSIFICATIONS = ['ClientError', 'TransientError', 'DatabaseError'];
4445

4546
/**
4647
* @param ConnectionPool|Neo4jConnectionPool $pool
@@ -100,21 +101,59 @@ public function writeTransaction(callable $tsxHandler, ?TransactionConfiguration
100101
$this->getLogger()?->log(LogLevel::INFO, 'Beginning write transaction', ['config' => $config]);
101102
$config = $this->mergeTsxConfig($config);
102103

103-
return TransactionHelper::retry(
104-
fn () => $this->startTransaction($config, $this->config->withAccessMode(AccessMode::WRITE())),
105-
$tsxHandler
106-
);
104+
return $this->retry($tsxHandler, false, $config);
107105
}
108106

109107
public function readTransaction(callable $tsxHandler, ?TransactionConfiguration $config = null)
110108
{
111109
$this->getLogger()?->log(LogLevel::INFO, 'Beginning read transaction', ['config' => $config]);
112110
$config = $this->mergeTsxConfig($config);
113111

114-
return TransactionHelper::retry(
115-
fn () => $this->startTransaction($config, $this->config->withAccessMode(AccessMode::READ())),
116-
$tsxHandler
117-
);
112+
return $this->retry($tsxHandler, true, $config);
113+
}
114+
115+
/**
116+
* @template U
117+
*
118+
* @param callable(TransactionInterface):U $tsxHandler
119+
*
120+
* @return U
121+
*/
122+
private function retry(callable $tsxHandler, bool $read, TransactionConfiguration $config)
123+
{
124+
while (true) {
125+
$transaction = null;
126+
try {
127+
if ($read) {
128+
$transaction = $this->startTransaction($config, $this->config->withAccessMode(AccessMode::READ()));
129+
} else {
130+
$transaction = $this->startTransaction($config, $this->config->withAccessMode(AccessMode::WRITE()));
131+
}
132+
$tbr = $tsxHandler($transaction);
133+
self::triggerLazyResult($tbr);
134+
$transaction->commit();
135+
136+
return $tbr;
137+
} catch (Neo4jException $e) {
138+
if ($transaction && !in_array($e->getClassification(), self::ROLLBACK_CLASSIFICATIONS)) {
139+
$transaction->rollback();
140+
}
141+
142+
if ($e->getTitle() === 'NotALeader') {
143+
// By closing the pool, we force the connection to be re-acquired and the routing table to be refetched
144+
$this->pool->close();
145+
} elseif ($e->getClassification() !== 'TransientError') {
146+
throw $e;
147+
}
148+
}
149+
}
150+
}
151+
152+
private static function triggerLazyResult(mixed $tbr): void
153+
{
154+
if ($tbr instanceof CypherSequence) {
155+
$tbr->preload();
156+
}
118157
}
119158

120159
public function transaction(callable $tsxHandler, ?TransactionConfiguration $config = null)

src/Common/TransactionHelper.php

Lines changed: 0 additions & 62 deletions
This file was deleted.

0 commit comments

Comments
 (0)