Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions sotodlib/mapmaking/demod_mapmaker.py
Original file line number Diff line number Diff line change
Expand Up @@ -604,12 +604,13 @@ def make_demod_map(context, obslist, noise_model, info,
for oi in range(len(obslist)):
obs_id, detset, band = obslist[oi][:3]
name = "%s:%s:%s" % (obs_id, detset, band)
error, output_init, output_proc, obs = preprocess_util.preproc_or_load_group(obs_id,
configs_init=preproc_init,
configs_proc=preproc_proc,
dets={'wafer_slot':detset, 'wafer.bandpass':band},
logger=L,
overwrite=False)
error, output_init, output_proc, obs, _ = preprocess_util.preproc_or_load_group(obs_id,
configs_init=preproc_init,
configs_proc=preproc_proc,
dets={'wafer_slot':detset, 'wafer.bandpass':band},
logger=L,
overwrite=False,
return_proc_aman=False)
errors.append(error) ; outputs.append((output_init, output_proc)) ;
if error not in [None,'load_success']:
L.info('tod %s:%s:%s failed in the preproc database'%(obs_id,detset,band))
Expand Down
41 changes: 26 additions & 15 deletions sotodlib/preprocess/pcore.py
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,8 @@ def extend(self, index, other):
def __setitem__(self, index, item):
super().__setitem__(index, self._check_item(item))

def run(self, aman, proc_aman=None, select=True, sim=False, update_plot=False):
def run(self, aman, proc_aman=None, full_aman=None,
select=True, sim=False, update_plot=False):
"""
The main workhorse function for the pipeline class. This function takes
an AxisManager TOD and successively runs the pipeline of preprocessing
Expand All @@ -460,6 +461,12 @@ def run(self, aman, proc_aman=None, select=True, sim=False, update_plot=False):
returned this preprocess axismanager. In this case, calls to
``process.calc_and_save()`` are skipped as the information is
expected to be present in this AxisManager.
full_aman: AxisManager (Optional)
A preprocess axismanager. This axis manager stores the outputs of
preprocessing functions (proc_aman) but without any of the detector
or samps restrictions applied, thus maintaining its original shape.
This is returned at the end of the pipeline. If not passed it is
instantiated with the same number of dets and samps as aman.
select: boolean (Optional)
if True, the aman detector axis is restricted as described in
each preprocess module. Most pipelines are developed with
Expand All @@ -475,18 +482,22 @@ def run(self, aman, proc_aman=None, select=True, sim=False, update_plot=False):

Returns
-------
proc_aman: AxisManager
full_aman: AxisManager
A preprocess axismanager that contains all data products calculated
throughout the running of the pipeline

throughout the running of the pipeline.
success: str
A string that stores the name of the last process step that the pipeline
completed. If the pipeline successfully finishes all steps, success = 'end'.
"""
if proc_aman is None:
if 'preprocess' in aman:
proc_aman = aman.preprocess.copy()
full = aman.preprocess.copy()
if full_aman is None:
full_aman = aman.preprocess.copy()
else:
proc_aman = core.AxisManager(aman.dets, aman.samps)
full = core.AxisManager( aman.dets, aman.samps)
if full_aman is None:
full_aman = core.AxisManager( aman.dets, aman.samps)
run_calc = True
update_plot = False
else:
Expand All @@ -495,7 +506,8 @@ def run(self, aman, proc_aman=None, select=True, sim=False, update_plot=False):
det_list = [det for det in proc_aman.dets.vals if det in aman.dets.vals]
aman.restrict('dets', det_list)
proc_aman.restrict('dets', det_list)
full = proc_aman.copy()
if full_aman is None:
full_aman = proc_aman.copy()
run_calc = False

if 'frequency_cutoffs' not in proc_aman:
Expand Down Expand Up @@ -527,7 +539,7 @@ def run(self, aman, proc_aman=None, select=True, sim=False, update_plot=False):
if run_calc:
aman, proc_aman = process.calc_and_save(aman, proc_aman)
process.plot(aman, proc_aman, filename=os.path.join(self.plot_dir, '{ctime}/{obsid}', f'{step+1}_{{name}}.png'))
update_full_aman( proc_aman, full, self.wrap_valid)
update_full_aman( proc_aman, full_aman, self.wrap_valid)
if update_plot:
process.plot(aman, proc_aman, filename=os.path.join(self.plot_dir, '{ctime}/{obsid}', f'{step+1}_{{name}}.png'))
plt.close()
Expand All @@ -540,16 +552,15 @@ def run(self, aman, proc_aman=None, select=True, sim=False, update_plot=False):
success = process.name
break

# copy updated frequency cutoffs to full_aman
if "frequency_cutoffs" in full_aman:
full_aman.move("frequency_cutoffs", None)
full_aman.wrap("frequency_cutoffs", proc_aman["frequency_cutoffs"])
if run_calc:
_wrap_valid_ranges(proc_aman, full, valid_name='valid_data',
_wrap_valid_ranges(proc_aman, full_aman, valid_name='valid_data',
wrap_name='valid_data')

# copy updated frequency cutoffs to full
if "frequency_cutoffs" in full:
full.move("frequency_cutoffs", None)
full.wrap("frequency_cutoffs", proc_aman["frequency_cutoffs"])

return full, success
return full_aman, success


class _FracFlaggedMixIn(object):
Expand Down
Loading