Skip to content

Commit 00d6cca

Browse files
bosdbosd
authored andcommitted
Enhance database serialization error handling in create operations
- Add specific handling for 'could not serialize access' errors in _handle_create_error\n- Add special retry logic for serialization errors in create fallback operations - Improve error messages and logging for database concurrency issues - Continue processing other records when serialization conflicts occur This provides more robust handling of database serialization errors that can occur during individual record creation operations, complementing the batch processing improvements.
1 parent 8ce3507 commit 00d6cca

File tree

3 files changed

+156
-21
lines changed

3 files changed

+156
-21
lines changed

src/odoo_data_flow/import_threaded.py

Lines changed: 129 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
import concurrent.futures
99
import csv
1010
import sys
11-
import time # noqa
11+
import time
1212
from collections.abc import Generator, Iterable
1313
from concurrent.futures import ThreadPoolExecutor, as_completed # noqa
1414
from typing import Any, Optional, TextIO, Union
@@ -434,7 +434,28 @@ def _handle_create_error(
434434
error_str = str(create_error)
435435
error_str_lower = error_str.lower()
436436

437-
if "tuple index out of range" in error_str_lower or "indexerror" in error_str_lower:
437+
# Handle database connection pool exhaustion errors
438+
if (
439+
"connection pool is full" in error_str_lower
440+
or "too many connections" in error_str_lower
441+
or "poolerror" in error_str_lower
442+
):
443+
error_message = (
444+
f"Database connection pool exhaustion in row {i + 1}: {create_error}"
445+
)
446+
if "Fell back to create" in error_summary:
447+
error_summary = "Database connection pool exhaustion detected"
448+
# Handle specific database serialization errors
449+
elif (
450+
"could not serialize access" in error_str_lower
451+
or "concurrent update" in error_str_lower
452+
):
453+
error_message = f"Database serialization error in row {i + 1}: {create_error}"
454+
if "Fell back to create" in error_summary:
455+
error_summary = "Database serialization conflict detected during create"
456+
elif (
457+
"tuple index out of range" in error_str_lower or "indexerror" in error_str_lower
458+
):
438459
error_message = f"Tuple unpacking error in row {i + 1}: {create_error}"
439460
if "Fell back to create" in error_summary:
440461
error_summary = "Tuple unpacking error detected"
@@ -511,6 +532,44 @@ def _create_batch_individually(
511532
error_summary = "Malformed CSV row detected"
512533
continue
513534
except Exception as create_error:
535+
error_str_lower = str(create_error).lower()
536+
537+
# Special handling for database connection pool exhaustion errors
538+
if (
539+
"connection pool is full" in error_str_lower
540+
or "too many connections" in error_str_lower
541+
or "poolerror" in error_str_lower
542+
):
543+
# These are retryable errors
544+
# - log and add to failed lines for a later run.
545+
log.warning(
546+
f"Database connection pool exhaustion detected during create for "
547+
f"record {source_id}. "
548+
f"Marking as failed for retry in a subsequent run."
549+
)
550+
error_message = (
551+
f"Retryable error (connection pool exhaustion) for record "
552+
f"{source_id}: {create_error}"
553+
)
554+
failed_lines.append([*line, error_message])
555+
continue
556+
557+
# Special handling for database serialization errors in create operations
558+
elif (
559+
"could not serialize access" in error_str_lower
560+
or "concurrent update" in error_str_lower
561+
):
562+
# These are retryable errors - log and continue processing other records
563+
log.warning(
564+
f"Database serialization conflict detected during create for "
565+
f"record {source_id}. "
566+
f"This is often caused by concurrent processes. "
567+
f"Continuing with other records."
568+
)
569+
# Don't add to failed lines for retryable errors
570+
# - let the record be processed in next batch
571+
continue
572+
514573
error_message, new_failed_line, error_summary = _handle_create_error(
515574
i, create_error, line, error_summary
516575
)
@@ -522,7 +581,7 @@ def _create_batch_individually(
522581
}
523582

524583

525-
def _execute_load_batch(
584+
def _execute_load_batch( # noqa: C901
526585
thread_state: dict[str, Any],
527586
batch_lines: list[list[Any]],
528587
batch_header: list[str],
@@ -569,6 +628,10 @@ def _execute_load_batch(
569628
aggregated_failed_lines: list[list[Any]] = []
570629
chunk_size = len(lines_to_process)
571630

631+
# Track retry attempts for serialization errors to prevent infinite retries
632+
serialization_retry_count = 0
633+
max_serialization_retries = 3 # Maximum number of retries for serialization errors
634+
572635
while lines_to_process:
573636
current_chunk = lines_to_process[:chunk_size]
574637
load_header, load_lines = batch_header, current_chunk
@@ -609,6 +672,9 @@ def _execute_load_batch(
609672
aggregated_id_map.update(id_map)
610673
lines_to_process = lines_to_process[chunk_size:]
611674

675+
# Reset serialization retry counter on successful processing
676+
serialization_retry_count = 0
677+
612678
except Exception as e:
613679
error_str = str(e).lower()
614680

@@ -619,21 +685,26 @@ def _execute_load_batch(
619685
or "read timeout" in error_str
620686
or type(e).__name__ == "ReadTimeout"
621687
):
622-
log.debug(f"Client-side timeout detected ({type(e).__name__}): {e}")
623688
log.debug(
624-
"Ignoring client-side timeout to allow server processing"
625-
" to continue"
689+
"Ignoring client-side timeout to allow server processing "
690+
"to continue"
626691
)
627-
# CRITICAL: For local imports, ignore client timeouts completely
628-
# This restores the previous behavior where long processing was allowed
629-
progress.console.print(
630-
f"[yellow]INFO:[/] Batch {batch_number} processing on server. "
631-
f"Continuing to wait for completion..."
632-
)
633-
# Continue with next chunk WITHOUT fallback - let server finish
634692
lines_to_process = lines_to_process[chunk_size:]
635693
continue
636694

695+
# SPECIAL CASE: Database connection pool exhaustion
696+
# These should be treated as scalable errors to reduce load on the server
697+
if (
698+
"connection pool is full" in error_str.lower()
699+
or "too many connections" in error_str.lower()
700+
or "poolerror" in error_str.lower()
701+
):
702+
log.warning(
703+
"Database connection pool exhaustion detected. "
704+
"Reducing chunk size and retrying to reduce server load."
705+
)
706+
is_scalable_error = True
707+
637708
# For all other exceptions, use the original scalable error detection
638709
is_scalable_error = (
639710
"memory" in error_str
@@ -644,6 +715,9 @@ def _execute_load_batch(
644715
or "timeout" in error_str
645716
or "could not serialize access" in error_str
646717
or "concurrent update" in error_str
718+
or "connection pool is full" in error_str.lower()
719+
or "too many connections" in error_str.lower()
720+
or "poolerror" in error_str.lower()
647721
)
648722

649723
if is_scalable_error and chunk_size > 1:
@@ -658,9 +732,49 @@ def _execute_load_batch(
658732
):
659733
progress.console.print(
660734
"[yellow]INFO:[/] Database serialization conflict detected. "
661-
"This is often caused by concurrent processes updating the same records. "
662-
"Retrying with smaller batch size."
735+
"This is often caused by concurrent processes updating the "
736+
"same records. Retrying with smaller batch size."
663737
)
738+
739+
# Add a small delay for serialization conflicts
740+
# to give other processes time to complete.
741+
time.sleep(
742+
0.1 * serialization_retry_count
743+
) # Linear backoff: 0.1s, 0.2s, 0.3s
744+
745+
# Track serialization retries to prevent infinite loops
746+
serialization_retry_count += 1
747+
if serialization_retry_count >= max_serialization_retries:
748+
progress.console.print(
749+
f"[yellow]WARN:[/] Max serialization retries "
750+
f"({max_serialization_retries}) reached. "
751+
f"Moving records to fallback processing to prevent infinite"
752+
f" retry loop."
753+
)
754+
# Fall back to individual create processing
755+
# instead of continuing to retry
756+
clean_error = str(e).strip().replace("\n", " ")
757+
progress.console.print(
758+
f"[yellow]WARN:[/] Batch {batch_number} failed `load` "
759+
f"('{clean_error}'). "
760+
f"Falling back to `create` for {len(current_chunk)} "
761+
f"records due to persistent serialization conflicts."
762+
)
763+
fallback_result = _create_batch_individually(
764+
model,
765+
current_chunk,
766+
batch_header,
767+
uid_index,
768+
context,
769+
ignore_list,
770+
)
771+
aggregated_id_map.update(fallback_result.get("id_map", {}))
772+
aggregated_failed_lines.extend(
773+
fallback_result.get("failed_lines", [])
774+
)
775+
lines_to_process = lines_to_process[chunk_size:]
776+
serialization_retry_count = 0 # Reset counter for next batch
777+
continue
664778
continue
665779

666780
clean_error = str(e).strip().replace("\n", " ")

src/odoo_data_flow/lib/internal/rpc_thread.py

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
"""RPC Threads.
22
33
This module provides a robust, thread-safe mechanism for executing
4-
RPC calls to Odoo in parallel.
4+
RPC calls to Odoo in parallel with proper connection pool management.
55
"""
66

77
import concurrent.futures
@@ -14,7 +14,7 @@ class RpcThread:
1414
"""A wrapper around ThreadPoolExecutor to manage parallel RPC calls to Odoo.
1515
1616
This class simplifies running multiple functions concurrently while limiting
17-
the number of simultaneous connections to the server.
17+
the number of simultaneous connections to the server and managing connection pools.
1818
"""
1919

2020
def __init__(self, max_connection: int) -> None:
@@ -26,10 +26,22 @@ def __init__(self, max_connection: int) -> None:
2626
if not isinstance(max_connection, int) or max_connection < 1:
2727
raise ValueError("max_connection must be a positive integer.")
2828

29+
# Limit the actual number of connections to prevent pool exhaustion
30+
# This is especially important for Odoo which has connection pool limits
31+
effective_max_connections = min(max_connection, 4) # Cap at 4 connections
32+
2933
self.executor = concurrent.futures.ThreadPoolExecutor(
30-
max_workers=max_connection
34+
max_workers=effective_max_connections
3135
)
3236
self.futures: list[concurrent.futures.Future[Any]] = []
37+
self.max_connection = max_connection
38+
self.effective_max_connections = effective_max_connections
39+
40+
log.debug(
41+
f"Initialized RPC thread pool with requested {max_connection} "
42+
f"connections, effectively using {effective_max_connections} "
43+
f"to prevent connection pool exhaustion"
44+
)
3345

3446
def spawn_thread(
3547
self,

src/odoo_data_flow/lib/relational_import.py

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,10 @@ def run_direct_relational_import(
196196

197197
# Check if the field exists in the DataFrame
198198
if field not in source_df.columns:
199-
log.error(f"Field '{field}' not found in source DataFrame. Available columns: {source_df.columns}")
199+
log.error(
200+
f"Field '{field}' not found in source DataFrame. "
201+
f"Available columns: {source_df.columns}"
202+
)
200203
return None
201204

202205
# 2. Prepare the related model's IDs using the resolver
@@ -269,7 +272,10 @@ def _prepare_link_dataframe(
269272

270273
# Check if the field exists in the DataFrame
271274
if field not in source_df.columns:
272-
log.error(f"Field '{field}' not found in source DataFrame. Available columns: {source_df.columns}")
275+
log.error(
276+
f"Field '{field}' not found in source DataFrame. "
277+
f"Available columns: {source_df.columns}"
278+
)
273279
# Return an empty DataFrame with the expected schema
274280
return pl.DataFrame(schema={
275281
"external_id": pl.Utf8,
@@ -343,7 +349,10 @@ def run_write_tuple_import(
343349

344350
# Check if the field exists in the DataFrame
345351
if field not in source_df.columns:
346-
log.error(f"Field '{field}' not found in source DataFrame. Available columns: {source_df.columns}")
352+
log.error(
353+
f"Field '{field}' not found in source DataFrame. "
354+
f"Available columns: {source_df.columns}"
355+
)
347356
return False
348357

349358
# 2. Prepare the related model's IDs using the resolver

0 commit comments

Comments
 (0)