Skip to content

Commit e323286

Browse files
committed
Added method in JobExecutor to do some job/spec validation.
1 parent 1aa7231 commit e323286

File tree

5 files changed

+69
-27
lines changed

5 files changed

+69
-27
lines changed

src/psij/executors/batch/batch_scheduler_executor.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -194,9 +194,9 @@ def submit(self, job: Job) -> None:
194194
"""See :func:`~psij.JobExecutor.submit`."""
195195
logger.info('Job %s: submitting', job.id)
196196
self._ensure_work_dir()
197-
assert (job.spec)
198197

199-
job.executor = self
198+
self._check_job(job)
199+
200200
context = self._create_script_context(job)
201201

202202
# assumes job ids are unique

src/psij/executors/flux.py

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -124,14 +124,14 @@ def _add_flux_callbacks(self, job: Job, fut: flux.job.FluxExecutorFuture) -> Non
124124

125125
def submit(self, job: Job) -> None:
126126
"""See :func:`~psij.job_executor.JobExecutor.submit`."""
127-
assert job.spec
128-
assert job.spec.attributes
129-
job.executor = self
130-
if isinstance(job.spec.resources, ResourceSpecV1):
131-
resources = job.spec.resources
132-
elif isinstance(job.spec.resources, ResourceSpec):
127+
spec = self._check_job(job)
128+
129+
assert spec.attributes
130+
if isinstance(spec.resources, ResourceSpecV1):
131+
resources = spec.resources
132+
elif isinstance(spec.resources, ResourceSpec):
133133
raise InvalidJobException(
134-
f"ResourceSpec version {job.spec.resources.version} not supported"
134+
f"ResourceSpec version {spec.resources.version} not supported"
135135
)
136136
else:
137137
resources = ResourceSpecV1(process_count=1, cpu_cores_per_process=1)
@@ -142,24 +142,24 @@ def submit(self, job: Job) -> None:
142142
)
143143
if resources.processes_per_node:
144144
raise InvalidJobException("Flux does not support processes_per_node")
145-
if not job.spec.executable:
145+
if not spec.executable:
146146
raise InvalidJobException("Job must have an executable")
147-
argv = list(job.spec.arguments) if job.spec.arguments else []
148-
argv.insert(0, job.spec.executable)
147+
argv = list(spec.arguments) if spec.arguments else []
148+
argv.insert(0, spec.executable)
149149
flux_jobspec = flux.job.JobspecV1.from_command(
150150
argv,
151151
num_tasks=resources.process_count,
152152
cores_per_task=resources.cpu_cores_per_process,
153153
gpus_per_task=resources.gpu_cores_per_process,
154154
num_nodes=resources.node_count,
155155
)
156-
if job.spec.stdout_path:
157-
flux_jobspec.stdout = job.spec.stdout_path
158-
if job.spec.stdin_path:
159-
flux_jobspec.stdin = job.spec.stdin_path
160-
if job.spec.stderr_path:
161-
flux.jobspec.stderr = job.spec.stderr_path
162-
flux_jobspec.duration = job.spec.attributes.duration.total_seconds()
156+
if spec.stdout_path:
157+
flux_jobspec.stdout = spec.stdout_path
158+
if spec.stdin_path:
159+
flux_jobspec.stdin = spec.stdin_path
160+
if spec.stderr_path:
161+
flux.jobspec.stderr = spec.stderr_path
162+
flux_jobspec.duration = spec.attributes.duration.total_seconds()
163163
fut = self._flux_executor.submit(flux_jobspec)
164164
self._add_flux_callbacks(job, fut)
165165

src/psij/executors/local.py

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -193,10 +193,7 @@ def submit(self, job: Job) -> None:
193193
194194
:param job: The job to be submitted.
195195
"""
196-
spec = job.spec
197-
if not spec:
198-
raise InvalidJobException('Missing specification')
199-
job.executor = self
196+
spec = self._check_job(job)
200197

201198
p = _ChildProcessEntry(job, self, self._get_launcher(self._get_launcher_name(spec)))
202199
assert p.launcher

src/psij/executors/rp.py

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -106,11 +106,8 @@ def submit(self, job: Job) -> None:
106106
107107
:param job: The job to be submitted.
108108
"""
109-
spec = job.spec
110-
if not spec:
111-
raise InvalidJobException('Missing specification')
109+
self._check_job(job)
112110

113-
job.executor = self
114111
try:
115112
td = self._job_2_descr(job)
116113
task = self._tmgr.submit_tasks(td)

src/psij/job_executor.py

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,14 @@
55
from typing import Optional, Dict, List, Type, cast, Union, Callable, Set
66

77
import psij
8+
from psij import InvalidJobException
89
from psij.descriptor import Descriptor, _VersionEntry
910
from psij._plugins import _register_plugin, _get_plugin_class, _print_plugin_status
1011
from psij.job_status import JobStatus
1112
from psij.job import Job, JobStatusCallback, FunctionJobStatusCallback
1213
from psij.job_executor_config import JobExecutorConfig
1314
from psij.job_launcher import Launcher
15+
from psij.job_spec import JobSpec
1416

1517

1618
logger = logging.getLogger(__name__)
@@ -57,6 +59,52 @@ def version(self) -> Version:
5759
"""Returns the version of this executor."""
5860
return cast(Version, getattr(self.__class__, '_VERSION_'))
5961

62+
def _check_job(self, job: Job) -> JobSpec:
63+
"""
64+
Checks a job for consistency and correctness.
65+
66+
Verifies that various aspects of the job are correctly specified. This includes precisely
67+
the following checks:
68+
* the job has a non-null specification
69+
* job.spec.environment is a Dict[str, str]
70+
71+
While this method makes a fair attempt at ensuring the validity of the job, it makes no
72+
such guarantees. Specifically, if an executor implementation requires checks not listed
73+
above, it should implement them explicitly.
74+
75+
These checks are meant to trigger common runtime type errors somewhat early and with clear
76+
error messages. In production software, these checks can be disabled by invoking Python
77+
with one of the optimization flags (`-O` or `-OO`).
78+
79+
Upon completion, this method sets the :attr:`~psij.Job.executor` attribute of the job and
80+
returns the job specification.
81+
82+
Parameters
83+
----------
84+
job
85+
The job to validate
86+
87+
Returns
88+
-------
89+
A non-null job specification
90+
"""
91+
spec = job.spec
92+
if not spec:
93+
raise InvalidJobException('Missing specification')
94+
95+
if __debug__:
96+
if spec.environment is not None:
97+
for k, v in spec.environment.items():
98+
if not isinstance(k, str):
99+
raise TypeError('environment key "%s" is not a string (%s)'
100+
% (k, type(k).__name__))
101+
if not isinstance(v, str):
102+
raise TypeError('environment key "%s" has non-string value (%s)'
103+
% (k, type(v).__name__))
104+
105+
job.executor = self
106+
return spec
107+
60108
@abstractmethod
61109
def submit(self, job: Job) -> None:
62110
"""

0 commit comments

Comments
 (0)