Skip to content
12 changes: 6 additions & 6 deletions src/coffea/nanoevents/trace.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ def _attempt_tracing(fun: Callable, tracer: ak.Array, throw: bool) -> None:


def trace_with_typetracer(
fun: Callable, events: ak.Array, throw: bool = True
fun: Callable, events: ak.Array, throw: bool = False
) -> frozenset[str]:
"""
Trace the execution of a function on NanoEvents using Awkward's typetracer to determine which buffers are touched.
Expand Down Expand Up @@ -145,7 +145,7 @@ def trace_with_typetracer(


def trace_with_length_zero_array(
fun: Callable, events: ak.Array, throw: bool = True
fun: Callable, events: ak.Array, throw: bool = False
) -> frozenset[str]:
"""
Trace the execution of a function on NanoEvents using a length-zero array to determine which buffers are touched.
Expand Down Expand Up @@ -173,7 +173,7 @@ def trace_with_length_zero_array(


def trace_with_length_one_array(
fun: Callable, events: ak.Array, throw: bool = True
fun: Callable, events: ak.Array, throw: bool = False
) -> frozenset[str]:
"""
Trace the execution of a function on NanoEvents using a length-one array to determine which buffers are touched.
Expand Down Expand Up @@ -225,7 +225,7 @@ def trace(fun: Callable, events: ak.Array) -> frozenset[str]:
touched = set()

try:
touched |= trace_with_typetracer(fun, events)
touched |= trace_with_typetracer(fun, events, throw=True)
return frozenset(touched)
except Exception as e1:
warnings.warn(
Expand All @@ -234,7 +234,7 @@ def trace(fun: Callable, events: ak.Array) -> frozenset[str]:
stacklevel=2,
)
try:
touched |= trace_with_length_zero_array(fun, events)
touched |= trace_with_length_zero_array(fun, events, throw=True)
return frozenset(touched)
except Exception as e2:
warnings.warn(
Expand All @@ -243,7 +243,7 @@ def trace(fun: Callable, events: ak.Array) -> frozenset[str]:
stacklevel=2,
)
try:
touched |= trace_with_length_one_array(fun, events)
touched |= trace_with_length_one_array(fun, events, throw=True)
return frozenset(touched)
except Exception as e3:
warnings.warn(
Expand Down
174 changes: 160 additions & 14 deletions src/coffea/processor/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import uproot
from cachetools import LRUCache

from coffea.nanoevents.trace import trace

from ..nanoevents import NanoEventsFactory, schemas
from ..util import _exception_chain, _hash, rich_bar
from .accumulator import Accumulatable, accumulate, set_accumulator
Expand Down Expand Up @@ -1384,6 +1386,7 @@ def _work_function(
processor_instance: ProcessorABC,
uproot_options: dict,
iteritems_options: dict,
preload_columns: dict,
) -> dict:
if "timeout" in uproot_options:
xrootdtimeout = uproot_options["timeout"]
Expand Down Expand Up @@ -1419,6 +1422,18 @@ def _work_function(
if item.usermeta is not None:
metadata.update(item.usermeta)

# preload logic
# Important: same hash as in the TraceProcessor cls, so we use a common method
_key = _key_for_tracing(metadata)
needed = None
if preload_columns is not None:
needed = preload_columns.get(_key, None)

if needed is not None:
preload = lambda b: b.name in needed # noqa
else:
preload = None

with filecontext as file:
if schema is None:
raise ValueError("Schema must be set")
Expand All @@ -1437,6 +1452,7 @@ def _work_function(
entry_start=item.entrystart,
entry_stop=item.entrystop,
iteritems_options=iteritems_options,
preload=preload,
)
events = factory.events()
elif format == "parquet":
Expand Down Expand Up @@ -1485,6 +1501,7 @@ def __call__(
treename: Optional[str] = None,
uproot_options: Optional[dict] = {},
iteritems_options: Optional[dict] = {},
preload_columns: Optional[dict] = None,
) -> Accumulatable:
"""Run the processor_instance on a given fileset

Expand All @@ -1503,9 +1520,16 @@ def __call__(
Any options to pass to ``uproot.open``
iteritems_options : dict, optional
Any options to pass to ``tree.iteritems``
preload_columns : dict, optional
A mapping of fileset metadata to columns to preload
"""
wrapped_out = self.run(
fileset, processor_instance, treename, uproot_options, iteritems_options
fileset,
processor_instance,
treename,
uproot_options,
iteritems_options,
preload_columns,
)
if self.use_dataframes:
return wrapped_out # not wrapped anymore
Expand Down Expand Up @@ -1549,13 +1573,139 @@ def preprocess(

return self._chunk_generator(fileset, treename)

def maybe_preprocess(
self,
fileset: dict,
treename: Optional[str] = None,
) -> Generator:
"""Run the processor_instance on a given fileset if needed

Parameters
----------
fileset : dict | str | List[WorkItem] | Generator
- A dictionary ``{dataset: [file, file], }``
Optionally, if some files' tree name differ, the dictionary can be specified:
``{dataset: {'treename': 'name', 'files': [file, file]}, }``
You can also define a different tree name per file in the dictionary:
``{dataset: {'files': {file: 'name'}}, }``
- A single file name
- File chunks for self.preprocess()
- Chunk generator
treename : str, optional
name of tree inside each root file, can be ``None``;
treename can also be defined in fileset, which will override the passed treename
Not needed if processing premade chunks
"""
meta = False
if not isinstance(fileset, (Mapping, str)):
if isinstance(fileset, Generator) or isinstance(fileset[0], WorkItem):
meta = True
else:
raise ValueError(
"Expected fileset to be a mapping dataset: list(files) or filename"
)

if meta:
chunks = fileset
else:
chunks = self.preprocess(fileset, treename)
return chunks

def trace_needed_columns(
self,
fileset: Union[dict, str, list[WorkItem], Generator],
processor_instance=ProcessorABC,
treename="Events",
uproot_options: Optional[dict] = {},
iteritems_options: Optional[dict] = {},
trace_method: Callable = trace,
) -> Generator[WorkItem]:
"""Trace the processor_instance on a given fileset

Parameters
----------
fileset : dict | str | List[WorkItem] | Generator
- A dictionary ``{dataset: [file, file], }``
Optionally, if some files' tree name differ, the dictionary can be specified:
``{dataset: {'treename': 'name', 'files': [file, file]}, }``
You can also define a different tree name per file in the dictionary:
``{dataset: {'files': {file: 'name'}}, }``
- A single file name
- File chunks for self.preprocess()
- Chunk generator
processor_instance : ProcessorABC
An instance of a class deriving from ProcessorABC
treename : str, optional
name of tree inside each root file, can be ``None``;
treename can also be defined in fileset, which will override the passed treename
Not needed if processing premade chunks
uproot_options : dict, optional
Any options to pass to ``uproot.open``
iteritems_options : dict, optional
Any options to pass to ``tree.iteritems``
trace_method : Callable, Optional
A callable that takes a function and a NanoEventsArray and returns
the set of columns accessed during the function call.
Default is ``coffea.nanoevents.trace.trace``
"""

class TraceProcessor(ProcessorABC):
"""Wraps a processor to trace which columns are accessed during processing."""

def __init__(self, processor: ProcessorABC):
self.processor = processor

@property
def accumulator(self):
return {}

def process(self, events):
accum = self.accumulator
key = _key_for_tracing(events.metadata)
accum[key] = set()
columns = trace_method(self.processor.process, events)
accum[key] |= columns
return accum

def postprocess(self, accumulator):
return accumulator

# only trace first chunk per dataset
def _first_item_per_dataset(gen: Generator) -> Generator:
seen = set()
for item in gen:
if item.dataset not in seen:
seen.add(item.dataset)
yield item

chunks = _first_item_per_dataset(self.maybe_preprocess(fileset, treename))

# wrap
trace_processor_instance = TraceProcessor(processor_instance)

# patch the executor description
old_desc = self.executor.desc
self.executor.desc = "Tracing"
needed = self(
chunks,
trace_processor_instance,
treename,
uproot_options=uproot_options,
iteritems_options=iteritems_options,
preload_columns=None,
)
# restore the old description
self.executor.desc = old_desc
return needed

def run(
self,
fileset: Union[dict, str, list[WorkItem], Generator],
processor_instance: ProcessorABC,
treename: Optional[str] = None,
uproot_options: Optional[dict] = {},
iteritems_options: Optional[dict] = {},
preload_columns: Optional[dict] = None,
) -> Accumulatable:
"""Run the processor_instance on a given fileset

Expand All @@ -1580,23 +1730,13 @@ def run(
Any options to pass to ``uproot.open``
iteritems_options : dict, optional
Any options to pass to ``tree.iteritems``
preload_columns: dict, optional
A mapping of fileset metadata to columns to preload
"""

meta = False
if not isinstance(fileset, (Mapping, str)):
if isinstance(fileset, Generator) or isinstance(fileset[0], WorkItem):
meta = True
else:
raise ValueError(
"Expected fileset to be a mapping dataset: list(files) or filename"
)
if not isinstance(processor_instance, ProcessorABC):
raise ValueError("Expected processor_instance to derive from ProcessorABC")

if meta:
chunks = fileset
else:
chunks = self.preprocess(fileset, treename)
chunks = self.maybe_preprocess(fileset, treename)

if self.processor_compression is None:
pi_to_send = processor_instance
Expand All @@ -1618,6 +1758,7 @@ def run(
processor_instance="heavy",
uproot_options=uproot_options,
iteritems_options=iteritems_options,
preload_columns=preload_columns,
)
else:
closure = partial(
Expand All @@ -1630,6 +1771,7 @@ def run(
processor_instance=pi_to_send,
uproot_options=uproot_options,
iteritems_options=iteritems_options,
preload_columns=preload_columns,
)

chunks = list(chunks)
Expand Down Expand Up @@ -1664,3 +1806,7 @@ def run(
return wrapped_out["out"]
else:
return wrapped_out


def _key_for_tracing(metadata):
return (metadata["dataset"], metadata["treename"])
Loading