@@ -434,7 +434,28 @@ def _handle_create_error(
434434 error_str = str (create_error )
435435 error_str_lower = error_str .lower ()
436436
437- if "tuple index out of range" in error_str_lower or "indexerror" in error_str_lower :
437+ # Handle database connection pool exhaustion errors
438+ if (
439+ "connection pool is full" in error_str_lower
440+ or "too many connections" in error_str_lower
441+ or "poolerror" in error_str_lower
442+ ):
443+ error_message = (
444+ f"Database connection pool exhaustion in row { i + 1 } : { create_error } "
445+ )
446+ if "Fell back to create" in error_summary :
447+ error_summary = "Database connection pool exhaustion detected"
448+ # Handle specific database serialization errors
449+ elif (
450+ "could not serialize access" in error_str_lower
451+ or "concurrent update" in error_str_lower
452+ ):
453+ error_message = f"Database serialization error in row { i + 1 } : { create_error } "
454+ if "Fell back to create" in error_summary :
455+ error_summary = "Database serialization conflict detected during create"
456+ elif (
457+ "tuple index out of range" in error_str_lower or "indexerror" in error_str_lower
458+ ):
438459 error_message = f"Tuple unpacking error in row { i + 1 } : { create_error } "
439460 if "Fell back to create" in error_summary :
440461 error_summary = "Tuple unpacking error detected"
@@ -511,6 +532,36 @@ def _create_batch_individually(
511532 error_summary = "Malformed CSV row detected"
512533 continue
513534 except Exception as create_error :
535+ error_str_lower = str (create_error ).lower ()
536+
537+ # Special handling for database connection pool exhaustion errors
538+ if (
539+ "connection pool is full" in error_str_lower
540+ or "too many connections" in error_str_lower
541+ or "poolerror" in error_str_lower
542+ ):
543+ # These are retryable errors - log and add to failed lines for a later run.
544+ log .warning (
545+ f"Database connection pool exhaustion detected during create for record { source_id } . "
546+ f"Marking as failed for retry in a subsequent run."
547+ )
548+ error_message = f"Retryable error (connection pool exhaustion) for record { source_id } : { create_error } "
549+ failed_lines .append ([* line , error_message ])
550+ continue
551+
552+ # Special handling for database serialization errors in create operations
553+ elif (
554+ "could not serialize access" in error_str_lower
555+ or "concurrent update" in error_str_lower
556+ ):
557+ # These are retryable errors - log and continue processing other records
558+ log .warning (
559+ f"Database serialization conflict detected during create for record { source_id } . "
560+ f"This is often caused by concurrent processes. Continuing with other records."
561+ )
562+ # Don't add to failed lines for retryable errors - let the record be processed in next batch
563+ continue
564+
514565 error_message , new_failed_line , error_summary = _handle_create_error (
515566 i , create_error , line , error_summary
516567 )
@@ -569,6 +620,10 @@ def _execute_load_batch(
569620 aggregated_failed_lines : list [list [Any ]] = []
570621 chunk_size = len (lines_to_process )
571622
623+ # Track retry attempts for serialization errors to prevent infinite retries
624+ serialization_retry_count = 0
625+ max_serialization_retries = 3 # Maximum number of retries for serialization errors
626+
572627 while lines_to_process :
573628 current_chunk = lines_to_process [:chunk_size ]
574629 load_header , load_lines = batch_header , current_chunk
@@ -609,6 +664,9 @@ def _execute_load_batch(
609664 aggregated_id_map .update (id_map )
610665 lines_to_process = lines_to_process [chunk_size :]
611666
667+ # Reset serialization retry counter on successful processing
668+ serialization_retry_count = 0
669+
612670 except Exception as e :
613671 error_str = str (e ).lower ()
614672
@@ -619,21 +677,25 @@ def _execute_load_batch(
619677 or "read timeout" in error_str
620678 or type (e ).__name__ == "ReadTimeout"
621679 ):
622- log .debug (f"Client-side timeout detected ({ type (e ).__name__ } ): { e } " )
623680 log .debug (
624- "Ignoring client-side timeout to allow server processing"
625- " to continue"
681+ "Ignoring client-side timeout to allow server processing to continue"
626682 )
627- # CRITICAL: For local imports, ignore client timeouts completely
628- # This restores the previous behavior where long processing was allowed
629- progress .console .print (
630- f"[yellow]INFO:[/] Batch { batch_number } processing on server. "
631- f"Continuing to wait for completion..."
632- )
633- # Continue with next chunk WITHOUT fallback - let server finish
634683 lines_to_process = lines_to_process [chunk_size :]
635684 continue
636685
686+ # SPECIAL CASE: Database connection pool exhaustion
687+ # These should be treated as scalable errors to reduce load on the server
688+ if (
689+ "connection pool is full" in error_str .lower ()
690+ or "too many connections" in error_str .lower ()
691+ or "poolerror" in error_str .lower ()
692+ ):
693+ log .warning (
694+ f"Database connection pool exhaustion detected. "
695+ f"Reducing chunk size and retrying to reduce server load."
696+ )
697+ is_scalable_error = True
698+
637699 # For all other exceptions, use the original scalable error detection
638700 is_scalable_error = (
639701 "memory" in error_str
@@ -644,6 +706,9 @@ def _execute_load_batch(
644706 or "timeout" in error_str
645707 or "could not serialize access" in error_str
646708 or "concurrent update" in error_str
709+ or "connection pool is full" in error_str .lower ()
710+ or "too many connections" in error_str .lower ()
711+ or "poolerror" in error_str .lower ()
647712 )
648713
649714 if is_scalable_error and chunk_size > 1 :
@@ -661,6 +726,41 @@ def _execute_load_batch(
661726 "This is often caused by concurrent processes updating the same records. "
662727 "Retrying with smaller batch size."
663728 )
729+
730+ # Add a small delay for serialization conflicts to give other processes time to complete.
731+ time .sleep (
732+ 0.1 * serialization_retry_count
733+ ) # Linear backoff: 0.1s, 0.2s, 0.3s
734+
735+ # Track serialization retries to prevent infinite loops
736+ serialization_retry_count += 1
737+ if serialization_retry_count >= max_serialization_retries :
738+ progress .console .print (
739+ f"[yellow]WARN:[/] Max serialization retries ({ max_serialization_retries } ) reached. "
740+ f"Moving records to fallback processing to prevent infinite retry loop."
741+ )
742+ # Fall back to individual create processing instead of continuing to retry
743+ clean_error = str (e ).strip ().replace ("\n " , " " )
744+ progress .console .print (
745+ f"[yellow]WARN:[/] Batch { batch_number } failed `load` "
746+ f"('{ clean_error } '). "
747+ f"Falling back to `create` for { len (current_chunk )} records due to persistent serialization conflicts."
748+ )
749+ fallback_result = _create_batch_individually (
750+ model ,
751+ current_chunk ,
752+ batch_header ,
753+ uid_index ,
754+ context ,
755+ ignore_list ,
756+ )
757+ aggregated_id_map .update (fallback_result .get ("id_map" , {}))
758+ aggregated_failed_lines .extend (
759+ fallback_result .get ("failed_lines" , [])
760+ )
761+ lines_to_process = lines_to_process [chunk_size :]
762+ serialization_retry_count = 0 # Reset counter for next batch
763+ continue
664764 continue
665765
666766 clean_error = str (e ).strip ().replace ("\n " , " " )
@@ -1075,7 +1175,7 @@ def import_data(
10751175 encoding : str = "utf-8" ,
10761176 separator : str = ";" ,
10771177 ignore : Optional [list [str ]] = None ,
1078- max_connection : int = 1 ,
1178+ max_connection : int = 1 , # Reduced default from higher values to prevent connection pool exhaustion
10791179 batch_size : int = 10 ,
10801180 skip : int = 0 ,
10811181 force_create : bool = False ,
0 commit comments