Skip to content

Commit b4a0918

Browse files
authored
Merge pull request #315 from ExaWorks/fix/issue_314
add environment type check to `JobSpec`
2 parents 93c9f8b + 4838830 commit b4a0918

File tree

10 files changed

+123
-28
lines changed

10 files changed

+123
-28
lines changed

requirements.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
11
filelock
22
psutil
33
pystache>=0.6.0
4+
typeguard

src/psij/executors/batch/batch_scheduler_executor.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -194,9 +194,9 @@ def submit(self, job: Job) -> None:
194194
"""See :func:`~psij.JobExecutor.submit`."""
195195
logger.info('Job %s: submitting', job.id)
196196
self._ensure_work_dir()
197-
assert (job.spec)
198197

199-
job.executor = self
198+
self._check_job(job)
199+
200200
context = self._create_script_context(job)
201201

202202
# assumes job ids are unique
@@ -551,7 +551,7 @@ def list(self) -> List[str]:
551551
Implementations are encouraged to restrict the results to jobs accessible by the current
552552
user.
553553
"""
554-
pass
554+
raise NotImplementedError()
555555

556556

557557
class _QueuePollThread(Thread):

src/psij/executors/flux.py

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -124,14 +124,14 @@ def _add_flux_callbacks(self, job: Job, fut: flux.job.FluxExecutorFuture) -> Non
124124

125125
def submit(self, job: Job) -> None:
126126
"""See :func:`~psij.job_executor.JobExecutor.submit`."""
127-
assert job.spec
128-
assert job.spec.attributes
129-
job.executor = self
130-
if isinstance(job.spec.resources, ResourceSpecV1):
131-
resources = job.spec.resources
132-
elif isinstance(job.spec.resources, ResourceSpec):
127+
spec = self._check_job(job)
128+
129+
assert spec.attributes
130+
if isinstance(spec.resources, ResourceSpecV1):
131+
resources = spec.resources
132+
elif isinstance(spec.resources, ResourceSpec):
133133
raise InvalidJobException(
134-
f"ResourceSpec version {job.spec.resources.version} not supported"
134+
f"ResourceSpec version {spec.resources.version} not supported"
135135
)
136136
else:
137137
resources = ResourceSpecV1(process_count=1, cpu_cores_per_process=1)
@@ -142,24 +142,24 @@ def submit(self, job: Job) -> None:
142142
)
143143
if resources.processes_per_node:
144144
raise InvalidJobException("Flux does not support processes_per_node")
145-
if not job.spec.executable:
145+
if not spec.executable:
146146
raise InvalidJobException("Job must have an executable")
147-
argv = list(job.spec.arguments) if job.spec.arguments else []
148-
argv.insert(0, job.spec.executable)
147+
argv = list(spec.arguments) if spec.arguments else []
148+
argv.insert(0, spec.executable)
149149
flux_jobspec = flux.job.JobspecV1.from_command(
150150
argv,
151151
num_tasks=resources.process_count,
152152
cores_per_task=resources.cpu_cores_per_process,
153153
gpus_per_task=resources.gpu_cores_per_process,
154154
num_nodes=resources.node_count,
155155
)
156-
if job.spec.stdout_path:
157-
flux_jobspec.stdout = job.spec.stdout_path
158-
if job.spec.stdin_path:
159-
flux_jobspec.stdin = job.spec.stdin_path
160-
if job.spec.stderr_path:
161-
flux.jobspec.stderr = job.spec.stderr_path
162-
flux_jobspec.duration = job.spec.attributes.duration.total_seconds()
156+
if spec.stdout_path:
157+
flux_jobspec.stdout = spec.stdout_path
158+
if spec.stdin_path:
159+
flux_jobspec.stdin = spec.stdin_path
160+
if spec.stderr_path:
161+
flux.jobspec.stderr = spec.stderr_path
162+
flux_jobspec.duration = spec.attributes.duration.total_seconds()
163163
fut = self._flux_executor.submit(flux_jobspec)
164164
self._add_flux_callbacks(job, fut)
165165

src/psij/executors/local.py

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -193,10 +193,7 @@ def submit(self, job: Job) -> None:
193193
194194
:param job: The job to be submitted.
195195
"""
196-
spec = job.spec
197-
if not spec:
198-
raise InvalidJobException('Missing specification')
199-
job.executor = self
196+
spec = self._check_job(job)
200197

201198
p = _ChildProcessEntry(job, self, self._get_launcher(self._get_launcher_name(spec)))
202199
assert p.launcher

src/psij/executors/rp.py

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -106,11 +106,8 @@ def submit(self, job: Job) -> None:
106106
107107
:param job: The job to be submitted.
108108
"""
109-
spec = job.spec
110-
if not spec:
111-
raise InvalidJobException('Missing specification')
109+
self._check_job(job)
112110

113-
job.executor = self
114111
try:
115112
td = self._job_2_descr(job)
116113
task = self._tmgr.submit_tasks(td)

src/psij/job_attributes.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
from datetime import timedelta
22
from typing import Optional, Dict
33

4+
from typeguard import check_argument_types
5+
46

57
class JobAttributes(object):
68
"""A class containing ancillary job information that describes how a job is to be run."""
@@ -27,6 +29,8 @@ def __init__(self, duration: timedelta = timedelta(minutes=10),
2729
:class:`~psij.JobExecutor` define and are responsible for interpreting custom
2830
attributes.
2931
"""
32+
assert check_argument_types()
33+
3034
self.duration = duration
3135
self.queue_name = queue_name
3236
self.project_name = project_name

src/psij/job_executor.py

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,14 @@
55
from typing import Optional, Dict, List, Type, cast, Union, Callable, Set
66

77
import psij
8+
from psij import InvalidJobException
89
from psij.descriptor import Descriptor, _VersionEntry
910
from psij._plugins import _register_plugin, _get_plugin_class, _print_plugin_status
1011
from psij.job_status import JobStatus
1112
from psij.job import Job, JobStatusCallback, FunctionJobStatusCallback
1213
from psij.job_executor_config import JobExecutorConfig
1314
from psij.job_launcher import Launcher
15+
from psij.job_spec import JobSpec
1416

1517

1618
logger = logging.getLogger(__name__)
@@ -57,6 +59,52 @@ def version(self) -> Version:
5759
"""Returns the version of this executor."""
5860
return cast(Version, getattr(self.__class__, '_VERSION_'))
5961

62+
def _check_job(self, job: Job) -> JobSpec:
63+
"""
64+
Checks a job for consistency and correctness.
65+
66+
Verifies that various aspects of the job are correctly specified. This includes precisely
67+
the following checks:
68+
* the job has a non-null specification
69+
* job.spec.environment is a Dict[str, str]
70+
71+
While this method makes a fair attempt at ensuring the validity of the job, it makes no
72+
such guarantees. Specifically, if an executor implementation requires checks not listed
73+
above, it should implement them explicitly.
74+
75+
These checks are meant to trigger common runtime type errors somewhat early and with clear
76+
error messages. In production software, these checks can be disabled by invoking Python
77+
with one of the optimization flags (`-O` or `-OO`).
78+
79+
Upon completion, this method sets the :attr:`~psij.Job.executor` attribute of the job and
80+
returns the job specification.
81+
82+
Parameters
83+
----------
84+
job
85+
The job to validate
86+
87+
Returns
88+
-------
89+
A non-null job specification
90+
"""
91+
spec = job.spec
92+
if not spec:
93+
raise InvalidJobException('Missing specification')
94+
95+
if __debug__:
96+
if spec.environment is not None:
97+
for k, v in spec.environment.items():
98+
if not isinstance(k, str):
99+
raise TypeError('environment key "%s" is not a string (%s)'
100+
% (k, type(k).__name__))
101+
if not isinstance(v, str):
102+
raise TypeError('environment key "%s" has non-string value (%s)'
103+
% (k, type(v).__name__))
104+
105+
job.executor = self
106+
return spec
107+
60108
@abstractmethod
61109
def submit(self, job: Job) -> None:
62110
"""

src/psij/job_spec.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
import sys
22
from pathlib import Path
33
from typing import Optional, List, Dict, Any
4+
5+
from typeguard import check_argument_types
6+
47
from psij.job_attributes import JobAttributes
58
from psij.resource_spec import ResourceSpec
69

@@ -53,6 +56,8 @@ def __init__(self, name: Optional[str] = None, executable: Optional[str] = None,
5356
:param launcher: The name of a launcher to use, such as "mpirun", "srun", "single", etc.
5457
For a list of available launchers,:ref:`launchers`
5558
"""
59+
assert check_argument_types()
60+
5661
self._name = name
5762
self.executable = executable
5863
self.arguments = arguments

src/psij/resource_spec.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
from abc import ABC, abstractmethod
22
from typing import Optional, List
33

4+
from typeguard import check_argument_types
5+
46
from psij.exceptions import InvalidJobException
57

68

@@ -58,6 +60,8 @@ def __init__(self, node_count: Optional[int] = None,
5860
:param gpu_cores_per_process:
5961
:param exclusive_node_use:
6062
"""
63+
assert check_argument_types()
64+
6165
self.node_count = node_count
6266
self.process_count = process_count
6367
self.processes_per_node = processes_per_node

tests/test_job_spec.py

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
2+
import pytest
3+
4+
from psij import JobSpec, JobExecutor, Job
5+
6+
7+
def _test_spec(spec: JobSpec) -> None:
8+
ex = JobExecutor.get_instance('local')
9+
ex.submit(Job(spec))
10+
11+
12+
def test_environment_types() -> None:
13+
with pytest.raises(TypeError):
14+
_test_spec(JobSpec(environment={'foo': 1})) # type: ignore
15+
16+
with pytest.raises(TypeError):
17+
_test_spec(JobSpec(environment={1: 'foo'})) # type: ignore
18+
19+
with pytest.raises(TypeError):
20+
spec = JobSpec()
21+
spec.environment = {'foo': 'bar'}
22+
spec.environment['buz'] = 2 # type: ignore
23+
_test_spec(spec)
24+
25+
spec = JobSpec()
26+
assert spec.environment is None
27+
28+
spec = JobSpec(environment={'foo': 'bar'})
29+
assert spec.environment['foo'] == 'bar' # type: ignore
30+
31+
spec = JobSpec()
32+
spec.environment = {'foo': 'bar'}
33+
assert spec.environment['foo'] == 'bar'
34+
35+
spec.environment = {'foo': 'biz'}
36+
assert spec.environment['foo'] == 'biz'
37+
38+
39+
test_environment_types()

0 commit comments

Comments
 (0)