Skip to content

Commit dc1b82e

Browse files
authored
Merge pull request #522 from Tohoku-University-Takizawa-Lab/nqsv
Add executor for NEC’s job scheduler (NQSV)
2 parents f8b54ff + 62e3307 commit dc1b82e

File tree

3 files changed

+324
-0
lines changed

3 files changed

+324
-0
lines changed
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
from packaging.version import Version
2+
from psij.descriptor import Descriptor
3+
4+
__PSI_J_EXECUTORS__ = [Descriptor(name='nqsv', version=Version('0.0.1'),
5+
cls='psij.executors.batch.nqsv.NQSVJobExecutor')]

src/psij/executors/batch/nqsv.py

Lines changed: 235 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,235 @@
1+
from pathlib import Path
2+
from psij import Job, JobState, JobStatus, SubmitException
3+
from typing import IO, Optional, List, Dict, Collection, Union, Sequence, Any, cast
4+
from psij.executors.batch.script_generator import TemplatedScriptGenerator
5+
from psij.executors.batch.batch_scheduler_executor import BatchSchedulerExecutor
6+
from psij.executors.batch.batch_scheduler_executor import BatchSchedulerExecutorConfig
7+
from psij.executors.batch.batch_scheduler_executor import check_status_exit_code
8+
9+
import re
10+
import subprocess
11+
from threading import Thread
12+
from datetime import timedelta
13+
14+
_NQSV_DIR = '/opt/nec/nqsv/bin/'
15+
_QDEL_COMMAND = _NQSV_DIR + 'qdel'
16+
_QSUB_COMMAND = _NQSV_DIR + 'qsub'
17+
_QSTAT_COMMAND = _NQSV_DIR + 'qstat'
18+
_QWAIT_COMMAND = _NQSV_DIR + 'qwait'
19+
20+
LARGE_TIMEOUT = timedelta(days=3650)
21+
22+
23+
class _NQSJobWaitingThread(Thread):
24+
"""A thread that waits for a job to finish and updates its status."""
25+
26+
def __init__(self, job: Job, ex: Any) -> None:
27+
super().__init__()
28+
self._job = job
29+
self._ex = ex
30+
31+
def run(self) -> None:
32+
"""Wait for the job to finish and update its status."""
33+
st = self._wait()
34+
self._ex._set_job_status(self._job, st)
35+
36+
def _enable_wait_status(self,
37+
target_states: Optional[Union[JobState, Sequence[JobState]]] = None) \
38+
-> bool:
39+
"""Check if the target states are valid for waiting."""
40+
if target_states is None:
41+
return True
42+
if isinstance(target_states, JobState):
43+
target_states = [target_states]
44+
# NQSV's qwait command is not support ACTIVE/QUEUED state
45+
for state1 in target_states:
46+
if state1 is JobState.ACTIVE or state1 is JobState.QUEUED:
47+
return False
48+
return True
49+
50+
def _parse_wait_output(self, out: str) -> JobStatus:
51+
"""Parse the output of the qwait command."""
52+
state = JobState.FAILED
53+
exit_code = None
54+
if 'exited' in out:
55+
s = out.split(' ')
56+
if int(s[1]) == 0:
57+
state = JobState.COMPLETED
58+
else:
59+
state = JobState.FAILED
60+
exit_code = int(s[1])
61+
elif 'deleted' in out:
62+
state = JobState.CANCELED
63+
elif 'error' in out or 'time out' in out or 'qwait error' in out:
64+
state = JobState.FAILED
65+
# killed by signal or rerun or system failure or resource limit exceeded
66+
else:
67+
# The job has already finished...
68+
state = JobState.COMPLETED
69+
r = JobStatus(state=state, exit_code=exit_code, message=None)
70+
return r
71+
72+
def _run_command_using_stderr(self, cmd: List[str]) -> str:
73+
"""Run a command and return the stderr output."""
74+
res = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
75+
return res.stderr
76+
77+
def _wait(self, timeout: Optional[timedelta] = None,
78+
target_states: Optional[Union[JobState, Sequence[JobState]]] = None) \
79+
-> Any:
80+
81+
# NQSV's qwait command is not support ACTIVE/QUEUED state, then use the orignal wait func.
82+
if self._enable_wait_status(target_states) is False:
83+
return self._ex._job_wait(timeout, target_states)
84+
85+
if timeout:
86+
command = [_QWAIT_COMMAND, '-w', 'exited', '-t', str(timeout.total_seconds()),
87+
str(self._job.native_id)]
88+
else:
89+
command = [_QWAIT_COMMAND, '-w', 'exited', str(self._job.native_id)]
90+
91+
out = self._run_command_using_stderr(command)
92+
return self._parse_wait_output(out)
93+
94+
95+
class NQSVExecutorConfig(BatchSchedulerExecutorConfig):
96+
"""Configuration for the NQSV executor."""
97+
98+
pass
99+
100+
101+
class NQSVJobExecutor(BatchSchedulerExecutor):
102+
"""
103+
An executor for the NEC NQSV batch scheduler.
104+
This executor uses NQSV to submit jobs. It is
105+
assumed that NQSV is installed and available in
106+
the system path. NQSV is a batch job scheduler
107+
developed by NEC Corporation.
108+
"""
109+
110+
_STATE_MAP = {
111+
'QUE': JobState.QUEUED,
112+
'RUN': JobState.ACTIVE,
113+
'WAT': JobState.QUEUED,
114+
'HLD': JobState.QUEUED,
115+
'SUS': JobState.QUEUED,
116+
'ARI': JobState.QUEUED,
117+
'TRS': JobState.QUEUED,
118+
'EXT': JobState.COMPLETED,
119+
'PRR': JobState.QUEUED,
120+
'POR': JobState.COMPLETED,
121+
'MIG': JobState.QUEUED,
122+
'STG': JobState.QUEUED,
123+
}
124+
125+
def __init__(self, url: Optional[str] = None, config: Optional[NQSVExecutorConfig] = None):
126+
"""Initialize the NQSV executor."""
127+
if config is None:
128+
config = NQSVExecutorConfig()
129+
super().__init__(url=url, config=config)
130+
path = Path(__file__).parent / 'nqsv/nqsv.mustache'
131+
self.generator = TemplatedScriptGenerator(config, path)
132+
self.submit_frag = False
133+
self.cancel_frag = False
134+
self.use_wait_command = False
135+
self._wait_threads: List[_NQSJobWaitingThread] = []
136+
137+
# Override submit function.
138+
def submit(self, job: Job) -> None:
139+
"""Submit a job to the NQSV scheduler."""
140+
super().submit(job)
141+
if self.use_wait_command:
142+
thread = _NQSJobWaitingThread(job, self)
143+
thread.start()
144+
self._wait_threads.append(thread)
145+
return None
146+
147+
def generate_submit_script(self,
148+
job: Job, context: Dict[str, object], submit_file: IO[str]) -> None:
149+
"""Generate a submit script for the NQSV scheduler."""
150+
self.generator.generate_submit_script(job, context, submit_file)
151+
152+
def get_submit_command(self, job: Job, submit_file_path: Path) -> List[str]:
153+
"""Get the command to submit a job to the NQSV scheduler."""
154+
return [_QSUB_COMMAND, str(submit_file_path.absolute())]
155+
156+
def job_id_from_submit_output(self, out: str) -> str:
157+
"""Extract the job ID from the output of the submit command."""
158+
self.submit_frag = True
159+
s = out.strip().split()[1]
160+
out = ""
161+
for char in s:
162+
if char.isdigit():
163+
out += char
164+
return out
165+
166+
def get_cancel_command(self, native_id: str) -> List[str]:
167+
"""Get the command to cancel a job in the NQSV scheduler."""
168+
self.cancel_frag = True
169+
return [_QDEL_COMMAND, native_id]
170+
171+
def process_cancel_command_output(self, exit_code: int, out: str) -> None:
172+
"""See :meth:`~.BatchSchedulerExecutor.process_cancel_command_output`."""
173+
raise SubmitException('Failed job cancel job: %s' % out)
174+
175+
def get_status_command(self, native_ids: Collection[str]) -> List[str]:
176+
"""Get the command to check the status of a job in the NQSV scheduler."""
177+
return [_QSTAT_COMMAND, '-F', 'rid,stt', '-n', '-l'] + list(native_ids)
178+
179+
def parse_status_output(self, exit_code: int, out: str) -> Dict[str, JobStatus]:
180+
"""Parse the output of the status command."""
181+
check_status_exit_code('qstat', exit_code, out)
182+
r = {}
183+
lines = iter(out.split('\n'))
184+
for line in lines:
185+
if not line:
186+
continue
187+
188+
cols = line.split()
189+
190+
if (len(cols) == 8 and self.cancel_frag):
191+
s = cols[2]
192+
native_id = ""
193+
for char in s:
194+
if char.isdigit():
195+
native_id += char
196+
state = JobState.CANCELED
197+
r[native_id] = JobStatus(state=state, message=None)
198+
199+
elif (len(cols) == 8):
200+
s = cols[1]
201+
native_id = ""
202+
for char in s:
203+
if char.isdigit():
204+
native_id += char
205+
state = JobState.COMPLETED
206+
r[native_id] = JobStatus(state=state, message=None)
207+
208+
else:
209+
assert len(cols) == 2
210+
match = re.search(r'\b(\d+)\b', cols[0])
211+
native_id = cast(str, match.group(1) if match else None)
212+
native_state = cols[1]
213+
state = self._get_state(native_state)
214+
msg = None
215+
r[native_id] = JobStatus(state=state, message=msg)
216+
217+
return r
218+
219+
def _get_state(self, state: str) -> JobState:
220+
"""Convert the state string to a JobState enum."""
221+
assert state in NQSVJobExecutor._STATE_MAP
222+
return NQSVJobExecutor._STATE_MAP[state]
223+
224+
def get_list_command(self) -> List[str]:
225+
"""Get the command to list jobs in the NQSV scheduler."""
226+
return [_QSTAT_COMMAND, '-F', 'rid', '-n', '-l']
227+
228+
def parse_list_output(self, out: str) -> List[str]:
229+
"""Parse the output of the list command."""
230+
r = []
231+
lines = iter(out.split('\n'))
232+
for line in lines:
233+
c = line.split('.')
234+
r.append(c[0])
235+
return r
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
#!/bin/bash
2+
3+
{{#job.spec.name}}
4+
#PBS -N {{.}}
5+
{{/job.spec.name}}
6+
7+
{{#job.spec.inherit_environment}}
8+
#PBS -V
9+
{{/job.spec.inherit_environment}}
10+
{{#env}}
11+
#PBS -v {{name}}="{{value}}"
12+
{{/env}}
13+
14+
{{#job.spec.resources}}
15+
{{#exclusive_node_use}}
16+
#PBS --exclusive
17+
{{/exclusive_node_use}}
18+
#PBS --cpunum-lhost={{computed_processes_per_node}} -b {{computed_node_count}}
19+
{{#gpu_cores_per_process}}
20+
#PBS --gpunum-lhost=${{.}}
21+
{{/gpu_cores_per_process}}
22+
{{/job.spec.resources}}
23+
24+
{{#job.spec.attributes}}
25+
{{#duration}}
26+
#PBS -l elapstim_req={{.}}
27+
{{/duration}}
28+
{{#queue_name}}
29+
#PBS -q {{.}}
30+
{{/queue_name}}
31+
{{#project_name}}
32+
#PBS -A {{.}}
33+
{{/project_name}}
34+
{{#reservation_id}}
35+
#PBS -y {{.}}
36+
{{/reservation_id}}
37+
{{/job.spec.attributes}}
38+
39+
#custom_attributes
40+
{{#custom_attributes}}
41+
{{#nqsv}}
42+
#PBS -{{key}} {{value}}
43+
{{/nqsv}}
44+
{{/custom_attributes}}
45+
46+
{{!we replace the follow environment variable to cpus when the job is submitted.}}
47+
#PBS -e /dev/null
48+
#PBS -o /dev/null
49+
50+
{{#job.spec.directory}}
51+
cd "{{.}}"
52+
{{/job.spec.directory}}
53+
54+
J=`echo $PBS_JOBID | awk -F ':' '{print $1}'`
55+
ID=`echo $PBS_JOBID | awk -F ':' '{print $2}' | awk -F '.' '{print $1}'`
56+
if [ "$J" = "0" ]; then
57+
exec &>> "{{psij.script_dir}}/$ID.out"
58+
fi
59+
60+
# create node file for PSIJ
61+
{{#job.spec.resources}}
62+
_PSIJ_PPN={{computed_processes_per_node}}
63+
{{/job.spec.resources}}
64+
65+
PSIJ_NODEFILE="{{psij.script_dir}}/$ID.nodefile"
66+
while read line
67+
do
68+
for i in `seq 1 $_PSIJ_PPN`;
69+
do
70+
echo $line >> $PSIJ_NODEFILE
71+
done
72+
done < $PBS_NODEFILE
73+
export PSIJ_NODEFILE
74+
75+
{{#psij.launch_command}}{{.}} {{/psij.launch_command}}
76+
77+
E=$?
78+
79+
{{!we redirect to a file tied to the native ID so that we can reach the file with attach().}}
80+
if [ "$J" = "0" ]; then
81+
echo "$E" > "{{psij.script_dir}}/$ID.ec"
82+
fi
83+
84+
exit $E

0 commit comments

Comments
 (0)