Skip to content

Commit d3acf0c

Browse files
bosdclaude
andcommitted
feat: integrate retry, idempotent, and throttle modules
Complete integration of the remaining 3 stability features: 1. **Smarter Retry Logic** - Integrated into error handling: - Uses ErrorCategory enum to classify errors as transient/permanent - Exponential backoff with jitter for server overload (502/503) - Database serialization conflict handling with backoff 2. **Idempotent Import Mode** (`--skip-unchanged`): - Fetches existing records from Odoo before import - Compares field values to detect unchanged records - Skips records that haven't changed, making imports idempotent - Reports skip statistics in final output 3. **Health-Aware Throttling** (`--adaptive-throttle`): - ThrottleController monitors server response times - Automatically adjusts delays based on server health - Records timing after each batch load operation - Reports throttle statistics at end of import All 597 tests passing. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <[email protected]>
1 parent 9896813 commit d3acf0c

File tree

4 files changed

+194
-93
lines changed

4 files changed

+194
-93
lines changed

src/odoo_data_flow/__main__.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -487,6 +487,20 @@ def invoice_v9_cmd(connection_file: str, **kwargs: Any) -> None:
487487
help="Validate data without importing. Checks required fields, "
488488
"selection values, and reference existence.",
489489
)
490+
@click.option(
491+
"--skip-unchanged",
492+
is_flag=True,
493+
default=False,
494+
help="Skip records that already exist with identical values. "
495+
"Makes imports idempotent by comparing field values before importing.",
496+
)
497+
@click.option(
498+
"--adaptive-throttle",
499+
is_flag=True,
500+
default=False,
501+
help="Enable health-aware throttling that automatically adjusts batch sizes "
502+
"and delays based on server response times. Helps prevent server overload.",
503+
)
490504
def import_cmd(connection_file: str, **kwargs: Any) -> None: # noqa: C901
491505
"""Runs the data import process."""
492506
# Handle dry-run mode early

src/odoo_data_flow/import_threaded.py

Lines changed: 159 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,9 @@
2626

2727
from .lib import checkpoint as ckpt
2828
from .lib import conf_lib
29+
from .lib import idempotent as idempotent_lib
30+
from .lib import retry as retry_lib
31+
from .lib import throttle as throttle_lib
2932
from .lib.internal.rpc_thread import RpcThread
3033
from .lib.internal.tools import batch, to_xmlid
3134
from .logging_config import log, suppress_console_handler
@@ -1168,7 +1171,15 @@ def _execute_load_batch( # noqa: C901
11681171
f"{preview_line}"
11691172
)
11701173

1174+
# Record timing for throttle controller
1175+
load_start = time.time()
11711176
res = model.load(load_header, sanitized_load_lines, context=context)
1177+
load_time = time.time() - load_start
1178+
1179+
# Record response time for health-aware throttling
1180+
throttle_ctrl = thread_state.get("throttle_controller")
1181+
if throttle_ctrl:
1182+
throttle_ctrl.record_response(load_time)
11721183

11731184
# DEBUG: Log detailed information about the load response
11741185
log.debug(f"Load response type: {type(res)}")
@@ -1398,13 +1409,17 @@ def _execute_load_batch( # noqa: C901
13981409
serialization_retry_count = 0
13991410

14001411
except Exception as e:
1401-
error_str = str(e).lower()
1412+
error_str = str(e)
1413+
error_str_lower = error_str.lower()
1414+
1415+
# Use retry module to categorize the error
1416+
error_category, error_pattern = retry_lib.categorize_error(error_str)
14021417

14031418
# SPECIAL CASE: Client-side timeouts for local processing
14041419
# These should be IGNORED entirely to allow long server processing
14051420
if (
1406-
"timed out" == error_str.strip()
1407-
or "read timeout" in error_str
1421+
"timed out" == error_str_lower.strip()
1422+
or "read timeout" in error_str_lower
14081423
or type(e).__name__ == "ReadTimeout"
14091424
):
14101425
log.debug(
@@ -1414,95 +1429,57 @@ def _execute_load_batch( # noqa: C901
14141429
lines_to_process = lines_to_process[chunk_size:]
14151430
continue
14161431

1417-
# SPECIAL CASE: Database connection pool exhaustion
1418-
# These should be treated as scalable errors to reduce load on the server
1419-
if (
1420-
"connection pool is full" in error_str.lower()
1421-
or "too many connections" in error_str.lower()
1422-
or "poolerror" in error_str.lower()
1423-
):
1424-
log.warning(
1425-
"Database connection pool exhaustion detected. "
1426-
"Reducing chunk size and retrying to reduce server load."
1427-
)
1428-
is_scalable_error = True
1429-
1430-
# For all other exceptions, use the original scalable error detection
1431-
is_scalable_error = (
1432-
"memory" in error_str
1433-
or "out of memory" in error_str
1434-
or "502" in error_str
1435-
or "503" in error_str
1436-
or "service unavailable" in error_str
1437-
or "gateway" in error_str
1438-
or "proxy" in error_str
1439-
or "timeout" in error_str
1440-
or "could not serialize access" in error_str
1441-
or "concurrent update" in error_str
1442-
or "connection pool is full" in error_str.lower()
1443-
or "too many connections" in error_str.lower()
1444-
or "poolerror" in error_str.lower()
1445-
)
1432+
# Transient errors: retry with exponential backoff
1433+
is_transient = error_category == retry_lib.ErrorCategory.TRANSIENT
14461434

1447-
# Detect server overload (502/503) for adaptive throttling
1448-
is_server_overload = (
1449-
"502" in error_str
1450-
or "503" in error_str
1451-
or "service unavailable" in error_str
1452-
or "bad gateway" in error_str
1435+
# Detect server overload for adaptive throttling
1436+
is_server_overload = error_pattern in (
1437+
"502", "503", "service unavailable", "bad gateway"
14531438
)
14541439

14551440
if is_server_overload:
1456-
# Adaptive throttling: increase delay exponentially on server overload
1457-
current_throttle = thread_state.get("adaptive_throttle", 0.0)
1458-
new_throttle = min(current_throttle + 1.0, 10.0) # Cap at 10 seconds
1459-
thread_state["adaptive_throttle"] = new_throttle
1441+
# Adaptive throttling with exponential backoff
1442+
retry_attempt = thread_state.get("retry_attempt", 0) + 1
1443+
thread_state["retry_attempt"] = retry_attempt
1444+
backoff_config = retry_lib.RetryConfig(
1445+
base_delay=1.0, max_delay=30.0, exponential_base=2.0
1446+
)
1447+
delay = retry_lib.calculate_backoff_delay(retry_attempt, backoff_config)
14601448
progress.console.print(
1461-
f"[yellow]WARN:[/] Server overload detected (502/503). "
1462-
f"Adding {new_throttle:.1f}s delay between batches."
1449+
f"[yellow]WARN:[/] Server overload detected ({error_pattern}). "
1450+
f"Backing off for {delay:.1f}s (attempt {retry_attempt})."
14631451
)
1464-
time.sleep(new_throttle)
1452+
time.sleep(delay)
14651453

1466-
if is_scalable_error and chunk_size > 1:
1454+
if is_transient and chunk_size > 1:
14671455
chunk_size = max(1, chunk_size // 2)
14681456
progress.console.print(
1469-
f"[yellow]WARN:[/] Batch {batch_number} hit scalable error. "
1470-
f"Reducing chunk size to {chunk_size} and retrying."
1457+
f"[yellow]WARN:[/] Batch {batch_number} hit transient error "
1458+
f"({error_pattern}). Reducing chunk size to {chunk_size}."
14711459
)
1472-
if (
1473-
"could not serialize access" in error_str
1474-
or "concurrent update" in error_str
1475-
):
1460+
1461+
# Serialization conflicts get exponential backoff
1462+
if error_pattern in ("could not serialize access", "deadlock"):
1463+
backoff_config = retry_lib.RetryConfig(
1464+
base_delay=0.1, max_delay=5.0, exponential_base=2.0
1465+
)
1466+
delay = retry_lib.calculate_backoff_delay(
1467+
serialization_retry_count + 1, backoff_config
1468+
)
14761469
progress.console.print(
1477-
"[yellow]INFO:[/] Database serialization conflict detected. "
1478-
"This is often caused by concurrent processes updating the "
1479-
"same records. Retrying with smaller batch size."
1470+
f"[yellow]INFO:[/] Database serialization conflict. "
1471+
f"Waiting {delay:.2f}s before retry."
14801472
)
1473+
time.sleep(delay)
14811474

1482-
# Add a small delay for serialization conflicts
1483-
# to give other processes time to complete.
1484-
time.sleep(
1485-
0.1 * serialization_retry_count
1486-
) # Linear backoff: 0.1s, 0.2s, 0.3s
1487-
1488-
# Track serialization retries to prevent infinite loops
14891475
serialization_retry_count += 1
14901476
if serialization_retry_count >= max_serialization_retries:
14911477
progress.console.print(
14921478
f"[yellow]WARN:[/] Max serialization retries "
14931479
f"({max_serialization_retries}) reached. "
1494-
f"Moving records to fallback processing to prevent infinite"
1495-
f" retry loop."
1496-
)
1497-
# Fall back to individual create processing
1498-
# instead of continuing to retry
1499-
clean_error = str(e).strip().replace("\n", " ")
1500-
progress.console.print(
1501-
f"[yellow]WARN:[/] Batch {batch_number} failed `load` "
1502-
f"('{clean_error}'). "
1503-
f"Falling back to `create` for {len(current_chunk)} "
1504-
f"records due to persistent serialization conflicts."
1480+
f"Falling back to individual processing."
15051481
)
1482+
clean_error = error_str.strip().replace("\n", " ")
15061483
fallback_result = _create_batch_individually(
15071484
model,
15081485
current_chunk,
@@ -1517,11 +1494,19 @@ def _execute_load_batch( # noqa: C901
15171494
fallback_result.get("failed_lines", [])
15181495
)
15191496
lines_to_process = lines_to_process[chunk_size:]
1520-
serialization_retry_count = 0 # Reset counter for next batch
1497+
serialization_retry_count = 0
1498+
thread_state["retry_attempt"] = 0 # Reset on success
15211499
continue
15221500
continue
15231501

1524-
clean_error = str(e).strip().replace("\n", " ")
1502+
# For permanent/recoverable errors, get recommendation and fall back
1503+
recommendation = retry_lib.get_retry_recommendation(error_str)
1504+
log.debug(
1505+
f"Error category: {error_category.value}, "
1506+
f"recommendation: {recommendation['action']}"
1507+
)
1508+
1509+
clean_error = error_str.strip().replace("\n", " ")
15251510
progress.console.print(
15261511
f"[yellow]WARN:[/] Batch {batch_number} failed `load` "
15271512
f"('{clean_error}'). "
@@ -1628,16 +1613,24 @@ def _run_threaded_pass( # noqa: C901
16281613
# Spawn threads with optional delay between batches to reduce server load.
16291614
futures = set()
16301615
batch_count = 0
1616+
throttle_ctrl = thread_state.get("throttle_controller")
16311617
for num, data in batches:
16321618
if rpc_thread.abort_flag:
16331619
break
16341620

16351621
# Add delay between batches (except before the first batch).
1636-
# Combine user-specified delay with adaptive throttle for server overload.
1637-
adaptive_throttle = thread_state.get("adaptive_throttle", 0.0)
1638-
total_delay = batch_delay + adaptive_throttle
1639-
if total_delay > 0 and batch_count > 0:
1640-
time.sleep(total_delay)
1622+
# Use throttle controller if available, otherwise use simple delay
1623+
if throttle_ctrl and batch_count > 0:
1624+
# Use health-aware throttle controller
1625+
delay = throttle_ctrl.get_delay()
1626+
if delay > 0:
1627+
time.sleep(delay)
1628+
elif batch_delay > 0 and batch_count > 0:
1629+
# Fallback to simple delay
1630+
adaptive_throttle = thread_state.get("adaptive_throttle", 0.0)
1631+
total_delay = batch_delay + adaptive_throttle
1632+
if total_delay > 0:
1633+
time.sleep(total_delay)
16411634

16421635
args = (
16431636
[thread_state, data, num]
@@ -1760,6 +1753,7 @@ def _orchestrate_pass_1(
17601753
o2m: bool,
17611754
split_by_cols: Optional[list[str]],
17621755
force_create: bool = False,
1756+
throttle_controller: Optional[throttle_lib.ThrottleController] = None,
17631757
) -> dict[str, Any]:
17641758
"""Orchestrates the multi-threaded Pass 1 (load/create).
17651759
@@ -1831,6 +1825,7 @@ def _orchestrate_pass_1(
18311825
"force_create": force_create,
18321826
"progress": progress,
18331827
"ignore_list": pass_1_ignore_list,
1828+
"throttle_controller": throttle_controller,
18341829
}
18351830

18361831
results, aborted = _run_threaded_pass(
@@ -2132,6 +2127,8 @@ def import_data(
21322127
stream: bool = False,
21332128
resume: bool = True,
21342129
enable_checkpoint: bool = True,
2130+
skip_unchanged: bool = False,
2131+
adaptive_throttle: bool = False,
21352132
) -> tuple[bool, dict[str, int]]:
21362133
"""Orchestrates a robust, multi-threaded, two-pass import process.
21372134
@@ -2246,12 +2243,72 @@ def import_data(
22462243
_show_error_panel(title, friendly_message)
22472244
return False, {}
22482245

2246+
# Apply idempotent filtering if enabled (skip unchanged records)
2247+
idempotent_stats = None
2248+
if skip_unchanged and not can_stream and header and all_data:
2249+
log.info("Idempotent mode: checking for unchanged records...")
2250+
try:
2251+
# Get the ID field index
2252+
id_field = unique_id_field or "id"
2253+
if id_field in header:
2254+
id_index = header.index(id_field)
2255+
# Extract external IDs from the data
2256+
external_ids = [
2257+
str(row[id_index]).strip()
2258+
for row in all_data
2259+
if id_index < len(row) and row[id_index]
2260+
]
2261+
2262+
if external_ids:
2263+
# Get fields to compare (exclude ignored fields)
2264+
compare_fields = [
2265+
h for h in header
2266+
if h != id_field and h not in (ignore or [])
2267+
]
2268+
2269+
# Fetch existing records from Odoo
2270+
existing_records = idempotent_lib.get_existing_records(
2271+
connection, model, external_ids, compare_fields
2272+
)
2273+
2274+
if existing_records:
2275+
# Filter out unchanged rows
2276+
original_count = len(all_data)
2277+
all_data, idempotent_stats = idempotent_lib.filter_unchanged_rows(
2278+
all_data, header, existing_records,
2279+
id_field=id_field, compare_fields=compare_fields
2280+
)
2281+
record_count = len(all_data)
2282+
2283+
log.info(
2284+
f"Idempotent filter: {original_count} -> {record_count} "
2285+
f"records (skipped {idempotent_stats.skipped_records} "
2286+
f"unchanged)"
2287+
)
2288+
else:
2289+
log.debug("No existing records found, all records are new")
2290+
else:
2291+
log.warning(
2292+
f"ID field '{id_field}' not found in header, "
2293+
"skipping idempotent filtering"
2294+
)
2295+
except Exception as e:
2296+
log.warning(f"Error during idempotent filtering, continuing: {e}")
2297+
22492298
# For streaming mode, we defer fail file setup (header not known yet)
22502299
# For standard mode, set up fail file now
22512300
fail_writer, fail_handle = None, None
22522301
if not can_stream and fail_file:
22532302
fail_writer, fail_handle = _setup_fail_file(fail_file, header, separator, encoding)
22542303

2304+
# Create throttle controller for adaptive throttling
2305+
throttle_controller = None
2306+
if adaptive_throttle:
2307+
throttle_controller = throttle_lib.create_throttle_controller(
2308+
base_delay=batch_delay
2309+
)
2310+
log.info("Adaptive throttle enabled: will adjust delays based on server health")
2311+
22552312
console = Console()
22562313
progress = Progress(
22572314
SpinnerColumn(),
@@ -2323,6 +2380,7 @@ def import_data(
23232380
o2m,
23242381
split_by_cols,
23252382
force_create,
2383+
throttle_controller,
23262384
)
23272385

23282386
# A pass is only successful if it wasn't aborted.
@@ -2390,6 +2448,27 @@ def import_data(
23902448
"id_map": id_map,
23912449
}
23922450

2451+
# Add idempotent stats if available
2452+
if idempotent_stats:
2453+
stats["skipped_unchanged"] = idempotent_stats.skipped_records
2454+
stats["new_records"] = idempotent_stats.new_records
2455+
stats["changed_records"] = idempotent_stats.changed_records
2456+
2457+
# Add throttle stats if available
2458+
if throttle_controller:
2459+
throttle_stats = throttle_controller.stats
2460+
stats["throttle_stats"] = {
2461+
"total_delay_added": throttle_stats.total_delay_added,
2462+
"batch_size_reductions": throttle_stats.batch_size_reductions,
2463+
"health_recoveries": throttle_stats.health_recoveries,
2464+
"avg_response_time": throttle_stats.avg_response_time,
2465+
}
2466+
if throttle_stats.total_delay_added > 0:
2467+
log.info(
2468+
f"Throttle summary: {throttle_stats.total_delay_added:.1f}s total delay, "
2469+
f"{throttle_stats.health_recoveries} recoveries"
2470+
)
2471+
23932472
# --- Checkpoint: Clean up on success ---
23942473
if overall_success and enable_checkpoint and session_id:
23952474
ckpt.delete_checkpoint(file_csv, session_id)

src/odoo_data_flow/importer.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,8 @@ def run_import( # noqa: C901
114114
resume: bool = True,
115115
no_checkpoint: bool = False,
116116
check_refs: str = "warn",
117+
skip_unchanged: bool = False,
118+
adaptive_throttle: bool = False,
117119
) -> None:
118120
"""Main entry point for the import command, handling all orchestration."""
119121
log.info("Starting data import process from file...")
@@ -244,6 +246,8 @@ def run_import( # noqa: C901
244246
stream=stream,
245247
resume=resume,
246248
enable_checkpoint=not no_checkpoint,
249+
skip_unchanged=skip_unchanged,
250+
adaptive_throttle=adaptive_throttle,
247251
)
248252
finally:
249253
if (

0 commit comments

Comments
 (0)