Skip to content

Commit 105c46b

Browse files
bosdbosd
authored andcommitted
Enhance database serialization error handling in create operations
- Add specific handling for 'could not serialize access' errors in _handle_create_error\n- Add special retry logic for serialization errors in create fallback operations - Improve error messages and logging for database concurrency issues - Continue processing other records when serialization conflicts occur This provides more robust handling of database serialization errors that can occur during individual record creation operations, complementing the batch processing improvements.
1 parent fe356fd commit 105c46b

File tree

2 files changed

+126
-15
lines changed

2 files changed

+126
-15
lines changed

src/odoo_data_flow/import_threaded.py

Lines changed: 111 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -434,7 +434,28 @@ def _handle_create_error(
434434
error_str = str(create_error)
435435
error_str_lower = error_str.lower()
436436

437-
if "tuple index out of range" in error_str_lower or "indexerror" in error_str_lower:
437+
# Handle database connection pool exhaustion errors
438+
if (
439+
"connection pool is full" in error_str_lower
440+
or "too many connections" in error_str_lower
441+
or "poolerror" in error_str_lower
442+
):
443+
error_message = (
444+
f"Database connection pool exhaustion in row {i + 1}: {create_error}"
445+
)
446+
if "Fell back to create" in error_summary:
447+
error_summary = "Database connection pool exhaustion detected"
448+
# Handle specific database serialization errors
449+
elif (
450+
"could not serialize access" in error_str_lower
451+
or "concurrent update" in error_str_lower
452+
):
453+
error_message = f"Database serialization error in row {i + 1}: {create_error}"
454+
if "Fell back to create" in error_summary:
455+
error_summary = "Database serialization conflict detected during create"
456+
elif (
457+
"tuple index out of range" in error_str_lower or "indexerror" in error_str_lower
458+
):
438459
error_message = f"Tuple unpacking error in row {i + 1}: {create_error}"
439460
if "Fell back to create" in error_summary:
440461
error_summary = "Tuple unpacking error detected"
@@ -511,6 +532,35 @@ def _create_batch_individually(
511532
error_summary = "Malformed CSV row detected"
512533
continue
513534
except Exception as create_error:
535+
error_str_lower = str(create_error).lower()
536+
537+
# Special handling for database connection pool exhaustion errors
538+
if (
539+
"connection pool is full" in error_str_lower
540+
or "too many connections" in error_str_lower
541+
or "poolerror" in error_str_lower
542+
):
543+
# These are retryable errors - log and continue processing other records
544+
log.warning(
545+
f"Database connection pool exhaustion detected during create for record {source_id}. "
546+
f"Continuing with other records to reduce server load."
547+
)
548+
# Don't add to failed lines for retryable errors - let the record be processed in next batch
549+
continue
550+
551+
# Special handling for database serialization errors in create operations
552+
elif (
553+
"could not serialize access" in error_str_lower
554+
or "concurrent update" in error_str_lower
555+
):
556+
# These are retryable errors - log and continue processing other records
557+
log.warning(
558+
f"Database serialization conflict detected during create for record {source_id}. "
559+
f"This is often caused by concurrent processes. Continuing with other records."
560+
)
561+
# Don't add to failed lines for retryable errors - let the record be processed in next batch
562+
continue
563+
514564
error_message, new_failed_line, error_summary = _handle_create_error(
515565
i, create_error, line, error_summary
516566
)
@@ -569,6 +619,10 @@ def _execute_load_batch(
569619
aggregated_failed_lines: list[list[Any]] = []
570620
chunk_size = len(lines_to_process)
571621

622+
# Track retry attempts for serialization errors to prevent infinite retries
623+
serialization_retry_count = 0
624+
max_serialization_retries = 3 # Maximum number of retries for serialization errors
625+
572626
while lines_to_process:
573627
current_chunk = lines_to_process[:chunk_size]
574628
load_header, load_lines = batch_header, current_chunk
@@ -609,6 +663,9 @@ def _execute_load_batch(
609663
aggregated_id_map.update(id_map)
610664
lines_to_process = lines_to_process[chunk_size:]
611665

666+
# Reset serialization retry counter on successful processing
667+
serialization_retry_count = 0
668+
612669
except Exception as e:
613670
error_str = str(e).lower()
614671

@@ -619,21 +676,25 @@ def _execute_load_batch(
619676
or "read timeout" in error_str
620677
or type(e).__name__ == "ReadTimeout"
621678
):
622-
log.debug(f"Client-side timeout detected ({type(e).__name__}): {e}")
623679
log.debug(
624-
"Ignoring client-side timeout to allow server processing"
625-
" to continue"
680+
"Ignoring client-side timeout to allow server processing to continue"
626681
)
627-
# CRITICAL: For local imports, ignore client timeouts completely
628-
# This restores the previous behavior where long processing was allowed
629-
progress.console.print(
630-
f"[yellow]INFO:[/] Batch {batch_number} processing on server. "
631-
f"Continuing to wait for completion..."
632-
)
633-
# Continue with next chunk WITHOUT fallback - let server finish
634682
lines_to_process = lines_to_process[chunk_size:]
635683
continue
636684

685+
# SPECIAL CASE: Database connection pool exhaustion
686+
# These should be treated as scalable errors to reduce load on the server
687+
if (
688+
"connection pool is full" in error_str.lower()
689+
or "too many connections" in error_str.lower()
690+
or "poolerror" in error_str.lower()
691+
):
692+
log.warning(
693+
f"Database connection pool exhaustion detected. "
694+
f"Reducing chunk size and retrying to reduce server load."
695+
)
696+
is_scalable_error = True
697+
637698
# For all other exceptions, use the original scalable error detection
638699
is_scalable_error = (
639700
"memory" in error_str
@@ -644,6 +705,9 @@ def _execute_load_batch(
644705
or "timeout" in error_str
645706
or "could not serialize access" in error_str
646707
or "concurrent update" in error_str
708+
or "connection pool is full" in error_str.lower()
709+
or "too many connections" in error_str.lower()
710+
or "poolerror" in error_str.lower()
647711
)
648712

649713
if is_scalable_error and chunk_size > 1:
@@ -661,6 +725,41 @@ def _execute_load_batch(
661725
"This is often caused by concurrent processes updating the same records. "
662726
"Retrying with smaller batch size."
663727
)
728+
729+
# Add a small delay for serialization conflicts to give other processes time to complete.
730+
time.sleep(
731+
0.1 * serialization_retry_count
732+
) # Exponential backoff: 0.1s, 0.2s, 0.3s
733+
734+
# Track serialization retries to prevent infinite loops
735+
serialization_retry_count += 1
736+
if serialization_retry_count >= max_serialization_retries:
737+
progress.console.print(
738+
f"[yellow]WARN:[/] Max serialization retries ({max_serialization_retries}) reached. "
739+
f"Moving records to fallback processing to prevent infinite retry loop."
740+
)
741+
# Fall back to individual create processing instead of continuing to retry
742+
clean_error = str(e).strip().replace("\n", " ")
743+
progress.console.print(
744+
f"[yellow]WARN:[/] Batch {batch_number} failed `load` "
745+
f"('{clean_error}'). "
746+
f"Falling back to `create` for {len(current_chunk)} records due to persistent serialization conflicts."
747+
)
748+
fallback_result = _create_batch_individually(
749+
model,
750+
current_chunk,
751+
batch_header,
752+
uid_index,
753+
context,
754+
ignore_list,
755+
)
756+
aggregated_id_map.update(fallback_result.get("id_map", {}))
757+
aggregated_failed_lines.extend(
758+
fallback_result.get("failed_lines", [])
759+
)
760+
lines_to_process = lines_to_process[chunk_size:]
761+
serialization_retry_count = 0 # Reset counter for next batch
762+
continue
664763
continue
665764

666765
clean_error = str(e).strip().replace("\n", " ")
@@ -1075,7 +1174,7 @@ def import_data(
10751174
encoding: str = "utf-8",
10761175
separator: str = ";",
10771176
ignore: Optional[list[str]] = None,
1078-
max_connection: int = 1,
1177+
max_connection: int = 1, # Reduced default from higher values to prevent connection pool exhaustion
10791178
batch_size: int = 10,
10801179
skip: int = 0,
10811180
force_create: bool = False,

src/odoo_data_flow/lib/internal/rpc_thread.py

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
"""RPC Threads.
22
33
This module provides a robust, thread-safe mechanism for executing
4-
RPC calls to Odoo in parallel.
4+
RPC calls to Odoo in parallel with proper connection pool management.
55
"""
66

77
import concurrent.futures
@@ -14,7 +14,7 @@ class RpcThread:
1414
"""A wrapper around ThreadPoolExecutor to manage parallel RPC calls to Odoo.
1515
1616
This class simplifies running multiple functions concurrently while limiting
17-
the number of simultaneous connections to the server.
17+
the number of simultaneous connections to the server and managing connection pools.
1818
"""
1919

2020
def __init__(self, max_connection: int) -> None:
@@ -26,10 +26,22 @@ def __init__(self, max_connection: int) -> None:
2626
if not isinstance(max_connection, int) or max_connection < 1:
2727
raise ValueError("max_connection must be a positive integer.")
2828

29+
# Limit the actual number of connections to prevent pool exhaustion
30+
# This is especially important for Odoo which has connection pool limits
31+
effective_max_connections = min(max_connection, 4) # Cap at 4 connections
32+
2933
self.executor = concurrent.futures.ThreadPoolExecutor(
30-
max_workers=max_connection
34+
max_workers=effective_max_connections
3135
)
3236
self.futures: list[concurrent.futures.Future[Any]] = []
37+
self.max_connection = max_connection
38+
self.effective_max_connections = effective_max_connections
39+
40+
log.debug(
41+
f"Initialized RPC thread pool with requested {max_connection} "
42+
f"connections, effectively using {effective_max_connections} "
43+
f"to prevent connection pool exhaustion"
44+
)
3345

3446
def spawn_thread(
3547
self,

0 commit comments

Comments
 (0)