|
| 1 | +from pathlib import Path |
| 2 | +from psij import Job, JobState, JobStatus, SubmitException |
| 3 | +from typing import IO, Optional, List, Dict, Collection, Union, Sequence, Any, cast |
| 4 | +from psij.executors.batch.script_generator import TemplatedScriptGenerator |
| 5 | +from psij.executors.batch.batch_scheduler_executor import BatchSchedulerExecutor |
| 6 | +from psij.executors.batch.batch_scheduler_executor import BatchSchedulerExecutorConfig |
| 7 | +from psij.executors.batch.batch_scheduler_executor import check_status_exit_code |
| 8 | + |
| 9 | +import re |
| 10 | +import subprocess |
| 11 | +from threading import Thread |
| 12 | +from datetime import timedelta |
| 13 | + |
| 14 | +_NQSV_DIR = '/opt/nec/nqsv/bin/' |
| 15 | +_QDEL_COMMAND = _NQSV_DIR + 'qdel' |
| 16 | +_QSUB_COMMAND = _NQSV_DIR + 'qsub' |
| 17 | +_QSTAT_COMMAND = _NQSV_DIR + 'qstat' |
| 18 | +_QWAIT_COMMAND = _NQSV_DIR + 'qwait' |
| 19 | + |
| 20 | +LARGE_TIMEOUT = timedelta(days=3650) |
| 21 | + |
| 22 | + |
| 23 | +class _NQSJobWaitingThread(Thread): |
| 24 | + """A thread that waits for a job to finish and updates its status.""" |
| 25 | + |
| 26 | + def __init__(self, job: Job, ex: Any) -> None: |
| 27 | + super().__init__() |
| 28 | + self._job = job |
| 29 | + self._ex = ex |
| 30 | + |
| 31 | + def run(self) -> None: |
| 32 | + """Wait for the job to finish and update its status.""" |
| 33 | + st = self._wait() |
| 34 | + self._ex._set_job_status(self._job, st) |
| 35 | + |
| 36 | + def _enable_wait_status(self, |
| 37 | + target_states: Optional[Union[JobState, Sequence[JobState]]] = None) \ |
| 38 | + -> bool: |
| 39 | + """Check if the target states are valid for waiting.""" |
| 40 | + if target_states is None: |
| 41 | + return True |
| 42 | + if isinstance(target_states, JobState): |
| 43 | + target_states = [target_states] |
| 44 | + # NQSV's qwait command is not support ACTIVE/QUEUED state |
| 45 | + for state1 in target_states: |
| 46 | + if state1 is JobState.ACTIVE or state1 is JobState.QUEUED: |
| 47 | + return False |
| 48 | + return True |
| 49 | + |
| 50 | + def _parse_wait_output(self, out: str) -> JobStatus: |
| 51 | + """Parse the output of the qwait command.""" |
| 52 | + state = JobState.FAILED |
| 53 | + exit_code = None |
| 54 | + if 'exited' in out: |
| 55 | + s = out.split(' ') |
| 56 | + if int(s[1]) == 0: |
| 57 | + state = JobState.COMPLETED |
| 58 | + else: |
| 59 | + state = JobState.FAILED |
| 60 | + exit_code = int(s[1]) |
| 61 | + elif 'deleted' in out: |
| 62 | + state = JobState.CANCELED |
| 63 | + elif 'error' in out or 'time out' in out or 'qwait error' in out: |
| 64 | + state = JobState.FAILED |
| 65 | + # killed by signal or rerun or system failure or resource limit exceeded |
| 66 | + else: |
| 67 | + # The job has already finished... |
| 68 | + state = JobState.COMPLETED |
| 69 | + r = JobStatus(state=state, exit_code=exit_code, message=None) |
| 70 | + return r |
| 71 | + |
| 72 | + def _run_command_using_stderr(self, cmd: List[str]) -> str: |
| 73 | + """Run a command and return the stderr output.""" |
| 74 | + res = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) |
| 75 | + return res.stderr |
| 76 | + |
| 77 | + def _wait(self, timeout: Optional[timedelta] = None, |
| 78 | + target_states: Optional[Union[JobState, Sequence[JobState]]] = None) \ |
| 79 | + -> Any: |
| 80 | + |
| 81 | + # NQSV's qwait command is not support ACTIVE/QUEUED state, then use the orignal wait func. |
| 82 | + if self._enable_wait_status(target_states) is False: |
| 83 | + return self._ex._job_wait(timeout, target_states) |
| 84 | + |
| 85 | + if timeout: |
| 86 | + command = [_QWAIT_COMMAND, '-w', 'exited', '-t', str(timeout.total_seconds()), |
| 87 | + str(self._job.native_id)] |
| 88 | + else: |
| 89 | + command = [_QWAIT_COMMAND, '-w', 'exited', str(self._job.native_id)] |
| 90 | + |
| 91 | + out = self._run_command_using_stderr(command) |
| 92 | + return self._parse_wait_output(out) |
| 93 | + |
| 94 | + |
| 95 | +class NQSVExecutorConfig(BatchSchedulerExecutorConfig): |
| 96 | + """Configuration for the NQSV executor.""" |
| 97 | + |
| 98 | + pass |
| 99 | + |
| 100 | + |
| 101 | +class NQSVJobExecutor(BatchSchedulerExecutor): |
| 102 | + """ |
| 103 | + An executor for the NEC NQSV batch scheduler. |
| 104 | + This executor uses NQSV to submit jobs. It is |
| 105 | + assumed that NQSV is installed and available in |
| 106 | + the system path. NQSV is a batch job scheduler |
| 107 | + developed by NEC Corporation. |
| 108 | + """ |
| 109 | + |
| 110 | + _STATE_MAP = { |
| 111 | + 'QUE': JobState.QUEUED, |
| 112 | + 'RUN': JobState.ACTIVE, |
| 113 | + 'WAT': JobState.QUEUED, |
| 114 | + 'HLD': JobState.QUEUED, |
| 115 | + 'SUS': JobState.QUEUED, |
| 116 | + 'ARI': JobState.QUEUED, |
| 117 | + 'TRS': JobState.QUEUED, |
| 118 | + 'EXT': JobState.COMPLETED, |
| 119 | + 'PRR': JobState.QUEUED, |
| 120 | + 'POR': JobState.COMPLETED, |
| 121 | + 'MIG': JobState.QUEUED, |
| 122 | + 'STG': JobState.QUEUED, |
| 123 | + } |
| 124 | + |
| 125 | + def __init__(self, url: Optional[str] = None, config: Optional[NQSVExecutorConfig] = None): |
| 126 | + """Initialize the NQSV executor.""" |
| 127 | + if config is None: |
| 128 | + config = NQSVExecutorConfig() |
| 129 | + super().__init__(url=url, config=config) |
| 130 | + path = Path(__file__).parent / 'nqsv/nqsv.mustache' |
| 131 | + self.generator = TemplatedScriptGenerator(config, path) |
| 132 | + self.submit_frag = False |
| 133 | + self.cancel_frag = False |
| 134 | + self.use_wait_command = False |
| 135 | + self._wait_threads: List[_NQSJobWaitingThread] = [] |
| 136 | + |
| 137 | + # Override submit function. |
| 138 | + def submit(self, job: Job) -> None: |
| 139 | + """Submit a job to the NQSV scheduler.""" |
| 140 | + super().submit(job) |
| 141 | + if self.use_wait_command: |
| 142 | + thread = _NQSJobWaitingThread(job, self) |
| 143 | + thread.start() |
| 144 | + self._wait_threads.append(thread) |
| 145 | + return None |
| 146 | + |
| 147 | + def generate_submit_script(self, |
| 148 | + job: Job, context: Dict[str, object], submit_file: IO[str]) -> None: |
| 149 | + """Generate a submit script for the NQSV scheduler.""" |
| 150 | + self.generator.generate_submit_script(job, context, submit_file) |
| 151 | + |
| 152 | + def get_submit_command(self, job: Job, submit_file_path: Path) -> List[str]: |
| 153 | + """Get the command to submit a job to the NQSV scheduler.""" |
| 154 | + return [_QSUB_COMMAND, str(submit_file_path.absolute())] |
| 155 | + |
| 156 | + def job_id_from_submit_output(self, out: str) -> str: |
| 157 | + """Extract the job ID from the output of the submit command.""" |
| 158 | + self.submit_frag = True |
| 159 | + s = out.strip().split()[1] |
| 160 | + out = "" |
| 161 | + for char in s: |
| 162 | + if char.isdigit(): |
| 163 | + out += char |
| 164 | + return out |
| 165 | + |
| 166 | + def get_cancel_command(self, native_id: str) -> List[str]: |
| 167 | + """Get the command to cancel a job in the NQSV scheduler.""" |
| 168 | + self.cancel_frag = True |
| 169 | + return [_QDEL_COMMAND, native_id] |
| 170 | + |
| 171 | + def process_cancel_command_output(self, exit_code: int, out: str) -> None: |
| 172 | + """See :meth:`~.BatchSchedulerExecutor.process_cancel_command_output`.""" |
| 173 | + raise SubmitException('Failed job cancel job: %s' % out) |
| 174 | + |
| 175 | + def get_status_command(self, native_ids: Collection[str]) -> List[str]: |
| 176 | + """Get the command to check the status of a job in the NQSV scheduler.""" |
| 177 | + return [_QSTAT_COMMAND, '-F', 'rid,stt', '-n', '-l'] + list(native_ids) |
| 178 | + |
| 179 | + def parse_status_output(self, exit_code: int, out: str) -> Dict[str, JobStatus]: |
| 180 | + """Parse the output of the status command.""" |
| 181 | + check_status_exit_code('qstat', exit_code, out) |
| 182 | + r = {} |
| 183 | + lines = iter(out.split('\n')) |
| 184 | + for line in lines: |
| 185 | + if not line: |
| 186 | + continue |
| 187 | + |
| 188 | + cols = line.split() |
| 189 | + |
| 190 | + if (len(cols) == 8 and self.cancel_frag): |
| 191 | + s = cols[2] |
| 192 | + native_id = "" |
| 193 | + for char in s: |
| 194 | + if char.isdigit(): |
| 195 | + native_id += char |
| 196 | + state = JobState.CANCELED |
| 197 | + r[native_id] = JobStatus(state=state, message=None) |
| 198 | + |
| 199 | + elif (len(cols) == 8): |
| 200 | + s = cols[1] |
| 201 | + native_id = "" |
| 202 | + for char in s: |
| 203 | + if char.isdigit(): |
| 204 | + native_id += char |
| 205 | + state = JobState.COMPLETED |
| 206 | + r[native_id] = JobStatus(state=state, message=None) |
| 207 | + |
| 208 | + else: |
| 209 | + assert len(cols) == 2 |
| 210 | + match = re.search(r'\b(\d+)\b', cols[0]) |
| 211 | + native_id = cast(str, match.group(1) if match else None) |
| 212 | + native_state = cols[1] |
| 213 | + state = self._get_state(native_state) |
| 214 | + msg = None |
| 215 | + r[native_id] = JobStatus(state=state, message=msg) |
| 216 | + |
| 217 | + return r |
| 218 | + |
| 219 | + def _get_state(self, state: str) -> JobState: |
| 220 | + """Convert the state string to a JobState enum.""" |
| 221 | + assert state in NQSVJobExecutor._STATE_MAP |
| 222 | + return NQSVJobExecutor._STATE_MAP[state] |
| 223 | + |
| 224 | + def get_list_command(self) -> List[str]: |
| 225 | + """Get the command to list jobs in the NQSV scheduler.""" |
| 226 | + return [_QSTAT_COMMAND, '-F', 'rid', '-n', '-l'] |
| 227 | + |
| 228 | + def parse_list_output(self, out: str) -> List[str]: |
| 229 | + """Parse the output of the list command.""" |
| 230 | + r = [] |
| 231 | + lines = iter(out.split('\n')) |
| 232 | + for line in lines: |
| 233 | + c = line.split('.') |
| 234 | + r.append(c[0]) |
| 235 | + return r |
0 commit comments