2323use Laudis \Neo4j \Contracts \UnmanagedTransactionInterface ;
2424use Laudis \Neo4j \Databags \Bookmark ;
2525use Laudis \Neo4j \Databags \BookmarkHolder ;
26+ use Laudis \Neo4j \Databags \Neo4jError ;
2627use Laudis \Neo4j \Databags \SessionConfiguration ;
2728use Laudis \Neo4j \Databags \Statement ;
2829use Laudis \Neo4j \Databags \SummarizedResult ;
@@ -76,54 +77,7 @@ public function runStatements(iterable $statements, ?TransactionConfiguration $c
7677 $ config = $ this ->mergeTsxConfig ($ config );
7778
7879 foreach ($ statements as $ statement ) {
79- // Wrap in retry logic for connection errors
80- $ retries = 0 ;
81- $ maxRetries = 3 ;
82-
83- while ($ retries < $ maxRetries ) {
84- try {
85- $ tbr [] = $ this ->beginInstantTransaction ($ this ->config , $ config )->runStatement ($ statement );
86- break ; // Success, exit retry loop
87- } catch (Neo4jException $ e ) {
88- if ($ this ->shouldClearRoutingTable ($ e )) {
89- $ this ->getLogger ()?->log(LogLevel::WARNING , 'Connection error in instant transaction, retrying ' , [
90- 'error ' => $ e ->getMessage (),
91- 'retry ' => $ retries + 1 ,
92- ]);
93-
94- if ($ this ->pool instanceof Neo4jConnectionPool) {
95- $ this ->pool ->clearRoutingTable ($ this ->config );
96- }
97- $ this ->pool ->close ();
98-
99- ++$ retries ;
100- if ($ retries >= $ maxRetries ) {
101- throw $ e ;
102- }
103- } else {
104- throw $ e ;
105- }
106- } catch (Throwable $ e ) {
107- if ($ this ->isConnectionError ($ e )) {
108- $ this ->getLogger ()?->log(LogLevel::WARNING , 'Connection error in instant transaction, retrying ' , [
109- 'error ' => $ e ->getMessage (),
110- 'retry ' => $ retries + 1 ,
111- ]);
112-
113- if ($ this ->pool instanceof Neo4jConnectionPool) {
114- $ this ->pool ->clearRoutingTable ($ this ->config );
115- }
116- $ this ->pool ->close ();
117-
118- ++$ retries ;
119- if ($ retries >= $ maxRetries ) {
120- throw $ e ;
121- }
122- } else {
123- throw $ e ;
124- }
125- }
126- }
80+ $ tbr [] = $ this ->executeStatementWithRetry ($ statement , $ config );
12781 }
12882
12983 return new CypherList ($ tbr );
@@ -186,44 +140,73 @@ private function retry(callable $tsxHandler, bool $read, TransactionConfiguratio
186140
187141 return $ tbr ;
188142 } catch (Neo4jException $ e ) {
189- if ($ transaction && !in_array ($ e ->getClassification (), self ::ROLLBACK_CLASSIFICATIONS )) {
190- $ transaction ->rollback ();
191- }
192-
193- // ADD THIS SECTION - Handle connection timeouts and routing failures
194- if ($ e ->getTitle () === 'NotALeader '
195- || $ e ->getNeo4jCode () === 'Neo.ClientError.Cluster.NotALeader '
196- || $ this ->isConnectionError ($ e )) {
197- // Clear routing table before closing pool to force fresh ROUTE request on retry
198- if ($ this ->pool instanceof Neo4jConnectionPool) {
199- $ this ->pool ->clearRoutingTable ($ this ->config );
200- }
201- // By closing the pool, we force the connection to be re-acquired and the routing table to be refetched
202- $ this ->pool ->close ();
203- } elseif ($ e ->getClassification () !== 'TransientError ' ) {
204- throw $ e ;
205- }
206- } catch (Exception $ e ) {
207- if ($ this ->isConnectionError ($ e )) {
208- if ($ this ->pool instanceof Neo4jConnectionPool) {
209- $ this ->pool ->clearRoutingTable ($ this ->config );
210- }
211- $ this ->pool ->close ();
212- } else {
143+ $ this ->handleManagedTransactionError ($ transaction , $ e );
144+ } catch (Throwable $ e ) {
145+ // For non-Neo4jException errors, only retry on connection errors
146+ if (!$ this ->isConnectionError ($ e )) {
213147 throw $ e ;
214148 }
149+ // Connection error - clear routing and retry
150+ $ this ->handleConnectionFailure ();
215151 }
216152 }
217153 }
218154
155+ /**
156+ * Handle Neo4jException in managed transaction - either rollback and retry or throw.
157+ *
158+ * @param Neo4jException $e The exception that occurred
159+ */
160+ private function handleManagedTransactionError (?UnmanagedTransactionInterface $ transaction , Neo4jException $ e ): void
161+ {
162+ if ($ transaction && !in_array ($ e ->getClassification (), self ::ROLLBACK_CLASSIFICATIONS )) {
163+ $ transaction ->rollback ();
164+ }
165+
166+ if ($ this ->shouldRetryManagedTransaction ($ e )) {
167+ $ this ->handleConnectionFailure ();
168+
169+ return ;
170+ }
171+
172+ throw $ e ;
173+ }
174+
175+ /**
176+ * Determine if a Neo4jException should trigger retry of managed transaction.
177+ */
178+ private function shouldRetryManagedTransaction (Neo4jException $ e ): bool
179+ {
180+ if ($ e ->getTitle () === 'NotALeader ' || $ e ->getNeo4jCode () === 'Neo.ClientError.Cluster.NotALeader ' ) {
181+ return true ;
182+ }
183+
184+ if ($ this ->isConnectionError ($ e )) {
185+ return true ;
186+ }
187+
188+ return $ e ->getClassification () === 'TransientError ' ;
189+ }
190+
191+ /**
192+ * Handle connection failure by clearing routing table and closing pool.
193+ * This forces fresh connection acquisition and routing table refresh on next attempt.
194+ */
195+ private function handleConnectionFailure (): void
196+ {
197+ if ($ this ->pool instanceof Neo4jConnectionPool) {
198+ $ this ->pool ->clearRoutingTable ($ this ->config );
199+ }
200+ $ this ->pool ->close ();
201+ }
202+
219203 /**
220204 * Check if the exception is a connection-related error.
221205 */
222206 private function isConnectionError (Throwable $ e ): bool
223207 {
224208 $ message = strtolower ($ e ->getMessage ());
225209
226- // Check for common connection error messages
227210 if (str_contains ($ message , 'interrupted system call ' )
228211 || str_contains ($ message , 'broken pipe ' )
229212 || str_contains ($ message , 'connection reset ' )
@@ -232,7 +215,6 @@ private function isConnectionError(Throwable $e): bool
232215 return true ;
233216 }
234217
235- // Check for Neo4jException-specific codes
236218 if ($ e instanceof Neo4jException) {
237219 return $ e ->getNeo4jCode () === 'Neo.ClientError.Cluster.NotALeader ' ;
238220 }
@@ -248,7 +230,6 @@ private function shouldClearRoutingTable(Neo4jException $e): bool
248230 $ message = strtolower ($ e ->getMessage ());
249231 $ title = $ e ->getTitle ();
250232
251- // Clear routing table for timeout, connection, and cluster errors
252233 return str_contains ($ message , 'interrupted system call ' )
253234 || str_contains ($ message , 'broken pipe ' )
254235 || str_contains ($ message , 'connection reset ' )
@@ -258,6 +239,58 @@ private function shouldClearRoutingTable(Neo4jException $e): bool
258239 || $ title === 'NotALeader ' ;
259240 }
260241
242+ /**
243+ * Execute a statement with automatic retry on connection errors.
244+ * Retries up to 3 times on connection failures, clearing routing table between attempts.
245+ *
246+ * @param Statement $statement The statement to execute
247+ * @param TransactionConfiguration $config Transaction configuration
248+ *
249+ * @return SummarizedResult The result of the statement
250+ */
251+ private function executeStatementWithRetry (Statement $ statement , TransactionConfiguration $ config ): SummarizedResult
252+ {
253+ $ maxRetries = 3 ;
254+ $ retries = 0 ;
255+
256+ while ($ retries < $ maxRetries ) {
257+ try {
258+ return $ this ->beginInstantTransaction ($ this ->config , $ config )->runStatement ($ statement );
259+ } catch (Neo4jException $ e ) {
260+ if (!$ this ->shouldClearRoutingTable ($ e )) {
261+ throw $ e ;
262+ }
263+ $ this ->handleStatementRetry ($ retries , $ maxRetries , $ e );
264+ } catch (Throwable $ e ) {
265+ if (!$ this ->isConnectionError ($ e )) {
266+ throw $ e ;
267+ }
268+ $ this ->handleStatementRetry ($ retries , $ maxRetries , $ e );
269+ }
270+ }
271+
272+ throw new Neo4jException ([Neo4jError::fromMessageAndCode ('Neo.ClientError.General ' , 'Statement execution failed after maximum retries ' )]);
273+ }
274+
275+ /**
276+ * Handle retry logic for statement execution - clear routing and increment counter.
277+ * Throws the exception if max retries exceeded.
278+ */
279+ private function handleStatementRetry (int &$ retries , int $ maxRetries , Throwable $ e ): void
280+ {
281+ $ this ->getLogger ()?->log(LogLevel::WARNING , 'Connection error in instant transaction, retrying ' , [
282+ 'error ' => $ e ->getMessage (),
283+ 'retry ' => $ retries + 1 ,
284+ ]);
285+
286+ $ this ->handleConnectionFailure ();
287+
288+ ++$ retries ;
289+ if ($ retries >= $ maxRetries ) {
290+ throw $ e ;
291+ }
292+ }
293+
261294 private static function triggerLazyResult (mixed $ tbr ): void
262295 {
263296 if ($ tbr instanceof CypherSequence) {
@@ -339,18 +372,13 @@ private function acquireConnection(TransactionConfiguration $config, SessionConf
339372 private function startTransaction (TransactionConfiguration $ config , SessionConfiguration $ sessionConfig ): UnmanagedTransactionInterface
340373 {
341374 $ this ->getLogger ()?->log(LogLevel::INFO , 'Starting transaction ' , ['config ' => $ config , 'sessionConfig ' => $ sessionConfig ]);
342- try {
343- $ connection = $ this ->acquireConnection ($ config , $ sessionConfig );
375+ $ connection = $ this ->acquireConnection ($ config , $ sessionConfig );
344376
377+ try {
345378 $ connection ->begin ($ this ->config ->getDatabase (), $ config ->getTimeout (), $ this ->bookmarkHolder , $ config ->getMetaData ());
346379 } catch (Neo4jException $ e ) {
347- if (isset ($ connection ) && $ connection ->getServerState () === 'FAILED ' ) {
348- $ connection ->reset ();
349- }
350- // Release connection back to pool if available
351- if (isset ($ connection )) {
352- $ this ->pool ->release ($ connection );
353- }
380+ // BEGIN failed - clean up connection before rethrowing
381+ $ this ->cleanupFailedConnection ($ connection );
354382 throw $ e ;
355383 }
356384
@@ -370,6 +398,19 @@ private function startTransaction(TransactionConfiguration $config, SessionConfi
370398 );
371399 }
372400
401+ /**
402+ * Clean up a connection that failed during BEGIN or other initialization.
403+ * Resets the connection if it's in FAILED state and releases it back to the pool.
404+ */
405+ private function cleanupFailedConnection (BoltConnection $ connection ): void
406+ {
407+ if ($ connection ->getServerState () === 'FAILED ' ) {
408+ $ connection ->reset ();
409+ }
410+ // Release connection back to pool for reuse
411+ $ this ->pool ->release ($ connection );
412+ }
413+
373414 private function mergeTsxConfig (?TransactionConfiguration $ config ): TransactionConfiguration
374415 {
375416 return TransactionConfiguration::default ()->merge ($ config );
0 commit comments