Skip to content

Commit 2aff4d5

Browse files
authored
Merge pull request #384 from ExaWorks/implement_list
A preliminary implementation for the list() method for batch schedule…
2 parents ba66bbb + 764c132 commit 2aff4d5

File tree

9 files changed

+83
-7
lines changed

9 files changed

+83
-7
lines changed

src/psij/executors/batch/batch_scheduler_executor.py

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -420,6 +420,38 @@ def parse_status_output(self, exit_code: int, out: str) -> Dict[str, JobStatus]:
420420
"""
421421
pass
422422

423+
@abstractmethod
424+
def get_list_command(self) -> List[str]:
425+
"""Constructs a command to retrieve the list of jobs known to the LRM for the current user.
426+
427+
Concrete implementations of batch scheduler executors must override this method. Upon
428+
running the command, the output can be parsed with :func:`~parse_list_output`.
429+
430+
Returns
431+
-------
432+
A list of strings representing the executable and arguments to invoke in order to obtain
433+
the list of jobs the LRM knows for the current user.
434+
"""
435+
pass
436+
437+
def parse_list_output(self, out: str) -> List[str]:
438+
"""Parses the output of the command obtained from :func:`~get_list_command`.
439+
440+
The default implementation of this method assumes that the output has no header and
441+
consists of native IDs, one per line, possibly surrounded by whitespace. Concrete
442+
implementations should override this method if a different format is expected.
443+
444+
Parameters
445+
----------
446+
out
447+
The output from the "list" command as returned by :func:`~get_list_command`.
448+
Returns
449+
-------
450+
A list of strings representing the native IDs of the jobs known to the LRM for the current
451+
user.
452+
"""
453+
return [s.strip() for s in out.splitlines()]
454+
423455
def _create_script_context(self, job: Job) -> Dict[str, object]:
424456
launcher = self._get_launcher_from_job(job)
425457
if isinstance(launcher, ScriptBasedLauncher) and logger.isEnabledFor(logging.DEBUG):
@@ -551,7 +583,10 @@ def list(self) -> List[str]:
551583
Implementations are encouraged to restrict the results to jobs accessible by the current
552584
user.
553585
"""
554-
raise NotImplementedError()
586+
return self.parse_list_output(self._run_command(self.get_list_command()))
587+
588+
def _current_user(self) -> str:
589+
return os.getlogin()
555590

556591

557592
class _QueuePollThread(Thread):

src/psij/executors/batch/cobalt.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,10 @@ def parse_status_output(self, exit_code: int, out: str) -> Dict[str, JobStatus]:
111111
index += 1
112112
return job_statuses
113113

114+
def get_list_command(self) -> List[str]:
115+
"""See :meth:`~.BatchSchedulerExecutor.get_list_command`."""
116+
return [_QSTAT_COMMAND, '-u', self._current_user(), '--header', 'JobId']
117+
114118
def job_id_from_submit_output(self, out: str) -> str:
115119
"""See :meth:`~.BatchSchedulerExecutor.job_id_from_submit_output`."""
116120
match = _QSUB_REGEX.search(out)

src/psij/executors/batch/lsf.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,3 +127,7 @@ def job_id_from_submit_output(self, out: str) -> str:
127127
if match is None:
128128
raise SubmitException(out)
129129
return match.group(0)[5:-1]
130+
131+
def get_list_command(self) -> List[str]:
132+
"""See :meth:`~.BatchSchedulerExecutor.get_list_command`."""
133+
return [_BJOBS_COMMAND, '-a', '-noheader', '-o', 'jobid', '-u', self._current_user()]

src/psij/executors/batch/pbspro.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,14 @@ def parse_status_output(self, exit_code: int, out: str) -> Dict[str, JobStatus]:
134134

135135
return r
136136

137+
def get_list_command(self) -> List[str]:
138+
"""See :meth:`~.BatchSchedulerExecutor.get_list_command`."""
139+
return ['qstat', '-u', self._current_user()]
140+
141+
def parse_list_output(self, out: str) -> List[str]:
142+
"""See :meth:`~.BatchSchedulerExecutor.parse_list_output`."""
143+
return [s.split()[0].strip() for s in out.splitlines()[2:]]
144+
137145
def _get_state(self, state: str) -> JobState:
138146
assert state in _STATE_MAP, f"PBS state {state} is not known to PSI/J"
139147
return _STATE_MAP[state]

src/psij/executors/batch/slurm.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,10 @@ def parse_status_output(self, exit_code: int, out: str) -> Dict[str, JobStatus]:
160160

161161
return r
162162

163+
def get_list_command(self) -> List[str]:
164+
"""See :meth:`~.BatchSchedulerExecutor.get_list_command`."""
165+
return ['squeue', '--me', '-o', '%i', '-h', '-r', '-t', 'all']
166+
163167
def _get_state(self, state: str) -> JobState:
164168
assert state in SlurmJobExecutor._STATE_MAP
165169
return SlurmJobExecutor._STATE_MAP[state]

src/psij/job_executor.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,8 @@ def submit(self, job: Job) -> None:
119119
has not been successfully sent to the underlying implementation, the job status remains
120120
unchanged, and no status notifications about the job will be fired.
121121
122+
A successful return of this method guarantees that the job's `native_id` property is set.
123+
122124
:raises ~psij.InvalidJobException: Thrown if the job specification cannot be understood.
123125
This exception is fatal in that submitting another job with the exact same details will
124126
also fail with an `~psij.InvalidJobException`. In principle, the underlying

tests/plugins1/_batch_test/_batch_test.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,9 @@ def parse_status_output(self, exit_code: int, out: str) -> Dict[str, JobStatus]:
8484
r[native_id] = JobStatus(state, message=msg)
8585
return r
8686

87+
def get_list_command(self) -> List[str]:
88+
return [sys.executable, QSTAT_PATH]
89+
8790
def _get_state(self, state: str) -> JobState:
8891
assert state in _TestJobExecutor._STATE_MAP
8992
return _TestJobExecutor._STATE_MAP[state]

tests/plugins1/_batch_test/test/qstat

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,16 +6,23 @@ from qlib import *
66
from filelock import FileLock
77

88

9-
ids = sys.argv[1]
9+
if len(sys.argv) > 1:
10+
ids = sys.argv[1]
11+
else:
12+
ids = None
1013

1114

1215
with FileLock(lock_file):
1316
if not state_file.exists():
1417
sys.exit(0)
1518
state = read_state()
1619

17-
idlist = ids.split(',')
18-
for id in idlist:
19-
if id in state['jobs']:
20-
j = state['jobs'][id]
21-
print('%s %s %s' % (id, j['state'], j['error']))
20+
if ids is not None:
21+
idlist = ids.split(',')
22+
for id in idlist:
23+
if id in state['jobs']:
24+
j = state['jobs'][id]
25+
print('%s %s %s' % (id, j['state'], j['error']))
26+
else:
27+
for id in state['jobs'].keys():
28+
print(id)

tests/test_executor.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,3 +152,12 @@ def test_stdin_redirect(execparams: ExecutorTestParams) -> None:
152152
contents = outf.read()
153153

154154
assert contents == rnd_str
155+
156+
157+
def test_list(execparams: ExecutorTestParams) -> None:
158+
job = Job(JobSpec(executable='/bin/sleep', arguments=['4']))
159+
ex = _get_executor_instance(execparams, job)
160+
ex.submit(job)
161+
assert job.native_id is not None
162+
ids = ex.list()
163+
assert job.native_id in ids

0 commit comments

Comments
 (0)