From bc951f563ebe304161c340e79b1e8c8a9e3abed1 Mon Sep 17 00:00:00 2001 From: Nick Manganelli Date: Sun, 30 Mar 2025 23:40:37 -0500 Subject: [PATCH 1/9] Uniformize parquet handling by adding the equivalent of functions for determining the form, steps, uuid if available --- src/coffea/dataset_tools/preprocess.py | 439 +++++++++++++++++++++++++ 1 file changed, 439 insertions(+) diff --git a/src/coffea/dataset_tools/preprocess.py b/src/coffea/dataset_tools/preprocess.py index 7bea0699d..3de3f7515 100644 --- a/src/coffea/dataset_tools/preprocess.py +++ b/src/coffea/dataset_tools/preprocess.py @@ -509,3 +509,442 @@ def preprocess( out_available = DataGroupSpec(out_available) out_updated = DataGroupSpec(out_updated) return out_available, out_updated + +def _normalize_parquet_file_info(file_info): + """ + Structure file info akin to _normalize_file_info for uproot files, which returns a list of (filename, object_path, steps, num_entries, uuid) tuples. + """ + normed_files = None + if isinstance(file_info, list): + normed_files = [(file, None, None, None, None) for file in file_info] + elif(isinstance(file_info, dict) and "files" not in file_info): + normed_files = [(file, object_path, None, None, None) for file, object_path in file_info.items()] + elif isinstance(file_info, dict) and "files" in file_info: + normed_files = [] + for filename, maybe_nested in file_info["files"].items(): + if isinstance(maybe_nested, dict): + object_path = maybe_nested.get("object_path", None) + steps = maybe_nested.get("steps", None) + num_entries = maybe_nested.get("num_entries", None) + uuid = maybe_nested.get("uuid", None) + normed_files.append((filename, object_path, steps, num_entries, uuid)) + elif isinstance(maybe_nested, str) or maybe_nested is None: + normed_files.append((filename, maybe_nested, None, None, None)) + else: + raise ValueError( + f"The file_info dictionary must contain either a string, dictionary, or None as the value. _normalize_parquet_file_info got {file_info}" + ) + return normed_files + +def get_parquet_form_uuid_steps( + normed_files: awkward.Array | dask_awkward.Array, + step_size: int | None = None, + use_row_groups: bool = False, + recalculate_steps: bool = False, + skip_bad_files: bool = False, + file_exceptions: Exception | Warning | tuple[Exception | Warning] = (OSError,), + save_form: bool = False, + step_size_safety_factor: float = 0.5, + parquet_options: dict = {}, +) -> awkward.Array | dask_awkward.Array: + """ + Given a list of normalized file and object paths, determine the form, steps, uuid for each file according to the supplied processing options. + + Parameters + ---------- + normed_files: awkward.Array | dask_awkward.Array + The list of normalized file descriptions to process for steps. + step_size: int | None, default None + If specified, the size of the steps to make when analyzing the input files. + use_row_groups: bool, default False + Calculate steps according to the row_groups in the parquet files. + recalculate_steps: bool, default False + If steps are present in the input normed files, force the recalculation of those steps, instead + of only recalculating the steps if the uuid has changed. + skip_bad_files: bool, False + Instead of failing, catch exceptions specified by file_exceptions and return null data. + file_exceptions: Exception | Warning | tuple[Exception | Warning], default (OSError,) + What exceptions to catch when skipping bad files. + save_form: bool, default False + Extract the form of the TTree from the file so we can skip opening files later. + step_size_safety_factor: float, default 0.5 + When using align_clusters, if a resulting step is larger than step_size by this factor + warn the user that the resulting steps may be highly irregular. + + Returns + ------- + array : awkward.Array | dask_awkward.Array + The normalized file descriptions, appended with the calculated steps for those files. + """ + nf_backend = awkward.backend(normed_files) + lz_or_nf = awkward.typetracer.length_zero_if_typetracer(normed_files) + + array = [] if nf_backend != "typetracer" else lz_or_nf + for arg in lz_or_nf: + try: + the_file = awkward.metadata_from_parquet(arg.file, **parquet_options) + except file_exceptions as e: + if skip_bad_files: + array.append(None) + continue + else: + raise e + + num_entries = the_file['num_rows'] + + form_json = None + form_hash = None + if save_form: + form = the_file['form'] + form_str = form.to_json() + # the function cache needs to be popped if present to prevent memory growth + if hasattr(dask.base, "function_cache"): + dask.base.function_cache.popitem() + + form_hash = hashlib.md5(form_str.encode("utf-8")).hexdigest() + form_json = compress_form(form_str) + + target_step_size = num_entries if step_size is None else step_size + + file_uuid = the_file.get('uuid', None) + + out_uuid = arg.uuid + out_steps = arg.steps + + if out_uuid != file_uuid or recalculate_steps: + if use_row_groups: + row_group_entries = the_file['col_counts'] + out = [0] + this_offset = 0 + for c in row_group_entries: + this_offset += c + if this_offset >= out[-1] + target_step_size: + out.append(this_offset) + if this_offset != out[-1]: + out.append(this_offset) + out = numpy.array(out, dtype="int64") + out = numpy.stack((out[:-1], out[1:]), axis=1) + + step_mask = ( + out[:, 1] - out[:, 0] + > (1 + step_size_safety_factor) * target_step_size + ) + if numpy.any(step_mask): + warnings.warn( + f"In file {arg.file}, steps: {out[step_mask]} with use_row_groups=True are " + f"{step_size_safety_factor*100:.0f}% larger than target " + f"step size: {target_step_size}!" + ) + else: + n_steps_target = max(round(num_entries / target_step_size), 1) + actual_step_size = math.ceil(num_entries / n_steps_target) + out = numpy.array( + [ + [ + i * actual_step_size, + min((i + 1) * actual_step_size, num_entries), + ] + for i in range(n_steps_target) + ], + dtype="int64", + ) + + out_uuid = file_uuid + out_steps = out.tolist() + + if out_steps is not None and len(out_steps) == 0: + out_steps = [[0, 0]] + + array.append( + { + "file": arg.file, + "object_path": arg.object_path, + "steps": out_steps, + "num_entries": num_entries, + "uuid": out_uuid, + "form": form_json, + "form_hash_md5": form_hash, + } + ) + + if len(array) == 0: + array = awkward.Array( + [ + { + "file": "junk", + "object_path": "junk", + "steps": [[0, 0]], + "num_entries": 0, + "uuid": "junk", + "form": "junk", + "form_hash_md5": "junk", + }, + None, + ] + ) + array = awkward.Array(array.layout.form.length_zero_array(highlevel=False)) + else: + array = awkward.Array(array) + + if nf_backend == "typetracer": + array = awkward.Array( + array.layout.to_typetracer(forget_length=True), + ) + + return array + +def _preprocess_parquet( + fileset: FilesetSpecOptional, + step_size: None | int = None, + use_row_groups: bool = False, + recalculate_steps: bool = False, + files_per_batch: int = 1, + skip_bad_files: bool = False, + file_exceptions: Exception | Warning | tuple[Exception | Warning] = (OSError,), + save_form: bool = False, + scheduler: None | Callable | str = None, + parquet_options: dict = {}, + step_size_safety_factor: float = 0.5, + allow_empty_datasets: bool = False, +) -> tuple[FilesetSpec, FilesetSpecOptional]: + """ + Given a list of normalized files, determine the form, steps, and add the metadata for each file according to the supplied processing options. + + Parameters + ---------- + fileset: FilesetSpecOptional + The set of datasets whose files will be preprocessed. + step_size: int | None, default None + If specified, the size of the steps to make when analyzing the input files. + use_row_groups: bool, default False + Use the row groups in the parquet files to determine the steps. + recalculate_steps: bool, default False + If steps are present in the input normed files, force the recalculation of those steps, + instead of only recalculating the steps if the uuid has changed. + skip_bad_files: bool, False + Instead of failing, catch exceptions specified by file_exceptions and return null data. + file_exceptions: Exception | Warning | tuple[Exception | Warning], default (FileNotFoundError, OSError) + What exceptions to catch when skipping bad files. + save_form: bool, default False + Extract the form of the TTree from each file in each dataset, creating the union of the forms over the dataset. + scheduler: None | Callable | str, default None + Specifies the scheduler that dask should use to execute the preprocessing task graph. + parquet_options: dict, default {} + Options to pass to get_parquet_form_uuid_steps for opening files + step_size_safety_factor: float, default 0.5 + When using use_row_groups, if a resulting step is larger than step_size by this factor + warn the user that the resulting steps may be highly irregular. + allow_empty_datasets: bool, default False + When a dataset query comes back completely empty, this is normally considered a processing error. + Toggle this argument to True to change this to warnings and allow incomplete returned filesets. + Returns + ------- + out_available : FilesetSpec + The subset of files in each dataset that were successfully preprocessed, organized by dataset. + out_updated : FilesetSpecOptional + The original set of datasets including files that were not accessible, updated to include the result of preprocessing where available. + """ + out_updated = copy.deepcopy(fileset) + out_available = copy.deepcopy(fileset) + + all_ak_norm_files = {} + files_to_preprocess = {} + for name, info in fileset.items(): + norm_files = _normalize_parquet_file_info(info) + fields = ["file", "object_path", "steps", "num_entries", "uuid"] + ak_norm_files = awkward.from_iter(norm_files) + ak_norm_files = awkward.Array( + {field: ak_norm_files[str(ifield)] for ifield, field in enumerate(fields)} + ) + all_ak_norm_files[name] = ak_norm_files + + dak_norm_files = dask_awkward.from_awkward( + ak_norm_files, math.ceil(len(ak_norm_files) / files_per_batch) + ) + + concat_fn = partial( + awkward.concatenate, + axis=0, + ) + + split_every = 8 + + files_trl_label = f"preprocess-{name}" + files_trl_token = dask.base.tokenize(dak_norm_files, concat_fn, split_every) + files_trl_name = f"{files_trl_label}-{files_trl_token}" + files_trl_tree_node_name = f"{files_trl_label}-tree-node-{files_trl_token}" + + files_part = dask_awkward.map_partitions( + get_parquet_form_uuid_steps, + dak_norm_files, + step_size=step_size, + use_row_groups=use_row_groups, + recalculate_steps=recalculate_steps, + skip_bad_files=skip_bad_files, + file_exceptions=file_exceptions, + save_form=save_form, + step_size_safety_factor=step_size_safety_factor, + parquet_options=parquet_options, + meta=dask_awkward.lib.core.empty_typetracer(), + ) + + files_trl = dask_awkward.layers.layers.AwkwardTreeReductionLayer( + name=files_trl_name, + name_input=files_part.name, + npartitions_input=files_part.npartitions, + concat_func=concat_fn, + tree_node_func=lambda x: x, + finalize_func=lambda x: x, + split_every=split_every, + tree_node_name=files_trl_tree_node_name, + ) + + files_graph = dask.highlevelgraph.HighLevelGraph.from_collections( + files_trl_name, files_trl, dependencies=[files_part] + ) + + files_to_preprocess[name] = dask_awkward.lib.core.new_array_object( + files_graph, + files_trl_name, + meta=dask_awkward.lib.core.empty_typetracer(), + npartitions=len(files_trl.output_partitions), + ) + + (all_processed_files,) = dask.compute(files_to_preprocess, scheduler=scheduler) + + for name, processed_files in all_processed_files.items(): + + if len(awkward.drop_none(processed_files, axis=0)) == 0: + ds_empty_msg = ( + "There was no populated list of files returned from querying your input dataset." + "\nPlease check your xrootd endpoints, and avoid redirectors." + f"\nInput dataset: {name}" + f"\nAs parsed for querying: {awkward.to_list(all_ak_norm_files[name])}" + ) + + if not allow_empty_datasets: + raise Exception(ds_empty_msg) + + warnings.warn(ds_empty_msg) + del out_available[name] + continue + + processed_files_without_forms = processed_files[ + ["file", "object_path", "steps", "num_entries", "uuid"] + ] + + forms = processed_files[["file", "form", "form_hash_md5", "num_entries"]][ + ~awkward.is_none(processed_files.form_hash_md5) + ] + + _, unique_forms_idx = numpy.unique( + forms.form_hash_md5.to_numpy(), return_index=True + ) + + dataset_forms = [] + unique_forms = forms[unique_forms_idx] + for thefile, formstr, num_entries in zip( + unique_forms.file, unique_forms.form, unique_forms.num_entries + ): + # skip trivially filled or empty files + form = awkward.forms.from_json(decompress_form(formstr)) + if num_entries >= 0 and set(form.fields) != _trivial_file_fields: + dataset_forms.append(form) + else: + warnings.warn( + f"{thefile} has fields {form.fields} and num_entries={num_entries} " + "and has been skipped during form-union determination. You will need " + "to skip this file when processing. You can either manually remove it " + "or, if it is an empty file, dynamically remove it with the function " + "dataset_tools.filter_files which takes the output of preprocess and " + ", by default, removes empty files each dataset in a fileset." + ) + + union_array = None + union_form_jsonstr = None + while len(dataset_forms): + new_array = awkward.Array(dataset_forms.pop().length_zero_array()) + if union_array is None: + union_array = new_array + else: + union_array = awkward.to_packed( + awkward.merge_union_of_records( + awkward.concatenate([union_array, new_array]), axis=0 + ) + ) + union_array.layout.parameters.update(new_array.layout.parameters) + if union_array is not None: + union_form = union_array.layout.form + + for icontent, content in enumerate(union_form.contents): + if isinstance(content, awkward.forms.IndexedOptionForm): + if ( + not isinstance(content.content, awkward.forms.NumpyForm) + or content.content.primitive != "bool" + ): + raise ValueError( + "IndexedOptionArrays can only contain NumpyArrays of " + "bools in mergers of flat-tuple-like schemas!" + ) + parameters = ( + content.content.parameters.copy() + if content.content.parameters is not None + else {} + ) + # re-create IndexOptionForm with parameters of lower level array + union_form.contents[icontent] = awkward.forms.IndexedOptionForm( + content.index, + content.content, + parameters=parameters, + form_key=content.form_key, + ) + + union_form_jsonstr = union_form.to_json() + + files_available = { + item["file"]: { + "object_path": item["object_path"], + "steps": item["steps"], + "num_entries": item["num_entries"], + "uuid": item["uuid"], + } + for item in awkward.drop_none(processed_files_without_forms).to_list() + } + + files_out = {} + for proc_item, orig_item in zip( + processed_files_without_forms.to_list(), all_ak_norm_files[name].to_list() + ): + item = orig_item if proc_item is None else proc_item + files_out[item["file"]] = { + "object_path": item["object_path"], + "steps": item["steps"], + "num_entries": item["num_entries"], + "uuid": item["uuid"], + } + + if "files" in out_updated[name]: + out_updated[name]["files"] = files_out + out_available[name]["files"] = files_available + else: + out_updated[name] = {"files": files_out, "metadata": None, "form": None} + out_available[name] = { + "files": files_available, + "metadata": None, + "form": None, + } + + compressed_union_form = None + if union_form_jsonstr is not None: + compressed_union_form = compress_form(union_form_jsonstr) + out_updated[name]["form"] = compressed_union_form + out_available[name]["form"] = compressed_union_form + else: + out_updated[name]["form"] = None + out_available[name]["form"] = None + + if "metadata" not in out_updated[name]: + out_updated[name]["metadata"] = None + out_available[name]["metadata"] = None + + return out_available, out_updated From 437e39b7596eb3b4eb0550a6b1a5745c89a868ee Mon Sep 17 00:00:00 2001 From: Nick Manganelli Date: Sat, 22 Nov 2025 17:47:00 -0600 Subject: [PATCH 2/9] Pieces missed from commit 4b875008 in datafactory_parquet branch --- src/coffea/dataset_tools/preprocess.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/src/coffea/dataset_tools/preprocess.py b/src/coffea/dataset_tools/preprocess.py index 3de3f7515..a9a82cee7 100644 --- a/src/coffea/dataset_tools/preprocess.py +++ b/src/coffea/dataset_tools/preprocess.py @@ -510,16 +510,20 @@ def preprocess( out_updated = DataGroupSpec(out_updated) return out_available, out_updated -def _normalize_parquet_file_info(file_info): +def _normalize_parquet_file_info(file_info, return_form_or_metadata=False): """ Structure file info akin to _normalize_file_info for uproot files, which returns a list of (filename, object_path, steps, num_entries, uuid) tuples. """ normed_files = None + form = None + metadata = None if isinstance(file_info, list): normed_files = [(file, None, None, None, None) for file in file_info] elif(isinstance(file_info, dict) and "files" not in file_info): normed_files = [(file, object_path, None, None, None) for file, object_path in file_info.items()] elif isinstance(file_info, dict) and "files" in file_info: + form = file_info.get("form", None) + metadata = file_info.get("metadata", None) normed_files = [] for filename, maybe_nested in file_info["files"].items(): if isinstance(maybe_nested, dict): @@ -534,6 +538,8 @@ def _normalize_parquet_file_info(file_info): raise ValueError( f"The file_info dictionary must contain either a string, dictionary, or None as the value. _normalize_parquet_file_info got {file_info}" ) + if return_form_or_metadata: + return normed_files, form, metadata return normed_files def get_parquet_form_uuid_steps( From 6ba42057638b11481f170fe758b58ce180e1b0d6 Mon Sep 17 00:00:00 2001 From: Nick Manganelli Date: Sat, 22 Nov 2025 17:47:26 -0600 Subject: [PATCH 3/9] The try-except block for mapping when utilizing parquet --- src/coffea/nanoevents/factory.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/coffea/nanoevents/factory.py b/src/coffea/nanoevents/factory.py index 51ad4556c..14a899f19 100644 --- a/src/coffea/nanoevents/factory.py +++ b/src/coffea/nanoevents/factory.py @@ -764,7 +764,10 @@ def events(self): """ if self._mode == "dask": dask_awkward.lib.core.dak_cache.clear() - events = self._mapping(form_mapping=self._schema) + try: + events = self._mapping(form_mapping=self._schema) + except TypeError: + events = self._mapping() report = None if isinstance(events, tuple): events, report = events From e00502d5aaa64a07ab056d7784d21e0f301d8a57 Mon Sep 17 00:00:00 2001 From: Nick Manganelli Date: Sat, 22 Nov 2025 17:56:29 -0600 Subject: [PATCH 4/9] pre-commit fixes --- src/coffea/dataset_tools/preprocess.py | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/src/coffea/dataset_tools/preprocess.py b/src/coffea/dataset_tools/preprocess.py index a9a82cee7..488a1dbee 100644 --- a/src/coffea/dataset_tools/preprocess.py +++ b/src/coffea/dataset_tools/preprocess.py @@ -510,6 +510,7 @@ def preprocess( out_updated = DataGroupSpec(out_updated) return out_available, out_updated + def _normalize_parquet_file_info(file_info, return_form_or_metadata=False): """ Structure file info akin to _normalize_file_info for uproot files, which returns a list of (filename, object_path, steps, num_entries, uuid) tuples. @@ -519,8 +520,11 @@ def _normalize_parquet_file_info(file_info, return_form_or_metadata=False): metadata = None if isinstance(file_info, list): normed_files = [(file, None, None, None, None) for file in file_info] - elif(isinstance(file_info, dict) and "files" not in file_info): - normed_files = [(file, object_path, None, None, None) for file, object_path in file_info.items()] + elif isinstance(file_info, dict) and "files" not in file_info: + normed_files = [ + (file, object_path, None, None, None) + for file, object_path in file_info.items() + ] elif isinstance(file_info, dict) and "files" in file_info: form = file_info.get("form", None) metadata = file_info.get("metadata", None) @@ -542,6 +546,7 @@ def _normalize_parquet_file_info(file_info, return_form_or_metadata=False): return normed_files, form, metadata return normed_files + def get_parquet_form_uuid_steps( normed_files: awkward.Array | dask_awkward.Array, step_size: int | None = None, @@ -596,12 +601,12 @@ def get_parquet_form_uuid_steps( else: raise e - num_entries = the_file['num_rows'] + num_entries = the_file["num_rows"] form_json = None form_hash = None if save_form: - form = the_file['form'] + form = the_file["form"] form_str = form.to_json() # the function cache needs to be popped if present to prevent memory growth if hasattr(dask.base, "function_cache"): @@ -612,14 +617,14 @@ def get_parquet_form_uuid_steps( target_step_size = num_entries if step_size is None else step_size - file_uuid = the_file.get('uuid', None) + file_uuid = the_file.get("uuid", None) out_uuid = arg.uuid out_steps = arg.steps if out_uuid != file_uuid or recalculate_steps: if use_row_groups: - row_group_entries = the_file['col_counts'] + row_group_entries = the_file["col_counts"] out = [0] this_offset = 0 for c in row_group_entries: @@ -699,6 +704,7 @@ def get_parquet_form_uuid_steps( return array + def _preprocess_parquet( fileset: FilesetSpecOptional, step_size: None | int = None, From 5e29c6bec1025454f85ff0b6646df32f36a53f11 Mon Sep 17 00:00:00 2001 From: Nick Manganelli Date: Sat, 22 Nov 2025 20:17:50 -0600 Subject: [PATCH 5/9] Start adapting to pydantic path only in _preprocess_parquet --- src/coffea/dataset_tools/preprocess.py | 86 ++++++++------------------ 1 file changed, 25 insertions(+), 61 deletions(-) diff --git a/src/coffea/dataset_tools/preprocess.py b/src/coffea/dataset_tools/preprocess.py index 488a1dbee..507930e75 100644 --- a/src/coffea/dataset_tools/preprocess.py +++ b/src/coffea/dataset_tools/preprocess.py @@ -511,39 +511,19 @@ def preprocess( return out_available, out_updated -def _normalize_parquet_file_info(file_info, return_form_or_metadata=False): +def _normalize_parquet_file_info(datasetspec: DatasetSpec, return_compressedform_or_metadata=False): """ Structure file info akin to _normalize_file_info for uproot files, which returns a list of (filename, object_path, steps, num_entries, uuid) tuples. """ - normed_files = None - form = None - metadata = None - if isinstance(file_info, list): - normed_files = [(file, None, None, None, None) for file in file_info] - elif isinstance(file_info, dict) and "files" not in file_info: - normed_files = [ - (file, object_path, None, None, None) - for file, object_path in file_info.items() - ] - elif isinstance(file_info, dict) and "files" in file_info: - form = file_info.get("form", None) - metadata = file_info.get("metadata", None) - normed_files = [] - for filename, maybe_nested in file_info["files"].items(): - if isinstance(maybe_nested, dict): - object_path = maybe_nested.get("object_path", None) - steps = maybe_nested.get("steps", None) - num_entries = maybe_nested.get("num_entries", None) - uuid = maybe_nested.get("uuid", None) - normed_files.append((filename, object_path, steps, num_entries, uuid)) - elif isinstance(maybe_nested, str) or maybe_nested is None: - normed_files.append((filename, maybe_nested, None, None, None)) - else: - raise ValueError( - f"The file_info dictionary must contain either a string, dictionary, or None as the value. _normalize_parquet_file_info got {file_info}" - ) - if return_form_or_metadata: - return normed_files, form, metadata + if not isinstance(datasetspec, DatasetSpec): + raise ValueError( + f"_normalize_parquet_file_info expects a DatasetSpec, got {type(datasetspec)}" + ) + normed_files = [] + for filename, fileinfo in datasetspec.files.items(): + normed_files.append((filename, fileinfo.object_path, fileinfo.steps, fileinfo.num_entries, fileinfo.uuid)) + if return_compressedform_or_metadata: + return normed_files, datasetspec.compressed_form, datasetspec.metadata return normed_files @@ -706,19 +686,19 @@ def get_parquet_form_uuid_steps( def _preprocess_parquet( - fileset: FilesetSpecOptional, + datagroupspec: DataGroupSpec, step_size: None | int = None, use_row_groups: bool = False, recalculate_steps: bool = False, files_per_batch: int = 1, skip_bad_files: bool = False, file_exceptions: Exception | Warning | tuple[Exception | Warning] = (OSError,), - save_form: bool = False, + save_form: bool = True, scheduler: None | Callable | str = None, parquet_options: dict = {}, step_size_safety_factor: float = 0.5, allow_empty_datasets: bool = False, -) -> tuple[FilesetSpec, FilesetSpecOptional]: +) -> tuple[DataGroupSpec, DataGroupSpec]: """ Given a list of normalized files, determine the form, steps, and add the metadata for each file according to the supplied processing options. @@ -756,12 +736,12 @@ def _preprocess_parquet( out_updated : FilesetSpecOptional The original set of datasets including files that were not accessible, updated to include the result of preprocessing where available. """ - out_updated = copy.deepcopy(fileset) - out_available = copy.deepcopy(fileset) + out_updated = datagroupspec.model_dump() + out_available = datagroupspec.model_dump() all_ak_norm_files = {} files_to_preprocess = {} - for name, info in fileset.items(): + for name, info in datagroupspec.items(): norm_files = _normalize_parquet_file_info(info) fields = ["file", "object_path", "steps", "num_entries", "uuid"] ak_norm_files = awkward.from_iter(norm_files) @@ -845,16 +825,16 @@ def _preprocess_parquet( ["file", "object_path", "steps", "num_entries", "uuid"] ] - forms = processed_files[["file", "form", "form_hash_md5", "num_entries"]][ + compressed_forms = processed_files[["file", "form", "form_hash_md5", "num_entries"]][ ~awkward.is_none(processed_files.form_hash_md5) ] _, unique_forms_idx = numpy.unique( - forms.form_hash_md5.to_numpy(), return_index=True + compressed_forms.form_hash_md5.to_numpy(), return_index=True ) dataset_forms = [] - unique_forms = forms[unique_forms_idx] + unique_forms = compressed_forms[unique_forms_idx] for thefile, formstr, num_entries in zip( unique_forms.file, unique_forms.form, unique_forms.num_entries ): @@ -935,28 +915,12 @@ def _preprocess_parquet( "uuid": item["uuid"], } - if "files" in out_updated[name]: - out_updated[name]["files"] = files_out - out_available[name]["files"] = files_available - else: - out_updated[name] = {"files": files_out, "metadata": None, "form": None} - out_available[name] = { - "files": files_available, - "metadata": None, - "form": None, - } + out_updated[name]["files"] = files_out + out_available[name]["files"] = files_available compressed_union_form = None - if union_form_jsonstr is not None: - compressed_union_form = compress_form(union_form_jsonstr) - out_updated[name]["form"] = compressed_union_form - out_available[name]["form"] = compressed_union_form - else: - out_updated[name]["form"] = None - out_available[name]["form"] = None - - if "metadata" not in out_updated[name]: - out_updated[name]["metadata"] = None - out_available[name]["metadata"] = None + compressed_union_form = compress_form(union_form_jsonstr) if union_form_jsonstr else None + out_updated[name]["compressed_form"] = compressed_union_form + out_available[name]["compressed_form"] = compressed_union_form - return out_available, out_updated + return DataGroupSpec.model_validate(out_available), DataGroupSpec.model_validate(out_updated) From d7ac36832cea18848cd3e85d6c38415d42af77eb Mon Sep 17 00:00:00 2001 From: Nick Manganelli Date: Sat, 22 Nov 2025 20:22:37 -0600 Subject: [PATCH 6/9] DataGroupSpec only input for parquet preprocessing --- src/coffea/dataset_tools/preprocess.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/src/coffea/dataset_tools/preprocess.py b/src/coffea/dataset_tools/preprocess.py index 507930e75..4e17e17a8 100644 --- a/src/coffea/dataset_tools/preprocess.py +++ b/src/coffea/dataset_tools/preprocess.py @@ -704,7 +704,7 @@ def _preprocess_parquet( Parameters ---------- - fileset: FilesetSpecOptional + fileset: DataGroupSpec The set of datasets whose files will be preprocessed. step_size: int | None, default None If specified, the size of the steps to make when analyzing the input files. @@ -731,11 +731,15 @@ def _preprocess_parquet( Toggle this argument to True to change this to warnings and allow incomplete returned filesets. Returns ------- - out_available : FilesetSpec + out_available : DataGroupSpec The subset of files in each dataset that were successfully preprocessed, organized by dataset. - out_updated : FilesetSpecOptional + out_updated : DataGroupSpec The original set of datasets including files that were not accessible, updated to include the result of preprocessing where available. """ + if not isinstance(datagroupspec, DataGroupSpec): + raise ValueError( + f"_preprocess_parquet expects a DataGroupSpec, got {type(datagroupspec)}" + ) out_updated = datagroupspec.model_dump() out_available = datagroupspec.model_dump() From cd52a4df127c9a59d218c3bec0351c06114c311c Mon Sep 17 00:00:00 2001 From: Nick Manganelli Date: Sat, 22 Nov 2025 20:28:50 -0600 Subject: [PATCH 7/9] pre-commit fixes --- src/coffea/dataset_tools/preprocess.py | 28 +++++++++++++++++++------- 1 file changed, 21 insertions(+), 7 deletions(-) diff --git a/src/coffea/dataset_tools/preprocess.py b/src/coffea/dataset_tools/preprocess.py index 4e17e17a8..2136e53c4 100644 --- a/src/coffea/dataset_tools/preprocess.py +++ b/src/coffea/dataset_tools/preprocess.py @@ -511,7 +511,9 @@ def preprocess( return out_available, out_updated -def _normalize_parquet_file_info(datasetspec: DatasetSpec, return_compressedform_or_metadata=False): +def _normalize_parquet_file_info( + datasetspec: DatasetSpec, return_compressedform_or_metadata=False +): """ Structure file info akin to _normalize_file_info for uproot files, which returns a list of (filename, object_path, steps, num_entries, uuid) tuples. """ @@ -521,7 +523,15 @@ def _normalize_parquet_file_info(datasetspec: DatasetSpec, return_compressedform ) normed_files = [] for filename, fileinfo in datasetspec.files.items(): - normed_files.append((filename, fileinfo.object_path, fileinfo.steps, fileinfo.num_entries, fileinfo.uuid)) + normed_files.append( + ( + filename, + fileinfo.object_path, + fileinfo.steps, + fileinfo.num_entries, + fileinfo.uuid, + ) + ) if return_compressedform_or_metadata: return normed_files, datasetspec.compressed_form, datasetspec.metadata return normed_files @@ -829,9 +839,9 @@ def _preprocess_parquet( ["file", "object_path", "steps", "num_entries", "uuid"] ] - compressed_forms = processed_files[["file", "form", "form_hash_md5", "num_entries"]][ - ~awkward.is_none(processed_files.form_hash_md5) - ] + compressed_forms = processed_files[ + ["file", "form", "form_hash_md5", "num_entries"] + ][~awkward.is_none(processed_files.form_hash_md5)] _, unique_forms_idx = numpy.unique( compressed_forms.form_hash_md5.to_numpy(), return_index=True @@ -923,8 +933,12 @@ def _preprocess_parquet( out_available[name]["files"] = files_available compressed_union_form = None - compressed_union_form = compress_form(union_form_jsonstr) if union_form_jsonstr else None + compressed_union_form = ( + compress_form(union_form_jsonstr) if union_form_jsonstr else None + ) out_updated[name]["compressed_form"] = compressed_union_form out_available[name]["compressed_form"] = compressed_union_form - return DataGroupSpec.model_validate(out_available), DataGroupSpec.model_validate(out_updated) + return DataGroupSpec.model_validate(out_available), DataGroupSpec.model_validate( + out_updated + ) From 9e82c190c1d84607eb93af893ff5dfa467ac7974 Mon Sep 17 00:00:00 2001 From: Nick Manganelli Date: Sat, 22 Nov 2025 23:19:09 -0600 Subject: [PATCH 8/9] break preprocess into legacy, parquet, root, and general-dispatching versions, with appropriate options and doc strings --- src/coffea/dataset_tools/preprocess.py | 407 ++++++++++++++++++++----- 1 file changed, 331 insertions(+), 76 deletions(-) diff --git a/src/coffea/dataset_tools/preprocess.py b/src/coffea/dataset_tools/preprocess.py index 2136e53c4..1d79d3d4f 100644 --- a/src/coffea/dataset_tools/preprocess.py +++ b/src/coffea/dataset_tools/preprocess.py @@ -18,9 +18,7 @@ from coffea.dataset_tools.filespec import ( DataGroupSpec, DatasetSpec, - InputFiles, ModelFactory, - PreprocessedFiles, ) from coffea.util import _is_interpretable, compress_form, decompress_form @@ -157,7 +155,7 @@ def get_steps( "steps": out_steps, "num_entries": num_entries, "uuid": out_uuid, - "compressed_form": form_json, + "form": form_json, "form_hash_md5": form_hash, } ) @@ -171,7 +169,7 @@ def get_steps( "steps": [[0, 0]], "num_entries": 0, "uuid": "junk", - "compressed_form": "junk", + "form": "junk", "form_hash_md5": "junk", }, None, @@ -226,8 +224,8 @@ def _normalize_file_info(file_info): _trivial_file_fields = {"run", "luminosityBlock", "event"} -def preprocess( - fileset: DataGroupSpec | dict, +def preprocess_legacy( + fileset: dict, step_size: None | int = None, align_clusters: bool = False, recalculate_steps: bool = False, @@ -239,58 +237,55 @@ def preprocess( uproot_options: dict = {}, step_size_safety_factor: float = 0.5, allow_empty_datasets: bool = False, -) -> tuple[DataGroupSpec, DataGroupSpec] | tuple[dict, dict]: +) -> tuple[dict, dict]: """ Given a list of normalized file and object paths (defined in uproot), determine the steps for each file according to the supplied processing options. Parameters ---------- - fileset : DataGroupSpec | dict + fileset: dict The set of datasets whose files will be preprocessed. - step_size : int or None, default None + step_size: int | None, default None If specified, the size of the steps to make when analyzing the input files. - align_clusters : bool, default False + align_clusters: bool, default False Round to the cluster size in a root file, when chunks are specified. Reduces data transfer in analysis. - recalculate_steps : bool, default False + recalculate_steps: bool, default False If steps are present in the input normed files, force the recalculation of those steps, instead of only recalculating the steps if the uuid has changed. - files_per_batch : int, default 1 + files_per_batch: int, default 1 The number of files to preprocess in a single batch. Large values will result in fewer dask tasks but each task will have to do more work. - skip_bad_files : bool, default False + skip_bad_files: bool, False Instead of failing, catch exceptions specified by file_exceptions and return null data. - file_exceptions : Exception or Warning or tuple[Exception or Warning], default (FileNotFoundError, OSError) + file_exceptions: Exception | Warning | tuple[Exception | Warning], default (FileNotFoundError, OSError) What exceptions to catch when skipping bad files. - save_form : bool, default False + save_form: bool, default False Extract the form of the TTree from each file in each dataset, creating the union of the forms over the dataset. - scheduler : None or Callable or str, default None + scheduler: None | Callable | str, default None Specifies the scheduler that dask should use to execute the preprocessing task graph. - uproot_options : dict, default {} + uproot_options: dict, default {} Options to pass to get_steps for opening files with uproot. - step_size_safety_factor : float, default 0.5 + step_size_safety_factor: float, default 0.5 When using align_clusters, if a resulting step is larger than step_size by this factor warn the user that the resulting steps may be highly irregular. - allow_empty_datasets : bool, default False + allow_empty_datasets: bool, default False When a dataset query comes back completely empty, this is normally considered a processing error. Toggle this argument to True to change this to warnings and allow incomplete returned filesets. Returns ------- - out_available : DataGroupSpec | dict + out_available : dict The subset of files in each dataset that were successfully preprocessed, organized by dataset. - out_updated : DataGroupSpec | dict + out_updated : dict The original set of datasets including files that were not accessible, updated to include the result of preprocessing where available. """ + out_updated = copy.deepcopy(fileset) out_available = copy.deepcopy(fileset) all_ak_norm_files = {} files_to_preprocess = {} - is_DataGroupSpec = isinstance(fileset, DataGroupSpec) for name, info in fileset.items(): - is_datasetspec = isinstance(info, DatasetSpec) - if is_datasetspec: - is_DataGroupSpec = True norm_files = _normalize_file_info(info) fields = ["file", "object_path", "steps", "num_entries", "uuid"] ak_norm_files = awkward.from_iter(norm_files) @@ -374,9 +369,9 @@ def preprocess( ["file", "object_path", "steps", "num_entries", "uuid"] ] - forms = processed_files[ - ["file", "compressed_form", "form_hash_md5", "num_entries"] - ][~awkward.is_none(processed_files.form_hash_md5)] + forms = processed_files[["file", "form", "form_hash_md5", "num_entries"]][ + ~awkward.is_none(processed_files.form_hash_md5) + ] _, unique_forms_idx = numpy.unique( forms.form_hash_md5.to_numpy(), return_index=True @@ -385,7 +380,7 @@ def preprocess( dataset_forms = [] unique_forms = forms[unique_forms_idx] for thefile, formstr, num_entries in zip( - unique_forms.file, unique_forms.compressed_form, unique_forms.num_entries + unique_forms.file, unique_forms.form, unique_forms.num_entries ): # skip trivially filled or empty files form = awkward.forms.from_json(decompress_form(formstr)) @@ -464,54 +459,34 @@ def preprocess( "uuid": item["uuid"], } - if is_datasetspec: - out_updated[name].files = InputFiles(files_out) - out_available[name].files = PreprocessedFiles(files_available) - elif "files" in out_updated[name]: + if "files" in out_updated[name]: out_updated[name]["files"] = files_out out_available[name]["files"] = files_available else: - out_updated[name] = { - "files": files_out, - "metadata": None, - "compressed_form": None, - } + out_updated[name] = {"files": files_out, "metadata": None, "form": None} out_available[name] = { "files": files_available, "metadata": None, - "compressed_form": None, + "form": None, } compressed_union_form = None if union_form_jsonstr is not None: compressed_union_form = compress_form(union_form_jsonstr) - if is_datasetspec: - out_updated[name].compressed_form = compressed_union_form - out_available[name].compressed_form = compressed_union_form - else: - out_updated[name]["compressed_form"] = compressed_union_form - out_available[name]["compressed_form"] = compressed_union_form + out_updated[name]["form"] = compressed_union_form + out_available[name]["form"] = compressed_union_form else: - if is_datasetspec: - out_updated[name].compressed_form = None - out_available[name].compressed_form = None - else: - out_updated[name]["compressed_form"] = None - out_available[name]["compressed_form"] = None + out_updated[name]["form"] = None + out_available[name]["form"] = None - if is_datasetspec: - pass - elif "metadata" not in out_updated[name]: + if "metadata" not in out_updated[name]: out_updated[name]["metadata"] = None out_available[name]["metadata"] = None - if is_DataGroupSpec: - out_available = DataGroupSpec(out_available) - out_updated = DataGroupSpec(out_updated) return out_available, out_updated -def _normalize_parquet_file_info( +def _normalize_pydantic_file_info( datasetspec: DatasetSpec, return_compressedform_or_metadata=False ): """ @@ -519,7 +494,7 @@ def _normalize_parquet_file_info( """ if not isinstance(datasetspec, DatasetSpec): raise ValueError( - f"_normalize_parquet_file_info expects a DatasetSpec, got {type(datasetspec)}" + f"_normalize_pydantic_file_info expects a DatasetSpec, got {type(datasetspec)}" ) normed_files = [] for filename, fileinfo in datasetspec.files.items(): @@ -695,7 +670,78 @@ def get_parquet_form_uuid_steps( return array -def _preprocess_parquet( +def preprocess_root( + datagroupspec: DataGroupSpec, + step_size: None | int = None, + align_clusters: bool = False, + recalculate_steps: bool = False, + files_per_batch: int = 1, + skip_bad_files: bool = False, + file_exceptions: Exception | Warning | tuple[Exception | Warning] = (OSError,), + save_form: bool = True, + scheduler: None | Callable | str = None, + uproot_options: dict = {}, + step_size_safety_factor: float = 0.5, + allow_empty_datasets: bool = False, +) -> tuple[DataGroupSpec, DataGroupSpec]: + """ + Given a list of normalized file and object paths (defined in uproot), determine the steps for each file according to the supplied processing options. + + Parameters + ---------- + datagroupspec : DataGroupSpec + The set of datasets whose files will be preprocessed. + step_size : int or None, default None + If specified, the size of the steps to make when analyzing the input files. + align_clusters : bool, default False + Round to the cluster size in a root file, when chunks are specified. Reduces data transfer in + analysis. + recalculate_steps : bool, default False + If steps are present in the input normed files, force the recalculation of those steps, + instead of only recalculating the steps if the uuid has changed. + files_per_batch : int, default 1 + The number of files to preprocess in a single batch. + Large values will result in fewer dask tasks but each task will have to do more work. + skip_bad_files : bool, default False + Instead of failing, catch exceptions specified by file_exceptions and return null data. + file_exceptions : Exception or Warning or tuple[Exception or Warning], default (FileNotFoundError, OSError) + What exceptions to catch when skipping bad files. + save_form : bool, default True + Extract the form of the TTree from each file in each dataset, creating the union of the forms over the dataset. + scheduler : None or Callable or str, default None + Specifies the scheduler that dask should use to execute the preprocessing task graph. + uproot_options : dict, default {} + Options to pass to get_steps for opening files with uproot. + step_size_safety_factor : float, default 0.5 + When using align_clusters, if a resulting step is larger than step_size by this factor + warn the user that the resulting steps may be highly irregular. + allow_empty_datasets : bool, default False + When a dataset query comes back completely empty, this is normally considered a processing error. + Toggle this argument to True to change this to warnings and allow incomplete returned filesets. + Returns + ------- + out_available : DataGroupSpec + The subset of files in each dataset that were successfully preprocessed, organized by dataset. + out_updated : DataGroupSpec + The original set of datasets including files that were not accessible, updated to include the result of preprocessing where available. + """ + return _preprocess_pydantic( + datagroupspec=datagroupspec, + step_size=step_size, + use_alignment_boundaries=align_clusters, + recalculate_steps=recalculate_steps, + files_per_batch=files_per_batch, + skip_bad_files=skip_bad_files, + file_exceptions=file_exceptions, + save_form=save_form, + scheduler=scheduler, + filetype_options=uproot_options, + step_size_safety_factor=step_size_safety_factor, + allow_empty_datasets=allow_empty_datasets, + ) + + +def preprocess_parquet( datagroupspec: DataGroupSpec, step_size: None | int = None, use_row_groups: bool = False, @@ -727,7 +773,7 @@ def _preprocess_parquet( Instead of failing, catch exceptions specified by file_exceptions and return null data. file_exceptions: Exception | Warning | tuple[Exception | Warning], default (FileNotFoundError, OSError) What exceptions to catch when skipping bad files. - save_form: bool, default False + save_form: bool, default True Extract the form of the TTree from each file in each dataset, creating the union of the forms over the dataset. scheduler: None | Callable | str, default None Specifies the scheduler that dask should use to execute the preprocessing task graph. @@ -746,9 +792,44 @@ def _preprocess_parquet( out_updated : DataGroupSpec The original set of datasets including files that were not accessible, updated to include the result of preprocessing where available. """ + return _preprocess_pydantic( + datagroupspec=datagroupspec, + step_size=step_size, + use_alignment_boundaries=use_row_groups, + recalculate_steps=recalculate_steps, + files_per_batch=files_per_batch, + skip_bad_files=skip_bad_files, + file_exceptions=file_exceptions, + save_form=save_form, + scheduler=scheduler, + filetype_options=parquet_options, + step_size_safety_factor=step_size_safety_factor, + allow_empty_datasets=allow_empty_datasets, + ) + + +def _preprocess_pydantic( + datagroupspec: DataGroupSpec, + step_size: None | int = None, + use_alignment_boundaries: bool = False, + recalculate_steps: bool = False, + files_per_batch: int = 1, + skip_bad_files: bool = False, + file_exceptions: Exception | Warning | tuple[Exception | Warning] = (OSError,), + save_form: bool = True, + scheduler: None | Callable | str = None, + filetype_options: dict = {}, + step_size_safety_factor: float = 0.5, + allow_empty_datasets: bool = False, +) -> tuple[DataGroupSpec, DataGroupSpec]: + """ + Function to preprocess either root or parquet DatasetSpecs in a DataGroupSpec. + ROOT TTrees: Maps align_clusters and uproot_options to use_alignment_boundaries and filetype_options respectively. + Parquet: Maps use_row_groups and parquet_options to use_alignment_boundaries and filetype_options respectively + """ if not isinstance(datagroupspec, DataGroupSpec): raise ValueError( - f"_preprocess_parquet expects a DataGroupSpec, got {type(datagroupspec)}" + f"_preprocess_pydantic expects a DataGroupSpec, got {type(datagroupspec)}" ) out_updated = datagroupspec.model_dump() out_available = datagroupspec.model_dump() @@ -756,7 +837,7 @@ def _preprocess_parquet( all_ak_norm_files = {} files_to_preprocess = {} for name, info in datagroupspec.items(): - norm_files = _normalize_parquet_file_info(info) + norm_files = _normalize_pydantic_file_info(info) fields = ["file", "object_path", "steps", "num_entries", "uuid"] ak_norm_files = awkward.from_iter(norm_files) ak_norm_files = awkward.Array( @@ -780,19 +861,38 @@ def _preprocess_parquet( files_trl_name = f"{files_trl_label}-{files_trl_token}" files_trl_tree_node_name = f"{files_trl_label}-tree-node-{files_trl_token}" - files_part = dask_awkward.map_partitions( - get_parquet_form_uuid_steps, - dak_norm_files, - step_size=step_size, - use_row_groups=use_row_groups, - recalculate_steps=recalculate_steps, - skip_bad_files=skip_bad_files, - file_exceptions=file_exceptions, - save_form=save_form, - step_size_safety_factor=step_size_safety_factor, - parquet_options=parquet_options, - meta=dask_awkward.lib.core.empty_typetracer(), - ) + if info.format == "root": + files_part = dask_awkward.map_partitions( + get_steps, + dak_norm_files, + step_size=step_size, + align_clusters=use_alignment_boundaries, + recalculate_steps=recalculate_steps, + skip_bad_files=skip_bad_files, + file_exceptions=file_exceptions, + save_form=save_form, + step_size_safety_factor=step_size_safety_factor, + uproot_options=filetype_options, + meta=dask_awkward.lib.core.empty_typetracer(), + ) + elif info.format == "parquet": + files_part = dask_awkward.map_partitions( + get_parquet_form_uuid_steps, + dak_norm_files, + step_size=step_size, + use_row_groups=use_alignment_boundaries, + recalculate_steps=recalculate_steps, + skip_bad_files=skip_bad_files, + file_exceptions=file_exceptions, + save_form=save_form, + step_size_safety_factor=step_size_safety_factor, + parquet_options=filetype_options, + meta=dask_awkward.lib.core.empty_typetracer(), + ) + else: + raise ValueError( + f"Dataset {name} has unsupported format {info.format}, supported formats are 'root' and 'parquet'." + ) files_trl = dask_awkward.layers.layers.AwkwardTreeReductionLayer( name=files_trl_name, @@ -942,3 +1042,158 @@ def _preprocess_parquet( return DataGroupSpec.model_validate(out_available), DataGroupSpec.model_validate( out_updated ) + + +def preprocess( + fileset: DataGroupSpec | dict, + step_size: None | int = None, + align_clusters: bool = False, + recalculate_steps: bool = False, + files_per_batch: int = 1, + skip_bad_files: bool = False, + file_exceptions: Exception | Warning | tuple[Exception | Warning] = (OSError,), + save_form: bool = True, + scheduler: None | Callable | str = None, + uproot_options: dict = {}, + step_size_safety_factor: float = 0.5, + allow_empty_datasets: bool = False, + preprocess_legacy_root: bool = False, + use_row_groups: bool = False, + parquet_options: dict = {}, +) -> tuple[DataGroupSpec, DataGroupSpec] | tuple[dict, dict]: + """ + Given a list of normalized file and object paths (defined in uproot), determine the steps for each file according to the supplied processing options. + + Parameters + ---------- + fileset : DataGroupSpec | dict + The set of datasets whose files will be preprocessed. + step_size : int or None, default None + If specified, the size of the steps to make when analyzing the input files. + align_clusters : bool, default False + Round to the cluster size in a root file, when chunks are specified. Reduces data transfer in + analysis. + recalculate_steps : bool, default False + If steps are present in the input normed files, force the recalculation of those steps, + instead of only recalculating the steps if the uuid has changed. + files_per_batch : int, default 1 + The number of files to preprocess in a single batch. + Large values will result in fewer dask tasks but each task will have to do more work. + skip_bad_files : bool, default False + Instead of failing, catch exceptions specified by file_exceptions and return null data. + file_exceptions : Exception or Warning or tuple[Exception or Warning], default (FileNotFoundError, OSError) + What exceptions to catch when skipping bad files. + save_form : bool, default False + Extract the form of the TTree from each file in each dataset, creating the union of the forms over the dataset. + scheduler : None or Callable or str, default None + Specifies the scheduler that dask should use to execute the preprocessing task graph. + uproot_options : dict, default {} + Options to pass to get_steps for opening files with uproot. + step_size_safety_factor : float, default 0.5 + When using align_clusters, if a resulting step is larger than step_size by this factor + warn the user that the resulting steps may be highly irregular. + allow_empty_datasets : bool, default False + When a dataset query comes back completely empty, this is normally considered a processing error. + Toggle this argument to True to change this to warnings and allow incomplete returned filesets. + preprocess_legacy_root: bool, default False + Use the legacy root preprocessing function for all files, even if the fileset is a DataGroupSpec. + Not compatible with parquet files. + use_row_groups : bool, default False + Calculate steps according to the row_groups in the parquet files (only applies to DataGroupSpec datasets with parquet files). + parquet_options : dict, default {} + Options to pass to get_parquet_form_uuid_steps for opening parquet files (only applies to DataGroupSpec datasets with parquet files). + Returns + ------- + out_available : DataGroupSpec | dict + The subset of files in each dataset that were successfully preprocessed, organized by dataset. + out_updated : DataGroupSpec | dict + The original set of datasets including files that were not accessible, updated to include the result of preprocessing where available. + """ + if preprocess_legacy_root: + # use the legacy root TTree preprocessing function if requested + return preprocess_legacy( + fileset.model_dump() if isinstance(fileset, DataGroupSpec) else fileset, + step_size=step_size, + align_clusters=align_clusters, + recalculate_steps=recalculate_steps, + files_per_batch=files_per_batch, + skip_bad_files=skip_bad_files, + file_exceptions=file_exceptions, + save_form=save_form, + scheduler=scheduler, + uproot_options=uproot_options, + step_size_safety_factor=step_size_safety_factor, + allow_empty_datasets=allow_empty_datasets, + ) + else: + if isinstance(fileset, DataGroupSpec): + datasetspecs = fileset + else: + DeprecationWarning( + "Passing a dict to preprocess is deprecated. Converting to DataGroupSpec and proceeding." + "To remove this warning, pass a DataGroupSpec object instead of a dict." + "To use the legacy preprocessing function, set preprocess_legacy_root=True." + "If automatic conversion is not possible, please submit your fileset to the coffea team." + ) + datasetspecs = DataGroupSpec.model_validate(fileset) + # split datasetspecs into uproot and parquet files, keeping track of original order + original_order = list(datasetspecs.keys()) + formats = [dss.format for dss in datasetspecs.values()] + if len(set(formats)) > 1 and align_clusters != use_row_groups: + warnings.warn( + "When preprocessing a mixed fileset, align_clusters and use_row_groups serve a similar function. If you didn't intend to treat root and parquet files' boundary alignments differently, set both to the same value." + ) + out_available_uproot, out_updated_uproot = preprocess_root( + datasetspecs.filter_datasets( + filter_callable=lambda ds: ds.format == "root" + ), + step_size=step_size, + align_clusters=align_clusters, + recalculate_steps=recalculate_steps, + files_per_batch=files_per_batch, + skip_bad_files=skip_bad_files, + file_exceptions=file_exceptions, + save_form=save_form, + scheduler=scheduler, + uproot_options=uproot_options, + step_size_safety_factor=step_size_safety_factor, + allow_empty_datasets=allow_empty_datasets, + ) + out_available_parquet, out_updated_parquet = preprocess_parquet( + datasetspecs.filter_datasets( + filter_callable=lambda ds: ds.format == "parquet" + ), + step_size=step_size, + use_row_groups=use_row_groups, + recalculate_steps=recalculate_steps, + files_per_batch=files_per_batch, + skip_bad_files=skip_bad_files, + file_exceptions=file_exceptions, + save_form=save_form, + scheduler=scheduler, + parquet_options=parquet_options, + step_size_safety_factor=step_size_safety_factor, + allow_empty_datasets=allow_empty_datasets, + ) + # recombine outputs in original order + out_available = DataGroupSpec( + { + k: ( + out_available_uproot[k] + if k in out_available_uproot + else out_available_parquet[k] + ) + for k in original_order + } + ) + out_updated = DataGroupSpec( + { + k: ( + out_updated_uproot[k] + if k in out_updated_uproot + else out_updated_parquet[k] + ) + for k in original_order + } + ) + return out_available, out_updated From c4ef71b83a41d72fbb3e7dc6f27a99fd0b482f14 Mon Sep 17 00:00:00 2001 From: Nick Manganelli Date: Sat, 22 Nov 2025 23:26:47 -0600 Subject: [PATCH 9/9] fixup formatting --- src/coffea/dataset_tools/preprocess.py | 67 +++++++++++++------------- 1 file changed, 33 insertions(+), 34 deletions(-) diff --git a/src/coffea/dataset_tools/preprocess.py b/src/coffea/dataset_tools/preprocess.py index 1d79d3d4f..bd4ce9411 100644 --- a/src/coffea/dataset_tools/preprocess.py +++ b/src/coffea/dataset_tools/preprocess.py @@ -189,8 +189,7 @@ def get_steps( def _normalize_file_info(file_info): normed_files = None - is_datasetspec = isinstance(file_info, DatasetSpec) - if is_datasetspec: + if isinstance(file_info, DatasetSpec): normed_files = uproot._util.regularize_files( ModelFactory.datasetspec_to_dict(file_info, coerce_filespec_to_dict=True)[ "files" @@ -243,33 +242,33 @@ def preprocess_legacy( Parameters ---------- - fileset: dict + fileset : dict The set of datasets whose files will be preprocessed. - step_size: int | None, default None + step_size : int | None, default None If specified, the size of the steps to make when analyzing the input files. - align_clusters: bool, default False + align_clusters : bool, default False Round to the cluster size in a root file, when chunks are specified. Reduces data transfer in analysis. - recalculate_steps: bool, default False + recalculate_steps : bool, default False If steps are present in the input normed files, force the recalculation of those steps, instead of only recalculating the steps if the uuid has changed. - files_per_batch: int, default 1 + files_per_batch : int, default 1 The number of files to preprocess in a single batch. Large values will result in fewer dask tasks but each task will have to do more work. - skip_bad_files: bool, False + skip_bad_files : bool, False Instead of failing, catch exceptions specified by file_exceptions and return null data. - file_exceptions: Exception | Warning | tuple[Exception | Warning], default (FileNotFoundError, OSError) + file_exceptions : Exception | Warning | tuple[Exception | Warning], default (FileNotFoundError, OSError) What exceptions to catch when skipping bad files. - save_form: bool, default False + save_form : bool, default False Extract the form of the TTree from each file in each dataset, creating the union of the forms over the dataset. - scheduler: None | Callable | str, default None + scheduler : None | Callable | str, default None Specifies the scheduler that dask should use to execute the preprocessing task graph. - uproot_options: dict, default {} + uproot_options : dict, default {} Options to pass to get_steps for opening files with uproot. - step_size_safety_factor: float, default 0.5 + step_size_safety_factor : float, default 0.5 When using align_clusters, if a resulting step is larger than step_size by this factor warn the user that the resulting steps may be highly irregular. - allow_empty_datasets: bool, default False + allow_empty_datasets : bool, default False When a dataset query comes back completely empty, this is normally considered a processing error. Toggle this argument to True to change this to warnings and allow incomplete returned filesets. Returns @@ -528,22 +527,22 @@ def get_parquet_form_uuid_steps( Parameters ---------- - normed_files: awkward.Array | dask_awkward.Array + normed_files : awkward.Array | dask_awkward.Array The list of normalized file descriptions to process for steps. - step_size: int | None, default None + step_size : int | None, default None If specified, the size of the steps to make when analyzing the input files. - use_row_groups: bool, default False + use_row_groups : bool, default False Calculate steps according to the row_groups in the parquet files. - recalculate_steps: bool, default False + recalculate_steps : bool, default False If steps are present in the input normed files, force the recalculation of those steps, instead of only recalculating the steps if the uuid has changed. - skip_bad_files: bool, False + skip_bad_files : bool, False Instead of failing, catch exceptions specified by file_exceptions and return null data. - file_exceptions: Exception | Warning | tuple[Exception | Warning], default (OSError,) + file_exceptions : Exception | Warning | tuple[Exception | Warning], default (OSError,) What exceptions to catch when skipping bad files. - save_form: bool, default False + save_form : bool, default False Extract the form of the TTree from the file so we can skip opening files later. - step_size_safety_factor: float, default 0.5 + step_size_safety_factor : float, default 0.5 When using align_clusters, if a resulting step is larger than step_size by this factor warn the user that the resulting steps may be highly irregular. @@ -760,29 +759,29 @@ def preprocess_parquet( Parameters ---------- - fileset: DataGroupSpec + fileset : DataGroupSpec The set of datasets whose files will be preprocessed. - step_size: int | None, default None + step_size : int | None, default None If specified, the size of the steps to make when analyzing the input files. - use_row_groups: bool, default False + use_row_groups : bool, default False Use the row groups in the parquet files to determine the steps. - recalculate_steps: bool, default False + recalculate_steps : bool, default False If steps are present in the input normed files, force the recalculation of those steps, instead of only recalculating the steps if the uuid has changed. - skip_bad_files: bool, False + skip_bad_files : bool, False Instead of failing, catch exceptions specified by file_exceptions and return null data. - file_exceptions: Exception | Warning | tuple[Exception | Warning], default (FileNotFoundError, OSError) + file_exceptions : Exception | Warning | tuple[Exception | Warning], default (FileNotFoundError, OSError) What exceptions to catch when skipping bad files. - save_form: bool, default True + save_form : bool, default True Extract the form of the TTree from each file in each dataset, creating the union of the forms over the dataset. - scheduler: None | Callable | str, default None + scheduler : None | Callable | str, default None Specifies the scheduler that dask should use to execute the preprocessing task graph. - parquet_options: dict, default {} + parquet_options : dict, default {} Options to pass to get_parquet_form_uuid_steps for opening files - step_size_safety_factor: float, default 0.5 + step_size_safety_factor : float, default 0.5 When using use_row_groups, if a resulting step is larger than step_size by this factor warn the user that the resulting steps may be highly irregular. - allow_empty_datasets: bool, default False + allow_empty_datasets : bool, default False When a dataset query comes back completely empty, this is normally considered a processing error. Toggle this argument to True to change this to warnings and allow incomplete returned filesets. Returns @@ -1095,7 +1094,7 @@ def preprocess( allow_empty_datasets : bool, default False When a dataset query comes back completely empty, this is normally considered a processing error. Toggle this argument to True to change this to warnings and allow incomplete returned filesets. - preprocess_legacy_root: bool, default False + preprocess_legacy_root : bool, default False Use the legacy root preprocessing function for all files, even if the fileset is a DataGroupSpec. Not compatible with parquet files. use_row_groups : bool, default False