Skip to content

Commit b434813

Browse files
bosdbosd
authored andcommitted
[IMP] Enhanced feedback panels
1 parent 384bfec commit b434813

File tree

6 files changed

+260
-111
lines changed

6 files changed

+260
-111
lines changed

src/odoo_data_flow/import_threaded.py

Lines changed: 143 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,9 @@
99
import sys
1010
from collections.abc import Generator
1111
from time import time
12-
from typing import Any, Optional
12+
from typing import Any, Optional, TextIO
1313

14+
import requests # type: ignore[import-untyped]
1415
from rich.progress import (
1516
BarColumn,
1617
Progress,
@@ -69,6 +70,7 @@ def __init__(
6970
self.add_error_reason = add_error_reason
7071
self.progress = progress
7172
self.task_id = task_id
73+
self.abort_flag = False
7274

7375
def _handle_odoo_messages(
7476
self, messages: list[dict[str, Any]], original_lines: list[list[Any]]
@@ -126,9 +128,13 @@ def launch_batch(
126128
check: bool = False,
127129
) -> None:
128130
"""Submits a batch of data lines to be imported by a worker thread."""
131+
if self.abort_flag:
132+
return
129133

130134
def launch_batch_fun(lines: list[list[Any]], num: Any, do_check: bool) -> int:
131135
"""The actual function executed by the worker thread."""
136+
if self.abort_flag:
137+
return 0
132138
start_time = time()
133139
failed_lines = []
134140
try:
@@ -140,7 +146,12 @@ def launch_batch_fun(lines: list[list[Any]], num: Any, do_check: bool) -> int:
140146
elif do_check and len(res.get("ids", [])) != len(lines):
141147
failed_lines = self._handle_record_mismatch(res, lines)
142148

149+
except requests.exceptions.ConnectionError as e:
150+
log.error(f"Connection to Odoo failed: {e}. Aborting import.")
151+
failed_lines = self._handle_rpc_error(e, lines)
152+
self.abort_flag = True
143153
except Exception as e:
154+
# For all other unexpected errors, log the full traceback.
144155
log.error(f"RPC call for batch {num} failed: {e}", exc_info=True)
145156
failed_lines = self._handle_rpc_error(e, lines)
146157

@@ -166,10 +177,16 @@ def wait(self) -> None:
166177
return
167178

168179
for future in concurrent.futures.as_completed(self.futures):
180+
if self.abort_flag:
181+
# If a critical error occurred, don't wait for other tasks.
182+
# Cancel pending futures and shut down immediately.
183+
self.executor.shutdown(wait=True, cancel_futures=True)
184+
break
169185
try:
170186
# The number of processed lines is returned by the future
171187
num_processed = future.result()
172-
self.progress.update(self.task_id, advance=num_processed)
188+
if self.progress:
189+
self.progress.update(self.task_id, advance=num_processed)
173190
except Exception as e:
174191
log.error(f"A task in a worker thread failed: {e}", exc_info=True)
175192

@@ -273,6 +290,82 @@ def _create_batches(
273290
yield f"{batch_num}-{current_split_value}", current_batch
274291

275292

293+
def _setup_fail_file(
294+
fail_file: str, header: list[str], is_fail_run: bool, separator: str, encoding: str
295+
) -> tuple[Optional[Any], Optional[TextIO]]:
296+
"""Opens the fail file and returns the writer and file handle."""
297+
try:
298+
fail_file_handle = open(fail_file, "w", newline="", encoding=encoding)
299+
fail_file_writer = csv.writer(
300+
fail_file_handle, delimiter=separator, quoting=csv.QUOTE_ALL
301+
)
302+
header_to_write = list(header)
303+
if is_fail_run:
304+
header_to_write.append("_ERROR_REASON")
305+
fail_file_writer.writerow(header_to_write)
306+
return fail_file_writer, fail_file_handle
307+
except OSError as e:
308+
log.error(f"Could not open fail file for writing: {fail_file}. Error: {e}")
309+
return None, None
310+
311+
312+
def _execute_import_in_threads(
313+
max_connection: int,
314+
model_obj: Any,
315+
final_header: list[str],
316+
final_data: list[list[Any]],
317+
fail_file_writer: Optional[Any],
318+
context: dict[str, Any],
319+
is_fail_run: bool,
320+
split: Optional[str],
321+
batch_size: int,
322+
o2m: bool,
323+
check: bool,
324+
model: str,
325+
) -> bool:
326+
"""Sets up and runs the rich progress bar and threaded import."""
327+
progress = Progress(
328+
SpinnerColumn(),
329+
TextColumn("[bold blue]{task.description}", justify="right"),
330+
BarColumn(bar_width=None),
331+
"[progress.percentage]{task.percentage:>3.0f}%",
332+
TextColumn("•"),
333+
TextColumn("[green]{task.completed} of {task.total} records"),
334+
TextColumn("•"),
335+
TimeRemainingColumn(),
336+
)
337+
338+
with progress:
339+
task_id = progress.add_task(
340+
f"Importing to [bold]{model}[/bold]", total=len(final_data)
341+
)
342+
343+
rpc_thread = RPCThreadImport(
344+
max_connection,
345+
model_obj,
346+
final_header,
347+
fail_file_writer,
348+
context,
349+
add_error_reason=is_fail_run,
350+
progress=progress,
351+
task_id=task_id,
352+
)
353+
354+
for batch_number, lines_batch in _create_batches(
355+
final_data, split, final_header, batch_size, o2m
356+
):
357+
if rpc_thread.abort_flag:
358+
log.error(
359+
"Aborting further processing due to critical connection error."
360+
)
361+
break
362+
rpc_thread.launch_batch(lines_batch, batch_number, check)
363+
364+
rpc_thread.wait()
365+
366+
return not rpc_thread.abort_flag
367+
368+
276369
def import_data(
277370
config_file: str,
278371
model: str,
@@ -291,18 +384,40 @@ def import_data(
291384
skip: int = 0,
292385
o2m: bool = False,
293386
is_fail_run: bool = False,
294-
) -> None:
387+
) -> bool:
295388
"""Main function to orchestrate the import process.
296389
297390
Can be run from a file or from in-memory data.
391+
392+
Args:
393+
config_file: Path to the connection configuration file.
394+
model: The Odoo model to import data into.
395+
header: A list of strings for the header row (for in-memory data).
396+
data: A list of lists representing the data rows (for in-memory data).
397+
file_csv: Path to the source CSV file to import.
398+
context: A dictionary for the Odoo context.
399+
fail_file: Path to write failed records to.
400+
encoding: The file encoding of the source file.
401+
separator: The delimiter used in the CSV file.
402+
ignore: A list of column names to ignore during import.
403+
split: The column name to group records by to avoid concurrent updates.
404+
check: If True, checks if records were successfully imported.
405+
max_connection: The number of simultaneous connections to use.
406+
batch_size: The number of records to process in each batch.
407+
skip: The number of initial lines to skip in the source file.
408+
o2m: If True, enables special handling for one-to-many imports.
409+
is_fail_run: If True, indicates a run to re-process failed records.
410+
411+
Returns:
412+
True if the import completed, False if it was aborted.
298413
"""
299414
_ignore = ignore or []
300415
_context = context or {}
301416

302417
if file_csv:
303418
header, data = _read_data_file(file_csv, separator, encoding, skip)
304-
if not data:
305-
return # Stop if file reading failed
419+
if not data and not header:
420+
return False
306421

307422
if header is None or data is None:
308423
raise ValueError(
@@ -317,65 +432,37 @@ def import_data(
317432
model_obj = connection.get_model(model)
318433
except Exception as e:
319434
log.error(f"Failed to connect to Odoo: {e}")
320-
return
435+
return False
321436

322-
# Set up the writer for the fail file
323-
fail_file_writer: Optional[Any] = None
324-
fail_file_handle = None
437+
fail_file_writer, fail_file_handle = None, None
325438
if fail_file:
326-
try:
327-
fail_file_handle = open(fail_file, "w", newline="", encoding=encoding)
328-
fail_file_writer = csv.writer(
329-
fail_file_handle, delimiter=separator, quoting=csv.QUOTE_ALL
330-
)
331-
# Add the error reason column to the header only for the second failure file
332-
header_to_write = list(final_header)
333-
if is_fail_run:
334-
header_to_write.append("_ERROR_REASON")
335-
fail_file_writer.writerow(header_to_write)
336-
except OSError as e:
337-
log.error(f"Could not open fail file for writing: {fail_file}. Error: {e}")
338-
return
339-
340-
progress = Progress(
341-
SpinnerColumn(),
342-
TextColumn("[bold blue]{task.description}", justify="right"),
343-
BarColumn(bar_width=None),
344-
"[progress.percentage]{task.percentage:>3.0f}%",
345-
TextColumn("•"),
346-
TextColumn("[green]{task.completed} of {task.total} records"),
347-
TextColumn("•"),
348-
TimeRemainingColumn(),
439+
fail_file_writer, fail_file_handle = _setup_fail_file(
440+
fail_file, final_header, is_fail_run, separator, encoding
441+
)
442+
if not fail_file_writer:
443+
return False
444+
445+
success = _execute_import_in_threads(
446+
max_connection,
447+
model_obj,
448+
final_header,
449+
final_data,
450+
fail_file_writer,
451+
_context,
452+
is_fail_run,
453+
split,
454+
batch_size,
455+
o2m,
456+
check,
457+
model,
349458
)
350459
start_time = time()
351460

352-
with progress:
353-
task_id = progress.add_task(
354-
f"[green]Importing to [bold]{model}[/bold]", total=len(final_data)
355-
)
356-
357-
rpc_thread = RPCThreadImport(
358-
max_connection,
359-
model_obj,
360-
final_header,
361-
fail_file_writer,
362-
_context,
363-
add_error_reason=is_fail_run,
364-
progress=progress,
365-
task_id=task_id,
366-
)
367-
368-
for batch_number, lines_batch in _create_batches(
369-
final_data, split, final_header, batch_size, o2m
370-
):
371-
rpc_thread.launch_batch(lines_batch, batch_number, check)
372-
373-
rpc_thread.wait()
374-
375461
if fail_file_handle:
376462
fail_file_handle.close()
377-
378463
log.info(
379464
f"{len(final_data)} records processed for model '{model}'. "
380465
f"Total time: {time() - start_time:.2f}s."
381466
)
467+
468+
return success

0 commit comments

Comments
 (0)