|
| 1 | +# SPDX-License-Identifier: BSD-3-Clause |
| 2 | +# Copyright (c) 2025 Scipp contributors (https://github.com/scipp) |
| 3 | +"""Shrink a nexus file by removing all but the first N pulses.""" |
| 4 | + |
| 5 | +import shutil |
| 6 | +import subprocess |
| 7 | +from pathlib import Path |
| 8 | + |
| 9 | +import h5py as h5 |
| 10 | +import numpy as np |
| 11 | +import scippnexus as snx |
| 12 | + |
| 13 | +N_PULSE = 2 |
| 14 | + |
| 15 | +DIR = Path("data/coda") |
| 16 | +IN_FNAME = "977695_00068064.hdf" |
| 17 | +TMP_FNAME = f"TMP_{IN_FNAME}" |
| 18 | +OUT_FNAME = f"TEST_{IN_FNAME}" |
| 19 | + |
| 20 | + |
| 21 | +def main() -> None: |
| 22 | + shutil.copy(DIR / IN_FNAME, DIR / TMP_FNAME) |
| 23 | + with snx.File(DIR / TMP_FNAME, 'r') as f: |
| 24 | + entry = f['entry'] |
| 25 | + user_names = list(entry[snx.NXuser]) |
| 26 | + det_names = list(entry['instrument'][snx.NXdetector]) |
| 27 | + mon_names = list(entry['instrument'][snx.NXmonitor]) |
| 28 | + |
| 29 | + last_times = [] |
| 30 | + |
| 31 | + with h5.File(DIR / TMP_FNAME, 'a') as f: |
| 32 | + entry = f['entry'] |
| 33 | + for name in user_names: |
| 34 | + del entry[name] |
| 35 | + del entry['scripts'] |
| 36 | + |
| 37 | + instr = entry['instrument'] |
| 38 | + for name in det_names: |
| 39 | + det = instr[name] |
| 40 | + del det['pixel_shape'] |
| 41 | + |
| 42 | + event_data_name = next(name for name in det if name.endswith('event_data')) |
| 43 | + events = det[event_data_name] |
| 44 | + |
| 45 | + last_times.append( |
| 46 | + np.array( |
| 47 | + int(events['event_time_zero'][N_PULSE]), |
| 48 | + dtype=f'datetime64[{events["event_time_zero"].attrs["units"]}]', |
| 49 | + ) |
| 50 | + ) |
| 51 | + |
| 52 | + n_events = events['event_index'][N_PULSE] |
| 53 | + slice_dataset(events, 'event_index', N_PULSE) |
| 54 | + slice_dataset(events, 'event_time_zero', N_PULSE) |
| 55 | + slice_dataset(events, 'event_id', n_events) |
| 56 | + slice_dataset(events, 'event_time_offset', n_events) |
| 57 | + |
| 58 | + last_time = np.min(last_times) |
| 59 | + for name in mon_names: |
| 60 | + mon = instr[name] |
| 61 | + data = mon[f'{name}_events'] |
| 62 | + time = data['time'][()].astype(f'datetime64[{data["time"].attrs["units"]}]') |
| 63 | + if time.shape == (0,): |
| 64 | + continue |
| 65 | + n_times = np.argmax(time > last_time) |
| 66 | + slice_dataset(data, 'time', n_times) |
| 67 | + |
| 68 | + assert ( # noqa: S101 |
| 69 | + data.attrs['axes'][0] == 'time' |
| 70 | + ) # required to slice as below: |
| 71 | + slice_dataset(data, 'signal', n_times) |
| 72 | + |
| 73 | + subprocess.check_call(['h5repack', DIR / TMP_FNAME, DIR / OUT_FNAME]) # noqa: S603, S607 |
| 74 | + |
| 75 | + |
| 76 | +def slice_dataset(group, name, n): |
| 77 | + attrs = group[name].attrs |
| 78 | + group[name] = group.pop(name)[:n] |
| 79 | + group[name].attrs.update(attrs) |
| 80 | + |
| 81 | + |
| 82 | +if __name__ == "__main__": |
| 83 | + main() |
0 commit comments