Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
192 changes: 145 additions & 47 deletions src/odoo_data_flow/import_threaded.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import csv
import sys
import time
import traceback
from collections.abc import Generator, Iterable
from concurrent.futures import ThreadPoolExecutor, as_completed # noqa
from typing import Any, Optional, TextIO, Union
Expand All @@ -25,7 +26,7 @@

from .lib import conf_lib
from .lib.internal.rpc_thread import RpcThread
from .lib.internal.tools import batch
from .lib.internal.tools import batch, to_xmlid
from .logging_config import log

try:
Expand Down Expand Up @@ -91,6 +92,10 @@ def _read_data_file(
return [], []
except Exception as e:
log.error(f"Failed to read file {file_path}: {e}")
log.error(f"Exception type: {type(e).__name__}")
log.error(f"Exception args: {e.args}")

log.error(f"Full traceback: {traceback.format_exc()}")
if isinstance(e, ValueError):
raise
return [], []
Expand Down Expand Up @@ -414,7 +419,7 @@ def _process_external_id_fields(
return converted_vals, external_id_fields


def _handle_create_error(
def _handle_create_error( # noqa C901
i: int,
create_error: Exception,
line: list[Any],
Expand Down Expand Up @@ -509,8 +514,9 @@ def _create_batch_individually(
source_id = line[uid_index]
# Sanitize source_id to ensure it's a valid XML ID
from .lib.internal.tools import to_xmlid

sanitized_source_id = to_xmlid(source_id)

# 1. SEARCH BEFORE CREATE
existing_record = model.browse().env.ref(
f"__export__.{sanitized_source_id}", raise_if_not_found=False
Expand Down Expand Up @@ -693,16 +699,75 @@ def _execute_load_batch( # noqa: C901
log.debug(f"Load header: {load_header}")
log.debug(f"Load lines count: {len(load_lines)}")
if load_lines:
log.debug(f"First load line (first 10 fields): {load_lines[0][:10] if len(load_lines[0]) > 10 else load_lines[0]}")
first_line_preview = (
load_lines[0][:10] if len(load_lines[0]) > 10 else load_lines[0]
)
log.debug(f"First load line (first 10 fields): {first_line_preview}")
log.debug(f"Full header: {load_header}")
# Log the full header and first line for debugging
if len(load_header) > 10:
log.debug(f"Full load_header: {load_header}")
if len(load_lines[0]) > 10:
log.debug(f"Full first load_line: {load_lines[0]}")


# Sanitize the id column values to prevent XML ID constraint
# violations
sanitized_load_lines = []
for _i, line in enumerate(load_lines):
sanitized_line = list(line)
if uid_index < len(sanitized_line):
# Sanitize the source_id (which is in the id column)
original_id = sanitized_line[uid_index]
sanitized_id = to_xmlid(original_id)
sanitized_line[uid_index] = sanitized_id
if _i < 3: # Only log first 3 lines for debugging
log.debug(
f"Sanitized ID for line {_i}: '{original_id}' -> "
f"'{sanitized_id}'"
)
else:
if _i < 3: # Only log first 3 lines for debugging
log.warning(
f"Line {_i} does not have enough columns for "
f"uid_index {uid_index}. "
f"Line has {len(line)} columns."
)
sanitized_load_lines.append(sanitized_line)

# Log sample of sanitized data without large base64 content
log.debug(f"Load header: {load_header}")
log.debug(f"Load lines count: {len(sanitized_load_lines)}")
if sanitized_load_lines and len(sanitized_load_lines) > 0:
# Show first line but truncate large base64 data
preview_line = []
for _i, field_value in enumerate(
sanitized_load_lines[0][:10]
if len(sanitized_load_lines[0]) > 10
else sanitized_load_lines[0]
):
if isinstance(field_value, str) and len(field_value) > 100:
# Truncate large strings (likely base64 data)
preview_line.append(
f"{field_value[:50]}...[{len(field_value) - 100} "
f"chars truncated]...{field_value[-50:]}"
)
else:
preview_line.append(field_value)
log.debug(
f"First load line (first 10 fields, truncated if large): "
f"{preview_line}"
)

res = model.load(load_header, sanitized_load_lines, context=context)


# DEBUG: Log detailed information about the load response
log.debug(f"Load response type: {type(res)}")
log.debug(
f"Load response keys: "
f"{list(res.keys()) if hasattr(res, 'keys') else 'Not a dict'}"
)
log.debug(f"Load response full content: {res}")

# DEBUG: Log what we got back from Odoo
log.debug(
f"Load response - messages: {res.get('messages', 'None')}, "
Expand All @@ -718,39 +783,57 @@ def _execute_load_batch( # noqa: C901
log.warning(f"Load operation returned {msg_type}: {msg_text}")
else:
log.info(f"Load operation returned {msg_type}: {msg_text}")

# Check for any Odoo server errors in the response

# Check for any Odoo server errors in the response that should halt
# processing
if res.get("messages"):
error = res["messages"][0].get("message", "Batch load failed.")
# Don't raise immediately, log and continue to capture in fail file
log.error(f"Odoo server error during load: {error}")

for message in res["messages"]:
msg_type = message.get("type", "unknown")
msg_text = message.get("message", "")
if msg_type == "error":
# Only raise for actual errors, not warnings
log.error(f"Load operation returned fatal error: {msg_text}")
raise ValueError(msg_text)
elif msg_type in ["warning", "info"]:
log.warning(f"Load operation returned {msg_type}: {msg_text}")
else:
log.info(f"Load operation returned {msg_type}: {msg_text}")

created_ids = res.get("ids", [])
log.debug(f"Expected records: {len(sanitized_load_lines)}, Created records: {len(created_ids)}")

log.debug(
f"Expected records: {len(sanitized_load_lines)}, "
f"Created records: {len(created_ids)}"
)

# Always log detailed information about record creation
if len(created_ids) != len(sanitized_load_lines):
log.warning(
f"Record creation mismatch: Expected {len(sanitized_load_lines)} records, "
f"Record creation mismatch: Expected "
f"{len(sanitized_load_lines)} records, "
f"but only {len(created_ids)} were created"
)
if len(created_ids) == 0:
log.error(
f"No records were created in this batch of {len(sanitized_load_lines)}. "
f"This may indicate silent failures in the Odoo load operation. "
f"No records were created in this batch of "
f"{len(sanitized_load_lines)}. "
f"This may indicate silent failures in the Odoo load "
f"operation. "
f"Check Odoo server logs for validation errors."
)
# Log the actual data being sent for debugging
if sanitized_load_lines:
log.debug(f"First few lines being sent:")
log.debug("First few lines being sent:")
for i, line in enumerate(sanitized_load_lines[:3]):
log.debug(f" Line {i}: {dict(zip(load_header, line))}")
else:
log.warning(
f"Partial record creation: {len(created_ids)}/{len(sanitized_load_lines)} "
f"records were created. Some records may have failed validation."
f"Partial record creation: {len(created_ids)}/"
f"{len(sanitized_load_lines)} "
f"records were created. Some records may have "
f"failed validation."
)
# Check for any Odoo server errors in the response that should halt processing
# Check for any Odoo server errors in the response that should
# halt processing
if res.get("messages"):
for message in res["messages"]:
msg_type = message.get("type", "unknown")
Comment on lines +835 to 839

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

This block of code, which checks for messages in the response, appears to be a duplicate. A more complete version of this logic already exists earlier in the function (lines 789-801), which properly handles different message types (error, warning, info) and raises an exception for fatal errors. This duplicated block is incomplete and redundant. It should be removed to avoid confusion and potential issues.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gemini-code-assist can you turn this into a code suggestion? #

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are correct! This block of code is indeed a duplicate and should be removed. The more comprehensive error handling logic is already present earlier in the function (lines 789-801).

Here's a code suggestion to remove the redundant block:

Expand All @@ -765,59 +848,74 @@ def _execute_load_batch( # noqa: C901
log.info(f"Load operation returned {msg_type}: {msg_text}")

created_ids = res.get("ids", [])
log.debug(f"Expected records: {len(sanitized_load_lines)}, Created records: {len(created_ids)}")

log.debug(
f"Expected records: {len(sanitized_load_lines)}, "
f"Created records: {len(created_ids)}"
)

# Always log detailed information about record creation
if len(created_ids) != len(sanitized_load_lines):
log.warning(
f"Record creation mismatch: Expected {len(sanitized_load_lines)} records, "
f"Record creation mismatch: Expected "
f"{len(sanitized_load_lines)} records, "
f"but only {len(created_ids)} were created"
)
if len(created_ids) == 0:
log.error(
f"No records were created in this batch of {len(sanitized_load_lines)}. "
f"This may indicate silent failures in the Odoo load operation. "
f"No records were created in this batch of "
f"{len(sanitized_load_lines)}. "
f"This may indicate silent failures in the Odoo load "
f"operation. "
f"Check Odoo server logs for validation errors."
)
# Log the actual data being sent for debugging
if sanitized_load_lines:
log.debug(f"First few lines being sent:")
log.debug("First few lines being sent:")
for i, line in enumerate(sanitized_load_lines[:3]):
log.debug(f" Line {i}: {dict(zip(load_header, line))}")
else:
log.warning(
f"Partial record creation: {len(created_ids)}/{len(sanitized_load_lines)} "
f"records were created. Some records may have failed validation."
f"Partial record creation: {len(created_ids)}/"
f"{len(sanitized_load_lines)} "
f"records were created. "
f"Some records may have failed validation."
)

# Instead of raising an exception, capture failures for the fail file
# But still create what records we can
if res.get("messages"):
# Extract error information and add to failed_lines to be written to fail file
# Extract error information and add to failed_lines to be
# written to fail file
error_msg = res["messages"][0].get("message", "Batch load failed.")
log.error(f"Capturing load failure for fail file: {error_msg}")
# We'll add the failed lines to aggregated_failed_lines at the end

# Use sanitized IDs for the id_map to match what was actually sent to Odoo
id_map = {
to_xmlid(line[uid_index]): created_ids[i] if i < len(created_ids) else None
for i, line in enumerate(current_chunk)
}

# Remove None entries (failed creations) from id_map
id_map = {k: v for k, v in id_map.items() if v is not None}

# We'll add the failed lines to aggregated_failed_lines
# at the end

id_map = {}
for i, line in enumerate(current_chunk):
# Ensure there's a corresponding created ID and that
# it's a valid integer.
# The 'incompatible type' error happens when the
# value could be None.
if i < len(created_ids) and created_ids[i] is not None:
sanitized_id = to_xmlid(line[uid_index])
db_id = created_ids[i]
id_map[sanitized_id] = db_id

# The update call remains the same and will now be type-safe.
aggregated_id_map.update(id_map)

# Log id_map information for debugging
log.debug(f"Created {len(id_map)} records in batch {batch_number}")
if id_map:
log.debug(f"Sample id_map entries: {dict(list(id_map.items())[:3])}")
else:
log.warning(f"No id_map entries created for batch {batch_number}")

# Capture failed lines for writing to fail file
successful_count = len(created_ids)
total_count = len(sanitized_load_lines)

if successful_count < total_count:
failed_count = total_count - successful_count
log.info(f"Capturing {failed_count} failed records for fail file")
Expand All @@ -829,10 +927,10 @@ def _execute_load_batch( # noqa: C901
error_msg = "Record creation failed"
if res.get("messages"):
error_msg = res["messages"][0].get("message", error_msg)
failed_line = list(line) + [f"Load failed: {error_msg}"]

failed_line = [*list(line), f"Load failed: {error_msg}"]
aggregated_failed_lines.append(failed_line)

aggregated_id_map.update(id_map)
lines_to_process = lines_to_process[chunk_size:]

Expand Down
14 changes: 9 additions & 5 deletions src/odoo_data_flow/lib/internal/tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,17 @@ def batch(iterable: Iterable[Any], size: int) -> Iterator[list[Any]]:
def to_xmlid(name: str) -> str:
"""Create valid xmlid.

Sanitizes a string to make it a valid XML ID, replacing special
characters with underscores.
Sanitizes a string to make it a valid XML ID, replacing only characters
that are invalid in XML IDs. Preserves the required '.' separator between
module name and identifier in Odoo XML IDs (e.g., 'module.identifier').
"""
# A mapping of characters to replace.
replacements = {".": "_", ",": "_", "\n": "_", "|": "_", " ": "_"}
for old, new in replacements.items():
name = name.replace(old, new)
# NOTE: Do NOT replace '.' as it's required to separate module.name in Odoo XML IDs
# Only replace characters that are actually invalid in XML IDs:
# - Spaces, commas, newlines, and pipe characters are invalid
# - Keep dots as they are required for module.identifier format
translation_table = str.maketrans({",": "_", "\n": "_", "|": "_", " ": "_"})
name = name.translate(translation_table)
return name.strip()


Expand Down
15 changes: 10 additions & 5 deletions src/odoo_data_flow/lib/relational_import.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,15 +242,18 @@ def _derive_relation_info(
"""
# Hardcoded mappings for known self-referencing fields
known_self_referencing_fields = {
('product.template', 'optional_product_ids'): ('product_optional_rel', 'product_template_id'),
("product.template", "optional_product_ids"): (
"product_optional_rel",
"product_template_id",
),
# Add more known self-referencing fields here as needed
}

# Check if we have a known mapping for this field
key = (model, field)
if key in known_self_referencing_fields:
return known_self_referencing_fields[key]

# Derive relation table name (typically follows pattern: model1_model2_rel)
# with models sorted alphabetically for canonical naming
models = sorted([model.replace(".", "_"), related_model_fk.replace(".", "_")])
Expand Down Expand Up @@ -716,15 +719,17 @@ def run_write_o2m_tuple_import(
# Check if the field with /id suffix exists (common for relation fields)
field_with_id = f"{field}/id"
if field_with_id in source_df.columns:
log.debug(f"Using field '{field_with_id}' instead of '{field}' for O2M filtering")
log.debug(
f"Using field '{field_with_id}' instead of '{field}' for O2M filtering"
)
actual_field = field_with_id
else:
log.error(
f"Field '{field}' not found in source DataFrame. "
f"Available columns: {list(source_df.columns)}"
)
return False

o2m_df = source_df.filter(pl.col(actual_field).is_not_null())

for record in o2m_df.iter_rows(named=True):
Expand Down
7 changes: 5 additions & 2 deletions tests/test_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

def test_to_xmlid() -> None:
"""Test the to_xmlid function."""
assert to_xmlid("A.B,C\nD|E F") == "A_B_C_D_E_F"
assert to_xmlid("A.B,C\nD|E F") == "A.B_C_D_E_F"
assert (
to_xmlid(" leading and trailing spaces ") == "__leading_and_trailing_spaces__"
)
Expand Down Expand Up @@ -51,7 +51,10 @@ def test_attribute_line_dict() -> None:
def id_gen_fun(template_id: str, attributes: dict[str, list[str]]) -> str:
return f"id_{template_id}_{next(iter(attributes.keys()))}"

attribute_list_ids = [["att_id_1", "att_name_1"], ["att_id_2", "att_name_2"]]
attribute_list_ids = [
["att_id_1", "att_name_1"],
["att_id_2", "att_name_2"],
]
aggregator = AttributeLineDict(attribute_list_ids, id_gen_fun)
header = ["product_tmpl_id/id", "attribute_id/id", "value_ids/id"]
line1 = [
Expand Down
Loading