Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ services:
TEST_DRIVER_REPO: /opt/project
TEST_BACKEND_HOST: testkit_backend
TEST_STUB_HOST: testkit
BOLT_LISTEN_ADDR: "0.0.0.0:9001"
depends_on:
- testkit_backend

Expand Down
4 changes: 4 additions & 0 deletions src/Bolt/BoltConnection.php
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,10 @@ public function assertNoFailure(Response $response): void
*/
public function discardUnconsumedResults(): void
{
if (!$this->isOpen()) {
return;
}

if (!in_array($this->protocol()->serverState, [ServerState::STREAMING, ServerState::TX_STREAMING], true)) {
return;
}
Expand Down
25 changes: 23 additions & 2 deletions src/Bolt/BoltResult.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,20 @@
namespace Laudis\Neo4j\Bolt;

use function array_splice;

use Bolt\error\ConnectException as BoltConnectException;

use function count;

use Generator;

use function in_array;

use Iterator;
use Laudis\Neo4j\Databags\Neo4jError;
use Laudis\Neo4j\Exception\Neo4jException;
use Laudis\Neo4j\Formatter\SummarizedResultFormatter;
use Throwable;

/**
* @psalm-import-type BoltCypherStats from SummarizedResultFormatter
Expand Down Expand Up @@ -100,7 +106,17 @@ public function consume(): array

private function fetchResults(): void
{
$meta = $this->connection->pull($this->qid, $this->fetchSize);
try {
$meta = $this->connection->pull($this->qid, $this->fetchSize);
} catch (BoltConnectException $e) {
// Close connection on socket errors
try {
$this->connection->close();
} catch (Throwable) {
// Ignore errors when closing
}
throw new Neo4jException([Neo4jError::fromMessageAndCode('Neo.ClientError.Cluster.NotALeader', 'Connection error: '.$e->getMessage())], $e);
}

/** @var list<list> $rows */
$rows = array_splice($meta, 0, count($meta) - 1);
Expand Down Expand Up @@ -154,6 +170,11 @@ public function __destruct()

public function discard(): void
{
$this->connection->discard($this->qid === -1 ? null : $this->qid);
try {
$this->connection->discard($this->qid === -1 ? null : $this->qid);
} catch (BoltConnectException $e) {
// Ignore connection errors during discard - connection is already broken
// The Neo4jException will be thrown when the next operation is attempted
}
}
}
2 changes: 1 addition & 1 deletion src/Bolt/BoltUnmanagedTransaction.php
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ public function runStatement(Statement $statement): SummarizedResult
$this->database,
$this->tsxConfig->getTimeout(),
$this->isInstantTransaction ? $this->bookmarkHolder : null, // let the begin transaction pass the bookmarks if it is a managed transaction
$this->isInstantTransaction ? $this->config->getAccessMode() : null, // let the begin transaction decide if it is a managed transaction
null, // mode is never sent in RUN messages - it comes from session configuration
$this->tsxConfig->getMetaData()
);
} catch (Throwable $e) {
Expand Down
133 changes: 131 additions & 2 deletions src/Bolt/Session.php
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
use Laudis\Neo4j\Databags\TransactionConfiguration;
use Laudis\Neo4j\Enum\AccessMode;
use Laudis\Neo4j\Exception\Neo4jException;
use Laudis\Neo4j\Exception\TimeoutException;
use Laudis\Neo4j\Formatter\SummarizedResultFormatter;
use Laudis\Neo4j\Neo4j\Neo4jConnectionPool;
use Laudis\Neo4j\Types\CypherList;
Expand Down Expand Up @@ -74,12 +75,80 @@ public function runStatements(iterable $statements, ?TransactionConfiguration $c
$this->getLogger()?->log(LogLevel::INFO, 'Running statements', ['statements' => $statements]);
$config = $this->mergeTsxConfig($config);
foreach ($statements as $statement) {
$tbr[] = $this->beginInstantTransaction($this->config, $config)->runStatement($statement);
$tbr[] = $this->runStatementWithRetry($statement, $config);
}

return new CypherList($tbr);
}

/**
* Runs a single statement with retry logic to handle socket timeouts and routing errors.
* For routing drivers, this is essential to refresh the routing table on timeout.
*
* @throws Exception
*/
private function runStatementWithRetry(Statement $statement, TransactionConfiguration $config): SummarizedResult
{
while (true) {
$transaction = null;
try {
$transaction = $this->beginInstantTransaction($this->config, $config);
$result = $transaction->runStatement($statement);
// Trigger lazy loading of results to catch any timeout during result iteration
self::triggerLazyResult($result);

return $result;
} catch (TimeoutException $e) {
// Socket timeout - clear routing table and retry
if ($transaction) {
try {
$transaction->rollback();
} catch (Exception $rollbackException) {
// Ignore rollback errors during timeout
}
}

// Close broken connection so it won't be reused
foreach ($this->usedConnections as $i => $usedConnection) {
try {
$usedConnection->close();
array_splice($this->usedConnections, $i, 1);
} catch (Exception $closeException) {
// Ignore close errors
}
}

if ($this->pool instanceof Neo4jConnectionPool) {
$this->pool->clearRoutingTable();
}
// Continue retry loop
} catch (Neo4jException $e) {
if ($transaction && !in_array($e->getClassification(), self::ROLLBACK_CLASSIFICATIONS)) {
try {
$transaction->rollback();
} catch (Exception $rollbackException) {
// Ignore rollback errors
}
}

if ($this->isSocketTimeoutError($e)) {
// When socket timeout occurs, clear routing table to force re-fetch with fresh server list
if ($this->pool instanceof Neo4jConnectionPool) {
$this->pool->clearRoutingTable();
}
// Continue retry loop
} elseif ($e->getTitle() === 'NotALeader') {
// By closing the pool, we force the connection to be re-acquired and the routing table to be refetched
$this->pool->close();
// Continue retry loop
} elseif ($e->getClassification() !== 'TransientError') {
throw $e;
}
// For other transient errors, continue retry loop
}
}
}

/**
* @param iterable<Statement>|null $statements
*/
Expand Down Expand Up @@ -136,12 +205,41 @@ private function retry(callable $tsxHandler, bool $read, TransactionConfiguratio
$transaction->commit();

return $tbr;
} catch (TimeoutException $e) {
// Socket timeout - clear routing table and retry
if ($transaction) {
try {
$transaction->rollback();
} catch (Exception $rollbackException) {
// Ignore rollback errors during timeout
}
}

// Close broken connection so it won't be reused
foreach ($this->usedConnections as $i => $usedConnection) {
try {
$usedConnection->close();
array_splice($this->usedConnections, $i, 1);
} catch (Exception $closeException) {
// Ignore close errors
}
}

if ($this->pool instanceof Neo4jConnectionPool) {
$this->pool->clearRoutingTable();
}
// Continue retry loop
} catch (Neo4jException $e) {
if ($transaction && !in_array($e->getClassification(), self::ROLLBACK_CLASSIFICATIONS)) {
$transaction->rollback();
}

if ($e->getTitle() === 'NotALeader') {
if ($this->isSocketTimeoutError($e)) {
// When socket timeout occurs, clear routing table to force re-fetch with fresh server list
if ($this->pool instanceof Neo4jConnectionPool) {
$this->pool->clearRoutingTable();
}
} elseif ($e->getTitle() === 'NotALeader') {
// By closing the pool, we force the connection to be re-acquired and the routing table to be refetched
$this->pool->close();
} elseif ($e->getClassification() !== 'TransientError') {
Expand All @@ -151,6 +249,37 @@ private function retry(callable $tsxHandler, bool $read, TransactionConfiguratio
}
}

/**
* Checks if an exception represents a socket timeout or connection-related failure
* that requires routing table refresh.
*
* @param Neo4jException $e The exception to check
*
* @return bool True if this is a socket timeout or connection failure
*/
private function isSocketTimeoutError(Neo4jException $e): bool
{
$title = $e->getTitle();
$classification = $e->getClassification();

// Check if this was caused by a timeout exception in the bolt library
// Timeout exceptions are wrapped in Neo4jException with NotALeader title,
// but we can detect them by checking the previous exception message
$previous = $e->getPrevious();
if ($previous !== null) {
$prevMessage = strtolower($previous->getMessage());
if (str_contains($prevMessage, 'timeout') || str_contains($prevMessage, 'time out')) {
return true;
}
}

// Socket timeout errors should be treated as transient and trigger routing table refresh
return in_array($title, [
'ServiceUnavailable',
'FailedToRoute',
], true) || $classification === 'TransientError';
}

private static function triggerLazyResult(mixed $tbr): void
{
if ($tbr instanceof CypherSequence) {
Expand Down
7 changes: 6 additions & 1 deletion src/BoltFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,11 @@ public function createConnection(ConnectionRequestData $data, SessionConfigurati

$config->setServerAgent($response['server']);

// Apply recv_timeout hint if present
if (array_key_exists('hints', $response) && array_key_exists('connection.recv_timeout_seconds', $response['hints'])) {
$connection->setTimeout((float) $response['hints']['connection.recv_timeout_seconds']);
}

return $connection;
}

Expand All @@ -92,7 +97,7 @@ public function canReuseConnection(ConnectionInterface $connection, SessionConfi
$database = $databaseInfo?->getName();

return $connection->getAccessMode() === $config->getAccessMode()
&& $database === $config->getDatabase();
&& $database === $config->getDatabase();
}

public function reuseConnection(BoltConnection $connection, SessionConfiguration $sessionConfig): BoltConnection
Expand Down
46 changes: 45 additions & 1 deletion src/Contracts/BoltMessage.php
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
use Bolt\protocol\Response;
use Iterator;
use Laudis\Neo4j\Bolt\BoltConnection;
use Laudis\Neo4j\Databags\Neo4jError;
use Laudis\Neo4j\Exception\Neo4jException;
use Throwable;

abstract class BoltMessage
{
Expand All @@ -31,13 +34,54 @@ abstract public function send(): BoltMessage;

public function getResponse(): Response
{
$response = $this->connection->protocol()->getResponse();
try {
$response = $this->connection->protocol()->getResponse();
} catch (Throwable $e) {
if ($this->isTimeoutException($e)) {
$timeoutMsg = 'Connection timeout reached';
if (preg_match('/(\d+)\s*(?:milliseconds?|ms|seconds?|s)/', $e->getMessage(), $matches) && array_key_exists(1, $matches)) {
$timeoutMsg = 'Connection timeout reached after '.$matches[1].' seconds';
}
try {
$this->connection->close();
} catch (Throwable) {
}
// Use DriverError so the driver treats this as a failure
throw new Neo4jException([Neo4jError::fromMessageAndCode('Neo.ClientError.Cluster.NotALeader', $timeoutMsg)], $e);
} elseif ($this->isSocketException($e)) {
try {
$this->connection->close();
} catch (Throwable) {
}
throw new Neo4jException([Neo4jError::fromMessageAndCode('Neo.ClientError.Cluster.NotALeader', 'Connection error: '.$e->getMessage())], $e);
}

throw $e;
}

$this->connection->assertNoFailure($response);

return $response;
}

private function isTimeoutException(Throwable $e): bool
{
$message = strtolower($e->getMessage());

return str_contains($message, 'timeout') || str_contains($message, 'time out');
}

private function isSocketException(Throwable $e): bool
{
$message = strtolower($e->getMessage());

return str_contains($message, 'broken pipe')
|| str_contains($message, 'connection reset')
|| str_contains($message, 'connection refused')
|| str_contains($message, 'interrupted system call')
|| str_contains($message, 'i/o error');
}

/**
* @return Iterator<Response>
*/
Expand Down
Loading
Loading