Skip to content

Commit b4e2c43

Browse files
committed
Remove Parsl
1 parent 56962c2 commit b4e2c43

File tree

3 files changed

+233
-111
lines changed

3 files changed

+233
-111
lines changed

src/access_mopper/batch_cmoriser.py

Lines changed: 110 additions & 90 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,13 @@
11
import os
22
import subprocess
33
import sys
4+
import time
45
from importlib.resources import files
56
from pathlib import Path
67

7-
import parsl
88
import yaml
9-
from parsl import Config, HighThroughputExecutor, python_app
10-
from parsl.addresses import address_by_hostname
9+
from jinja2 import Template
1110

12-
from access_mopper.executors.pbs_scheduler import SmartPBSProvider
1311
from access_mopper.tracking import TaskTracker
1412

1513

@@ -24,58 +22,85 @@ def start_dashboard(dashboard_path: str, db_path: str):
2422
)
2523

2624

27-
@python_app
28-
def run_cmor(variable, config, db_path):
29-
import glob
30-
from pathlib import Path
25+
def create_job_script(variable, config, db_path, script_dir):
26+
"""Create a PBS job script for processing a single variable using Jinja2 template."""
3127

32-
from access_mopper import ACCESS_ESM_CMORiser
33-
from access_mopper.tracking import TaskTracker
28+
# Load the template from the templates directory
29+
template_path = files("access_mopper.templates").joinpath("cmor_job_script.j2")
3430

35-
input_folder = config["input_folder"]
36-
pattern = config.get("file_patterns", {}).get(variable)
37-
full_pattern = str(input_folder + pattern)
38-
input_files = glob.glob(full_pattern)
39-
if not input_files:
40-
raise ValueError(f"No files found for pattern {pattern}")
31+
with template_path.open() as f:
32+
template_content = f.read()
4133

34+
job_template = Template(template_content)
35+
36+
# Get the package path for sys.path.insert
37+
package_path = Path(__file__).parent.parent
38+
39+
script_content = job_template.render(
40+
variable=variable,
41+
config=config,
42+
db_path=db_path,
43+
script_dir=script_dir,
44+
package_path=package_path,
45+
)
46+
47+
script_path = script_dir / f"cmor_{variable}.sh"
48+
with open(script_path, "w") as f:
49+
f.write(script_content)
50+
51+
os.chmod(script_path, 0o755)
52+
return script_path
53+
54+
55+
def submit_job(script_path):
56+
"""Submit a PBS job and return the job ID."""
4257
try:
43-
exp = config["experiment_id"]
44-
tracker = TaskTracker(Path(db_path))
45-
tracker.add_task(variable, exp)
46-
47-
if tracker.is_done(variable, exp):
48-
return f"Skipped: {variable} (already done)"
49-
50-
tracker.mark_running(variable, exp)
51-
52-
# Create CMORiser without Dask client
53-
cmoriser = ACCESS_ESM_CMORiser(
54-
input_paths=input_files,
55-
compound_name=variable,
56-
experiment_id=config["experiment_id"],
57-
source_id=config["source_id"],
58-
variant_label=config["variant_label"],
59-
grid_label=config["grid_label"],
60-
activity_id=config.get("activity_id"),
61-
output_path=config["output_folder"],
62-
drs_root=config.get("drs_root"),
58+
result = subprocess.run(
59+
["qsub", str(script_path)], capture_output=True, text=True, check=True
6360
)
64-
cmoriser.run()
65-
tracker.mark_done(variable, exp)
61+
job_id = result.stdout.strip()
62+
return job_id
63+
except subprocess.CalledProcessError as e:
64+
print(f"Failed to submit job {script_path}: {e}")
65+
return None
66+
67+
68+
def wait_for_jobs(job_ids, poll_interval=30):
69+
"""Wait for all jobs to complete and report status."""
70+
print(f"Waiting for {len(job_ids)} jobs to complete...")
71+
72+
while job_ids:
73+
time.sleep(poll_interval)
6674

67-
return f"Completed: {variable}"
68-
except Exception as e:
69-
# Mark as failed
75+
# Check job status
7076
try:
71-
exp = config["experiment_id"]
72-
tracker = TaskTracker(Path(db_path))
73-
tracker.mark_failed(variable, exp, str(e))
74-
except Exception:
75-
pass # Don't let tracker errors mask the original error
77+
result = subprocess.run(
78+
["qstat", "-x"] + job_ids,
79+
capture_output=True,
80+
text=True,
81+
check=False, # qstat returns non-zero when jobs complete
82+
)
83+
84+
# Parse qstat output to see which jobs are still running
85+
still_running = []
86+
for line in result.stdout.split("\n"):
87+
for job_id in job_ids:
88+
if job_id in line and any(
89+
status in line for status in ["Q", "R", "H"]
90+
):
91+
still_running.append(job_id)
92+
break
93+
94+
completed = [job_id for job_id in job_ids if job_id not in still_running]
95+
if completed:
96+
print(f"Completed jobs: {completed}")
97+
job_ids = still_running
7698

77-
# Re-raise with just the error message to avoid serialization issues
78-
raise RuntimeError(f"Failed processing {variable}: {str(e)}")
99+
except subprocess.CalledProcessError:
100+
# If qstat fails, assume all jobs are done
101+
break
102+
103+
print("All jobs completed!")
79104

80105

81106
def main():
@@ -98,48 +123,43 @@ def main():
98123
DASHBOARD_SCRIPT = files("access_mopper.dashboard").joinpath("cmor_dashboard.py")
99124
start_dashboard(str(DASHBOARD_SCRIPT), str(DB_PATH))
100125

101-
# Read resource settings from config_data, with defaults
102-
cpus_per_node = config_data.get("cpus_per_node", 4)
103-
mem = config_data.get("mem", "16GB")
104-
walltime = config_data.get("walltime", "01:00:00")
105-
storage = config_data.get("storage", None)
106-
nodes_per_block = config_data.get("nodes_per_block", 1)
107-
init_blocks = config_data.get("init_blocks", 1)
108-
max_blocks = config_data.get("max_blocks", 10)
109-
queue = config_data.get("queue", "normal")
110-
scheduler_options = config_data.get("scheduler_options", "#PBS -P your_project")
111-
worker_init = config_data.get("worker_init", "module load netcdf-python")
112-
113-
# Configure Parsl
114-
parsl_config = Config(
115-
executors=[
116-
HighThroughputExecutor(
117-
label="htex_pbs",
118-
address=address_by_hostname(),
119-
provider=SmartPBSProvider(
120-
queue=queue,
121-
scheduler_options=scheduler_options,
122-
worker_init=worker_init,
123-
nodes_per_block=nodes_per_block,
124-
cpus_per_node=cpus_per_node,
125-
mem=mem,
126-
storage=storage,
127-
walltime=walltime,
128-
init_blocks=init_blocks,
129-
max_blocks=max_blocks,
130-
),
131-
)
132-
],
133-
strategy="simple",
134-
)
135-
136-
parsl.load(parsl_config)
137-
138-
futures = [
139-
run_cmor(var, config_data, str(DB_PATH)) for var in config_data["variables"]
140-
]
141-
results = [f.result() for f in futures]
142-
print("\n".join(results))
126+
# Create directory for job scripts
127+
script_dir = Path("cmor_job_scripts")
128+
script_dir.mkdir(exist_ok=True)
129+
130+
# Create and submit job scripts for each variable
131+
job_ids = []
132+
variables = config_data["variables"]
133+
134+
print(f"Submitting {len(variables)} CMORisation jobs...")
135+
136+
for variable in variables:
137+
# Create job script
138+
script_path = create_job_script(variable, config_data, str(DB_PATH), script_dir)
139+
print(f"Created job script: {script_path}")
140+
141+
# Submit job
142+
job_id = submit_job(script_path)
143+
if job_id:
144+
job_ids.append(job_id)
145+
print(f"Submitted job {job_id} for variable {variable}")
146+
else:
147+
print(f"Failed to submit job for variable {variable}")
148+
149+
if job_ids:
150+
print(f"\nSubmitted {len(job_ids)} jobs successfully:")
151+
for i, (var, job_id) in enumerate(zip(variables[: len(job_ids)], job_ids)):
152+
print(f" {var}: {job_id}")
153+
154+
print(f"\nMonitor jobs with: qstat {' '.join(job_ids)}")
155+
print("Dashboard available at: http://localhost:8501")
156+
157+
# Optionally wait for all jobs to complete
158+
if config_data.get("wait_for_completion", False):
159+
wait_for_jobs(job_ids)
160+
else:
161+
print("No jobs were submitted successfully")
162+
sys.exit(1)
143163

144164

145165
if __name__ == "__main__":
Lines changed: 34 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,7 @@
1-
# === General input/output paths
2-
input_folder: /g/data/p73/archive/CMIP7/ACCESS-ESM1-6/spinup/JuneSpinUp-JuneSpinUp-bfaa9c5b
3-
output_folder: /scratch/tm70/rb5533/mopper_output
4-
drs_root: /scratch/tm70/rb5533/mopper_output/CMIP
1+
# Example configuration for batch CMORisation using PBS
2+
# Each variable will be processed in a separate PBS job with its own Dask cluster
53

6-
# === CMIP6 metadata
7-
experiment_id: piControl
8-
source_id: ACCESS-ESM1-5
9-
variant_label: r1i1p1f1
10-
grid_label: gn
11-
activity_id: CMIP
12-
13-
# === List of variables to CMORise (table.variable)
4+
# Required: List of variables to process
145
variables:
156
- Amon.pr
167
- Omon.tos
@@ -23,6 +14,20 @@ variables:
2314
- Amon.rsds
2415
- Amon.rsus
2516

17+
# Required: CMIP6 metadata
18+
experiment_id: piControl
19+
source_id: ACCESS-ESM1-5
20+
variant_label: r1i1p1f1
21+
grid_label: gn
22+
activity_id: CMIP
23+
24+
# Required: Input and output paths
25+
input_folder: /g/data/p73/archive/CMIP7/ACCESS-ESM1-6/spinup/JuneSpinUp-JuneSpinUp-bfaa9c5b
26+
output_folder: /scratch/tm70/rb5533/mopper_output
27+
28+
29+
# Optional: File patterns for each variable
30+
# If not specified, will use default patterns
2631
file_patterns:
2732
Amon.pr: "/output[0-4][0-9][0-9]/atmosphere/netCDF/*mon.nc"
2833
Omon.tos: "/output[0-4][0-9][0-9]/ocean/ocean-2d-surface_temp-1monthly-mean*.nc"
@@ -35,14 +40,22 @@ file_patterns:
3540
Amon.rsds: "/output[0-4][0-9][0-9]/atmosphere/netCDF/*mon.nc"
3641
Amon.rsus: "/output[0-4][0-9][0-9][0-9]/atmosphere/netCDF/*mon.nc"
3742

38-
# === Job submission settings
43+
44+
# Optional: DRS root (if you want to organize output in CMIP6 DRS structure)
45+
# drs_root: /scratch/tm70/rb5533/mopper_output/CMIP
46+
47+
# PBS job configuration
48+
queue: "normal"
3949
cpus_per_node: 14
40-
mem: 32GB
41-
walltime: 01:00:00
42-
storage: gdata/p73+gdata/tm70+scratch/tm70
43-
nodes_per_block: 2
44-
init_blocks: 2
45-
max_blocks: 20
46-
queue: normal
50+
mem: "32GB"
51+
walltime: "02:00:00"
4752
scheduler_options: "#PBS -P tm70"
48-
worker_init: "source /g/data/tm70/rb5533/miniforge3/bin/activate && conda activate esmvaltool_dev"
53+
storage: "gdata/p73+gdata/tm70+scratch/tm70"
54+
55+
# Environment setup for each job
56+
worker_init: |
57+
source /g/data/tm70/rb5533/miniforge3/bin/activate
58+
conda activate esmvaltool_dev
59+
60+
# Optional: Wait for all jobs to complete before exiting
61+
wait_for_completion: false
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
#!/bin/bash
2+
#PBS -N cmor_{{ variable }}
3+
#PBS -q {{ config.get('queue', 'normal') }}
4+
#PBS -l ncpus={{ config.get('cpus_per_node', 4) }}
5+
#PBS -l mem={{ config.get('mem', '16GB') }}
6+
#PBS -l walltime={{ config.get('walltime', '01:00:00') }}
7+
#PBS -o {{ script_dir }}/cmor_{{ variable }}.out
8+
#PBS -e {{ script_dir }}/cmor_{{ variable }}.err
9+
{{ config.get('scheduler_options', '#PBS -P your_project') }}
10+
{% if config.get('storage') %}#PBS -l storage={{ config.get('storage') }}{% endif %}
11+
12+
# Initialize environment
13+
{{ config.get('worker_init', 'module load netcdf-python') }}
14+
15+
# Set environment variables for this job
16+
export CMOR_TRACKER_DB="{{ db_path }}"
17+
export VARIABLE="{{ variable }}"
18+
19+
# Run the CMORisation for this variable
20+
python -c "
21+
import os
22+
import glob
23+
import sys
24+
from pathlib import Path
25+
26+
# Add any necessary paths
27+
sys.path.insert(0, '{{ package_path }}')
28+
29+
from access_mopper import ACCESS_ESM_CMORiser
30+
from access_mopper.tracking import TaskTracker
31+
32+
# Configuration
33+
config = {{ config|tojson }}
34+
variable = os.environ['VARIABLE']
35+
db_path = os.environ['CMOR_TRACKER_DB']
36+
37+
# Find input files
38+
input_folder = config['input_folder']
39+
pattern = config.get('file_patterns', {}).get(variable)
40+
if not pattern:
41+
raise ValueError(f'No pattern found for variable {variable}')
42+
43+
full_pattern = str(Path(input_folder) / pattern)
44+
input_files = glob.glob(full_pattern)
45+
if not input_files:
46+
raise ValueError(f'No files found for pattern {pattern}')
47+
48+
print(f'Processing {variable} with {len(input_files)} files')
49+
50+
try:
51+
# Initialize tracking
52+
exp = config['experiment_id']
53+
tracker = TaskTracker(Path(db_path))
54+
tracker.add_task(variable, exp)
55+
56+
if tracker.is_done(variable, exp):
57+
print(f'Skipped: {variable} (already done)')
58+
sys.exit(0)
59+
60+
tracker.mark_running(variable, exp)
61+
62+
# Create CMORiser with Dask parallelization
63+
cmoriser = ACCESS_ESM_CMORiser(
64+
input_paths=input_files,
65+
compound_name=variable,
66+
experiment_id=config['experiment_id'],
67+
source_id=config['source_id'],
68+
variant_label=config['variant_label'],
69+
grid_label=config['grid_label'],
70+
activity_id=config.get('activity_id'),
71+
output_path=config['output_folder'],
72+
drs_root=config.get('drs_root'),
73+
)
74+
75+
# Run the CMORisation
76+
cmoriser.run()
77+
tracker.mark_done(variable, exp)
78+
79+
print(f'Completed: {variable}')
80+
81+
except Exception as e:
82+
print(f'Error processing {variable}: {e}', file=sys.stderr)
83+
try:
84+
exp = config['experiment_id']
85+
tracker = TaskTracker(Path(db_path))
86+
tracker.mark_failed(variable, exp, str(e))
87+
except:
88+
pass
89+
sys.exit(1)

0 commit comments

Comments
 (0)