@@ -28,9 +28,15 @@ def start_dashboard(dashboard_path: str, db_path: str):
2828def run_cmor (variable , config , db_path ):
2929 from pathlib import Path
3030
31+ # Start a Dask cluster for this worker
32+ import dask .distributed as dask
33+
3134 from access_mopper import ACCESS_ESM_CMORiser
3235 from access_mopper .tracking import TaskTracker
3336
37+ client = dask .Client (threads_per_worker = 1 )
38+ print (f"Dask dashboard for { variable } : { client .dashboard_link } " )
39+
3440 exp = config ["experiment_id" ]
3541 tracker = TaskTracker (Path (db_path ))
3642 tracker .add_task (variable , exp )
@@ -53,9 +59,11 @@ def run_cmor(variable, config, db_path):
5359 )
5460 cmoriser .run ()
5561 tracker .mark_done (variable , exp )
62+ client .close () # Clean up the Dask client
5663 return f"Completed: { variable } "
5764 except Exception as e :
5865 tracker .mark_failed (variable , exp , str (e ))
66+ client .close ()
5967 raise
6068
6169
@@ -96,7 +104,6 @@ def main():
96104 HighThroughputExecutor (
97105 label = "htex_pbs" ,
98106 address = address_by_hostname (),
99- max_workers = 1 ,
100107 provider = SmartPBSProvider (
101108 queue = queue ,
102109 scheduler_options = scheduler_options ,
0 commit comments