Skip to content

Commit d0b39e8

Browse files
committed
feat: Enhance --groupby functionality and update documentation
This commit refines the `--groupby` feature to correctly align data partitioning with the specified grouping columns. Key changes include: - The internal `params` key for `--groupby` is now consistently `groupby` (previously `split`). This change is reflected in both the code and the `performance_tuning.md` documentation. - The documentation has been updated to clearly explain the 'race condition' problem that `--groupby` solves in multi-worker imports and provides a clearer example. - The recursive batch creation logic (`_recursive_create_batches`) has been adjusted to ensure that all records with the same value in the designated grouping column are processed by the same worker, thereby preventing concurrent update errors.
1 parent 9e0dbc5 commit d0b39e8

File tree

5 files changed

+233
-51
lines changed

5 files changed

+233
-51
lines changed

src/odoo_data_flow/__main__.py

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,15 @@ def invoice_v9_cmd(**kwargs: Any) -> None:
183183
run_invoice_v9_workflow(**kwargs)
184184

185185

186+
def split_by_comma(
187+
ctx: click.Context, param: click.Parameter, value: Optional[str]
188+
) -> Optional[tuple[str, ...]]:
189+
"""Split comma separated values."""
190+
if value:
191+
return tuple(s.strip() for s in value.split(","))
192+
return None
193+
194+
186195
# --- Import Command ---
187196
@cli.command(name="import")
188197
@click.option(
@@ -231,9 +240,13 @@ def invoice_v9_cmd(**kwargs: Any) -> None:
231240
@click.option("-s", "--sep", "separator", default=";", help="CSV separator character.")
232241
@click.option(
233242
"--groupby",
234-
"split",
243+
"split_by_cols", # Renamed for clarity: plural as it's now a list
235244
default=None,
236-
help="Column to group data by to avoid concurrent updates.",
245+
callback=split_by_comma, # Use the callback to process the string
246+
help="Comma-separated list of columns to group data by "
247+
"(e.g., 'parent_id,category_id'). "
248+
"Records with empty values for the first column are processed first, then grouped "
249+
"by that column. This process repeats for subsequent columns.",
237250
)
238251
@click.option(
239252
"--ignore", default=None, help="Comma-separated list of columns to ignore."

src/odoo_data_flow/import_threaded.py

Lines changed: 195 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -267,58 +267,218 @@ def _read_data_file(
267267
return [], []
268268

269269

270-
def _create_batches(
271-
data: list[list[Any]],
272-
split_by_col: Optional[str],
270+
def _recursive_create_batches(
271+
current_data: list[list[Any]],
272+
group_cols: list[str],
273273
header: list[str],
274274
batch_size: int,
275275
o2m: bool,
276+
batch_prefix: str = "",
277+
level: int = 0, # Current recursion level for internal tracking
276278
) -> Generator[tuple[Any, list[list[Any]]], None, None]:
277-
"""A generator that yields batches of data.
279+
"""Recursively creates batches of data, optionally grouping by columns.
278280
279-
If split_by_col is provided, it
280-
groups records with the same value in that column into the same batch.
281+
This generator function processes data in batches. If `group_cols` are provided,
282+
it recursively groups records based on the values in these columns, ensuring
283+
that records belonging to the same group (and its subgroups) stay together
284+
within a batch. It also handles one-to-many (o2m) relationships, keeping
285+
child records with their parent records.
286+
287+
Args:
288+
current_data (list[list[Any]]): The list of data rows to process. Each row
289+
is expected to be a list of values.
290+
group_cols (list[str]): A list of column names by which to group the data.
291+
The function processes these columns in order, from left to right.
292+
header (list[str]): The header row of the data, containing column names.
293+
This is used to find the index of `group_cols` and the 'id' column.
294+
batch_size (int): The maximum number of records allowed in a single batch
295+
before a new batch is started.
296+
o2m (bool): If True, enables one-to-many relationship handling. This means
297+
that records with an empty 'id' field (assuming the first column is 'id')
298+
will be kept with the preceding parent record in the same batch.
299+
batch_prefix (str, optional): A prefix string used to construct unique
300+
batch identifiers. This is primarily for internal tracking during recursion.
301+
Defaults to "".
302+
level (int, optional): The current recursion depth. Used internally for
303+
constructing unique batch identifiers. Defaults to 0.
304+
305+
Yields:
306+
tuple[Any, list[list[Any]]]: A tuple where the first element is a unique
307+
identifier for the batch (string combining prefixes, levels, group
308+
counters, and group values) and the second element is the list of
309+
data rows constituting that batch.
310+
311+
Raises:
312+
ValueError: If a column specified in `group_cols` is not found in the `header`.
313+
(This error is logged and the function returns in the current
314+
implementation, but a `ValueError` could conceptually be raised if
315+
not handled this way).
281316
"""
282-
if not split_by_col:
283-
# Simple batching without grouping
284-
for i, data_batch in enumerate(batch(data, batch_size)):
285-
yield i, list(data_batch)
317+
if not group_cols:
318+
# Base case: No more grouping columns, just yield regular batches
319+
for i, data_batch in enumerate(batch(current_data, batch_size)):
320+
yield (
321+
f"{batch_prefix}-{i}" if batch_prefix else str(i),
322+
list(data_batch),
323+
)
286324
return
287325

326+
current_group_col = group_cols[0]
327+
remaining_group_cols = group_cols[1:]
328+
288329
try:
289-
split_index = header.index(split_by_col)
290-
id_index = header.index("id")
291-
except ValueError as e:
292-
log.error(f"Grouping column '{e}' not found in header. Cannot use --groupby.")
330+
split_index = header.index(current_group_col)
331+
id_index = header.index("id") # Needed for o2m logic
332+
except ValueError:
333+
# This error should ideally be caught earlier, as header is fixed
334+
log.error(
335+
f"Grouping column '{current_group_col}' not found in header. "
336+
"Cannot use --groupby."
337+
)
293338
return
294339

295-
data.sort(key=lambda row: row[split_index])
340+
# Sort data based on the current grouping column
341+
# Empty strings/None/False values for the split column should come first
342+
# This also helps with consistent grouping
343+
# New (correct)
344+
current_data.sort(
345+
key=lambda row: (
346+
row[split_index] is None or row[split_index] == "",
347+
row[split_index],
348+
)
349+
)
296350

297351
current_batch: list[list[Any]] = []
298352
current_split_value: Optional[str] = None
299-
batch_num = 0
353+
group_counter = 0
300354

301-
for row in data:
302-
is_o2m_line = o2m and not row[id_index]
355+
for row in current_data:
303356
row_split_value = row[split_index]
304-
305-
if (
306-
current_batch
307-
and not is_o2m_line
308-
and (
309-
row_split_value != current_split_value
310-
or len(current_batch) >= batch_size
311-
)
357+
# is_empty_value = row_split_value is None or row_split_value == ""
358+
is_o2m_line = (
359+
o2m and not row[id_index]
360+
) # O2M lines should stay with their parent
361+
362+
if not current_batch:
363+
# First row in a new segment, initialize current_split_value
364+
current_split_value = row_split_value
365+
elif not is_o2m_line and (
366+
row_split_value != current_split_value or len(current_batch) >= batch_size
312367
):
313-
yield f"{batch_num}-{current_split_value}", current_batch
368+
# If we've hit a new group value or max batch size,
369+
# process the current batch recursively
370+
yield from _recursive_create_batches(
371+
current_batch,
372+
remaining_group_cols,
373+
header,
374+
batch_size,
375+
o2m,
376+
f"{batch_prefix}{level}-{group_counter}-"
377+
f"{current_split_value or 'empty'}",
378+
)
314379
current_batch = []
315-
batch_num += 1
380+
group_counter += 1
381+
current_split_value = row_split_value # Start new group value
316382

317383
current_batch.append(row)
318-
current_split_value = row_split_value
319384

320385
if current_batch:
321-
yield f"{batch_num}-{current_split_value}", current_batch
386+
# Yield any remaining batch after the loop
387+
yield from _recursive_create_batches(
388+
current_batch,
389+
remaining_group_cols,
390+
header,
391+
batch_size,
392+
o2m,
393+
f"{batch_prefix}{level}-{group_counter}-{current_split_value or 'empty'}",
394+
)
395+
396+
397+
def _create_batches( # noqa C901
398+
data: list[list[Any]],
399+
split_by_cols: Optional[list[str]],
400+
header: list[str],
401+
batch_size: int,
402+
o2m: bool,
403+
) -> Generator[tuple[int, list[list[Any]]], None, None]:
404+
"""A generator that yields batches of data.
405+
406+
If split_by_cols is provided, it
407+
groups records with the same value in that column into the same batch.
408+
"""
409+
if not data:
410+
return
411+
412+
batch: list[list[Any]] = []
413+
# Determine the grouping column. If multiple, we'll just use the first for now.
414+
# This might need refinement based on desired multi-column grouping behavior.
415+
split_column_name: Optional[str] = None
416+
if split_by_cols and len(split_by_cols) > 0:
417+
split_column_name = split_by_cols[0]
418+
419+
split_index = -1
420+
if split_column_name:
421+
try:
422+
split_index = header.index(split_column_name)
423+
except ValueError:
424+
log.error(
425+
f"Grouping column '{split_column_name}' not found in header. "
426+
f"Cannot use --groupby."
427+
)
428+
return
429+
430+
batch_counter = 0
431+
432+
# If splitting is enabled and a valid column is found
433+
if split_index != -1 and split_column_name:
434+
current_data = sorted(
435+
data,
436+
key=lambda row: (
437+
row[split_index] is None or row[split_index] == "",
438+
row[split_index],
439+
),
440+
)
441+
442+
current_group_value = object() # Sentinel for initial comparison
443+
444+
for row in current_data:
445+
row_group_value = row[split_index]
446+
is_o2m_line = o2m and (
447+
row[0] is None or row[0] == ""
448+
) # Assuming ID is first column
449+
450+
if o2m and is_o2m_line and batch:
451+
# If it's an O2M line and a batch already exists,
452+
# append to the current batch
453+
batch.append(row)
454+
elif row_group_value != current_group_value:
455+
# New group or first record, yield current batch if not empty
456+
if batch:
457+
batch_counter += 1
458+
yield batch_counter, batch
459+
batch = [row] # Start new batch with current row
460+
current_group_value = row_group_value
461+
elif len(batch) >= batch_size:
462+
# Same group, but batch is full, yield and start new batch
463+
batch_counter += 1
464+
yield batch_counter, batch
465+
batch = [row]
466+
else:
467+
# Same group, batch not full, append
468+
batch.append(row)
469+
470+
if batch: # Yield any remaining batch
471+
batch_counter += 1
472+
yield batch_counter, batch
473+
else: # No splitting, just batch by size
474+
for row in data:
475+
batch.append(row)
476+
if len(batch) >= batch_size:
477+
yield batch_counter, batch
478+
batch = []
479+
if batch:
480+
batch_counter += 1
481+
yield batch_counter, batch
322482

323483

324484
def _setup_fail_file(
@@ -352,7 +512,7 @@ def _execute_import_in_threads(
352512
fail_file_writer: Optional[Any],
353513
context: dict[str, Any],
354514
is_fail_run: bool,
355-
split: Optional[str],
515+
split_by_cols: Optional[list[str]],
356516
batch_size: int,
357517
o2m: bool,
358518
check: bool,
@@ -389,7 +549,7 @@ def _execute_import_in_threads(
389549
)
390550

391551
for batch_number, lines_batch in _create_batches(
392-
final_data, split, final_header, batch_size, o2m
552+
final_data, split_by_cols, final_header, batch_size, o2m
393553
):
394554
if rpc_thread.abort_flag:
395555
log.error(
@@ -422,7 +582,7 @@ def import_data(
422582
encoding: str = "utf-8",
423583
separator: str = ";",
424584
ignore: Optional[list[str]] = None,
425-
split: Optional[str] = None,
585+
split_by_cols: Optional[list[str]] = None,
426586
check: bool = True,
427587
max_connection: int = 1,
428588
batch_size: int = 10,
@@ -445,7 +605,7 @@ def import_data(
445605
encoding: The file encoding of the source file.
446606
separator: The delimiter used in the CSV file.
447607
ignore: A list of column names to ignore during import.
448-
split: The column name to group records by to avoid concurrent updates.
608+
split_by_cols: The column names to group records by to avoid concurrent updates.
449609
check: If True, checks if records were successfully imported.
450610
max_connection: The number of simultaneous connections to use.
451611
batch_size: The number of records to process in each batch.
@@ -496,7 +656,7 @@ def import_data(
496656
fail_file_writer,
497657
_context,
498658
is_fail_run,
499-
split,
659+
split_by_cols,
500660
batch_size,
501661
o2m,
502662
check,

src/odoo_data_flow/importer.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -145,16 +145,18 @@ def run_import( # noqa: C901
145145
log.info(f"Workers: {max_connection_run}, Batch Size: {batch_size_run}")
146146
log.info(f"Failed records will be saved to: {fail_output_file}")
147147

148+
split_by_cols_for_import = [split] if split else None
149+
148150
success = import_threaded.import_data(
149-
config,
150-
final_model,
151+
config_file=config,
152+
model=final_model,
151153
file_csv=file_to_process,
152154
context=parsed_context,
153155
fail_file=fail_output_file,
154156
encoding=encoding,
155157
separator=separator,
156158
ignore=ignore_list,
157-
split=split,
159+
split_by_cols=split_by_cols_for_import,
158160
check=check,
159161
max_connection=max_connection_run,
160162
batch_size=batch_size_run,

0 commit comments

Comments
 (0)