Skip to content

Commit 361ec47

Browse files
committed
Add hold,release,info function for open ondemand support
1 parent 6e3c040 commit 361ec47

File tree

6 files changed

+565
-5
lines changed

6 files changed

+565
-5
lines changed

src/psij/executors/batch/batch_scheduler_executor.py

Lines changed: 224 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -281,6 +281,50 @@ def cancel(self, job: Job) -> None:
281281
# re-raise
282282
raise
283283

284+
def hold(self, job: Job) -> None:
285+
"""Holds a job if it has not otherwise executed.
286+
287+
A command is constructed using :func:`~get_hold_command` and executed in order to hold
288+
the job. Also see :func:`~psij.JobExecutor.hold`.
289+
"""
290+
if job.native_id is None:
291+
raise SubmitException('Job does not have a native ID.')
292+
if job.status.state.final:
293+
return
294+
try:
295+
self._run_command(self.get_hold_command(job.native_id))
296+
except subprocess.CalledProcessError as ex:
297+
try:
298+
self.process_hold_command_output(ex.returncode, ex.output)
299+
except InvalidJobStateError:
300+
# do nothing; the job has started or completed anyway
301+
pass
302+
except SubmitException:
303+
# re-raise
304+
raise
305+
306+
def release(self, job: Job) -> None:
307+
"""Releases a job if it has been held.
308+
309+
A command is constructed using :func:`~get_release_command` and executed in order to
310+
release the job. Also see :func:`~psij.JobExecutor.release`.
311+
"""
312+
if job.native_id is None:
313+
raise SubmitException('Job does not have a native ID.')
314+
if job.status.state.final:
315+
return
316+
try:
317+
self._run_command(self.get_release_command(job.native_id))
318+
except subprocess.CalledProcessError as ex:
319+
try:
320+
self.process_release_command_output(ex.returncode, ex.output)
321+
except InvalidJobStateError:
322+
# do nothing; the job has started or completed anyway
323+
pass
324+
except SubmitException:
325+
# re-raise
326+
raise
327+
284328
def attach(self, job: Job, native_id: str) -> None:
285329
"""Attaches a job to a native job.
286330
@@ -302,6 +346,37 @@ def attach(self, job: Job, native_id: str) -> None:
302346
job.executor = self
303347
self._queue_poll_thread.register_job(job)
304348

349+
def info(self, jobs: Optional[List[Job]] = None, owner: Optional[str] = None) -> List[Job]:
350+
"""Retrieves information about jobs.
351+
352+
If `jobs` is not specified, the information for all jobs known to the executor is
353+
retrieved. Otherwise, the information for the specified job is retrieved.
354+
The information is obtained by executing the command returned by
355+
:func:`~get_info_command` and parsing the output using
356+
:func:`~parse_info_output`.
357+
Parameters
358+
----------
359+
job
360+
The job to retrieve information for. If `None`, all jobs are retrieved.
361+
owner
362+
The owner of the job. If `None`, the current user is used.
363+
"""
364+
try:
365+
id_list = []
366+
# create a list of native ids to query
367+
if jobs is not None:
368+
for job in jobs:
369+
if job.native_id is not None:
370+
id_list.append(job.native_id)
371+
out = self._run_command(self.get_info_command(native_ids=id_list, owner=owner))
372+
except subprocess.CalledProcessError as ex:
373+
out = ex.output
374+
exit_code = ex.returncode
375+
else:
376+
exit_code = 0
377+
378+
return self.parse_info_command_output(exit_code, out, jobs)
379+
305380
@abstractmethod
306381
def generate_submit_script(self, job: Job, context: Dict[str, object],
307382
submit_file: IO[str]) -> None:
@@ -415,6 +490,112 @@ def process_cancel_command_output(self, exit_code: int, out: str) -> None:
415490
"""
416491
pass
417492

493+
@abstractmethod
494+
def get_hold_command(self, native_id: str) -> List[str]:
495+
"""Constructs a command to hold a batch scheduler job.
496+
497+
Concrete implementations of batch scheduler executors must override this method.
498+
499+
Parameters
500+
----------
501+
native_id
502+
The native id of the job being held.
503+
504+
Returns
505+
-------
506+
A list of strings representing the command and arguments to execute in order to hold
507+
the job, such as, e.g., `['qrls', native_id]`.
508+
"""
509+
pass
510+
511+
@abstractmethod
512+
def process_hold_command_output(self, exit_code: int, out: str) -> str:
513+
"""Handle output from a failed hold command.
514+
515+
The main purpose of this method is to help distinguish between the cancel command
516+
failing due to an invalid job state (such as the job having completed before the hold
517+
command was invoked) and other types of errors. Since job state errors are ignored, there
518+
are two options:
519+
520+
1. Instruct the hold command to not fail on invalid state errors and have this
521+
method always raise a :class:`~psij.exceptions.SubmitException`, since it is only invoked
522+
on "other" errors.
523+
524+
2. Have the hold command fail on both invalid state errors and other errors and
525+
interpret the output from the hold command to distinguish between the two and raise
526+
the appropriate exception.
527+
528+
Parameters
529+
----------
530+
exit_code
531+
The exit code from the hold command.
532+
out
533+
The output from the hold command.
534+
535+
Raises
536+
------
537+
InvalidJobStateError
538+
Raised if the job holding has failed because the job was in a completed or failed
539+
state at the time when the holding command was invoked.
540+
SubmitException
541+
Raised for all other reasons.
542+
543+
"""
544+
pass
545+
546+
@abstractmethod
547+
def get_release_command(self, native_id: str) -> List[str]:
548+
"""Constructs a command to release a batch scheduler job.
549+
550+
Concrete implementations of batch scheduler executors must override this method.
551+
552+
Parameters
553+
----------
554+
native_id
555+
The native id of the job being released.
556+
557+
Returns
558+
-------
559+
A list of strings representing the command and arguments to execute in order to release
560+
the job, such as, e.g.,`['qrls', native_id]`.
561+
"""
562+
pass
563+
564+
@abstractmethod
565+
def process_release_command_output(self, exit_code: int, out: str) -> str:
566+
"""Handle output from a failed release command.
567+
568+
The main purpose of this method is to help distinguish between the release command
569+
failing due to an invalid job state (such as the job having completed before the release
570+
command was invoked) and other types of errors. Since job state errors are ignored, there
571+
are two options:
572+
573+
1. Instruct the release command to not fail on invalid state errors and have this
574+
method always raise a :class:`~psij.exceptions.SubmitException`, since it is only invoked
575+
on "other" errors.
576+
577+
2. Have the release command fail on both invalid state errors and other errors and
578+
interpret the output from the release command to distinguish between the two and raise
579+
the appropriate exception.
580+
581+
Parameters
582+
----------
583+
exit_code
584+
The exit code from the release command.
585+
586+
out
587+
The output from the release command.
588+
589+
Raises
590+
------
591+
InvalidJobStateError
592+
Raised if the job releasing has failed because the job was in a completed or failed
593+
state at the time when the releasing command was invoked.
594+
SubmitException
595+
Raised for all other reasons.
596+
"""
597+
pass
598+
418599
@abstractmethod
419600
def get_status_command(self, native_ids: Collection[str]) -> List[str]:
420601
"""Constructs a command to retrieve the status of a list of jobs.
@@ -488,6 +669,49 @@ def parse_list_output(self, out: str) -> List[str]:
488669
"""
489670
return [s.strip() for s in out.splitlines()]
490671

672+
@abstractmethod
673+
def get_info_command(self, native_ids: Optional[List[str]] = None,
674+
owner: Optional[str] = None) -> List[str]:
675+
"""Constructs a command to retrieve information about a job.
676+
677+
Concrete implementations of batch scheduler executors must override this method. The
678+
command should be able to retrieve information about the job with the specified native id
679+
and, optionally, the owner of the job.
680+
681+
Parameters
682+
----------
683+
native_ids
684+
The native id of the jobs to retrieve information for.
685+
owner
686+
The owner of the job. If `None`, the current user is used.
687+
688+
Returns
689+
-------
690+
A list of strings representing the command and arguments to execute in order to get
691+
information about the job, such as `['qstat', '-u', owner, native_id]`.
692+
"""
693+
pass
694+
695+
@abstractmethod
696+
def parse_info_command_output(self, exit_code: int, out: str,
697+
jobs: Optional[List[Job]] = None) -> List[Job]:
698+
"""Parses the output of a job information command.
699+
700+
Concrete implementations of batch scheduler executors must override this method. The output
701+
is meant to have been produced by the command generated by :func:`~get_info_command`.
702+
703+
Parameters
704+
----------
705+
out
706+
The string output of the information command as prescribed by :func:`~get_info_command`.
707+
jobs
708+
A list of jobs to retrieve information for. If `None`, all jobs are retrieved.
709+
Returns
710+
-------
711+
A list of dictionaries containing information about the job, if jobs are not specified.
712+
"""
713+
pass
714+
491715
def _create_script_context(self, job: Job) -> Dict[str, object]:
492716
launcher = self._get_launcher_from_job(job)
493717
if isinstance(launcher, ScriptBasedLauncher) and logger.isEnabledFor(logging.DEBUG):

0 commit comments

Comments
 (0)