Skip to content

Commit 7ca4ab3

Browse files
committed
Better handle flagged samples in trend cuts and dark templates
- In trend cuts, disregard the slope of chunks without a sufficient number of good samples. - In dark templates, handle cases where many samples are flagged, and also cases where there are no remaining dark detectors after cuts. - Other small cleanups
1 parent 860350b commit 7ca4ab3

File tree

4 files changed

+125
-53
lines changed

4 files changed

+125
-53
lines changed

sotodlib/scripts/so_batch_control.py

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,7 @@ def get_state_file(out_root, obs, name="state"):
5252

5353

5454
def get_obs_state(out_root, obs):
55-
"""Find the current processing state of an observation.
56-
"""
55+
"""Find the current processing state of an observation."""
5756
state_file = get_state_file(out_root, obs)
5857
if os.path.isfile(state_file):
5958
with open(state_file, "r") as f:
@@ -63,8 +62,7 @@ def get_obs_state(out_root, obs):
6362

6463

6564
def clear_obs_state(out_root, obs):
66-
"""Clear the current processing state of an observation.
67-
"""
65+
"""Clear the current processing state of an observation."""
6866
obs_dir = os.path.join(out_root, obs)
6967
if not os.path.isdir(obs_dir):
7068
return
@@ -74,8 +72,7 @@ def clear_obs_state(out_root, obs):
7472

7573

7674
def set_obs_state(out_root, obs, state):
77-
"""Set the current processing state of an observation.
78-
"""
75+
"""Set the current processing state of an observation."""
7976
obs_dir = os.path.join(out_root, obs)
8077
if not os.path.exists(obs_dir):
8178
os.makedirs(obs_dir)
@@ -175,6 +172,13 @@ def main():
175172
default=None,
176173
help="Set the state of this observation to 'done'",
177174
)
175+
parser.add_argument(
176+
"--set_state_failed",
177+
required=False,
178+
type=str,
179+
default=None,
180+
help="Set the state of this observation to 'failed'",
181+
)
178182
parser.add_argument(
179183
"--set_state_running",
180184
required=False,
@@ -196,13 +200,22 @@ def main():
196200
default=False,
197201
help="Remove the running state from all observations",
198202
)
203+
parser.add_argument(
204+
"--ignore_running",
205+
required=False,
206+
action="store_true",
207+
default=False,
208+
help="Ignore stale running state when considering observations",
209+
)
199210

200211
args = parser.parse_args()
201212

202213
if args.get_batch is not None:
203214
# We are getting the next batch of observations
204215
all_obs = load_obs_file(args.observations)
205-
batch_obs = find_obs(all_obs, args.get_batch, args.out_root)
216+
batch_obs = find_obs(
217+
all_obs, args.get_batch, args.out_root, ignore_running=args.ignore_running
218+
)
206219
if args.batch_list:
207220
batch_str = ",".join(batch_obs)
208221
print(f"{batch_str}", flush=True)
@@ -216,6 +229,8 @@ def main():
216229
clear_obs_state(args.out_root, args.clear_state)
217230
elif args.set_state_done is not None:
218231
set_obs_state(args.out_root, args.set_state_done, "done")
232+
elif args.set_state_failed is not None:
233+
set_obs_state(args.out_root, args.set_state_done, "failed")
219234
elif args.set_state_running is not None:
220235
set_obs_state(args.out_root, args.set_state_running, "running")
221236
elif args.cleanup:

sotodlib/toast/ops/dark_template.py

Lines changed: 86 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
import toast
1818
from toast.traits import trait_docs, Unicode, Int, Instance, Bool, Float, List
1919
from toast.ops import Operator
20-
from toast.utils import Logger, Environment, rate_from_times
20+
from toast.utils import Logger, flagged_noise_fill
2121
from toast.timing import function_timer, Timer
2222
from toast.observation import default_values as defaults
2323

@@ -101,24 +101,48 @@ def __init__(self, **kwargs):
101101

102102
@function_timer
103103
def _gather_dark_tod(self, ob):
104-
""" Gather dark tod to root process
104+
"""Gather dark tod to root process
105105
106106
"""
107107
comm = ob.comm.comm_group
108+
rank = ob.comm.group_rank
108109
dark_tod = []
109110
fp = ob.telescope.focalplane
110111
for det in ob.select_local_detectors(flagmask=self.det_mask):
111112
if fp[det]["det_info:wafer:type"] != "DARK":
112113
continue
113-
tod = ob.detdata[self.det_data][det]
114+
tod = ob.detdata[self.det_data][det].copy()
115+
if self.shared_flags is not None:
116+
flags = ob.shared[self.shared_flags].data & self.shared_flag_mask
117+
else:
118+
flags = np.zeros(len(tod), dtype=np.uint8)
119+
if self.det_flags is not None:
120+
flags |= ob.detdata[self.det_flags] & self.det_flag_mask
114121
# Check for typical pathologies
115122
if np.any(np.isnan(tod)) or np.std(tod) == 0:
116123
continue
117-
# Subtract the mean
118-
dark_tod.append(tod - np.mean(tod))
124+
if np.count_nonzero(flags) < 0.5 * len(tod):
125+
continue
126+
# Subtract the mean and fill flagged samples with noise. For the gap
127+
# filling, base the buffer size on the lowpass kernel size.
128+
good = flags == 0
129+
avg = np.mean(tod[good])
130+
tod -= avg
131+
flagged_noise_fill(tod, flags, self.naverage // 2, poly_order=5)
132+
dark_tod.append(tod)
133+
fail = False
119134
if comm is not None:
120-
dark_tod = comm.gather(dark_tod)
121-
return dark_tod
135+
dark_tod = comm.gather(dark_tod, root=0)
136+
if rank == 0:
137+
total_dark = np.sum([len(x) for x in dark_tod])
138+
if total_dark == 0:
139+
fail = True
140+
if comm is not None:
141+
fail = comm.bcast(fail, root=0)
142+
if fail:
143+
return None
144+
else:
145+
return dark_tod
122146

123147
@function_timer
124148
def _derive_templates(self, dark_tod):
@@ -163,6 +187,7 @@ def _load_dark_templates(self, ob):
163187
"""
164188
log = Logger.get()
165189
comm = ob.comm.comm_group
190+
rank = ob.comm.group_rank
166191

167192
# See if the templates were already cached
168193

@@ -171,17 +196,25 @@ def _load_dark_templates(self, ob):
171196
self.cache_dir,
172197
f"dark_templates.{ob.name}.pck",
173198
)
174-
if os.path.isfile(fname_cache):
175-
if comm is None or comm.rank == 0:
199+
exists = False
200+
if rank == 0:
201+
if os.path.isfile(fname_cache):
202+
exists = True
203+
if comm is not None:
204+
exists = comm.bcast(exists, root=0)
205+
if exists:
206+
if rank == 0:
176207
with open(fname_cache, "rb") as f:
177208
templates = pickle.load(f)
178209
log.debug(f"Loaded dark templates from {fname_cache}")
179210
else:
180211
templates = None
181212
self._share_templates(ob, templates)
182213
return ob.shared[self.key]
183-
184-
return None
214+
else:
215+
return None
216+
else:
217+
return None
185218

186219
@function_timer
187220
def _share_templates(self, ob, templates):
@@ -196,8 +229,11 @@ def _share_templates(self, ob, templates):
196229
ntemplate, nsample = templates.shape
197230

198231
if comm is not None:
199-
ntemplate = comm.bcast(ntemplate)
200-
nsample = comm.bcast(nsample)
232+
ntemplate = comm.bcast(ntemplate, root=0)
233+
nsample = comm.bcast(nsample, root=0)
234+
235+
if ntemplate is None:
236+
return
201237

202238
ob.shared.create_column(
203239
self.key,
@@ -209,7 +245,6 @@ def _share_templates(self, ob, templates):
209245
else:
210246
temp_trans = None
211247
ob.shared[self.key].set(temp_trans, offset=(0, 0), fromrank=0)
212-
return
213248

214249
@function_timer
215250
def _save_dark_templates(self, ob, templates):
@@ -220,48 +255,68 @@ def _save_dark_templates(self, ob, templates):
220255

221256
log = Logger.get()
222257
comm = ob.comm.comm_group
258+
rank = ob.comm.group_rank
223259

224260
if self.cache_dir is None:
225261
return
226262

227-
if comm is None or comm.rank == 0:
228-
fname_cache = os.path.join(
229-
self.cache_dir,
230-
f"dark_templates.{ob.name}.pck",
231-
)
232-
with open(fname_cache, "wb") as f:
233-
pickle.dump(templates.data.T, f)
234-
log.debug(f"Wrote dark templates to {fname_cache}")
235-
236-
return
263+
if rank == 0:
264+
if templates is not None:
265+
fname_cache = os.path.join(
266+
self.cache_dir,
267+
f"dark_templates.{ob.name}.pck",
268+
)
269+
with open(fname_cache, "wb") as f:
270+
pickle.dump(templates.data.T, f)
271+
log.debug(f"Wrote dark templates to {fname_cache}")
272+
if comm is not None:
273+
comm.barrier()
237274

238275
@function_timer
239276
def _get_dark_templates(self, ob):
240-
""" Construct low-passed dark bolometer templates
277+
""" Construct low-passed dark bolometer templates.
278+
279+
If no good dark detector data is found, the return value is None.
241280
"""
242-
log = Logger.get()
243281
comm = ob.comm.comm_group
282+
rank = ob.comm.group_rank
244283

245284
# gather all dark TOD to the root process
246285

247286
dark_tod = self._gather_dark_tod(ob)
248287

249288
# Construct dark templates through PCA and lowpass
250289

251-
if comm is None or comm.rank == 0:
252-
templates = self._derive_templates(dark_tod)
290+
if rank == 0:
291+
if dark_tod is None:
292+
fail = True
293+
else:
294+
fail = False
295+
templates = self._derive_templates(dark_tod)
253296
else:
297+
fail = False
254298
templates = None
299+
if comm is not None:
300+
fail = comm.bcast(fail, root=0)
255301

256-
self._share_templates(ob, templates)
257-
258-
return ob.shared[self.key]
302+
if fail:
303+
return None
304+
else:
305+
self._share_templates(ob, templates)
306+
return ob.shared[self.key]
259307

260308
@function_timer
261309
def _project_dark_templates(self, ob, templates):
262310
""" Project the provided templates out of every optical detector
263311
264312
"""
313+
log = Logger.get()
314+
if templates is None:
315+
# No templates available
316+
msg = f"No dark templates available for {ob.name}"
317+
log.info_rank(msg, comm=ob.comm.comm_group)
318+
return
319+
265320
if self.shared_flags is not None:
266321
common_flags = ob.shared[self.shared_flags].data & self.shared_flag_mask
267322
else:
@@ -303,8 +358,6 @@ def _project_dark_templates(self, ob, templates):
303358
coeff = np.dot(cov, proj)
304359
tod[ind] -= np.dot(coeff, templates_ind.T)
305360

306-
return
307-
308361
@function_timer
309362
def _exec(self, data, detectors=None, **kwargs):
310363
log = Logger.get()
@@ -330,8 +383,6 @@ def _exec(self, data, detectors=None, **kwargs):
330383
self._save_dark_templates(ob, templates)
331384
self._project_dark_templates(ob, templates)
332385

333-
return
334-
335386
@function_timer
336387
def _finalize(self, data, **kwargs):
337388
pass

sotodlib/toast/ops/load_context_utils.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -637,14 +637,14 @@ def _process_flag(mask, inbuf, detdata, det_begin, det_end, invert):
637637
# temporary buffers of ranges flags.
638638
del send_data
639639

640-
# Every process checks its local data for NaN values. If any are found, a warning
640+
# Every process checks its local data for NaN values. If any are found, a message
641641
# is printed and the detector is cut.
642642
dflags = dict()
643643
for det in obs.local_detectors:
644644
nnan = np.count_nonzero(np.isnan(obs.detdata[field][det]))
645645
if nnan > 0:
646646
msg = f"{obs.name}:{det} has {nnan} NaN values. Cutting."
647-
log.warning(msg)
647+
log.debug(msg)
648648
dflags[det] = defaults.det_mask_invalid
649649
obs.update_local_detector_flags(dflags)
650650

sotodlib/toast/ops/trend_cut.py

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -164,15 +164,21 @@ def _compute_trend(self, obs, reltime, slices, det):
164164
slopes = list()
165165
for slc in slices:
166166
good = flags[slc] == 0
167-
t = reltime[slc][good]
168-
s = signal[slc][good]
169-
170-
t_mean = np.mean(t)
171-
t_var = np.var(t, mean=t_mean)
172-
s_mean = np.mean(s)
173-
ts_mean = np.mean(t * s)
174-
175-
slopes.append((ts_mean - t_mean * s_mean) / t_var)
167+
if np.count_nonzero(good) < 0.1 * len(flags[slc]):
168+
# Not enough good data
169+
slopes.append(0.0)
170+
else:
171+
t = reltime[slc][good]
172+
s = signal[slc][good]
173+
174+
t_mean = np.mean(t)
175+
t_var = np.var(t, mean=t_mean)
176+
s_mean = np.mean(s)
177+
ts_mean = np.mean(t * s)
178+
if t_var == 0:
179+
slopes.append(0.0)
180+
else:
181+
slopes.append((ts_mean - t_mean * s_mean) / t_var)
176182
return np.array(slopes)
177183

178184
def _finalize(self, data, **kwargs):

0 commit comments

Comments
 (0)