From 25fa4f78b4fae66e15740efb27c9d8cb692d5c4a Mon Sep 17 00:00:00 2001 From: pfackeldey Date: Thu, 25 Sep 2025 17:36:07 +0200 Subject: [PATCH 1/7] feat: add preload feature to Runner API --- src/coffea/processor/executor.py | 82 +++++++++++++++++++++++++++++++- 1 file changed, 81 insertions(+), 1 deletion(-) diff --git a/src/coffea/processor/executor.py b/src/coffea/processor/executor.py index b3fd9ff32..93ef32407 100644 --- a/src/coffea/processor/executor.py +++ b/src/coffea/processor/executor.py @@ -1384,6 +1384,7 @@ def _work_function( processor_instance: ProcessorABC, uproot_options: dict, iteritems_options: dict, + preload_columns: dict, ) -> dict: if "timeout" in uproot_options: xrootdtimeout = uproot_options["timeout"] @@ -1419,6 +1420,18 @@ def _work_function( if item.usermeta is not None: metadata.update(item.usermeta) + # preload logic + # Important: same hash as in the TraceProcessor cls, so we use a common method + hashed = _hash_for_tracing(metadata) + needed = None + if preload_columns is not None: + needed = preload_columns.get(hashed, None) + + if needed is not None: + preload = lambda b: b.name in needed # noqa + else: + preload = None + with filecontext as file: if schema is None: raise ValueError("Schema must be set") @@ -1437,6 +1450,7 @@ def _work_function( entry_start=item.entrystart, entry_stop=item.entrystop, iteritems_options=iteritems_options, + preload=preload, ) events = factory.events() elif format == "parquet": @@ -1485,6 +1499,7 @@ def __call__( treename: Optional[str] = None, uproot_options: Optional[dict] = {}, iteritems_options: Optional[dict] = {}, + preload_columns: Optional[dict] = None, ) -> Accumulatable: """Run the processor_instance on a given fileset @@ -1503,9 +1518,16 @@ def __call__( Any options to pass to ``uproot.open`` iteritems_options : dict, optional Any options to pass to ``tree.iteritems`` + preload_columns : dict, optional + A mapping of fileset metadata to columns to preload """ wrapped_out = self.run( - fileset, processor_instance, treename, uproot_options, iteritems_options + fileset, + processor_instance, + treename, + uproot_options, + iteritems_options, + preload_columns, ) if self.use_dataframes: return wrapped_out # not wrapped anymore @@ -1553,6 +1575,46 @@ def preprocess( return self._chunk_generator(fileset, treename) + def trace_needed_columns( + self, + fileset: Union[dict, str, list[WorkItem], Generator], + processor_instance=ProcessorABC, + treename="Events", + ) -> Generator[WorkItem]: + class TraceProcessor(ProcessorABC): + """Wraps a processor to trace which columns are accessed during processing.""" + + def __init__(self, processor: ProcessorABC): + self.processor = processor + + @property + def accumulator(self): + return {} + + def process(self, events): + from coffea.nanoevents.trace import trace + + accum = self.accumulator + hashed = _hash_for_tracing(events.metadata) + accum[hashed] = set() + columns = trace(self.processor.process, events) + accum[hashed] |= columns + return accum + + def postprocess(self, accumulator): + return accumulator + + # wrap + trace_processor_instance = TraceProcessor(processor_instance) + return self( + fileset, + trace_processor_instance, + treename, + uproot_options={}, + iteritems_options={}, + preload_columns=None, + ) + def run( self, fileset: Union[dict, str, list[WorkItem], Generator], @@ -1560,6 +1622,7 @@ def run( treename: Optional[str] = None, uproot_options: Optional[dict] = {}, iteritems_options: Optional[dict] = {}, + preload_columns: Optional[dict] = None, ) -> Accumulatable: """Run the processor_instance on a given fileset @@ -1584,6 +1647,8 @@ def run( Any options to pass to ``uproot.open`` iteritems_options : dict, optional Any options to pass to ``tree.iteritems`` + preload_columns: dict, optional + A mapping of fileset metadata to columns to preload """ meta = False @@ -1622,6 +1687,7 @@ def run( processor_instance="heavy", uproot_options=uproot_options, iteritems_options=iteritems_options, + preload_columns=preload_columns, ) else: closure = partial( @@ -1634,6 +1700,7 @@ def run( processor_instance=pi_to_send, uproot_options=uproot_options, iteritems_options=iteritems_options, + preload_columns=preload_columns, ) chunks = list(chunks) @@ -1668,3 +1735,16 @@ def run( return wrapped_out["out"] else: return wrapped_out + + +def _hash_for_tracing(metadata): + return _hash( + ( + metadata["dataset"], + metadata["filename"], + metadata["treename"], + metadata["entrystart"], + metadata["entrystop"], + metadata["fileuuid"], + ) + ) From d98bb716ff90597e474614c02ad758e60e3d04aa Mon Sep 17 00:00:00 2001 From: pfackeldey Date: Thu, 25 Sep 2025 18:15:09 +0200 Subject: [PATCH 2/7] add docs; forward {uproot,iteritem}_options --- src/coffea/processor/executor.py | 31 +++++++++++++++++++++++++++++-- 1 file changed, 29 insertions(+), 2 deletions(-) diff --git a/src/coffea/processor/executor.py b/src/coffea/processor/executor.py index 93ef32407..7be41c0ad 100644 --- a/src/coffea/processor/executor.py +++ b/src/coffea/processor/executor.py @@ -1580,7 +1580,34 @@ def trace_needed_columns( fileset: Union[dict, str, list[WorkItem], Generator], processor_instance=ProcessorABC, treename="Events", + uproot_options: Optional[dict] = {}, + iteritems_options: Optional[dict] = {}, ) -> Generator[WorkItem]: + """Trace the processor_instance on a given fileset + + Parameters + ---------- + fileset : dict | str | List[WorkItem] | Generator + - A dictionary ``{dataset: [file, file], }`` + Optionally, if some files' tree name differ, the dictionary can be specified: + ``{dataset: {'treename': 'name', 'files': [file, file]}, }`` + You can also define a different tree name per file in the dictionary: + ``{dataset: {'files': {file: 'name'}}, }`` + - A single file name + - File chunks for self.preprocess() + - Chunk generator + processor_instance : ProcessorABC + An instance of a class deriving from ProcessorABC + treename : str, optional + name of tree inside each root file, can be ``None``; + treename can also be defined in fileset, which will override the passed treename + Not needed if processing premade chunks + uproot_options : dict, optional + Any options to pass to ``uproot.open`` + iteritems_options : dict, optional + Any options to pass to ``tree.iteritems`` + """ + class TraceProcessor(ProcessorABC): """Wraps a processor to trace which columns are accessed during processing.""" @@ -1610,8 +1637,8 @@ def postprocess(self, accumulator): fileset, trace_processor_instance, treename, - uproot_options={}, - iteritems_options={}, + uproot_options=uproot_options, + iteritems_options=iteritems_options, preload_columns=None, ) From c9369c65d229ab083768bb4e18ead9a5ed52f8e9 Mon Sep 17 00:00:00 2001 From: pfackeldey Date: Thu, 25 Sep 2025 18:17:44 +0200 Subject: [PATCH 3/7] add option to choose the trace method --- src/coffea/processor/executor.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/src/coffea/processor/executor.py b/src/coffea/processor/executor.py index 7be41c0ad..a8b0f1def 100644 --- a/src/coffea/processor/executor.py +++ b/src/coffea/processor/executor.py @@ -26,6 +26,8 @@ import uproot from cachetools import LRUCache +from coffea.nanoevents.trace import trace + from ..nanoevents import NanoEventsFactory, schemas from ..util import _exception_chain, _hash, rich_bar from .accumulator import Accumulatable, accumulate, set_accumulator @@ -1579,9 +1581,11 @@ def trace_needed_columns( self, fileset: Union[dict, str, list[WorkItem], Generator], processor_instance=ProcessorABC, + *, treename="Events", uproot_options: Optional[dict] = {}, iteritems_options: Optional[dict] = {}, + trace_method: Callable = trace, ) -> Generator[WorkItem]: """Trace the processor_instance on a given fileset @@ -1619,12 +1623,10 @@ def accumulator(self): return {} def process(self, events): - from coffea.nanoevents.trace import trace - accum = self.accumulator hashed = _hash_for_tracing(events.metadata) accum[hashed] = set() - columns = trace(self.processor.process, events) + columns = trace_method(self.processor.process, events) accum[hashed] |= columns return accum From 88db7f0af9b2b079934b87352623c1eb12a65d8f Mon Sep 17 00:00:00 2001 From: Iason Krommydas Date: Thu, 25 Sep 2025 18:29:01 +0200 Subject: [PATCH 4/7] changing to throw=False default in tracing --- src/coffea/nanoevents/trace.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/coffea/nanoevents/trace.py b/src/coffea/nanoevents/trace.py index aba1ac7f0..653fd43d4 100644 --- a/src/coffea/nanoevents/trace.py +++ b/src/coffea/nanoevents/trace.py @@ -117,7 +117,7 @@ def _attempt_tracing(fun: Callable, tracer: ak.Array, throw: bool) -> None: def trace_with_typetracer( - fun: Callable, events: ak.Array, throw: bool = True + fun: Callable, events: ak.Array, throw: bool = False ) -> frozenset[str]: """ Trace the execution of a function on NanoEvents using Awkward's typetracer to determine which buffers are touched. @@ -145,7 +145,7 @@ def trace_with_typetracer( def trace_with_length_zero_array( - fun: Callable, events: ak.Array, throw: bool = True + fun: Callable, events: ak.Array, throw: bool = False ) -> frozenset[str]: """ Trace the execution of a function on NanoEvents using a length-zero array to determine which buffers are touched. @@ -173,7 +173,7 @@ def trace_with_length_zero_array( def trace_with_length_one_array( - fun: Callable, events: ak.Array, throw: bool = True + fun: Callable, events: ak.Array, throw: bool = False ) -> frozenset[str]: """ Trace the execution of a function on NanoEvents using a length-one array to determine which buffers are touched. @@ -225,7 +225,7 @@ def trace(fun: Callable, events: ak.Array) -> frozenset[str]: touched = set() try: - touched |= trace_with_typetracer(fun, events) + touched |= trace_with_typetracer(fun, events, throw=True) return frozenset(touched) except Exception as e1: warnings.warn( @@ -234,7 +234,7 @@ def trace(fun: Callable, events: ak.Array) -> frozenset[str]: stacklevel=2, ) try: - touched |= trace_with_length_zero_array(fun, events) + touched |= trace_with_length_zero_array(fun, events, throw=True) return frozenset(touched) except Exception as e2: warnings.warn( @@ -243,7 +243,7 @@ def trace(fun: Callable, events: ak.Array) -> frozenset[str]: stacklevel=2, ) try: - touched |= trace_with_length_one_array(fun, events) + touched |= trace_with_length_one_array(fun, events, throw=True) return frozenset(touched) except Exception as e3: warnings.warn( From 23e02b576a25e18096de8a3ab5b83c3fdd0ea215 Mon Sep 17 00:00:00 2001 From: Iason Krommydas Date: Thu, 25 Sep 2025 18:38:32 +0200 Subject: [PATCH 5/7] doc string for trace_method, no kwargs yet in this PR --- src/coffea/processor/executor.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/coffea/processor/executor.py b/src/coffea/processor/executor.py index a8b0f1def..432ee8bf8 100644 --- a/src/coffea/processor/executor.py +++ b/src/coffea/processor/executor.py @@ -1581,7 +1581,6 @@ def trace_needed_columns( self, fileset: Union[dict, str, list[WorkItem], Generator], processor_instance=ProcessorABC, - *, treename="Events", uproot_options: Optional[dict] = {}, iteritems_options: Optional[dict] = {}, @@ -1610,6 +1609,10 @@ def trace_needed_columns( Any options to pass to ``uproot.open`` iteritems_options : dict, optional Any options to pass to ``tree.iteritems`` + trace_method : Callable, Optional + A callable that takes a function and a NanoEventsArray and returns + the set of columns accessed during the function call. + Default is ``coffea.nanoevents.trace.trace`` """ class TraceProcessor(ProcessorABC): From f986ea663fbe99ba9c25012d08332c6cc7654705 Mon Sep 17 00:00:00 2001 From: pfackeldey Date: Wed, 1 Oct 2025 13:26:41 +0200 Subject: [PATCH 6/7] only trace first chunk per dataset to reduce tracing work --- src/coffea/processor/executor.py | 87 +++++++++++++++++++++----------- 1 file changed, 57 insertions(+), 30 deletions(-) diff --git a/src/coffea/processor/executor.py b/src/coffea/processor/executor.py index f35716561..7ee800966 100644 --- a/src/coffea/processor/executor.py +++ b/src/coffea/processor/executor.py @@ -1424,10 +1424,10 @@ def _work_function( # preload logic # Important: same hash as in the TraceProcessor cls, so we use a common method - hashed = _hash_for_tracing(metadata) + _key = _key_for_tracing(metadata) needed = None if preload_columns is not None: - needed = preload_columns.get(hashed, None) + needed = preload_columns.get(_key, None) if needed is not None: preload = lambda b: b.name in needed # noqa @@ -1573,6 +1573,44 @@ def preprocess( return self._chunk_generator(fileset, treename) + def maybe_preprocess( + self, + fileset: dict, + treename: Optional[str] = None, + ) -> Generator: + """Run the processor_instance on a given fileset if needed + + Parameters + ---------- + fileset : dict | str | List[WorkItem] | Generator + - A dictionary ``{dataset: [file, file], }`` + Optionally, if some files' tree name differ, the dictionary can be specified: + ``{dataset: {'treename': 'name', 'files': [file, file]}, }`` + You can also define a different tree name per file in the dictionary: + ``{dataset: {'files': {file: 'name'}}, }`` + - A single file name + - File chunks for self.preprocess() + - Chunk generator + treename : str, optional + name of tree inside each root file, can be ``None``; + treename can also be defined in fileset, which will override the passed treename + Not needed if processing premade chunks + """ + meta = False + if not isinstance(fileset, (Mapping, str)): + if isinstance(fileset, Generator) or isinstance(fileset[0], WorkItem): + meta = True + else: + raise ValueError( + "Expected fileset to be a mapping dataset: list(files) or filename" + ) + + if meta: + chunks = fileset + else: + chunks = self.preprocess(fileset, treename) + return chunks + def trace_needed_columns( self, fileset: Union[dict, str, list[WorkItem], Generator], @@ -1623,19 +1661,29 @@ def accumulator(self): def process(self, events): accum = self.accumulator - hashed = _hash_for_tracing(events.metadata) - accum[hashed] = set() + key = _key_for_tracing(events.metadata) + accum[key] = set() columns = trace_method(self.processor.process, events) - accum[hashed] |= columns + accum[key] |= columns return accum def postprocess(self, accumulator): return accumulator + # only trace first chunk per dataset + def _first_item_per_dataset(gen: Generator) -> Generator: + seen = set() + for item in gen: + if item.dataset not in seen: + seen.add(item.dataset) + yield item + + chunks = _first_item_per_dataset(self.maybe_preprocess(fileset, treename)) + # wrap trace_processor_instance = TraceProcessor(processor_instance) return self( - fileset, + chunks, trace_processor_instance, treename, uproot_options=uproot_options, @@ -1678,22 +1726,10 @@ def run( preload_columns: dict, optional A mapping of fileset metadata to columns to preload """ - - meta = False - if not isinstance(fileset, (Mapping, str)): - if isinstance(fileset, Generator) or isinstance(fileset[0], WorkItem): - meta = True - else: - raise ValueError( - "Expected fileset to be a mapping dataset: list(files) or filename" - ) if not isinstance(processor_instance, ProcessorABC): raise ValueError("Expected processor_instance to derive from ProcessorABC") - if meta: - chunks = fileset - else: - chunks = self.preprocess(fileset, treename) + chunks = self.maybe_preprocess(fileset, treename) if self.processor_compression is None: pi_to_send = processor_instance @@ -1765,14 +1801,5 @@ def run( return wrapped_out -def _hash_for_tracing(metadata): - return _hash( - ( - metadata["dataset"], - metadata["filename"], - metadata["treename"], - metadata["entrystart"], - metadata["entrystop"], - metadata["fileuuid"], - ) - ) +def _key_for_tracing(metadata): + return (metadata["dataset"], metadata["treename"]) From 0ac60d3f01670bda4196a2a825f622e2363b8b36 Mon Sep 17 00:00:00 2001 From: pfackeldey Date: Wed, 1 Oct 2025 13:57:38 +0200 Subject: [PATCH 7/7] change progressbar description during tracing to 'Tracing', so it's clear what is happening --- src/coffea/processor/executor.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/src/coffea/processor/executor.py b/src/coffea/processor/executor.py index 7ee800966..af93712a0 100644 --- a/src/coffea/processor/executor.py +++ b/src/coffea/processor/executor.py @@ -1682,7 +1682,11 @@ def _first_item_per_dataset(gen: Generator) -> Generator: # wrap trace_processor_instance = TraceProcessor(processor_instance) - return self( + + # patch the executor description + old_desc = self.executor.desc + self.executor.desc = "Tracing" + needed = self( chunks, trace_processor_instance, treename, @@ -1690,6 +1694,9 @@ def _first_item_per_dataset(gen: Generator) -> Generator: iteritems_options=iteritems_options, preload_columns=None, ) + # restore the old description + self.executor.desc = old_desc + return needed def run( self,