Skip to content
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 57 additions & 21 deletions sotodlib/site_pipeline/preprocess_tod.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import os
from pdb import run
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think you're using this anywhere, please remove

import yaml
import time
import logging
Expand Down Expand Up @@ -371,34 +372,69 @@ def _main(executor: Union["MPICommExecutor", "ProcessPoolExecutor"],
n_fail = 0

# Run write_block obs-ids in parallel at once then write all to the sqlite db.
futures = [executor.submit(preprocess_tod, obs_id=r[0]['obs_id'],
group_list=r[1], verbosity=verbosity,
configs=configs,
overwrite=overwrite, run_parallel=True) for r in run_list]
for future in as_completed_callable(futures):
logger.info('New future as_completed result')
try:
err, db_datasets = future.result()
futures = []
for r in run_list:
fut = executor.submit(
preprocess_tod,
obs_id=r[0]["obs_id"],
group_list=r[1],
verbosity=verbosity,
configs=configs,
overwrite=overwrite,
run_parallel=True,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This set of kwargs is repeated 4 times in this edit. Instead, store it as a dict, for re-use.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree the repetition is a problem, but I also think that adding a dict also increases the complexity, and keeping the karts of the future with the future is better. I will try to find a way to reduce the complexity, otherwise I'll introduce a dict.

)
fut.kargs = {"obs_id": r[0]["obs_id"],
"group_list": r[1],
"verbosity": verbosity,
"configs": configs,
"overwrite": overwrite,
"run_parallel": True,
"try": 1}
futures.append(fut)

while futures:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok so now futures are checked in a loop for completion and errors. If a future raised OSError, it is retried up to 3 times before logging as failed. I think this is great!

One comment: lines 395-398 - I think this repeatedly checks if a task (future) has completed by polling its status in a loop without e.g. any delay. I wonder if there's a more efficient way in terms of CPU usage?

Would it be possible to use something like as_completed to process futures as soon as they finish? Something like:

from concurrent.futures import as_completed

# Submit all tasks and attach kargs for retry tracking.
future_to_kargs = {}
for r in run_list:
    fut = executor.submit(
        preprocess_tod,
        obs_id=r[0]["obs_id"],
        group_list=r[1],
        verbosity=verbosity,
        configs=configs,
        overwrite=overwrite,
        run_parallel=True,
    )
    future_to_kargs[fut] = {
        "obs_id": r[0]["obs_id"],
        "group_list": r[1],
        "verbosity": verbosity,
        "configs": configs,
        "overwrite": overwrite,
        "run_parallel": True,
        "try": 1,
    }

n_fail = 0
pending = list(future_to_kargs.keys())
while pending:
    for fut in as_completed(pending):
        kargs = future_to_kargs[fut]
        try:
            err, db_datasets = fut.result()
            if err is not None:
                n_fail += 1
            # ... handle db_datasets and cleanup as before ...
        except OSError as e:
            if kargs["try"] < 3:
                logger.info(f"Retrying {kargs['obs_id']} due to OSError (attempt {kargs['try']+1})")
                new_fut = executor.submit(
                    preprocess_tod,
                    obs_id=kargs["obs_id"],
                    group_list=kargs["group_list"],
                    verbosity=kargs["verbosity"],
                    configs=kargs["configs"],
                    overwrite=kargs["overwrite"],
                    run_parallel=kargs["run_parallel"],
                )
                kargs["try"] += 1
                future_to_kargs[new_fut] = kargs
                pending.append(new_fut)
            else:
                logger.info(f"Failed after 3 attempts: {e}")
                n_fail += 1
        except Exception as e:
            logger.info(f"Unhandled error: {e}")
            n_fail += 1
        pending.remove(fut)

if raise_error and n_fail > 0:
    raise RuntimeError(f"preprocess_tod: {n_fail}/{len(run_list)} obs_ids failed")

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do not need a new dictionary for the future's arguments. future provides the ability to extend it and you can also probe it from the actual object. I do think that creating a dictionary increases the complexity of the code.

As far as the two loops there is no reason to have both, since the inner loop does what the outer loop was supposed to do. In addition, changing the iterator of a for loop during runtime can lead to funny behavior, as for loops are expected to run for a specific predefined number of iterations. I think the while loop is a bit cleaner as it is based on the state of the futures list.

as_completed is doing something very similar to future_to_check.done(). It adds a callback function that gets triggered when a future ends. I can see if I can make this loop to not do a busy wait, but in the end we are losing a single core that either way is not used when the MPIPool executor is running.

future_to_check = futures.pop(0)
if future_to_check.done():
err, db_datasets = future_to_check.result()
if err is not None:
n_fail += 1
except Exception as e:
errmsg = f'{type(e)}: {e}'
tb = ''.join(traceback.format_tb(e.__traceback__))
if db_datasets:
if err is None:
logger.info(f'Processing future result db_dataset: {db_datasets}')
for db_dataset in db_datasets:
pp_util.cleanup_mandb(err, db_dataset, configs, logger)
else:
pp_util.cleanup_mandb(err, db_datasets, configs, logger)
elif isinstance(future_to_check.exception(), OSError) and future_to_check.kargs["try"] <= 3:
logger.info(f"Future raised an OSError: {future_to_check.exception()}, resubmitting")
new_future = executor.submit(
preprocess_tod,
obs_id=future_to_check.kargs["obs_id"],
group_list=future_to_check.kargs["group_list"],
verbosity=future_to_check.kargs["verbosity"],
configs=future_to_check.kargs["configs"],
overwrite=future_to_check.kargs["overwrite"],
run_parallel=future_to_check.kargs["run_parallel"],
)
new_future.kargs = {"obs_id": future_to_check.kargs["obs_id"],
"group_list": future_to_check.kargs["group_list"],
"verbosity": future_to_check.kargs["verbosity"],
"configs": future_to_check.kargs["configs"],
"overwrite": future_to_check.kargs["overwrite"],
"run_parallel": future_to_check.kargs["run_parallel"],
"try": future_to_check.kargs["try"] + 1}
futures.append(new_future)
elif isinstance(future_to_check.exception(), OSError) and future_to_check.kargs["try"] > 3:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now we retry tasks up to 3 times if an OSError occurs. However, if the underlying issue persists during these attempts, the task will still fail. The choice of 3 retries is arbitrary, so I wonder if it would make sense to e.g. increase wait time between retries, or allowing retries until a total timeout duration is reached?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@iparask the retries go to the back of the queue? If that's the case I think it is fine to keep 3 retries as the limit

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@chervias yes it does go to the end of the queue, and as a result some time will pass before it gets scheduled again.

@susannaaz, 3 is definitely arbitrary and we can set it as a parameter if you prefer. For some weird reason though 3 is the default of any system that does retries. I do not want to introduce a sleep there to wait to retry. What I can do is to create our own resubmit function that introduces a delay that is increased exponentially.

That would also be a good solution if introduced the retry in the level of `get_meta. However, that would require that the whole preprocess function is written as a series of smaller functions that are executed as a chain of futures, which requires a lot of work and a completely different approach from the one currently used.

logger.info(f"Future failed after 3 attempts: {future_to_check.exception()}")
errmsg = f'{type(future_to_check.exception())}: {future_to_check.exception()}'
tb = ''.join(traceback.format_tb(future_to_check.exception().__traceback__))
logger.info(f"ERROR: future.result()\n{errmsg}\n{tb}")
f = open(errlog, 'a')
f.write(f'\n{time.time()}, future.result() error\n{errmsg}\n{tb}\n')
f.close()
n_fail+=1
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like you haven't handled the case where isinstance(ex, ValueError) and total_tries <= 3. Seems like that should be tested explicitly, and the rest handled with "else".

Seems that the logic is complex enough that this nshould have a test.

I would also like to see an explanation for why OSError and ValueError are the two things being tracked specially here. What does each kind of error mean, in this context?

continue
futures.remove(future)

if db_datasets:
if err is None:
logger.info(f'Processing future result db_dataset: {db_datasets}')
for db_dataset in db_datasets:
pp_util.cleanup_mandb(err, db_dataset, configs, logger)
else:
pp_util.cleanup_mandb(err, db_datasets, configs, logger)
else:
futures.append(future_to_check)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do I read this right that if the job fails with any exception other than OSError, it will get rescheduled infinitely? If so that's probably not what we want!

Copy link
Member Author

@iparask iparask Sep 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

futures list hold the already submitted jobs. If you see like 396 does a

future_to_check = futures.pop(0)

and this just puts back in the list for checking later. There is though a different bug, as this if does not handle any other exception as fatal. I will add that.


if raise_error and n_fail > 0:
raise RuntimeError(f"preprocess_tod: {n_fail}/{len(run_list)} obs_ids failed")
Expand Down