diff --git a/src/coffea/dataset_tools/preprocess.py b/src/coffea/dataset_tools/preprocess.py index 7bea0699d..bd4ce9411 100644 --- a/src/coffea/dataset_tools/preprocess.py +++ b/src/coffea/dataset_tools/preprocess.py @@ -18,9 +18,7 @@ from coffea.dataset_tools.filespec import ( DataGroupSpec, DatasetSpec, - InputFiles, ModelFactory, - PreprocessedFiles, ) from coffea.util import _is_interpretable, compress_form, decompress_form @@ -157,7 +155,7 @@ def get_steps( "steps": out_steps, "num_entries": num_entries, "uuid": out_uuid, - "compressed_form": form_json, + "form": form_json, "form_hash_md5": form_hash, } ) @@ -171,7 +169,7 @@ def get_steps( "steps": [[0, 0]], "num_entries": 0, "uuid": "junk", - "compressed_form": "junk", + "form": "junk", "form_hash_md5": "junk", }, None, @@ -191,8 +189,7 @@ def get_steps( def _normalize_file_info(file_info): normed_files = None - is_datasetspec = isinstance(file_info, DatasetSpec) - if is_datasetspec: + if isinstance(file_info, DatasetSpec): normed_files = uproot._util.regularize_files( ModelFactory.datasetspec_to_dict(file_info, coerce_filespec_to_dict=True)[ "files" @@ -226,8 +223,8 @@ def _normalize_file_info(file_info): _trivial_file_fields = {"run", "luminosityBlock", "event"} -def preprocess( - fileset: DataGroupSpec | dict, +def preprocess_legacy( + fileset: dict, step_size: None | int = None, align_clusters: bool = False, recalculate_steps: bool = False, @@ -239,15 +236,15 @@ def preprocess( uproot_options: dict = {}, step_size_safety_factor: float = 0.5, allow_empty_datasets: bool = False, -) -> tuple[DataGroupSpec, DataGroupSpec] | tuple[dict, dict]: +) -> tuple[dict, dict]: """ Given a list of normalized file and object paths (defined in uproot), determine the steps for each file according to the supplied processing options. Parameters ---------- - fileset : DataGroupSpec | dict + fileset : dict The set of datasets whose files will be preprocessed. - step_size : int or None, default None + step_size : int | None, default None If specified, the size of the steps to make when analyzing the input files. align_clusters : bool, default False Round to the cluster size in a root file, when chunks are specified. Reduces data transfer in @@ -258,13 +255,13 @@ def preprocess( files_per_batch : int, default 1 The number of files to preprocess in a single batch. Large values will result in fewer dask tasks but each task will have to do more work. - skip_bad_files : bool, default False + skip_bad_files : bool, False Instead of failing, catch exceptions specified by file_exceptions and return null data. - file_exceptions : Exception or Warning or tuple[Exception or Warning], default (FileNotFoundError, OSError) + file_exceptions : Exception | Warning | tuple[Exception | Warning], default (FileNotFoundError, OSError) What exceptions to catch when skipping bad files. save_form : bool, default False Extract the form of the TTree from each file in each dataset, creating the union of the forms over the dataset. - scheduler : None or Callable or str, default None + scheduler : None | Callable | str, default None Specifies the scheduler that dask should use to execute the preprocessing task graph. uproot_options : dict, default {} Options to pass to get_steps for opening files with uproot. @@ -276,21 +273,18 @@ def preprocess( Toggle this argument to True to change this to warnings and allow incomplete returned filesets. Returns ------- - out_available : DataGroupSpec | dict + out_available : dict The subset of files in each dataset that were successfully preprocessed, organized by dataset. - out_updated : DataGroupSpec | dict + out_updated : dict The original set of datasets including files that were not accessible, updated to include the result of preprocessing where available. """ + out_updated = copy.deepcopy(fileset) out_available = copy.deepcopy(fileset) all_ak_norm_files = {} files_to_preprocess = {} - is_DataGroupSpec = isinstance(fileset, DataGroupSpec) for name, info in fileset.items(): - is_datasetspec = isinstance(info, DatasetSpec) - if is_datasetspec: - is_DataGroupSpec = True norm_files = _normalize_file_info(info) fields = ["file", "object_path", "steps", "num_entries", "uuid"] ak_norm_files = awkward.from_iter(norm_files) @@ -374,9 +368,9 @@ def preprocess( ["file", "object_path", "steps", "num_entries", "uuid"] ] - forms = processed_files[ - ["file", "compressed_form", "form_hash_md5", "num_entries"] - ][~awkward.is_none(processed_files.form_hash_md5)] + forms = processed_files[["file", "form", "form_hash_md5", "num_entries"]][ + ~awkward.is_none(processed_files.form_hash_md5) + ] _, unique_forms_idx = numpy.unique( forms.form_hash_md5.to_numpy(), return_index=True @@ -385,7 +379,7 @@ def preprocess( dataset_forms = [] unique_forms = forms[unique_forms_idx] for thefile, formstr, num_entries in zip( - unique_forms.file, unique_forms.compressed_form, unique_forms.num_entries + unique_forms.file, unique_forms.form, unique_forms.num_entries ): # skip trivially filled or empty files form = awkward.forms.from_json(decompress_form(formstr)) @@ -464,48 +458,741 @@ def preprocess( "uuid": item["uuid"], } - if is_datasetspec: - out_updated[name].files = InputFiles(files_out) - out_available[name].files = PreprocessedFiles(files_available) - elif "files" in out_updated[name]: + if "files" in out_updated[name]: out_updated[name]["files"] = files_out out_available[name]["files"] = files_available else: - out_updated[name] = { - "files": files_out, - "metadata": None, - "compressed_form": None, - } + out_updated[name] = {"files": files_out, "metadata": None, "form": None} out_available[name] = { "files": files_available, "metadata": None, - "compressed_form": None, + "form": None, } compressed_union_form = None if union_form_jsonstr is not None: compressed_union_form = compress_form(union_form_jsonstr) - if is_datasetspec: - out_updated[name].compressed_form = compressed_union_form - out_available[name].compressed_form = compressed_union_form - else: - out_updated[name]["compressed_form"] = compressed_union_form - out_available[name]["compressed_form"] = compressed_union_form + out_updated[name]["form"] = compressed_union_form + out_available[name]["form"] = compressed_union_form else: - if is_datasetspec: - out_updated[name].compressed_form = None - out_available[name].compressed_form = None - else: - out_updated[name]["compressed_form"] = None - out_available[name]["compressed_form"] = None + out_updated[name]["form"] = None + out_available[name]["form"] = None - if is_datasetspec: - pass - elif "metadata" not in out_updated[name]: + if "metadata" not in out_updated[name]: out_updated[name]["metadata"] = None out_available[name]["metadata"] = None - if is_DataGroupSpec: - out_available = DataGroupSpec(out_available) - out_updated = DataGroupSpec(out_updated) return out_available, out_updated + + +def _normalize_pydantic_file_info( + datasetspec: DatasetSpec, return_compressedform_or_metadata=False +): + """ + Structure file info akin to _normalize_file_info for uproot files, which returns a list of (filename, object_path, steps, num_entries, uuid) tuples. + """ + if not isinstance(datasetspec, DatasetSpec): + raise ValueError( + f"_normalize_pydantic_file_info expects a DatasetSpec, got {type(datasetspec)}" + ) + normed_files = [] + for filename, fileinfo in datasetspec.files.items(): + normed_files.append( + ( + filename, + fileinfo.object_path, + fileinfo.steps, + fileinfo.num_entries, + fileinfo.uuid, + ) + ) + if return_compressedform_or_metadata: + return normed_files, datasetspec.compressed_form, datasetspec.metadata + return normed_files + + +def get_parquet_form_uuid_steps( + normed_files: awkward.Array | dask_awkward.Array, + step_size: int | None = None, + use_row_groups: bool = False, + recalculate_steps: bool = False, + skip_bad_files: bool = False, + file_exceptions: Exception | Warning | tuple[Exception | Warning] = (OSError,), + save_form: bool = False, + step_size_safety_factor: float = 0.5, + parquet_options: dict = {}, +) -> awkward.Array | dask_awkward.Array: + """ + Given a list of normalized file and object paths, determine the form, steps, uuid for each file according to the supplied processing options. + + Parameters + ---------- + normed_files : awkward.Array | dask_awkward.Array + The list of normalized file descriptions to process for steps. + step_size : int | None, default None + If specified, the size of the steps to make when analyzing the input files. + use_row_groups : bool, default False + Calculate steps according to the row_groups in the parquet files. + recalculate_steps : bool, default False + If steps are present in the input normed files, force the recalculation of those steps, instead + of only recalculating the steps if the uuid has changed. + skip_bad_files : bool, False + Instead of failing, catch exceptions specified by file_exceptions and return null data. + file_exceptions : Exception | Warning | tuple[Exception | Warning], default (OSError,) + What exceptions to catch when skipping bad files. + save_form : bool, default False + Extract the form of the TTree from the file so we can skip opening files later. + step_size_safety_factor : float, default 0.5 + When using align_clusters, if a resulting step is larger than step_size by this factor + warn the user that the resulting steps may be highly irregular. + + Returns + ------- + array : awkward.Array | dask_awkward.Array + The normalized file descriptions, appended with the calculated steps for those files. + """ + nf_backend = awkward.backend(normed_files) + lz_or_nf = awkward.typetracer.length_zero_if_typetracer(normed_files) + + array = [] if nf_backend != "typetracer" else lz_or_nf + for arg in lz_or_nf: + try: + the_file = awkward.metadata_from_parquet(arg.file, **parquet_options) + except file_exceptions as e: + if skip_bad_files: + array.append(None) + continue + else: + raise e + + num_entries = the_file["num_rows"] + + form_json = None + form_hash = None + if save_form: + form = the_file["form"] + form_str = form.to_json() + # the function cache needs to be popped if present to prevent memory growth + if hasattr(dask.base, "function_cache"): + dask.base.function_cache.popitem() + + form_hash = hashlib.md5(form_str.encode("utf-8")).hexdigest() + form_json = compress_form(form_str) + + target_step_size = num_entries if step_size is None else step_size + + file_uuid = the_file.get("uuid", None) + + out_uuid = arg.uuid + out_steps = arg.steps + + if out_uuid != file_uuid or recalculate_steps: + if use_row_groups: + row_group_entries = the_file["col_counts"] + out = [0] + this_offset = 0 + for c in row_group_entries: + this_offset += c + if this_offset >= out[-1] + target_step_size: + out.append(this_offset) + if this_offset != out[-1]: + out.append(this_offset) + out = numpy.array(out, dtype="int64") + out = numpy.stack((out[:-1], out[1:]), axis=1) + + step_mask = ( + out[:, 1] - out[:, 0] + > (1 + step_size_safety_factor) * target_step_size + ) + if numpy.any(step_mask): + warnings.warn( + f"In file {arg.file}, steps: {out[step_mask]} with use_row_groups=True are " + f"{step_size_safety_factor*100:.0f}% larger than target " + f"step size: {target_step_size}!" + ) + else: + n_steps_target = max(round(num_entries / target_step_size), 1) + actual_step_size = math.ceil(num_entries / n_steps_target) + out = numpy.array( + [ + [ + i * actual_step_size, + min((i + 1) * actual_step_size, num_entries), + ] + for i in range(n_steps_target) + ], + dtype="int64", + ) + + out_uuid = file_uuid + out_steps = out.tolist() + + if out_steps is not None and len(out_steps) == 0: + out_steps = [[0, 0]] + + array.append( + { + "file": arg.file, + "object_path": arg.object_path, + "steps": out_steps, + "num_entries": num_entries, + "uuid": out_uuid, + "form": form_json, + "form_hash_md5": form_hash, + } + ) + + if len(array) == 0: + array = awkward.Array( + [ + { + "file": "junk", + "object_path": "junk", + "steps": [[0, 0]], + "num_entries": 0, + "uuid": "junk", + "form": "junk", + "form_hash_md5": "junk", + }, + None, + ] + ) + array = awkward.Array(array.layout.form.length_zero_array(highlevel=False)) + else: + array = awkward.Array(array) + + if nf_backend == "typetracer": + array = awkward.Array( + array.layout.to_typetracer(forget_length=True), + ) + + return array + + +def preprocess_root( + datagroupspec: DataGroupSpec, + step_size: None | int = None, + align_clusters: bool = False, + recalculate_steps: bool = False, + files_per_batch: int = 1, + skip_bad_files: bool = False, + file_exceptions: Exception | Warning | tuple[Exception | Warning] = (OSError,), + save_form: bool = True, + scheduler: None | Callable | str = None, + uproot_options: dict = {}, + step_size_safety_factor: float = 0.5, + allow_empty_datasets: bool = False, +) -> tuple[DataGroupSpec, DataGroupSpec]: + """ + Given a list of normalized file and object paths (defined in uproot), determine the steps for each file according to the supplied processing options. + + Parameters + ---------- + datagroupspec : DataGroupSpec + The set of datasets whose files will be preprocessed. + step_size : int or None, default None + If specified, the size of the steps to make when analyzing the input files. + align_clusters : bool, default False + Round to the cluster size in a root file, when chunks are specified. Reduces data transfer in + analysis. + recalculate_steps : bool, default False + If steps are present in the input normed files, force the recalculation of those steps, + instead of only recalculating the steps if the uuid has changed. + files_per_batch : int, default 1 + The number of files to preprocess in a single batch. + Large values will result in fewer dask tasks but each task will have to do more work. + skip_bad_files : bool, default False + Instead of failing, catch exceptions specified by file_exceptions and return null data. + file_exceptions : Exception or Warning or tuple[Exception or Warning], default (FileNotFoundError, OSError) + What exceptions to catch when skipping bad files. + save_form : bool, default True + Extract the form of the TTree from each file in each dataset, creating the union of the forms over the dataset. + scheduler : None or Callable or str, default None + Specifies the scheduler that dask should use to execute the preprocessing task graph. + uproot_options : dict, default {} + Options to pass to get_steps for opening files with uproot. + step_size_safety_factor : float, default 0.5 + When using align_clusters, if a resulting step is larger than step_size by this factor + warn the user that the resulting steps may be highly irregular. + allow_empty_datasets : bool, default False + When a dataset query comes back completely empty, this is normally considered a processing error. + Toggle this argument to True to change this to warnings and allow incomplete returned filesets. + Returns + ------- + out_available : DataGroupSpec + The subset of files in each dataset that were successfully preprocessed, organized by dataset. + out_updated : DataGroupSpec + The original set of datasets including files that were not accessible, updated to include the result of preprocessing where available. + """ + return _preprocess_pydantic( + datagroupspec=datagroupspec, + step_size=step_size, + use_alignment_boundaries=align_clusters, + recalculate_steps=recalculate_steps, + files_per_batch=files_per_batch, + skip_bad_files=skip_bad_files, + file_exceptions=file_exceptions, + save_form=save_form, + scheduler=scheduler, + filetype_options=uproot_options, + step_size_safety_factor=step_size_safety_factor, + allow_empty_datasets=allow_empty_datasets, + ) + + +def preprocess_parquet( + datagroupspec: DataGroupSpec, + step_size: None | int = None, + use_row_groups: bool = False, + recalculate_steps: bool = False, + files_per_batch: int = 1, + skip_bad_files: bool = False, + file_exceptions: Exception | Warning | tuple[Exception | Warning] = (OSError,), + save_form: bool = True, + scheduler: None | Callable | str = None, + parquet_options: dict = {}, + step_size_safety_factor: float = 0.5, + allow_empty_datasets: bool = False, +) -> tuple[DataGroupSpec, DataGroupSpec]: + """ + Given a list of normalized files, determine the form, steps, and add the metadata for each file according to the supplied processing options. + + Parameters + ---------- + fileset : DataGroupSpec + The set of datasets whose files will be preprocessed. + step_size : int | None, default None + If specified, the size of the steps to make when analyzing the input files. + use_row_groups : bool, default False + Use the row groups in the parquet files to determine the steps. + recalculate_steps : bool, default False + If steps are present in the input normed files, force the recalculation of those steps, + instead of only recalculating the steps if the uuid has changed. + skip_bad_files : bool, False + Instead of failing, catch exceptions specified by file_exceptions and return null data. + file_exceptions : Exception | Warning | tuple[Exception | Warning], default (FileNotFoundError, OSError) + What exceptions to catch when skipping bad files. + save_form : bool, default True + Extract the form of the TTree from each file in each dataset, creating the union of the forms over the dataset. + scheduler : None | Callable | str, default None + Specifies the scheduler that dask should use to execute the preprocessing task graph. + parquet_options : dict, default {} + Options to pass to get_parquet_form_uuid_steps for opening files + step_size_safety_factor : float, default 0.5 + When using use_row_groups, if a resulting step is larger than step_size by this factor + warn the user that the resulting steps may be highly irregular. + allow_empty_datasets : bool, default False + When a dataset query comes back completely empty, this is normally considered a processing error. + Toggle this argument to True to change this to warnings and allow incomplete returned filesets. + Returns + ------- + out_available : DataGroupSpec + The subset of files in each dataset that were successfully preprocessed, organized by dataset. + out_updated : DataGroupSpec + The original set of datasets including files that were not accessible, updated to include the result of preprocessing where available. + """ + return _preprocess_pydantic( + datagroupspec=datagroupspec, + step_size=step_size, + use_alignment_boundaries=use_row_groups, + recalculate_steps=recalculate_steps, + files_per_batch=files_per_batch, + skip_bad_files=skip_bad_files, + file_exceptions=file_exceptions, + save_form=save_form, + scheduler=scheduler, + filetype_options=parquet_options, + step_size_safety_factor=step_size_safety_factor, + allow_empty_datasets=allow_empty_datasets, + ) + + +def _preprocess_pydantic( + datagroupspec: DataGroupSpec, + step_size: None | int = None, + use_alignment_boundaries: bool = False, + recalculate_steps: bool = False, + files_per_batch: int = 1, + skip_bad_files: bool = False, + file_exceptions: Exception | Warning | tuple[Exception | Warning] = (OSError,), + save_form: bool = True, + scheduler: None | Callable | str = None, + filetype_options: dict = {}, + step_size_safety_factor: float = 0.5, + allow_empty_datasets: bool = False, +) -> tuple[DataGroupSpec, DataGroupSpec]: + """ + Function to preprocess either root or parquet DatasetSpecs in a DataGroupSpec. + ROOT TTrees: Maps align_clusters and uproot_options to use_alignment_boundaries and filetype_options respectively. + Parquet: Maps use_row_groups and parquet_options to use_alignment_boundaries and filetype_options respectively + """ + if not isinstance(datagroupspec, DataGroupSpec): + raise ValueError( + f"_preprocess_pydantic expects a DataGroupSpec, got {type(datagroupspec)}" + ) + out_updated = datagroupspec.model_dump() + out_available = datagroupspec.model_dump() + + all_ak_norm_files = {} + files_to_preprocess = {} + for name, info in datagroupspec.items(): + norm_files = _normalize_pydantic_file_info(info) + fields = ["file", "object_path", "steps", "num_entries", "uuid"] + ak_norm_files = awkward.from_iter(norm_files) + ak_norm_files = awkward.Array( + {field: ak_norm_files[str(ifield)] for ifield, field in enumerate(fields)} + ) + all_ak_norm_files[name] = ak_norm_files + + dak_norm_files = dask_awkward.from_awkward( + ak_norm_files, math.ceil(len(ak_norm_files) / files_per_batch) + ) + + concat_fn = partial( + awkward.concatenate, + axis=0, + ) + + split_every = 8 + + files_trl_label = f"preprocess-{name}" + files_trl_token = dask.base.tokenize(dak_norm_files, concat_fn, split_every) + files_trl_name = f"{files_trl_label}-{files_trl_token}" + files_trl_tree_node_name = f"{files_trl_label}-tree-node-{files_trl_token}" + + if info.format == "root": + files_part = dask_awkward.map_partitions( + get_steps, + dak_norm_files, + step_size=step_size, + align_clusters=use_alignment_boundaries, + recalculate_steps=recalculate_steps, + skip_bad_files=skip_bad_files, + file_exceptions=file_exceptions, + save_form=save_form, + step_size_safety_factor=step_size_safety_factor, + uproot_options=filetype_options, + meta=dask_awkward.lib.core.empty_typetracer(), + ) + elif info.format == "parquet": + files_part = dask_awkward.map_partitions( + get_parquet_form_uuid_steps, + dak_norm_files, + step_size=step_size, + use_row_groups=use_alignment_boundaries, + recalculate_steps=recalculate_steps, + skip_bad_files=skip_bad_files, + file_exceptions=file_exceptions, + save_form=save_form, + step_size_safety_factor=step_size_safety_factor, + parquet_options=filetype_options, + meta=dask_awkward.lib.core.empty_typetracer(), + ) + else: + raise ValueError( + f"Dataset {name} has unsupported format {info.format}, supported formats are 'root' and 'parquet'." + ) + + files_trl = dask_awkward.layers.layers.AwkwardTreeReductionLayer( + name=files_trl_name, + name_input=files_part.name, + npartitions_input=files_part.npartitions, + concat_func=concat_fn, + tree_node_func=lambda x: x, + finalize_func=lambda x: x, + split_every=split_every, + tree_node_name=files_trl_tree_node_name, + ) + + files_graph = dask.highlevelgraph.HighLevelGraph.from_collections( + files_trl_name, files_trl, dependencies=[files_part] + ) + + files_to_preprocess[name] = dask_awkward.lib.core.new_array_object( + files_graph, + files_trl_name, + meta=dask_awkward.lib.core.empty_typetracer(), + npartitions=len(files_trl.output_partitions), + ) + + (all_processed_files,) = dask.compute(files_to_preprocess, scheduler=scheduler) + + for name, processed_files in all_processed_files.items(): + + if len(awkward.drop_none(processed_files, axis=0)) == 0: + ds_empty_msg = ( + "There was no populated list of files returned from querying your input dataset." + "\nPlease check your xrootd endpoints, and avoid redirectors." + f"\nInput dataset: {name}" + f"\nAs parsed for querying: {awkward.to_list(all_ak_norm_files[name])}" + ) + + if not allow_empty_datasets: + raise Exception(ds_empty_msg) + + warnings.warn(ds_empty_msg) + del out_available[name] + continue + + processed_files_without_forms = processed_files[ + ["file", "object_path", "steps", "num_entries", "uuid"] + ] + + compressed_forms = processed_files[ + ["file", "form", "form_hash_md5", "num_entries"] + ][~awkward.is_none(processed_files.form_hash_md5)] + + _, unique_forms_idx = numpy.unique( + compressed_forms.form_hash_md5.to_numpy(), return_index=True + ) + + dataset_forms = [] + unique_forms = compressed_forms[unique_forms_idx] + for thefile, formstr, num_entries in zip( + unique_forms.file, unique_forms.form, unique_forms.num_entries + ): + # skip trivially filled or empty files + form = awkward.forms.from_json(decompress_form(formstr)) + if num_entries >= 0 and set(form.fields) != _trivial_file_fields: + dataset_forms.append(form) + else: + warnings.warn( + f"{thefile} has fields {form.fields} and num_entries={num_entries} " + "and has been skipped during form-union determination. You will need " + "to skip this file when processing. You can either manually remove it " + "or, if it is an empty file, dynamically remove it with the function " + "dataset_tools.filter_files which takes the output of preprocess and " + ", by default, removes empty files each dataset in a fileset." + ) + + union_array = None + union_form_jsonstr = None + while len(dataset_forms): + new_array = awkward.Array(dataset_forms.pop().length_zero_array()) + if union_array is None: + union_array = new_array + else: + union_array = awkward.to_packed( + awkward.merge_union_of_records( + awkward.concatenate([union_array, new_array]), axis=0 + ) + ) + union_array.layout.parameters.update(new_array.layout.parameters) + if union_array is not None: + union_form = union_array.layout.form + + for icontent, content in enumerate(union_form.contents): + if isinstance(content, awkward.forms.IndexedOptionForm): + if ( + not isinstance(content.content, awkward.forms.NumpyForm) + or content.content.primitive != "bool" + ): + raise ValueError( + "IndexedOptionArrays can only contain NumpyArrays of " + "bools in mergers of flat-tuple-like schemas!" + ) + parameters = ( + content.content.parameters.copy() + if content.content.parameters is not None + else {} + ) + # re-create IndexOptionForm with parameters of lower level array + union_form.contents[icontent] = awkward.forms.IndexedOptionForm( + content.index, + content.content, + parameters=parameters, + form_key=content.form_key, + ) + + union_form_jsonstr = union_form.to_json() + + files_available = { + item["file"]: { + "object_path": item["object_path"], + "steps": item["steps"], + "num_entries": item["num_entries"], + "uuid": item["uuid"], + } + for item in awkward.drop_none(processed_files_without_forms).to_list() + } + + files_out = {} + for proc_item, orig_item in zip( + processed_files_without_forms.to_list(), all_ak_norm_files[name].to_list() + ): + item = orig_item if proc_item is None else proc_item + files_out[item["file"]] = { + "object_path": item["object_path"], + "steps": item["steps"], + "num_entries": item["num_entries"], + "uuid": item["uuid"], + } + + out_updated[name]["files"] = files_out + out_available[name]["files"] = files_available + + compressed_union_form = None + compressed_union_form = ( + compress_form(union_form_jsonstr) if union_form_jsonstr else None + ) + out_updated[name]["compressed_form"] = compressed_union_form + out_available[name]["compressed_form"] = compressed_union_form + + return DataGroupSpec.model_validate(out_available), DataGroupSpec.model_validate( + out_updated + ) + + +def preprocess( + fileset: DataGroupSpec | dict, + step_size: None | int = None, + align_clusters: bool = False, + recalculate_steps: bool = False, + files_per_batch: int = 1, + skip_bad_files: bool = False, + file_exceptions: Exception | Warning | tuple[Exception | Warning] = (OSError,), + save_form: bool = True, + scheduler: None | Callable | str = None, + uproot_options: dict = {}, + step_size_safety_factor: float = 0.5, + allow_empty_datasets: bool = False, + preprocess_legacy_root: bool = False, + use_row_groups: bool = False, + parquet_options: dict = {}, +) -> tuple[DataGroupSpec, DataGroupSpec] | tuple[dict, dict]: + """ + Given a list of normalized file and object paths (defined in uproot), determine the steps for each file according to the supplied processing options. + + Parameters + ---------- + fileset : DataGroupSpec | dict + The set of datasets whose files will be preprocessed. + step_size : int or None, default None + If specified, the size of the steps to make when analyzing the input files. + align_clusters : bool, default False + Round to the cluster size in a root file, when chunks are specified. Reduces data transfer in + analysis. + recalculate_steps : bool, default False + If steps are present in the input normed files, force the recalculation of those steps, + instead of only recalculating the steps if the uuid has changed. + files_per_batch : int, default 1 + The number of files to preprocess in a single batch. + Large values will result in fewer dask tasks but each task will have to do more work. + skip_bad_files : bool, default False + Instead of failing, catch exceptions specified by file_exceptions and return null data. + file_exceptions : Exception or Warning or tuple[Exception or Warning], default (FileNotFoundError, OSError) + What exceptions to catch when skipping bad files. + save_form : bool, default False + Extract the form of the TTree from each file in each dataset, creating the union of the forms over the dataset. + scheduler : None or Callable or str, default None + Specifies the scheduler that dask should use to execute the preprocessing task graph. + uproot_options : dict, default {} + Options to pass to get_steps for opening files with uproot. + step_size_safety_factor : float, default 0.5 + When using align_clusters, if a resulting step is larger than step_size by this factor + warn the user that the resulting steps may be highly irregular. + allow_empty_datasets : bool, default False + When a dataset query comes back completely empty, this is normally considered a processing error. + Toggle this argument to True to change this to warnings and allow incomplete returned filesets. + preprocess_legacy_root : bool, default False + Use the legacy root preprocessing function for all files, even if the fileset is a DataGroupSpec. + Not compatible with parquet files. + use_row_groups : bool, default False + Calculate steps according to the row_groups in the parquet files (only applies to DataGroupSpec datasets with parquet files). + parquet_options : dict, default {} + Options to pass to get_parquet_form_uuid_steps for opening parquet files (only applies to DataGroupSpec datasets with parquet files). + Returns + ------- + out_available : DataGroupSpec | dict + The subset of files in each dataset that were successfully preprocessed, organized by dataset. + out_updated : DataGroupSpec | dict + The original set of datasets including files that were not accessible, updated to include the result of preprocessing where available. + """ + if preprocess_legacy_root: + # use the legacy root TTree preprocessing function if requested + return preprocess_legacy( + fileset.model_dump() if isinstance(fileset, DataGroupSpec) else fileset, + step_size=step_size, + align_clusters=align_clusters, + recalculate_steps=recalculate_steps, + files_per_batch=files_per_batch, + skip_bad_files=skip_bad_files, + file_exceptions=file_exceptions, + save_form=save_form, + scheduler=scheduler, + uproot_options=uproot_options, + step_size_safety_factor=step_size_safety_factor, + allow_empty_datasets=allow_empty_datasets, + ) + else: + if isinstance(fileset, DataGroupSpec): + datasetspecs = fileset + else: + DeprecationWarning( + "Passing a dict to preprocess is deprecated. Converting to DataGroupSpec and proceeding." + "To remove this warning, pass a DataGroupSpec object instead of a dict." + "To use the legacy preprocessing function, set preprocess_legacy_root=True." + "If automatic conversion is not possible, please submit your fileset to the coffea team." + ) + datasetspecs = DataGroupSpec.model_validate(fileset) + # split datasetspecs into uproot and parquet files, keeping track of original order + original_order = list(datasetspecs.keys()) + formats = [dss.format for dss in datasetspecs.values()] + if len(set(formats)) > 1 and align_clusters != use_row_groups: + warnings.warn( + "When preprocessing a mixed fileset, align_clusters and use_row_groups serve a similar function. If you didn't intend to treat root and parquet files' boundary alignments differently, set both to the same value." + ) + out_available_uproot, out_updated_uproot = preprocess_root( + datasetspecs.filter_datasets( + filter_callable=lambda ds: ds.format == "root" + ), + step_size=step_size, + align_clusters=align_clusters, + recalculate_steps=recalculate_steps, + files_per_batch=files_per_batch, + skip_bad_files=skip_bad_files, + file_exceptions=file_exceptions, + save_form=save_form, + scheduler=scheduler, + uproot_options=uproot_options, + step_size_safety_factor=step_size_safety_factor, + allow_empty_datasets=allow_empty_datasets, + ) + out_available_parquet, out_updated_parquet = preprocess_parquet( + datasetspecs.filter_datasets( + filter_callable=lambda ds: ds.format == "parquet" + ), + step_size=step_size, + use_row_groups=use_row_groups, + recalculate_steps=recalculate_steps, + files_per_batch=files_per_batch, + skip_bad_files=skip_bad_files, + file_exceptions=file_exceptions, + save_form=save_form, + scheduler=scheduler, + parquet_options=parquet_options, + step_size_safety_factor=step_size_safety_factor, + allow_empty_datasets=allow_empty_datasets, + ) + # recombine outputs in original order + out_available = DataGroupSpec( + { + k: ( + out_available_uproot[k] + if k in out_available_uproot + else out_available_parquet[k] + ) + for k in original_order + } + ) + out_updated = DataGroupSpec( + { + k: ( + out_updated_uproot[k] + if k in out_updated_uproot + else out_updated_parquet[k] + ) + for k in original_order + } + ) + return out_available, out_updated diff --git a/src/coffea/nanoevents/factory.py b/src/coffea/nanoevents/factory.py index 51ad4556c..14a899f19 100644 --- a/src/coffea/nanoevents/factory.py +++ b/src/coffea/nanoevents/factory.py @@ -764,7 +764,10 @@ def events(self): """ if self._mode == "dask": dask_awkward.lib.core.dak_cache.clear() - events = self._mapping(form_mapping=self._schema) + try: + events = self._mapping(form_mapping=self._schema) + except TypeError: + events = self._mapping() report = None if isinstance(events, tuple): events, report = events