Skip to content

Commit 764c132

Browse files
committed
A preliminary implementation for the list() method for batch scheduler type
executors. The test should probably attempt to detect if headers are incorrectly being parsed as normal data lines, but it doesn't.
1 parent ba66bbb commit 764c132

File tree

9 files changed

+83
-7
lines changed

9 files changed

+83
-7
lines changed

src/psij/executors/batch/batch_scheduler_executor.py

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -420,6 +420,38 @@ def parse_status_output(self, exit_code: int, out: str) -> Dict[str, JobStatus]:
420420
"""
421421
pass
422422

423+
@abstractmethod
424+
def get_list_command(self) -> List[str]:
425+
"""Constructs a command to retrieve the list of jobs known to the LRM for the current user.
426+
427+
Concrete implementations of batch scheduler executors must override this method. Upon
428+
running the command, the output can be parsed with :func:`~parse_list_output`.
429+
430+
Returns
431+
-------
432+
A list of strings representing the executable and arguments to invoke in order to obtain
433+
the list of jobs the LRM knows for the current user.
434+
"""
435+
pass
436+
437+
def parse_list_output(self, out: str) -> List[str]:
438+
"""Parses the output of the command obtained from :func:`~get_list_command`.
439+
440+
The default implementation of this method assumes that the output has no header and
441+
consists of native IDs, one per line, possibly surrounded by whitespace. Concrete
442+
implementations should override this method if a different format is expected.
443+
444+
Parameters
445+
----------
446+
out
447+
The output from the "list" command as returned by :func:`~get_list_command`.
448+
Returns
449+
-------
450+
A list of strings representing the native IDs of the jobs known to the LRM for the current
451+
user.
452+
"""
453+
return [s.strip() for s in out.splitlines()]
454+
423455
def _create_script_context(self, job: Job) -> Dict[str, object]:
424456
launcher = self._get_launcher_from_job(job)
425457
if isinstance(launcher, ScriptBasedLauncher) and logger.isEnabledFor(logging.DEBUG):
@@ -551,7 +583,10 @@ def list(self) -> List[str]:
551583
Implementations are encouraged to restrict the results to jobs accessible by the current
552584
user.
553585
"""
554-
raise NotImplementedError()
586+
return self.parse_list_output(self._run_command(self.get_list_command()))
587+
588+
def _current_user(self) -> str:
589+
return os.getlogin()
555590

556591

557592
class _QueuePollThread(Thread):

src/psij/executors/batch/cobalt.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,10 @@ def parse_status_output(self, exit_code: int, out: str) -> Dict[str, JobStatus]:
111111
index += 1
112112
return job_statuses
113113

114+
def get_list_command(self) -> List[str]:
115+
"""See :meth:`~.BatchSchedulerExecutor.get_list_command`."""
116+
return [_QSTAT_COMMAND, '-u', self._current_user(), '--header', 'JobId']
117+
114118
def job_id_from_submit_output(self, out: str) -> str:
115119
"""See :meth:`~.BatchSchedulerExecutor.job_id_from_submit_output`."""
116120
match = _QSUB_REGEX.search(out)

src/psij/executors/batch/lsf.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,3 +127,7 @@ def job_id_from_submit_output(self, out: str) -> str:
127127
if match is None:
128128
raise SubmitException(out)
129129
return match.group(0)[5:-1]
130+
131+
def get_list_command(self) -> List[str]:
132+
"""See :meth:`~.BatchSchedulerExecutor.get_list_command`."""
133+
return [_BJOBS_COMMAND, '-a', '-noheader', '-o', 'jobid', '-u', self._current_user()]

src/psij/executors/batch/pbspro.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,14 @@ def parse_status_output(self, exit_code: int, out: str) -> Dict[str, JobStatus]:
134134

135135
return r
136136

137+
def get_list_command(self) -> List[str]:
138+
"""See :meth:`~.BatchSchedulerExecutor.get_list_command`."""
139+
return ['qstat', '-u', self._current_user()]
140+
141+
def parse_list_output(self, out: str) -> List[str]:
142+
"""See :meth:`~.BatchSchedulerExecutor.parse_list_output`."""
143+
return [s.split()[0].strip() for s in out.splitlines()[2:]]
144+
137145
def _get_state(self, state: str) -> JobState:
138146
assert state in _STATE_MAP, f"PBS state {state} is not known to PSI/J"
139147
return _STATE_MAP[state]

src/psij/executors/batch/slurm.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,10 @@ def parse_status_output(self, exit_code: int, out: str) -> Dict[str, JobStatus]:
160160

161161
return r
162162

163+
def get_list_command(self) -> List[str]:
164+
"""See :meth:`~.BatchSchedulerExecutor.get_list_command`."""
165+
return ['squeue', '--me', '-o', '%i', '-h', '-r', '-t', 'all']
166+
163167
def _get_state(self, state: str) -> JobState:
164168
assert state in SlurmJobExecutor._STATE_MAP
165169
return SlurmJobExecutor._STATE_MAP[state]

src/psij/job_executor.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,8 @@ def submit(self, job: Job) -> None:
119119
has not been successfully sent to the underlying implementation, the job status remains
120120
unchanged, and no status notifications about the job will be fired.
121121
122+
A successful return of this method guarantees that the job's `native_id` property is set.
123+
122124
:raises ~psij.InvalidJobException: Thrown if the job specification cannot be understood.
123125
This exception is fatal in that submitting another job with the exact same details will
124126
also fail with an `~psij.InvalidJobException`. In principle, the underlying

tests/plugins1/_batch_test/_batch_test.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,9 @@ def parse_status_output(self, exit_code: int, out: str) -> Dict[str, JobStatus]:
8484
r[native_id] = JobStatus(state, message=msg)
8585
return r
8686

87+
def get_list_command(self) -> List[str]:
88+
return [sys.executable, QSTAT_PATH]
89+
8790
def _get_state(self, state: str) -> JobState:
8891
assert state in _TestJobExecutor._STATE_MAP
8992
return _TestJobExecutor._STATE_MAP[state]

tests/plugins1/_batch_test/test/qstat

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,16 +6,23 @@ from qlib import *
66
from filelock import FileLock
77

88

9-
ids = sys.argv[1]
9+
if len(sys.argv) > 1:
10+
ids = sys.argv[1]
11+
else:
12+
ids = None
1013

1114

1215
with FileLock(lock_file):
1316
if not state_file.exists():
1417
sys.exit(0)
1518
state = read_state()
1619

17-
idlist = ids.split(',')
18-
for id in idlist:
19-
if id in state['jobs']:
20-
j = state['jobs'][id]
21-
print('%s %s %s' % (id, j['state'], j['error']))
20+
if ids is not None:
21+
idlist = ids.split(',')
22+
for id in idlist:
23+
if id in state['jobs']:
24+
j = state['jobs'][id]
25+
print('%s %s %s' % (id, j['state'], j['error']))
26+
else:
27+
for id in state['jobs'].keys():
28+
print(id)

tests/test_executor.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,3 +152,12 @@ def test_stdin_redirect(execparams: ExecutorTestParams) -> None:
152152
contents = outf.read()
153153

154154
assert contents == rnd_str
155+
156+
157+
def test_list(execparams: ExecutorTestParams) -> None:
158+
job = Job(JobSpec(executable='/bin/sleep', arguments=['4']))
159+
ex = _get_executor_instance(execparams, job)
160+
ex.submit(job)
161+
assert job.native_id is not None
162+
ids = ex.list()
163+
assert job.native_id in ids

0 commit comments

Comments
 (0)