Skip to content

Commit 9ac2a56

Browse files
committed
First pass implementation
1 parent 6f770ee commit 9ac2a56

File tree

7 files changed

+316
-2
lines changed

7 files changed

+316
-2
lines changed

pyproject.toml

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,17 +27,27 @@ dependencies = [
2727
"dask",
2828
"pyyaml",
2929
"tqdm",
30-
"requests"
30+
"requests",
31+
"parsl"
3132
]
3233
dynamic = ["version"]
3334

3435
[project.optional-dependencies]
36+
# pip install access_mopper[dashboard]
37+
dashboard = [
38+
"streamlit>=1.35.0"
39+
]
3540
test = [
3641
"pytest",
3742
"pytest-cov",
3843
"ruff"
3944
]
4045

46+
[project.scripts]
47+
mopper-cmorise = "access_mopper.batch_cmoriser:main"
48+
mopper-dashboard = "access_mopper.dashboard.cmor_dashboard:main"
49+
mopper-example-config = "access_mopper.examples.show_config:main"
50+
4151
[build-system]
4252
build-backend = "setuptools.build_meta"
4353
requires = [
@@ -53,7 +63,9 @@ include-package-data = true
5363

5464
[tool.setuptools.package-data]
5565
access_mopper = ["*.yml", "mappings/*.json",
56-
"vocabularies/**/*"]
66+
"vocabularies/**/*",
67+
"dashboard/*.py", "examples/*.yml"
68+
]
5769

5870
[tool.versioneer]
5971
VCS = "git"

src/access_mopper/batch_cmoriser

Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
import sys
2+
import yaml
3+
from pathlib import Path
4+
import subprocess
5+
import os
6+
7+
from access_mopper import ACCESS_ESM_CMORiser
8+
from access_mopper.tracking import TaskTracker
9+
from parsl import python_app, Config, HighThroughputExecutor
10+
from parsl.providers import PBSProProvider
11+
from parsl.addresses import address_by_hostname
12+
from importlib.resources import files
13+
import parsl
14+
15+
16+
def start_dashboard(dashboard_path: str, db_path: str):
17+
env = os.environ.copy()
18+
env["CMOR_TRACKER_DB"] = db_path
19+
subprocess.Popen(
20+
["streamlit", "run", dashboard_path],
21+
env=env,
22+
stdout=subprocess.DEVNULL,
23+
stderr=subprocess.DEVNULL,
24+
)
25+
26+
27+
@python_app
28+
def run_cmor(variable, config, db_path):
29+
from access_mopper import ACCESS_ESM_CMORiser
30+
from access_mopper.tracking import TaskTracker
31+
from pathlib import Path
32+
33+
exp = config["experiment_id"]
34+
tracker = TaskTracker(Path(db_path))
35+
tracker.add_task(variable, exp)
36+
37+
if tracker.is_done(variable, exp):
38+
return f"Skipped: {variable} (already done)"
39+
40+
try:
41+
tracker.mark_running(variable, exp)
42+
cmoriser = ACCESS_ESM_CMORiser(
43+
input_paths=Path(config["input_folder"]),
44+
compound_name=variable,
45+
experiment_id=config["experiment_id"],
46+
source_id=config["source_id"],
47+
variant_label=config["variant_label"],
48+
grid_label=config["grid_label"],
49+
activity_id=config.get("activity_id"),
50+
output_path=config["output_folder"],
51+
drs_root=config.get("drs_root"),
52+
)
53+
cmoriser.run()
54+
tracker.mark_done(variable, exp)
55+
return f"Completed: {variable}"
56+
except Exception as e:
57+
tracker.mark_failed(variable, exp, str(e))
58+
raise
59+
60+
61+
def main():
62+
if len(sys.argv) != 2:
63+
print("Usage: mopper-cmorise path/to/batch_config.yml")
64+
sys.exit(1)
65+
66+
config_path = Path(sys.argv[1])
67+
if not config_path.exists():
68+
print(f"Error: config file not found: {config_path}")
69+
sys.exit(1)
70+
71+
with config_path.open() as f:
72+
config_data = yaml.safe_load(f)
73+
74+
tracker = TaskTracker()
75+
DB_PATH = tracker.db_path
76+
77+
# Start Streamlit dashboard
78+
DASHBOARD_SCRIPT = files("access_mopper.dashboard").joinpath("cmor_dashboard.py")
79+
start_dashboard(str(DASHBOARD_SCRIPT), str(DB_PATH))
80+
81+
# Configure Parsl
82+
parsl_config = Config(
83+
executors=[
84+
HighThroughputExecutor(
85+
label="htex_pbs",
86+
address=address_by_hostname(),
87+
max_workers=1,
88+
provider=PBSProProvider(
89+
queue="normal",
90+
launcher=None,
91+
walltime="01:00:00",
92+
select_options="1:ncpus=4:mem=16GB",
93+
scheduler_options="#PBS -P your_project",
94+
worker_init="module load netcdf-python",
95+
nodes_per_block=1,
96+
init_blocks=1,
97+
max_blocks=10,
98+
),
99+
)
100+
],
101+
strategy="simple",
102+
)
103+
104+
parsl.load(parsl_config)
105+
106+
futures = [run_cmor(var, config_data, str(DB_PATH)) for var in config_data["variables"]]
107+
results = [f.result() for f in futures]
108+
print("\n".join(results))
109+
110+
111+
if __name__ == "__main__":
112+
main()

src/access_mopper/dashboard/__init__.py

Whitespace-only changes.
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
import os
2+
import sqlite3
3+
from pathlib import Path
4+
5+
import pandas as pd
6+
import streamlit as st
7+
8+
DB_PATH = Path(
9+
os.getenv("CMOR_TRACKER_DB", Path.home() / ".mopper" / "db" / "cmor_tasks.db")
10+
)
11+
12+
st.set_page_config(page_title="CMORisation Tracker", layout="wide")
13+
st.title("🧼 ACCESS CMORisation Dashboard")
14+
15+
16+
@st.cache_data(ttl=10)
17+
def load_data():
18+
conn = sqlite3.connect(DB_PATH)
19+
df = pd.read_sql_query("SELECT * FROM cmor_tasks", conn)
20+
conn.close()
21+
return df
22+
23+
24+
df = load_data()
25+
26+
# Sidebar filters
27+
with st.sidebar:
28+
st.header("Filters")
29+
statuses = df["status"].unique().tolist()
30+
selected_statuses = st.multiselect("Status", options=statuses, default=statuses)
31+
experiments = df["experiment"].unique().tolist()
32+
selected_experiments = st.multiselect(
33+
"Experiment", options=experiments, default=experiments
34+
)
35+
36+
# Apply filters
37+
filtered_df = df[
38+
df["status"].isin(selected_statuses) & df["experiment"].isin(selected_experiments)
39+
]
40+
41+
st.markdown(f"### Showing {len(filtered_df)} task(s)")
42+
st.dataframe(filtered_df, use_container_width=True)
43+
44+
# Summary stats
45+
st.markdown("### 📊 Summary")
46+
summary = df["status"].value_counts().rename_axis("status").reset_index(name="count")
47+
st.table(summary)
48+
49+
# Errors
50+
if "failed" in df["status"].values:
51+
st.markdown("### ❌ Failed Tasks")
52+
st.dataframe(
53+
df[df["status"] == "failed"][["variable", "experiment", "error_message"]],
54+
use_container_width=True,
55+
)
56+
57+
58+
def main():
59+
import os
60+
import subprocess
61+
from pathlib import Path
62+
63+
db_path = Path(
64+
os.getenv("CMOR_TRACKER_DB", Path.home() / ".mopper" / "db" / "cmor_tasks.db")
65+
)
66+
subprocess.run(
67+
["streamlit", "run", __file__],
68+
env={**os.environ, "CMOR_TRACKER_DB": str(db_path)},
69+
)
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
# === General input/output paths
2+
input_folder: /g/data/xyz/experiment/piControl
3+
output_folder: /g/data/xyz/cmip6_output/piControl
4+
drs_root: /g/data/xyz/cmip6_output
5+
6+
# === CMIP6 metadata
7+
experiment_id: piControl
8+
source_id: ACCESS-ESM1-6
9+
variant_label: r1i1p1f1
10+
grid_label: gn
11+
activity_id: CMIP
12+
13+
# === List of variables to CMORise (table.variable)
14+
variables:
15+
- Amon.tas
16+
- Amon.ps
17+
- Lmon.smc
18+
- Omon.thetao
19+
- Omon.so
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
import shutil
2+
import sys
3+
from importlib.resources import files
4+
from pathlib import Path
5+
6+
7+
def main():
8+
example_file = files("access_mopper.examples").joinpath("batch_config.yml")
9+
10+
if len(sys.argv) == 2:
11+
target_path = Path(sys.argv[1])
12+
shutil.copy(example_file, target_path)
13+
print(f"Example config copied to {target_path}")
14+
else:
15+
with example_file.open("r") as f:
16+
print(f.read())

src/access_mopper/tracking.py

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
import sqlite3
2+
from pathlib import Path
3+
from typing import Optional
4+
5+
6+
class TaskTracker:
7+
def __init__(self, db_path: Optional[Path] = None):
8+
if db_path is None:
9+
db_path = Path.home() / ".mopper" / "db" / "cmor_tasks.db"
10+
self.db_path = Path(db_path)
11+
self.db_path.parent.mkdir(parents=True, exist_ok=True)
12+
self.conn = sqlite3.connect(self.db_path)
13+
self._init_db()
14+
15+
def _init_db(self):
16+
with self.conn:
17+
self.conn.execute(
18+
"""
19+
CREATE TABLE IF NOT EXISTS cmor_tasks (
20+
id INTEGER PRIMARY KEY AUTOINCREMENT,
21+
variable TEXT NOT NULL,
22+
experiment TEXT NOT NULL,
23+
status TEXT CHECK(status IN ('pending', 'running', 'done', 'failed')) NOT NULL DEFAULT 'pending',
24+
start_time TEXT,
25+
end_time TEXT,
26+
error_message TEXT
27+
)
28+
"""
29+
)
30+
self.conn.execute(
31+
"CREATE UNIQUE INDEX IF NOT EXISTS idx_var_exp ON cmor_tasks(variable, experiment)"
32+
)
33+
34+
def add_task(self, variable: str, experiment: str):
35+
with self.conn:
36+
self.conn.execute(
37+
"""
38+
INSERT OR IGNORE INTO cmor_tasks (variable, experiment)
39+
VALUES (?, ?)
40+
""",
41+
(variable, experiment),
42+
)
43+
44+
def mark_running(self, variable: str, experiment: str):
45+
with self.conn:
46+
self.conn.execute(
47+
"""
48+
UPDATE cmor_tasks
49+
SET status='running', start_time=datetime('now')
50+
WHERE variable=? AND experiment=?
51+
""",
52+
(variable, experiment),
53+
)
54+
55+
def mark_done(self, variable: str, experiment: str):
56+
with self.conn:
57+
self.conn.execute(
58+
"""
59+
UPDATE cmor_tasks
60+
SET status='done', end_time=datetime('now'), error_message=NULL
61+
WHERE variable=? AND experiment=?
62+
""",
63+
(variable, experiment),
64+
)
65+
66+
def mark_failed(self, variable: str, experiment: str, error_message: str):
67+
with self.conn:
68+
self.conn.execute(
69+
"""
70+
UPDATE cmor_tasks
71+
SET status='failed', end_time=datetime('now'), error_message=?
72+
WHERE variable=? AND experiment=?
73+
""",
74+
(error_message, variable, experiment),
75+
)
76+
77+
def is_done(self, variable: str, experiment: str) -> bool:
78+
cur = self.conn.cursor()
79+
cur.execute(
80+
"""
81+
SELECT status FROM cmor_tasks WHERE variable=? AND experiment=?
82+
""",
83+
(variable, experiment),
84+
)
85+
row = cur.fetchone()
86+
return row is not None and row[0] == "done"

0 commit comments

Comments
 (0)