Skip to content

Commit 87787c6

Browse files
committed
Improve tracking for scaling beyond 50 jobs
1 parent 5862b04 commit 87787c6

File tree

2 files changed

+21
-1
lines changed

2 files changed

+21
-1
lines changed

src/access_mopper/templates/cmor_python_script.j2

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,9 @@ def main():
5353
print(f'Processing {variable} with {len(input_files)} files')
5454

5555
# Initialize tracker
56-
tracker = TaskTracker(Path(db_path))
56+
db_name = f"cmor_tasks_{variable.replace('.', '_')}.db"
57+
db_path = Path(os.environ['CMOR_TRACKER_DB']).parent / db_name
58+
tracker = TaskTracker(db_path)
5759
tracker.add_task(variable, experiment_id)
5860

5961
if tracker.is_done(variable, experiment_id):

src/access_mopper/tracking.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
1+
import random
12
import sqlite3
3+
import time
24
from pathlib import Path
35
from typing import Optional
46

@@ -13,6 +15,9 @@ def __init__(self, db_path: Optional[Path] = None):
1315
self._init_db()
1416

1517
def _init_db(self):
18+
# Enable WAL mode for better concurrent access
19+
self.conn.execute("PRAGMA journal_mode=WAL")
20+
self.conn.execute("PRAGMA synchronous=NORMAL")
1621
with self.conn:
1722
self.conn.execute(
1823
"""
@@ -84,3 +89,16 @@ def is_done(self, variable: str, experiment: str) -> bool:
8489
)
8590
row = cur.fetchone()
8691
return row is not None and row[0] == "done"
92+
93+
def _execute_with_retry(self, query, params=(), max_retries=5):
94+
for attempt in range(max_retries):
95+
try:
96+
with self.conn:
97+
return self.conn.execute(query, params)
98+
except sqlite3.OperationalError as e:
99+
if "database is locked" in str(e) and attempt < max_retries - 1:
100+
# Exponential backoff with jitter
101+
delay = (2**attempt) + random.uniform(0, 1)
102+
time.sleep(delay)
103+
continue
104+
raise

0 commit comments

Comments
 (0)