Skip to content

Commit 2d68551

Browse files
committed
init commit for OAR worker
1 parent a95e3f0 commit 2d68551

File tree

9 files changed

+654
-16
lines changed

9 files changed

+654
-16
lines changed

.github/workflows/ci-cd.yaml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ jobs:
4040
- name: Install task package
4141
run: |
4242
pip install ${{ matrix.pip-flags }} ".[dev]"
43-
python -c "import pydra.workers.CHANGEME as m; print(f'{m.__name__} {m.__version__} @ {m.__file__}')"
43+
python -c "import pydra.workers.oar as m; print(f'{m.__name__} {m.__version__} @ {m.__file__}')"
4444
python -c "import pydra as m; print(f'{m.__name__} {m.__version__} @ {m.__file__}')"
4545
4646
test:
@@ -61,12 +61,12 @@ jobs:
6161
- name: Install task package
6262
run: |
6363
pip install ".[test]"
64-
python -c "import pydra.workers.CHANGEME as m; print(f'{m.__name__} {m.__version__} @ {m.__file__}')"
64+
python -c "import pydra.workers.oar as m; print(f'{m.__name__} {m.__version__} @ {m.__file__}')"
6565
python -c "import pydra as m; print(f'{m.__name__} {m.__version__} @ {m.__file__}')"
6666
- name: Test with pytest
6767
run: |
68-
pytest -sv --doctest-modules pydra/workers/CHANGEME \
69-
--cov pydra.workers.CHANGEME --cov-report xml
68+
pytest -sv --doctest-modules pydra/workers/oar \
69+
--cov pydra.workers.oar --cov-report xml
7070
- uses: codecov/codecov-action@v3
7171
if: ${{ always() }}
7272

.gitignore

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,4 +141,4 @@ dmypy.json
141141
.DS_store
142142

143143
# Generated files
144-
/pydra/workers/CHANGEME/_version.py
144+
/pydra/workers/oar/_version.py

README.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,10 @@ workers, within the `pydra.workers.<yourworkerpackagename>` namespace.
88
1. Click on new repo.
99
1. Select this template from the repository template drop down list.
1010
1. Give your repo a name.
11-
1. Once the repo is created and cloned, search for CHANGEME (`grep -rn CHANGEME . `) and
11+
1. Once the repo is created and cloned, search for oar (`grep -rn oar . `) and
1212
replace with appropriate name.
13-
1. Rename the namespace package root directory to replace `CHANGEME` with the name of the package:
14-
* `pydra/workers/CHANGEME`
13+
1. Rename the namespace package root directory to replace `oar` with the name of the package:
14+
* `pydra/workers/oar`
1515
1. Add your new worker class to `pydra/workers/<package-name>/__init__.py`
1616
1. You may want to initialize a [Sphinx] docs directory.
1717
1. Review the workflow in `.github/workflows/pythonpackage.yml`. Testing editable installations

docs/conf.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
# -- Project information -----------------------------------------------------
1919

20-
project = "pydra-workers-CHANGEME"
20+
project = "pydra-workers-oar"
2121
copyright = "2020, Xihe Xie"
2222
author = "Xihe Xie"
2323

docs/index.rst

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
Welcome to pydra-workers-CHANGEME's documentation!
1+
Welcome to pydra-workers-oar's documentation!
22
==================================================
33

44
.. toctree::
Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,15 @@
33
imported.
44
55
>>> import pydra.engine
6-
>>> import pydra.workers.CHANGEME
6+
>>> import pydra.workers.oar
77
"""
88

99
try:
1010
from ._version import __version__
1111
except ImportError:
1212
raise RuntimeError(
13-
"Pydra package 'CHANGEME' has not been installed, please use "
13+
"Pydra package 'oar' has not been installed, please use "
1414
"`pip install -e <path-to-repo>` to install development version"
1515
)
16+
17+
from .oar import OarWorker

pydra/workers/oar/oar.py

Lines changed: 189 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,189 @@
1+
import asyncio
2+
import os
3+
import sys
4+
import json
5+
import re
6+
import typing as ty
7+
from tempfile import gettempdir
8+
from pathlib import Path
9+
from shutil import copyfile
10+
import logging
11+
import attrs
12+
from pydra.engine.job import Job, save
13+
from pydra.workers import base
14+
15+
16+
logger = logging.getLogger("pydra.worker")
17+
18+
if ty.TYPE_CHECKING:
19+
from pydra.engine.result import Result
20+
21+
22+
@attrs.define
23+
class OarWorker(base.Worker):
24+
"""A worker to execute tasks on OAR systems."""
25+
26+
_cmd = "oarsub"
27+
28+
poll_delay: int = attrs.field(default=1, converter=base.ensure_non_negative)
29+
oarsub_args: str = ""
30+
error: dict[str, ty.Any] = attrs.field(factory=dict)
31+
32+
def __getstate__(self) -> dict[str, ty.Any]:
33+
"""Return state for pickling."""
34+
state = super().__getstate__()
35+
del state["error"]
36+
return state
37+
38+
def __setstate__(self, state: dict[str, ty.Any]):
39+
"""Set state for unpickling."""
40+
state["error"] = {}
41+
super().__setstate__(state)
42+
43+
def _prepare_runscripts(self, job, interpreter="/bin/sh", rerun=False):
44+
if isinstance(job, Job):
45+
cache_root = job.cache_root
46+
ind = None
47+
uid = job.uid
48+
else:
49+
assert isinstance(job, tuple), f"Expecting a job or a tuple, not {job!r}"
50+
assert len(job) == 2, f"Expecting a tuple of length 2, not {job!r}"
51+
ind = job[0]
52+
cache_root = job[-1].cache_root
53+
uid = f"{job[-1].uid}_{ind}"
54+
55+
script_dir = cache_root / f"{self.plugin_name()}_scripts" / uid
56+
script_dir.mkdir(parents=True, exist_ok=True)
57+
if ind is None:
58+
if not (script_dir / "_job.pklz").exists():
59+
save(script_dir, job=job)
60+
else:
61+
copyfile(job[1], script_dir / "_job.pklz")
62+
63+
job_pkl = script_dir / "_job.pklz"
64+
if not job_pkl.exists() or not job_pkl.stat().st_size:
65+
raise Exception("Missing or empty job!")
66+
67+
batchscript = script_dir / f"batchscript_{uid}.sh"
68+
python_string = (
69+
f"""'from pydra.engine.job import load_and_run; """
70+
f"""load_and_run("{job_pkl}", rerun={rerun}) '"""
71+
)
72+
bcmd = "\n".join(
73+
(
74+
f"#!{interpreter}",
75+
f"{sys.executable} -c " + python_string,
76+
)
77+
)
78+
with batchscript.open("wt") as fp:
79+
fp.writelines(bcmd)
80+
os.chmod(batchscript, 0o544)
81+
return script_dir, batchscript
82+
83+
async def run(self, job: "Job[base.TaskType]", rerun: bool = False) -> "Result":
84+
"""Worker submission API."""
85+
script_dir, batch_script = self._prepare_runscripts(job, rerun=rerun)
86+
if (script_dir / script_dir.parts[1]) == gettempdir():
87+
logger.warning("Temporary directories may not be shared across computers")
88+
script_dir = job.cache_root / f"{self.plugin_name()}_scripts" / job.uid
89+
sargs = self.oarsub_args.split()
90+
jobname = re.search(r"(?<=-n )\S+|(?<=--name=)\S+", self.oarsub_args)
91+
if not jobname:
92+
jobname = ".".join((job.name, job.uid))
93+
sargs.append(f"--name={jobname}")
94+
output = re.search(r"(?<=-O )\S+|(?<=--stdout=)\S+", self.oarsub_args)
95+
if not output:
96+
output_file = str(script_dir / "oar-%jobid%.out")
97+
sargs.append(f"--stdout={output_file}")
98+
error = re.search(r"(?<=-E )\S+|(?<=--stderr=)\S+", self.oarsub_args)
99+
if not error:
100+
error_file = str(script_dir / "oar-%jobid%.err")
101+
sargs.append(f"--stderr={error_file}")
102+
else:
103+
error_file = None
104+
sargs.append(str(batch_script))
105+
# TO CONSIDER: add random sleep to avoid overloading calls
106+
rc, stdout, stderr = await base.read_and_display_async(
107+
self._cmd, *sargs, hide_display=True
108+
)
109+
jobid = re.search(r"OAR_JOB_ID=(\d+)", stdout)
110+
if rc:
111+
raise RuntimeError(f"Error returned from oarsub: {stderr}")
112+
elif not jobid:
113+
raise RuntimeError("Could not extract job ID")
114+
jobid = jobid.group(1)
115+
if error_file:
116+
error_file = error_file.replace("%jobid%", jobid)
117+
self.error[jobid] = error_file.replace("%jobid%", jobid)
118+
# intermittent polling
119+
while True:
120+
# 4 possibilities
121+
# False: job is still pending/working
122+
# Terminated: job is complete
123+
# Error + idempotent: job has been stopped and resubmited with another jobid
124+
# Error: Job failure
125+
done = await self._poll_job(jobid)
126+
if not done:
127+
await asyncio.sleep(self.poll_delay)
128+
elif done == "Terminated":
129+
return True
130+
elif done == "Error" and "idempotent" in self.oarsub_args:
131+
jobid = await self._handle_resubmission(jobid, job)
132+
continue
133+
else:
134+
error_file = self.error[jobid]
135+
if not Path(error_file).exists():
136+
logger.debug(
137+
f"No error file for job {jobid}. Checking if job was resubmitted by OAR..."
138+
)
139+
jobid = await self._handle_resubmission(jobid, job)
140+
if jobid:
141+
continue
142+
for _ in range(5):
143+
if Path(error_file).exists():
144+
break
145+
await asyncio.sleep(1)
146+
else:
147+
raise RuntimeError(
148+
f"OAR error file not found: {error_file}, and no resubmission detected."
149+
)
150+
error_line = Path(error_file).read_text().split("\n")[-2]
151+
if "Exception" in error_line:
152+
error_message = error_line.replace("Exception: ", "")
153+
elif "Error" in error_line:
154+
error_message = error_line.replace("Error: ", "")
155+
else:
156+
error_message = "Job failed (unknown reason - TODO)"
157+
raise Exception(error_message)
158+
return True
159+
160+
async def _poll_job(self, jobid):
161+
cmd = ("oarstat", "-J", "-s", "-j", jobid)
162+
logger.debug(f"Polling job {jobid}")
163+
_, stdout, _ = await base.read_and_display_async(*cmd, hide_display=True)
164+
if not stdout:
165+
raise RuntimeError("Job information not found")
166+
status = json.loads(stdout)[jobid]
167+
if status in ["Waiting", "Launching", "Running", "Finishing"]:
168+
return False
169+
return status
170+
171+
async def _handle_resubmission(self, jobid, job):
172+
logger.debug(f"Job {jobid} has been stopped. Looking for its resubmission...")
173+
# loading info about task with a specific uid
174+
info_file = job.cache_root / f"{job.uid}_info.json"
175+
if info_file.exists():
176+
checksum = json.loads(info_file.read_text())["checksum"]
177+
lock_file = job.cache_root / f"{checksum}.lock"
178+
if lock_file.exists():
179+
lock_file.unlink()
180+
cmd_re = ("oarstat", "-J", "--sql", f"resubmit_job_id='{jobid}'")
181+
_, stdout, _ = await base.read_and_display_async(*cmd_re, hide_display=True)
182+
if stdout:
183+
return next(iter(json.loads(stdout).keys()), None)
184+
else:
185+
return None
186+
187+
188+
# Alias so it can be referred to as oar.Worker
189+
Worker = OarWorker

pyproject.toml

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@ requires = ["hatchling", "hatch-vcs"]
33
build-backend = "hatchling.build"
44

55
[project]
6-
name = "pydra-workers-CHANGEME"
7-
description = "Pydra workers package for CHANGEME"
6+
name = "pydra-workers-oar"
7+
description = "Pydra workers package for oar"
88
readme = "README.md"
99
requires-python = ">=3.11"
1010
dependencies = [
@@ -52,11 +52,11 @@ test = [
5252
source = "vcs"
5353

5454
[tool.hatch.build.hooks.vcs]
55-
version-file = "pydra/workers/CHANGEME/_version.py"
55+
version-file = "pydra/workers/oar/_version.py"
5656

5757
[tool.hatch.build.targets.wheel]
5858
packages = ["pydra"]
59-
include-only = ["pydra/workers/CHANGEME"]
59+
include-only = ["pydra/workers/oar"]
6060

6161
[tool.black]
6262
target-version = ["py38"]
@@ -73,3 +73,6 @@ per-file-ignores = [
7373
max-line-length = 88
7474
select = "C,E,F,W,B,B950"
7575
extend-ignore = ['E203', 'E501', 'E129', 'W503']
76+
77+
[project.entry-points."pydra.workers"]
78+
oar = pydra.workers.oar:OARWorker

0 commit comments

Comments
 (0)