Skip to content

Commit f84639b

Browse files
authored
Add minimal Slurm api for submitting jobs (#219)
Add slurm zocalo config plugin. Initial pydantic models generated from the API using datamodel-code-generator.
1 parent 6e0eb17 commit f84639b

File tree

7 files changed

+853
-0
lines changed

7 files changed

+853
-0
lines changed

HISTORY.rst

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ History
44

55
Unreleased
66
----------
7+
* Add minimal wrapper for the Slurm REST API to allow job submission
8+
* Add Slurm ``zocalo.configuration`` plugin
79

810
0.27.0 (2023-03-16)
911
-------------------

setup.cfg

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ zocalo.configuration.plugins =
7474
jmx = zocalo.configuration.plugin_jmx:JMX
7575
logging = zocalo.configuration.plugin_logging:Logging
7676
rabbitmqapi = zocalo.configuration.plugin_rabbitmqapi:RabbitAPI
77+
slurm = zocalo.configuration.plugin_slurm:Slurm
7778
smtp = zocalo.configuration.plugin_smtp:SMTP
7879
storage = zocalo.configuration.plugin_storage:Storage
7980
zocalo.wrappers =
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
from __future__ import annotations
2+
3+
import os
4+
5+
from marshmallow import fields
6+
7+
from zocalo.configuration import PluginSchema
8+
9+
10+
class Slurm:
11+
class Schema(PluginSchema):
12+
url = fields.Str(required=True)
13+
user_token = fields.Str(required=False)
14+
user = fields.Str(required=False)
15+
api_version = fields.Str(required=True)
16+
17+
@staticmethod
18+
def activate(configuration):
19+
user_token = configuration.get("user_token")
20+
if user_token and os.path.isfile(user_token):
21+
with open(user_token, "r") as f:
22+
configuration["user_token"] = f.read().strip()
23+
return configuration

src/zocalo/util/slurm/__init__.py

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
from __future__ import annotations
2+
3+
from typing import Any, Optional
4+
5+
import requests
6+
7+
import zocalo.configuration
8+
9+
from . import models
10+
11+
12+
class SlurmRestApi:
13+
def __init__(
14+
self,
15+
url: str,
16+
version: str = "v0.0.36",
17+
user_name: Optional[str] = None,
18+
user_token: Optional[str] = None,
19+
):
20+
self._url = url
21+
self._version = version
22+
self._session = requests.Session()
23+
if user_name:
24+
self._session.headers["X-SLURM-USER-NAME"] = user_name
25+
if user_token:
26+
self._session.headers["X-SLURM-USER-TOKEN"] = user_token
27+
28+
@classmethod
29+
def from_zocalo_configuration(cls, zc: zocalo.configuration.Configuration):
30+
return cls(
31+
url=zc.slurm["url"],
32+
version=zc.slurm["api_version"],
33+
user_name=zc.slurm.get("user"),
34+
user_token=zc.slurm.get("user_token"),
35+
)
36+
37+
def get(
38+
self, endpoint: str, params: dict[str, Any] = None, timeout: float | None = None
39+
) -> requests.Response:
40+
response = self._session.get(
41+
f"{self._url}/{endpoint}", params=params, timeout=timeout
42+
)
43+
response.raise_for_status()
44+
return response
45+
46+
def put(
47+
self,
48+
endpoint: str,
49+
params: dict[str, Any] = None,
50+
json: dict[str, Any] = None,
51+
timeout: float | None = None,
52+
) -> requests.Response:
53+
response = self._session.put(
54+
f"{self._url}/{endpoint}", params=params, json=json, timeout=timeout
55+
)
56+
response.raise_for_status()
57+
return response
58+
59+
def post(
60+
self,
61+
endpoint: str,
62+
data: dict[str, Any] | None = None,
63+
json: dict[str, Any] | None = None,
64+
timeout: float | None = None,
65+
) -> requests.Response:
66+
response = self._session.post(
67+
f"{self._url}/{endpoint}", data=data, json=json, timeout=timeout
68+
)
69+
response.raise_for_status()
70+
return response
71+
72+
def delete(
73+
self, endpoint: str, params: dict[str, Any] = None, timeout: float | None = None
74+
) -> requests.Response:
75+
response = self._session.delete(
76+
f"{self._url}/{endpoint}", params=params, timeout=timeout
77+
)
78+
response.raise_for_status()
79+
return response
80+
81+
def get_jobs(self) -> models.JobsResponse:
82+
endpoint = f"slurm/{self._version}/jobs"
83+
response = self.get(endpoint)
84+
return models.JobsResponse(**response.json())
85+
86+
def get_job_info(self, job_id: int) -> models.JobsResponse:
87+
endpoint = f"slurm/{self._version}/job/{job_id}"
88+
response = self.get(endpoint)
89+
return models.JobsResponse(**response.json())
90+
91+
def submit_job(
92+
self, job_submission: models.JobSubmission
93+
) -> models.JobSubmissionResponse:
94+
endpoint = f"slurm/{self._version}/job/submit"
95+
response = self.post(endpoint, json=job_submission.dict(exclude_defaults=True))
96+
return models.JobSubmissionResponse(**response.json())

0 commit comments

Comments
 (0)