Skip to content

Commit 5bfc4d7

Browse files
committed
Merge branch 'gp/preproc_fault_toler' into c1f_and_common_mode
2 parents 9e97eb9 + 9812136 commit 5bfc4d7

File tree

1 file changed

+54
-21
lines changed

1 file changed

+54
-21
lines changed

sotodlib/site_pipeline/preprocess_tod.py

Lines changed: 54 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -376,34 +376,67 @@ def _main(executor: Union["MPICommExecutor", "ProcessPoolExecutor"],
376376
n_fail = 0
377377

378378
# Run write_block obs-ids in parallel at once then write all to the sqlite db.
379-
futures = [executor.submit(preprocess_tod, obs_id=r[0]['obs_id'],
380-
group_list=r[1], verbosity=verbosity,
381-
configs=configs,
382-
overwrite=overwrite, run_parallel=True) for r in run_list]
383-
for future in as_completed_callable(futures):
384-
logger.info('New future as_completed result')
385-
try:
386-
err, db_datasets = future.result()
379+
futures = []
380+
futures_kargs = {}
381+
for r in run_list:
382+
futures_kargs[r[0]["obs_id"]] = {
383+
"obs_id": r[0]["obs_id"],
384+
"group_list": r[1],
385+
"verbosity": verbosity,
386+
"configs": configs,
387+
"overwrite": overwrite,
388+
"run_parallel": True,
389+
"try": 1,
390+
}
391+
kwargs = {k: v for k, v in futures_kargs[r[0]["obs_id"]].items() if k != "try"}
392+
fut = executor.submit(
393+
preprocess_tod,
394+
**kwargs
395+
)
396+
fut.obs_id = r[0]["obs_id"]
397+
futures.append(fut)
398+
399+
while futures:
400+
future_to_check = futures.pop(0)
401+
kwargs = {k: v for k, v in futures_kargs[future_to_check.obs_id].items() if k != "try"}
402+
if future_to_check.done() and future_to_check.exception() is None:
403+
err, db_datasets = future_to_check.result()
387404
if err is not None:
388405
n_fail += 1
389-
except Exception as e:
390-
errmsg = f'{type(e)}: {e}'
391-
tb = ''.join(traceback.format_tb(e.__traceback__))
406+
if db_datasets:
407+
if err is None:
408+
logger.info(f'Processing future result db_dataset: {db_datasets}')
409+
for db_dataset in db_datasets:
410+
pp_util.cleanup_mandb(err, db_dataset, configs, logger)
411+
else:
412+
pp_util.cleanup_mandb(err, db_datasets, configs, logger)
413+
elif future_to_check.done() and (
414+
isinstance(future_to_check.exception(), OSError)
415+
and future_to_check.kargs["try"] <= 3
416+
):
417+
logger.info(f"Future raised an OSError: {future_to_check.exception()}, resubmitting")
418+
419+
new_future = executor.submit(
420+
preprocess_tod,
421+
**kwargs
422+
)
423+
new_future.obs_id = future_to_check.obs_id
424+
futures_kargs[new_future.obs_id]["try"] += 1
425+
futures.append(new_future)
426+
elif future_to_check.done() and ((
427+
isinstance(future_to_check.exception(), ValueError)
428+
and future_to_check.kargs["try"] > 3
429+
) or not isinstance(future_to_check.exception(), ValueError)):
430+
logger.info(f"Future failed after 3 attempts: {future_to_check.exception()}")
431+
errmsg = f'{type(future_to_check.exception())}: {future_to_check.exception()}'
432+
tb = ''.join(traceback.format_tb(future_to_check.exception().__traceback__))
392433
logger.info(f"ERROR: future.result()\n{errmsg}\n{tb}")
393434
f = open(errlog, 'a')
394435
f.write(f'\n{time.time()}, future.result() error\n{errmsg}\n{tb}\n')
395436
f.close()
396437
n_fail+=1
397-
continue
398-
futures.remove(future)
399-
400-
if db_datasets:
401-
if err is None:
402-
logger.info(f'Processing future result db_dataset: {db_datasets}')
403-
for db_dataset in db_datasets:
404-
pp_util.cleanup_mandb(err, db_dataset, configs, logger)
405-
else:
406-
pp_util.cleanup_mandb(err, db_datasets, configs, logger)
438+
else:
439+
futures.append(future_to_check)
407440

408441
if raise_error and n_fail > 0:
409442
raise RuntimeError(f"preprocess_tod: {n_fail}/{len(run_list)} obs_ids failed")

0 commit comments

Comments
 (0)