Skip to content

Commit 976497d

Browse files
committed
fixed configuration_hints testkit tests
1 parent 6d8b5f5 commit 976497d

File tree

7 files changed

+287
-13
lines changed

7 files changed

+287
-13
lines changed

src/Bolt/Session.php

Lines changed: 109 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
use Laudis\Neo4j\Neo4j\Neo4jConnectionPool;
3434
use Laudis\Neo4j\Types\CypherList;
3535
use Psr\Log\LogLevel;
36+
use Throwable;
3637

3738
/**
3839
* A session using bolt connections.
@@ -73,8 +74,56 @@ public function runStatements(iterable $statements, ?TransactionConfiguration $c
7374

7475
$this->getLogger()?->log(LogLevel::INFO, 'Running statements', ['statements' => $statements]);
7576
$config = $this->mergeTsxConfig($config);
77+
7678
foreach ($statements as $statement) {
77-
$tbr[] = $this->beginInstantTransaction($this->config, $config)->runStatement($statement);
79+
// Wrap in retry logic for connection errors
80+
$retries = 0;
81+
$maxRetries = 3;
82+
83+
while ($retries < $maxRetries) {
84+
try {
85+
$tbr[] = $this->beginInstantTransaction($this->config, $config)->runStatement($statement);
86+
break; // Success, exit retry loop
87+
} catch (Neo4jException $e) {
88+
if ($this->shouldClearRoutingTable($e)) {
89+
$this->getLogger()?->log(LogLevel::WARNING, 'Connection error in instant transaction, retrying', [
90+
'error' => $e->getMessage(),
91+
'retry' => $retries + 1,
92+
]);
93+
94+
if ($this->pool instanceof Neo4jConnectionPool) {
95+
$this->pool->clearRoutingTable($this->config);
96+
}
97+
$this->pool->close();
98+
99+
++$retries;
100+
if ($retries >= $maxRetries) {
101+
throw $e;
102+
}
103+
} else {
104+
throw $e;
105+
}
106+
} catch (Throwable $e) {
107+
if ($this->isConnectionError($e)) {
108+
$this->getLogger()?->log(LogLevel::WARNING, 'Connection error in instant transaction, retrying', [
109+
'error' => $e->getMessage(),
110+
'retry' => $retries + 1,
111+
]);
112+
113+
if ($this->pool instanceof Neo4jConnectionPool) {
114+
$this->pool->clearRoutingTable($this->config);
115+
}
116+
$this->pool->close();
117+
118+
++$retries;
119+
if ($retries >= $maxRetries) {
120+
throw $e;
121+
}
122+
} else {
123+
throw $e;
124+
}
125+
}
126+
}
78127
}
79128

80129
return new CypherList($tbr);
@@ -141,16 +190,74 @@ private function retry(callable $tsxHandler, bool $read, TransactionConfiguratio
141190
$transaction->rollback();
142191
}
143192

144-
if ($e->getTitle() === 'NotALeader') {
193+
// ADD THIS SECTION - Handle connection timeouts and routing failures
194+
if ($e->getTitle() === 'NotALeader'
195+
|| $e->getNeo4jCode() === 'Neo.ClientError.Cluster.NotALeader'
196+
|| $this->isConnectionError($e)) {
197+
// Clear routing table before closing pool to force fresh ROUTE request on retry
198+
if ($this->pool instanceof Neo4jConnectionPool) {
199+
$this->pool->clearRoutingTable($this->config);
200+
}
145201
// By closing the pool, we force the connection to be re-acquired and the routing table to be refetched
146202
$this->pool->close();
147203
} elseif ($e->getClassification() !== 'TransientError') {
148204
throw $e;
149205
}
206+
} catch (Exception $e) {
207+
if ($this->isConnectionError($e)) {
208+
if ($this->pool instanceof Neo4jConnectionPool) {
209+
$this->pool->clearRoutingTable($this->config);
210+
}
211+
$this->pool->close();
212+
} else {
213+
throw $e;
214+
}
150215
}
151216
}
152217
}
153218

219+
/**
220+
* Check if the exception is a connection-related error.
221+
*/
222+
private function isConnectionError(Throwable $e): bool
223+
{
224+
$message = strtolower($e->getMessage());
225+
226+
// Check for common connection error messages
227+
if (str_contains($message, 'interrupted system call')
228+
|| str_contains($message, 'broken pipe')
229+
|| str_contains($message, 'connection reset')
230+
|| str_contains($message, 'connection timeout')
231+
|| str_contains($message, 'connection closed')) {
232+
return true;
233+
}
234+
235+
// Check for Neo4jException-specific codes
236+
if ($e instanceof Neo4jException) {
237+
return $e->getNeo4jCode() === 'Neo.ClientError.Cluster.NotALeader';
238+
}
239+
240+
return false;
241+
}
242+
243+
/**
244+
* Check if the exception should trigger a routing table clear.
245+
*/
246+
private function shouldClearRoutingTable(Neo4jException $e): bool
247+
{
248+
$message = strtolower($e->getMessage());
249+
$title = $e->getTitle();
250+
251+
// Clear routing table for timeout, connection, and cluster errors
252+
return str_contains($message, 'interrupted system call')
253+
|| str_contains($message, 'broken pipe')
254+
|| str_contains($message, 'connection reset')
255+
|| str_contains($message, 'connection timeout')
256+
|| str_contains($message, 'connection closed')
257+
|| $e->getNeo4jCode() === 'Neo.ClientError.Cluster.NotALeader'
258+
|| $title === 'NotALeader';
259+
}
260+
154261
private static function triggerLazyResult(mixed $tbr): void
155262
{
156263
if ($tbr instanceof CypherSequence) {

src/Common/SysVSemaphore.php

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,11 @@
2929

3030
class SysVSemaphore implements SemaphoreInterface
3131
{
32+
/**
33+
* @psalm-suppress UndefinedClass
34+
*/
3235
private function __construct(
33-
private readonly \SysvSemaphore $semaphore,
36+
private readonly mixed $semaphore,
3437
) {
3538
}
3639

src/Neo4j/Neo4jConnectionPool.php

Lines changed: 107 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,8 @@ public function acquire(SessionConfiguration $config): Generator
134134
$latestError = null;
135135

136136
if ($table == null) {
137+
$this->getLogger()?->log(LogLevel::DEBUG, 'Routing table not found in cache, fetching new routing table');
138+
137139
$addresses = $this->getAddresses($this->data->getUri()->getHost());
138140
foreach ($addresses as $address) {
139141
$triedAddresses[] = $address;
@@ -150,8 +152,18 @@ public function acquire(SessionConfiguration $config): Generator
150152
*/
151153
$connection = GeneratorHelper::getReturnFromGenerator($pool->acquire($config));
152154
$table = $this->routingTable($connection, $config);
155+
156+
$this->getLogger()?->log(LogLevel::DEBUG, 'Successfully fetched routing table', [
157+
'ttl' => $table->getTtl(),
158+
'leaders' => $table->getWithRole(RoutingRoles::LEADER()),
159+
'followers' => $table->getWithRole(RoutingRoles::FOLLOWER()),
160+
'routers' => $table->getWithRole(RoutingRoles::ROUTE()),
161+
]);
153162
} catch (ConnectException $e) {
154-
// todo - once client side logging is implemented it must be conveyed here.
163+
$this->getLogger()?->log(LogLevel::WARNING, 'Failed to connect to address', [
164+
'address' => $address,
165+
'error' => $e->getMessage(),
166+
]);
155167
$latestError = $e;
156168
continue; // We continue if something is wrong with the current server
157169
}
@@ -174,6 +186,11 @@ public function acquire(SessionConfiguration $config): Generator
174186
$server = $server->withScheme($this->data->getUri()->getScheme());
175187
}
176188

189+
$this->getLogger()?->log(LogLevel::DEBUG, 'Acquiring connection from server', [
190+
'server' => (string) $server,
191+
'access_mode' => $config->getAccessMode()?->getValue(),
192+
]);
193+
177194
return $this->createOrGetPool($this->data->getUri()->getHost(), $server)->acquire($config);
178195
}
179196

@@ -182,6 +199,89 @@ public function getLogger(): ?Neo4jLogger
182199
return $this->logger;
183200
}
184201

202+
/**
203+
* Get the current routing table from cache for the given session configuration.
204+
*
205+
* @return RoutingTable|null The cached routing table, or null if not yet initialized
206+
*/
207+
public function getRoutingTable(SessionConfiguration $config): ?RoutingTable
208+
{
209+
$key = $this->createKey($this->data, $config);
210+
/** @var RoutingTable|null $table */
211+
$table = $this->cache->get($key);
212+
213+
return $table;
214+
}
215+
216+
/**
217+
* Clear the cached routing table for the given session configuration.
218+
* This forces a new routing table to be fetched on the next acquire() call.
219+
*/
220+
public function clearRoutingTable(SessionConfiguration $config): void
221+
{
222+
$key = $this->createKey($this->data, $config);
223+
$deleted = $this->cache->delete($key);
224+
225+
$this->getLogger()?->log(LogLevel::INFO, 'Cleared routing table from cache', [
226+
'key' => $key,
227+
'deleted' => $deleted,
228+
]);
229+
}
230+
231+
/**
232+
* Remove a failed server from the routing table.
233+
* This removes the server from all roles (leader, follower, router) and updates the cache.
234+
*
235+
* @param SessionConfiguration $config The session configuration
236+
* @param string $serverAddress The address of the failed server (e.g., "172.18.0.3:9010")
237+
*/
238+
public function removeFailedServer(SessionConfiguration $config, string $serverAddress): void
239+
{
240+
$key = $this->createKey($this->data, $config);
241+
/** @var RoutingTable|null $table */
242+
$table = $this->cache->get($key);
243+
244+
if ($table !== null) {
245+
$this->getLogger()?->log(LogLevel::WARNING, 'Removing failed server from routing table', [
246+
'server' => $serverAddress,
247+
]);
248+
249+
// Remove the server and update cache
250+
$updatedTable = $table->removeServer($serverAddress);
251+
252+
// Only update cache if the table actually changed
253+
if ($updatedTable !== $table) {
254+
$this->cache->set($key, $updatedTable, $updatedTable->getTtl());
255+
256+
$this->getLogger()?->log(LogLevel::INFO, 'Updated routing table after removing failed server', [
257+
'server' => $serverAddress,
258+
'remaining_leaders' => $updatedTable->getWithRole(RoutingRoles::LEADER()),
259+
'remaining_followers' => $updatedTable->getWithRole(RoutingRoles::FOLLOWER()),
260+
'remaining_routers' => $updatedTable->getWithRole(RoutingRoles::ROUTE()),
261+
]);
262+
}
263+
}
264+
}
265+
266+
/**
267+
* Check if a server exists in the routing table.
268+
*
269+
* @param SessionConfiguration $config The session configuration
270+
* @param string $serverAddress The address of the server to check
271+
*
272+
* @return bool True if the server exists in the routing table, false otherwise
273+
*/
274+
public function hasServer(SessionConfiguration $config, string $serverAddress): bool
275+
{
276+
$table = $this->getRoutingTable($config);
277+
278+
if ($table === null) {
279+
return false;
280+
}
281+
282+
return $table->hasServer($serverAddress);
283+
}
284+
185285
/**
186286
* @throws Exception
187287
*/
@@ -193,6 +293,10 @@ private function getNextServer(RoutingTable $table, ?AccessMode $mode): Uri
193293
$servers = $table->getWithRole(RoutingRoles::FOLLOWER());
194294
}
195295

296+
if (count($servers) === 0) {
297+
throw new RuntimeException(sprintf('No servers available for access mode: %s', $mode?->getValue() ?? 'WRITE'));
298+
}
299+
196300
return Uri::create($servers[random_int(0, count($servers) - 1)]);
197301
}
198302

@@ -252,6 +356,8 @@ private function createKey(ConnectionRequestData $data, ?SessionConfiguration $c
252356

253357
public function close(): void
254358
{
359+
$this->getLogger()?->log(LogLevel::INFO, 'Closing all connection pools');
360+
255361
foreach (self::$pools as $pool) {
256362
$pool->close();
257363
}

src/Neo4j/RoutingTable.php

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,4 +60,51 @@ public function getWithRole(?RoutingRoles $role = null): array
6060

6161
return array_values(array_unique($tbr));
6262
}
63+
64+
/**
65+
* Remove a server from the routing table.
66+
*
67+
* @param string $serverAddress The address to remove
68+
*
69+
* @return RoutingTable A new routing table with the server removed
70+
*/
71+
public function removeServer(string $serverAddress): RoutingTable
72+
{
73+
/** @var list<array{addresses: list<string>, role: string}> $updatedServers */
74+
$updatedServers = [];
75+
76+
foreach ($this->servers as $server) {
77+
$updatedAddresses = array_filter(
78+
$server['addresses'],
79+
static fn (string $address): bool => $address !== $serverAddress
80+
);
81+
82+
if (!empty($updatedAddresses)) {
83+
$updatedServers[] = [
84+
'addresses' => array_values($updatedAddresses),
85+
'role' => $server['role'],
86+
];
87+
}
88+
}
89+
90+
return new self($updatedServers, $this->ttl);
91+
}
92+
93+
/**
94+
* Check if a server exists in the routing table.
95+
*
96+
* @param string $serverAddress The address to check
97+
*
98+
* @return bool True if the server exists, false otherwise
99+
*/
100+
public function hasServer(string $serverAddress): bool
101+
{
102+
foreach ($this->servers as $server) {
103+
if (in_array($serverAddress, $server['addresses'], true)) {
104+
return true;
105+
}
106+
}
107+
108+
return false;
109+
}
63110
}

testkit-backend/src/Handlers/ForcedRoutingTableUpdate.php

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515

1616
use Exception;
1717
use Laudis\Neo4j\Contracts\ConnectionPoolInterface;
18+
use Laudis\Neo4j\Databags\SessionConfiguration;
1819
use Laudis\Neo4j\Neo4j\Neo4jConnectionPool;
1920
use Laudis\Neo4j\Neo4j\Neo4jDriver;
2021
use Laudis\Neo4j\TestkitBackend\Contracts\RequestHandlerInterface;
@@ -53,11 +54,14 @@ public function handle($request): TestkitResponseInterface
5354
/** @var ConnectionPoolInterface $pool */
5455
$pool = $poolProperty->getValue($driver);
5556

56-
$tableProperty = (new ReflectionClass(Neo4jConnectionPool::class))->getProperty('table');
57-
$tableProperty->setAccessible(true);
58-
$tableProperty->setValue($pool, null);
57+
if ($pool instanceof Neo4jConnectionPool) {
58+
// Clear the cached routing table to force a refresh
59+
$config = new SessionConfiguration();
60+
$pool->clearRoutingTable($config);
61+
}
5962
}
6063

64+
// Running a query will trigger a new routing table fetch if needed
6165
$driver->createSession()->run('RETURN 1 AS x');
6266

6367
return new DriverResponse($request->getDriverId());

0 commit comments

Comments
 (0)