Skip to content
Open
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 54 additions & 21 deletions sotodlib/site_pipeline/preprocess_tod.py
Original file line number Diff line number Diff line change
Expand Up @@ -371,34 +371,67 @@ def _main(executor: Union["MPICommExecutor", "ProcessPoolExecutor"],
n_fail = 0

# Run write_block obs-ids in parallel at once then write all to the sqlite db.
futures = [executor.submit(preprocess_tod, obs_id=r[0]['obs_id'],
group_list=r[1], verbosity=verbosity,
configs=configs,
overwrite=overwrite, run_parallel=True) for r in run_list]
for future in as_completed_callable(futures):
logger.info('New future as_completed result')
try:
err, db_datasets = future.result()
futures = []
futures_kargs = {}
for r in run_list:
futures_kargs[r[0]["obs_id"]] = {
"obs_id": r[0]["obs_id"],
"group_list": r[1],
"verbosity": verbosity,
"configs": configs,
"overwrite": overwrite,
"run_parallel": True,
"try": 1,
}
kwargs = {k: v for k, v in futures_kargs[r[0]["obs_id"]].items() if k != "try"}
fut = executor.submit(
preprocess_tod,
**kwargs
)
fut.obs_id = r[0]["obs_id"]
futures.append(fut)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm introducing this dictionary that will hold the kargs of a future. I want to avoid using the object as the key because it will create an ever increasing dictionary with worst case scenario nobs*retries.

Also there is no way to grab the description of a future through the object, and that is why the object is extended with the obs_id.


while futures:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok so now futures are checked in a loop for completion and errors. If a future raised OSError, it is retried up to 3 times before logging as failed. I think this is great!

One comment: lines 395-398 - I think this repeatedly checks if a task (future) has completed by polling its status in a loop without e.g. any delay. I wonder if there's a more efficient way in terms of CPU usage?

Would it be possible to use something like as_completed to process futures as soon as they finish? Something like:

from concurrent.futures import as_completed

# Submit all tasks and attach kargs for retry tracking.
future_to_kargs = {}
for r in run_list:
    fut = executor.submit(
        preprocess_tod,
        obs_id=r[0]["obs_id"],
        group_list=r[1],
        verbosity=verbosity,
        configs=configs,
        overwrite=overwrite,
        run_parallel=True,
    )
    future_to_kargs[fut] = {
        "obs_id": r[0]["obs_id"],
        "group_list": r[1],
        "verbosity": verbosity,
        "configs": configs,
        "overwrite": overwrite,
        "run_parallel": True,
        "try": 1,
    }

n_fail = 0
pending = list(future_to_kargs.keys())
while pending:
    for fut in as_completed(pending):
        kargs = future_to_kargs[fut]
        try:
            err, db_datasets = fut.result()
            if err is not None:
                n_fail += 1
            # ... handle db_datasets and cleanup as before ...
        except OSError as e:
            if kargs["try"] < 3:
                logger.info(f"Retrying {kargs['obs_id']} due to OSError (attempt {kargs['try']+1})")
                new_fut = executor.submit(
                    preprocess_tod,
                    obs_id=kargs["obs_id"],
                    group_list=kargs["group_list"],
                    verbosity=kargs["verbosity"],
                    configs=kargs["configs"],
                    overwrite=kargs["overwrite"],
                    run_parallel=kargs["run_parallel"],
                )
                kargs["try"] += 1
                future_to_kargs[new_fut] = kargs
                pending.append(new_fut)
            else:
                logger.info(f"Failed after 3 attempts: {e}")
                n_fail += 1
        except Exception as e:
            logger.info(f"Unhandled error: {e}")
            n_fail += 1
        pending.remove(fut)

if raise_error and n_fail > 0:
    raise RuntimeError(f"preprocess_tod: {n_fail}/{len(run_list)} obs_ids failed")

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do not need a new dictionary for the future's arguments. future provides the ability to extend it and you can also probe it from the actual object. I do think that creating a dictionary increases the complexity of the code.

As far as the two loops there is no reason to have both, since the inner loop does what the outer loop was supposed to do. In addition, changing the iterator of a for loop during runtime can lead to funny behavior, as for loops are expected to run for a specific predefined number of iterations. I think the while loop is a bit cleaner as it is based on the state of the futures list.

as_completed is doing something very similar to future_to_check.done(). It adds a callback function that gets triggered when a future ends. I can see if I can make this loop to not do a busy wait, but in the end we are losing a single core that either way is not used when the MPIPool executor is running.

future_to_check = futures.pop(0)
kwargs = {k: v for k, v in futures_kargs[future_to_check.obs_id].items() if k != "try"}
if future_to_check.done() and future_to_check.exception() is None:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The if is being extended a bit. First all cases. check if the future has finished running, and then it check the status of the exception. If everything went well the same as before, otherwise checks the type of exception. If it is an OSError it resubmits (executor.submit) it. If the OSError appeared more times than the limit it stops. If the error is not an OSError it stops it. When none of this is true it just puts it at the end of the list to check.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm sorry but fault tolerance is a bit ugly. We may want to create an executor class, that extends the other two and includes fault tolerance.

err, db_datasets = future_to_check.result()
if err is not None:
n_fail += 1
except Exception as e:
errmsg = f'{type(e)}: {e}'
tb = ''.join(traceback.format_tb(e.__traceback__))
if db_datasets:
if err is None:
logger.info(f'Processing future result db_dataset: {db_datasets}')
for db_dataset in db_datasets:
pp_util.cleanup_mandb(err, db_dataset, configs, logger)
else:
pp_util.cleanup_mandb(err, db_datasets, configs, logger)
elif future_to_check.done() and (
isinstance(future_to_check.exception(), OSError)
and future_to_check.kargs["try"] <= 3
):
logger.info(f"Future raised an OSError: {future_to_check.exception()}, resubmitting")

new_future = executor.submit(
preprocess_tod,
**kwargs
)
new_future.obs_id = future_to_check.obs_id
futures_kargs[new_future.obs_id]["try"] += 1
futures.append(new_future)
elif future_to_check.done() and ((
isinstance(future_to_check.exception(), ValueError)
and future_to_check.kargs["try"] > 3
) or not isinstance(future_to_check.exception(), ValueError)):
logger.info(f"Future failed after 3 attempts: {future_to_check.exception()}")
errmsg = f'{type(future_to_check.exception())}: {future_to_check.exception()}'
tb = ''.join(traceback.format_tb(future_to_check.exception().__traceback__))
logger.info(f"ERROR: future.result()\n{errmsg}\n{tb}")
f = open(errlog, 'a')
f.write(f'\n{time.time()}, future.result() error\n{errmsg}\n{tb}\n')
f.close()
n_fail+=1
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like you haven't handled the case where isinstance(ex, ValueError) and total_tries <= 3. Seems like that should be tested explicitly, and the rest handled with "else".

Seems that the logic is complex enough that this nshould have a test.

I would also like to see an explanation for why OSError and ValueError are the two things being tracked specially here. What does each kind of error mean, in this context?

continue
futures.remove(future)

if db_datasets:
if err is None:
logger.info(f'Processing future result db_dataset: {db_datasets}')
for db_dataset in db_datasets:
pp_util.cleanup_mandb(err, db_dataset, configs, logger)
else:
pp_util.cleanup_mandb(err, db_datasets, configs, logger)
else:
futures.append(future_to_check)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do I read this right that if the job fails with any exception other than OSError, it will get rescheduled infinitely? If so that's probably not what we want!

Copy link
Member Author

@iparask iparask Sep 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

futures list hold the already submitted jobs. If you see like 396 does a

future_to_check = futures.pop(0)

and this just puts back in the list for checking later. There is though a different bug, as this if does not handle any other exception as fatal. I will add that.


if raise_error and n_fail > 0:
raise RuntimeError(f"preprocess_tod: {n_fail}/{len(run_list)} obs_ids failed")
Expand Down