diff --git a/src/coffea/nanoevents/trace.py b/src/coffea/nanoevents/trace.py index aba1ac7f0..653fd43d4 100644 --- a/src/coffea/nanoevents/trace.py +++ b/src/coffea/nanoevents/trace.py @@ -117,7 +117,7 @@ def _attempt_tracing(fun: Callable, tracer: ak.Array, throw: bool) -> None: def trace_with_typetracer( - fun: Callable, events: ak.Array, throw: bool = True + fun: Callable, events: ak.Array, throw: bool = False ) -> frozenset[str]: """ Trace the execution of a function on NanoEvents using Awkward's typetracer to determine which buffers are touched. @@ -145,7 +145,7 @@ def trace_with_typetracer( def trace_with_length_zero_array( - fun: Callable, events: ak.Array, throw: bool = True + fun: Callable, events: ak.Array, throw: bool = False ) -> frozenset[str]: """ Trace the execution of a function on NanoEvents using a length-zero array to determine which buffers are touched. @@ -173,7 +173,7 @@ def trace_with_length_zero_array( def trace_with_length_one_array( - fun: Callable, events: ak.Array, throw: bool = True + fun: Callable, events: ak.Array, throw: bool = False ) -> frozenset[str]: """ Trace the execution of a function on NanoEvents using a length-one array to determine which buffers are touched. @@ -225,7 +225,7 @@ def trace(fun: Callable, events: ak.Array) -> frozenset[str]: touched = set() try: - touched |= trace_with_typetracer(fun, events) + touched |= trace_with_typetracer(fun, events, throw=True) return frozenset(touched) except Exception as e1: warnings.warn( @@ -234,7 +234,7 @@ def trace(fun: Callable, events: ak.Array) -> frozenset[str]: stacklevel=2, ) try: - touched |= trace_with_length_zero_array(fun, events) + touched |= trace_with_length_zero_array(fun, events, throw=True) return frozenset(touched) except Exception as e2: warnings.warn( @@ -243,7 +243,7 @@ def trace(fun: Callable, events: ak.Array) -> frozenset[str]: stacklevel=2, ) try: - touched |= trace_with_length_one_array(fun, events) + touched |= trace_with_length_one_array(fun, events, throw=True) return frozenset(touched) except Exception as e3: warnings.warn( diff --git a/src/coffea/processor/executor.py b/src/coffea/processor/executor.py index 8504e3890..af93712a0 100644 --- a/src/coffea/processor/executor.py +++ b/src/coffea/processor/executor.py @@ -26,6 +26,8 @@ import uproot from cachetools import LRUCache +from coffea.nanoevents.trace import trace + from ..nanoevents import NanoEventsFactory, schemas from ..util import _exception_chain, _hash, rich_bar from .accumulator import Accumulatable, accumulate, set_accumulator @@ -1384,6 +1386,7 @@ def _work_function( processor_instance: ProcessorABC, uproot_options: dict, iteritems_options: dict, + preload_columns: dict, ) -> dict: if "timeout" in uproot_options: xrootdtimeout = uproot_options["timeout"] @@ -1419,6 +1422,18 @@ def _work_function( if item.usermeta is not None: metadata.update(item.usermeta) + # preload logic + # Important: same hash as in the TraceProcessor cls, so we use a common method + _key = _key_for_tracing(metadata) + needed = None + if preload_columns is not None: + needed = preload_columns.get(_key, None) + + if needed is not None: + preload = lambda b: b.name in needed # noqa + else: + preload = None + with filecontext as file: if schema is None: raise ValueError("Schema must be set") @@ -1437,6 +1452,7 @@ def _work_function( entry_start=item.entrystart, entry_stop=item.entrystop, iteritems_options=iteritems_options, + preload=preload, ) events = factory.events() elif format == "parquet": @@ -1485,6 +1501,7 @@ def __call__( treename: Optional[str] = None, uproot_options: Optional[dict] = {}, iteritems_options: Optional[dict] = {}, + preload_columns: Optional[dict] = None, ) -> Accumulatable: """Run the processor_instance on a given fileset @@ -1503,9 +1520,16 @@ def __call__( Any options to pass to ``uproot.open`` iteritems_options : dict, optional Any options to pass to ``tree.iteritems`` + preload_columns : dict, optional + A mapping of fileset metadata to columns to preload """ wrapped_out = self.run( - fileset, processor_instance, treename, uproot_options, iteritems_options + fileset, + processor_instance, + treename, + uproot_options, + iteritems_options, + preload_columns, ) if self.use_dataframes: return wrapped_out # not wrapped anymore @@ -1549,6 +1573,131 @@ def preprocess( return self._chunk_generator(fileset, treename) + def maybe_preprocess( + self, + fileset: dict, + treename: Optional[str] = None, + ) -> Generator: + """Run the processor_instance on a given fileset if needed + + Parameters + ---------- + fileset : dict | str | List[WorkItem] | Generator + - A dictionary ``{dataset: [file, file], }`` + Optionally, if some files' tree name differ, the dictionary can be specified: + ``{dataset: {'treename': 'name', 'files': [file, file]}, }`` + You can also define a different tree name per file in the dictionary: + ``{dataset: {'files': {file: 'name'}}, }`` + - A single file name + - File chunks for self.preprocess() + - Chunk generator + treename : str, optional + name of tree inside each root file, can be ``None``; + treename can also be defined in fileset, which will override the passed treename + Not needed if processing premade chunks + """ + meta = False + if not isinstance(fileset, (Mapping, str)): + if isinstance(fileset, Generator) or isinstance(fileset[0], WorkItem): + meta = True + else: + raise ValueError( + "Expected fileset to be a mapping dataset: list(files) or filename" + ) + + if meta: + chunks = fileset + else: + chunks = self.preprocess(fileset, treename) + return chunks + + def trace_needed_columns( + self, + fileset: Union[dict, str, list[WorkItem], Generator], + processor_instance=ProcessorABC, + treename="Events", + uproot_options: Optional[dict] = {}, + iteritems_options: Optional[dict] = {}, + trace_method: Callable = trace, + ) -> Generator[WorkItem]: + """Trace the processor_instance on a given fileset + + Parameters + ---------- + fileset : dict | str | List[WorkItem] | Generator + - A dictionary ``{dataset: [file, file], }`` + Optionally, if some files' tree name differ, the dictionary can be specified: + ``{dataset: {'treename': 'name', 'files': [file, file]}, }`` + You can also define a different tree name per file in the dictionary: + ``{dataset: {'files': {file: 'name'}}, }`` + - A single file name + - File chunks for self.preprocess() + - Chunk generator + processor_instance : ProcessorABC + An instance of a class deriving from ProcessorABC + treename : str, optional + name of tree inside each root file, can be ``None``; + treename can also be defined in fileset, which will override the passed treename + Not needed if processing premade chunks + uproot_options : dict, optional + Any options to pass to ``uproot.open`` + iteritems_options : dict, optional + Any options to pass to ``tree.iteritems`` + trace_method : Callable, Optional + A callable that takes a function and a NanoEventsArray and returns + the set of columns accessed during the function call. + Default is ``coffea.nanoevents.trace.trace`` + """ + + class TraceProcessor(ProcessorABC): + """Wraps a processor to trace which columns are accessed during processing.""" + + def __init__(self, processor: ProcessorABC): + self.processor = processor + + @property + def accumulator(self): + return {} + + def process(self, events): + accum = self.accumulator + key = _key_for_tracing(events.metadata) + accum[key] = set() + columns = trace_method(self.processor.process, events) + accum[key] |= columns + return accum + + def postprocess(self, accumulator): + return accumulator + + # only trace first chunk per dataset + def _first_item_per_dataset(gen: Generator) -> Generator: + seen = set() + for item in gen: + if item.dataset not in seen: + seen.add(item.dataset) + yield item + + chunks = _first_item_per_dataset(self.maybe_preprocess(fileset, treename)) + + # wrap + trace_processor_instance = TraceProcessor(processor_instance) + + # patch the executor description + old_desc = self.executor.desc + self.executor.desc = "Tracing" + needed = self( + chunks, + trace_processor_instance, + treename, + uproot_options=uproot_options, + iteritems_options=iteritems_options, + preload_columns=None, + ) + # restore the old description + self.executor.desc = old_desc + return needed + def run( self, fileset: Union[dict, str, list[WorkItem], Generator], @@ -1556,6 +1705,7 @@ def run( treename: Optional[str] = None, uproot_options: Optional[dict] = {}, iteritems_options: Optional[dict] = {}, + preload_columns: Optional[dict] = None, ) -> Accumulatable: """Run the processor_instance on a given fileset @@ -1580,23 +1730,13 @@ def run( Any options to pass to ``uproot.open`` iteritems_options : dict, optional Any options to pass to ``tree.iteritems`` + preload_columns: dict, optional + A mapping of fileset metadata to columns to preload """ - - meta = False - if not isinstance(fileset, (Mapping, str)): - if isinstance(fileset, Generator) or isinstance(fileset[0], WorkItem): - meta = True - else: - raise ValueError( - "Expected fileset to be a mapping dataset: list(files) or filename" - ) if not isinstance(processor_instance, ProcessorABC): raise ValueError("Expected processor_instance to derive from ProcessorABC") - if meta: - chunks = fileset - else: - chunks = self.preprocess(fileset, treename) + chunks = self.maybe_preprocess(fileset, treename) if self.processor_compression is None: pi_to_send = processor_instance @@ -1618,6 +1758,7 @@ def run( processor_instance="heavy", uproot_options=uproot_options, iteritems_options=iteritems_options, + preload_columns=preload_columns, ) else: closure = partial( @@ -1630,6 +1771,7 @@ def run( processor_instance=pi_to_send, uproot_options=uproot_options, iteritems_options=iteritems_options, + preload_columns=preload_columns, ) chunks = list(chunks) @@ -1664,3 +1806,7 @@ def run( return wrapped_out["out"] else: return wrapped_out + + +def _key_for_tracing(metadata): + return (metadata["dataset"], metadata["treename"])