Skip to content

Commit 992dcf3

Browse files
authored
Toast data and metadata improvements: (#1522)
- Support loading raw data as int32 DAQ units. - When loading metadata, ensure that byte arrays are converted to strings as the starting point. These get automatically converted to / from bytestrings during HDF5 save / load. - The HKManager now excludes several large fields by default (WG / HWP encoders and ACU udp stream). - HKManager converts all byte strings to unicode on load, and then separately converts to / from bytestrings during HDF5 save / load. - Small fix to always use "so3g compatibility mode" when converting boresight Az/El to RA/DEC on load.
1 parent 6c2d7bf commit 992dcf3

File tree

4 files changed

+151
-52
lines changed

4 files changed

+151
-52
lines changed

sotodlib/toast/hkmanager.py

Lines changed: 58 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,17 @@
22
# Full license can be found in the top level "LICENSE" file.
33

44
import copy
5+
import re
56
from collections.abc import MutableMapping
67

78
import numpy as np
89
from scipy.interpolate import CubicSpline, PchipInterpolator
910

10-
from toast.utils import Logger
11-
from toast.io.hdf_utils import load_meta_object, save_meta_object
11+
from toast.utils import Logger, replace_byte_arrays, array_equal
12+
from toast.io.hdf_utils import (
13+
load_meta_object,
14+
save_meta_object,
15+
)
1216

1317
from ..io import hkdb
1418

@@ -49,6 +53,17 @@ class HKManager(MutableMapping):
4953
5054
"""
5155

56+
# Exclude the massive HWP and WG housekeeping data by default, since
57+
# this information is only used to build the HWP angle and WG calibration
58+
# metadata. Loading these fields dramatically increases the load time
59+
# for normal use. These can still be forcibly loaded by specifying them
60+
# in the `plat_fields` list.
61+
exclude_by_default = [
62+
r"^hwp-.*",
63+
r"^wg-.*",
64+
r"^acu.acu_udp_stream.*",
65+
]
66+
5267
def __init__(
5368
self,
5469
comm=None,
@@ -83,7 +98,7 @@ def __init__(
8398
# Load data on just one process per group
8499
self._load_hk_data(site_root, site_db, site_fields, site_aliases)
85100

86-
# Load the plot HK data
101+
# Load the platform HK data
87102
if plat_root is not None:
88103
if plat_db is None:
89104
raise RuntimeError("If plat_root is specified, plat_db is required")
@@ -99,6 +114,7 @@ def __init__(
99114
def _load_hk_data(self, root, db, fields, aliases):
100115
"""Helper function to do the actual loading."""
101116
log = Logger.get()
117+
exclude_pat = [re.compile(x) for x in self.exclude_by_default]
102118
conf = hkdb.HkConfig.from_dict(
103119
{
104120
"hk_root": root,
@@ -120,7 +136,17 @@ def _load_hk_data(self, root, db, fields, aliases):
120136
)
121137
all_fields = hkdb.get_feed_list(test_spec)
122138
if fields is None or len(fields) == 0:
123-
selected = list(all_fields)
139+
# The user is requesting "all" fields. We will prune this list to exclude
140+
# fields related to the HWP and wire grid encoders, which are excessively
141+
# large and rarely used for downstream analysis.
142+
selected = list()
143+
for fld in all_fields:
144+
keep = True
145+
for pat in exclude_pat:
146+
if pat.match(fld) is not None:
147+
keep = False
148+
if keep:
149+
selected.append(fld)
124150
else:
125151
selected = fields
126152
# Load the data
@@ -133,7 +159,10 @@ def _load_hk_data(self, root, db, fields, aliases):
133159
hkdb=None,
134160
)
135161
result = hkdb.load_hk(lspec, show_pb=False)
136-
self._internal.update(result.data)
162+
163+
converted = replace_byte_arrays(result.data)
164+
165+
self._internal.update(converted)
137166
if aliases is not None:
138167
for k, v in aliases.items():
139168
if v not in self._internal:
@@ -159,12 +188,24 @@ def interp_cubic(self, field):
159188
itrp = CubicSpline(times, vals, extrapolate=True)
160189
return itrp(self._stamps)
161190

162-
def memory_use(self):
191+
def memory_use(self, threshold=0):
192+
"""Return the total size of all fields and a string with the breakdown.
193+
194+
Args:
195+
threshold (int): Only include in the string objects larger than this.
196+
197+
Returns:
198+
(tuple): The (total size, info string)
199+
200+
"""
163201
bytes = 0
202+
fstr = ""
164203
for k, v in self._internal.items():
165-
bytes += v[0].nbytes
166-
bytes += v[1].nbytes
167-
return bytes
204+
fbytes = v[0].nbytes + v[1].nbytes
205+
bytes += fbytes
206+
if fbytes > threshold:
207+
fstr += f"{k}: {fbytes} bytes\n"
208+
return bytes, fstr
168209

169210
# Mapping methods
170211

@@ -204,14 +245,14 @@ def __eq__(self, other):
204245
log.verbose(f" keys {self._internal} != {other._internal}")
205246
return False
206247
for k, v in self._internal.items():
207-
try:
208-
if not np.allclose(v, other._internal[k]):
209-
log.verbose(f" data array {v} != {other._internal[k]}")
210-
return False
211-
except Exception:
212-
if self._internal[k] != other._internal[k]:
213-
log.verbose(f" data value {v} != {other._internal[k]}")
214-
return False
248+
times = v[0]
249+
vals = v[1]
250+
other_times = other._internal[k][0]
251+
other_vals = other._internal[k][1]
252+
if not array_equal(times, other_times, log_prefix=f"HKManager {k} times"):
253+
return False
254+
if not array_equal(vals, other_vals, log_prefix=f"HKManager {k} values"):
255+
return False
215256
if self._aliases != other._aliases:
216257
log.verbose(f" aliases {self._aliases} != {other._aliases}")
217258
return False

sotodlib/toast/ops/load_context.py

Lines changed: 32 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,7 @@
2424
Float,
2525
)
2626
from toast.ops.operator import Operator
27-
from toast.ops.pipeline import Pipeline
28-
from toast.utils import Logger
27+
from toast.utils import Logger, replace_byte_arrays
2928
from toast.dist import distribute_discrete
3029
from toast.observation import default_values as defaults
3130

@@ -135,13 +134,9 @@ class LoadContext(Operator):
135134
help="Path to DB for site housekeeping",
136135
)
137136

138-
hk_site_fields = List(
139-
list(), help="Restrict loading to only these site fields"
140-
)
137+
hk_site_fields = List(list(), help="Restrict loading to only these site fields")
141138

142-
hk_site_aliases = Dict(
143-
dict(), help="Optional convenience aliases for site fields"
144-
)
139+
hk_site_aliases = Dict(dict(), help="Optional convenience aliases for site fields")
145140

146141
hk_platform_root = Unicode(
147142
None,
@@ -330,6 +325,10 @@ class LoadContext(Operator):
330325

331326
bandwidth = Float(0.2, help="Fractional bandwith used in analytic bandpass")
332327

328+
daq_units = Bool(
329+
False, help="If True, convert raw data to original int32 DAQ units"
330+
)
331+
333332
def __init__(self, **kwargs):
334333
super().__init__(**kwargs)
335334

@@ -355,6 +354,10 @@ def _exec(self, data, detectors=None, **kwargs):
355354
msg = "Only one of the context or context_file should be specified"
356355
raise RuntimeError(msg)
357356

357+
if self.daq_units and self.preprocess_config is not None:
358+
msg = "Cannot convert raw signal to DAQ units if passing data through preprocessing"
359+
raise RuntimeError(msg)
360+
358361
# Build our detector selection dictionary. Merge our explicit traits
359362
# with any pre-existing detector selection.
360363
dets_select = None
@@ -635,8 +638,14 @@ def _exec(self, data, detectors=None, **kwargs):
635638
# Read and communicate data
636639
self._load_data(ob, have_pointing, preproc_conf)
637640

641+
# Now that all metadata has been loaded, ensure that all byte strings
642+
# are converted to unicode arrays.
643+
ob._internal = replace_byte_arrays(ob._internal)
644+
638645
# Optionally load housekeeping data
639646
if self.hk_site_root is not None or self.hk_platform_root is not None:
647+
hk_timer = Timer()
648+
hk_timer.start()
640649
ob.hk = HKManager(
641650
ob.comm.comm_group,
642651
ob.shared[self.times].data,
@@ -649,6 +658,11 @@ def _exec(self, data, detectors=None, **kwargs):
649658
plat_fields=self.hk_platform_fields,
650659
plat_aliases=self.hk_platform_aliases,
651660
)
661+
log.debug_rank(
662+
f"LoadContext {obs_name} load House Keeping in",
663+
comm=comm.comm_group,
664+
timer=hk_timer,
665+
)
652666

653667
# Compute the boresight pointing and observatory position
654668
if have_pointing:
@@ -1052,9 +1066,14 @@ def _create_observation(
10521066
dtype=np.float64,
10531067
)
10541068
if ax_det_signal is not None:
1055-
ob.detdata.create(
1056-
self.det_data, dtype=np.float64, units=self.det_data_units
1057-
)
1069+
if self.daq_units:
1070+
ob.detdata.create(
1071+
self.det_data, dtype=np.int32, units=u.dimensionless_unscaled
1072+
)
1073+
else:
1074+
ob.detdata.create(
1075+
self.det_data, dtype=np.float64, units=self.det_data_units
1076+
)
10581077
ob.detdata.create(self.det_flags, dtype=np.uint8)
10591078

10601079
if meta is not None:
@@ -1118,6 +1137,7 @@ def _load_data(self, ob, have_pointing, pconf):
11181137
ignore_preprocess_archive=self.ignore_preprocess_archive,
11191138
context=self.context,
11201139
context_file=self.context_file,
1140+
daq_units=self.daq_units,
11211141
)
11221142

11231143
log.debug_rank(
@@ -1315,6 +1335,7 @@ def _load_data(self, ob, have_pointing, pconf):
13151335
is_flag=(mask is not None),
13161336
flag_invert=do_invert,
13171337
flag_mask=mask,
1338+
daq_units=self.daq_units,
13181339
)
13191340

13201341
# Original wafer data no longer needed. AxisManager does not seem to

0 commit comments

Comments
 (0)