Skip to content

Commit a99e45d

Browse files
MoAly98pre-commit-ci[bot]ikrommyd
authored
feat: add file handle as an attribute of the events factory (#1502)
* feat: add file.source as metadata in processor * add flag to only run filesource test in executor workflows * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * track file handle instead of fsspecsource and delete reference before exiting file context * remove pixi dust added by mistake * add file handle to events factory instead of metadata, following implementaiton for access_log * fix precommit for test file * remove extra del (nicks comment) * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --------- Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> Co-authored-by: Iason Krommydas <iason.krom@gmail.com>
1 parent 90a50ce commit a99e45d

File tree

9 files changed

+71
-6
lines changed

9 files changed

+71
-6
lines changed

src/coffea/nanoevents/factory.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -392,13 +392,15 @@ def from_root(
392392

393393
if isinstance(file, uproot.reading.ReadOnlyDirectory):
394394
tree = file[treepath]
395+
file_handle = file
395396
elif "<class 'uproot.rootio.ROOTDirectory'>" == str(type(file)):
396397
raise RuntimeError(
397398
"The file instance (%r) is an uproot3 type, but this module is only compatible with uproot5 or higher"
398399
% file
399400
)
400401
else:
401402
tree = uproot.open(file, **uproot_options)
403+
file_handle = tree.file
402404

403405
# Get the typenames
404406
typenames = tree.typenames()
@@ -438,6 +440,7 @@ def from_root(
438440
entry_stop,
439441
cache={},
440442
access_log=access_log,
443+
file_handle=file_handle,
441444
use_ak_forth=use_ak_forth,
442445
virtual=mode == "virtual",
443446
preloaded_arrays=preloaded_arrays,
@@ -755,6 +758,11 @@ def access_log(self):
755758
"""List of accessed branches, populated when columns are lazily loaded."""
756759
return getattr(self._mapping, "_access_log", None)
757760

761+
@property
762+
def file_handle(self):
763+
"""The file handle used to open the source file, if available."""
764+
return getattr(self._mapping, "_file_handle", None)
765+
758766
def events(self):
759767
"""
760768
Build events

src/coffea/nanoevents/mapping/base.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,13 +34,15 @@ def __init__(
3434
stop,
3535
cache=None,
3636
access_log=None,
37+
file_handle=None,
3738
use_ak_forth=False,
3839
virtual=False,
3940
buffer_cache=None,
4041
):
4142
self._fileopener = fileopener
4243
self._cache = cache
4344
self._access_log = access_log
45+
self._file_handle = file_handle
4446
self._start = start
4547
self._stop = stop
4648
self._use_ak_forth = use_ak_forth

src/coffea/nanoevents/mapping/uproot.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,7 @@ def __init__(
123123
stop,
124124
cache=None,
125125
access_log=None,
126+
file_handle=None,
126127
use_ak_forth=False,
127128
virtual=False,
128129
decompression_executor=None,
@@ -136,6 +137,7 @@ def __init__(
136137
stop=stop,
137138
cache=cache,
138139
access_log=access_log,
140+
file_handle=file_handle,
139141
use_ak_forth=use_ak_forth,
140142
virtual=virtual,
141143
buffer_cache=buffer_cache,

src/coffea/processor/executor.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1550,6 +1550,7 @@ def _work_function(
15501550
)
15511551
# save the output
15521552
checkpointer.save(out, metadata, processor_instance)
1553+
15531554
return out
15541555

15551556
def __call__(

src/coffea/processor/test_items/NanoEventsProcessor.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,11 @@
66

77

88
class NanoEventsProcessor(processor.ProcessorABC):
9-
def __init__(self, columns=[], canaries=[], mode="dask"):
9+
def __init__(self, columns=[], canaries=[], mode="dask", check_filehandle=False):
1010
self._columns = columns
1111
self._canaries = canaries
1212
self.mode = mode
13+
self.check_filehandle = check_filehandle
1314

1415
self.expected_usermeta = {
1516
"ZJets": ("someusermeta", "hello"),
@@ -56,6 +57,10 @@ def process(self, events):
5657
metaname, metavalue = self.expected_usermeta[dataset]
5758
assert metavalue == events.metadata[metaname]
5859

60+
if self.check_filehandle:
61+
factory = events.attrs["@events_factory"]
62+
assert factory.file_handle is not None
63+
5964
# mapping = events.behavior["__events_factory__"]._mapping
6065
muon_pt = events.Muon.pt
6166
# if isinstance(mapping, nanoevents.mapping.CachedMapping):

tests/test_local_executors.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,11 @@ def test_nanoevents_analysis(
2323
executor, compression, maxchunks, skipbadfiles, filetype, mode, processor_type
2424
):
2525
if processor_type == "Callable":
26-
processor_instance = NanoEventsProcessor(mode=mode)
26+
processor_instance = NanoEventsProcessor(mode=mode, check_filehandle=True)
2727
else:
28-
processor_instance = NanoEventsProcessor(mode=mode).process
28+
processor_instance = NanoEventsProcessor(
29+
mode=mode, check_filehandle=True
30+
).process
2931

3032
if filetype == "parquet":
3133
pytest.xfail("parquet nanoevents not supported yet")

tests/test_nanoevents.py

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import awkward as ak
44
import pytest
5+
import uproot
56
from distributed import Client
67

78
from coffea.nanoevents import NanoAODSchema, NanoEventsFactory
@@ -246,3 +247,47 @@ def test_access_log(tests_directory, mode):
246247
_ = ak.materialize(events.Muon.pt)
247248
branches = {entry.branch for entry in factory.access_log}
248249
assert branches == {"nMuon", "Muon_pt"}
250+
251+
252+
@pytest.mark.parametrize("mode", ["eager", "virtual"])
253+
def test_file_handle_from_path(tests_directory, mode):
254+
"""Test that file_handle is available when opening from path string."""
255+
path = f"{tests_directory}/samples/nano_dy.root:Events"
256+
257+
factory = NanoEventsFactory.from_root(
258+
path,
259+
schemaclass=NanoAODSchema,
260+
mode=mode,
261+
)
262+
263+
# file_handle should be ReadOnlyFile when opened from path
264+
assert factory.file_handle is not None
265+
assert isinstance(factory.file_handle, uproot.reading.ReadOnlyFile)
266+
267+
_ = factory.events()
268+
269+
# file_handle still accessible after events() call
270+
assert factory.file_handle is not None
271+
272+
273+
@pytest.mark.parametrize("mode", ["eager", "virtual"])
274+
def test_file_handle_from_directory(tests_directory, mode):
275+
"""Test that file_handle is available when passing ReadOnlyDirectory."""
276+
filepath = f"{tests_directory}/samples/nano_dy.root"
277+
278+
with uproot.open(filepath) as file:
279+
factory = NanoEventsFactory.from_root(
280+
file,
281+
treepath="Events",
282+
schemaclass=NanoAODSchema,
283+
mode=mode,
284+
)
285+
286+
# file_handle should be ReadOnlyDirectory when passed directly
287+
assert factory.file_handle is not None
288+
assert isinstance(factory.file_handle, uproot.ReadOnlyDirectory)
289+
290+
_ = factory.events()
291+
292+
# file_handle still accessible after events() call
293+
assert factory.file_handle is not None

tests/test_skyhook_job.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@
6969
"Data": "/mnt/cephfs/nanoevents/Data",
7070
},
7171
"Events",
72-
processor_instance=NanoEventsProcessor(),
72+
processor_instance=NanoEventsProcessor(check_filehandle=True),
7373
)
7474

7575
assert hists["cutflow"]["ZJets_pt"] == 108
@@ -93,7 +93,7 @@
9393
"Data": "/mnt/cephfs/nanoevents/Data",
9494
},
9595
"Events",
96-
processor_instance=NanoEventsProcessor(),
96+
processor_instance=NanoEventsProcessor(check_filehandle=True),
9797
)
9898

9999
assert hists["cutflow"]["ZJets_pt"] == 108

tests/test_taskvine_virtual.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ def test_taskvine_executor_with_virtual_arrays():
3131
}
3232

3333
# Create the same processor as used in local executors
34-
processor = NanoEventsProcessor(mode="virtual")
34+
processor = NanoEventsProcessor(mode="virtual", check_filehandle=True)
3535

3636
# Create Runner with TaskVineExecutor
3737
from coffea.processor import Runner

0 commit comments

Comments
 (0)