Skip to content

Commit dc7e2d8

Browse files
author
Vasileios Karakasis
authored
Merge pull request #1612 from ekouts/feat/align_pbs
[refactor] Align implementations of the PBS and the Torque scheduler regarding the use of `qstat`
2 parents 53de376 + 3c441d5 commit dc7e2d8

File tree

4 files changed

+111
-152
lines changed

4 files changed

+111
-152
lines changed

reframe/core/backends.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,7 @@
1919
_scheduler_backend_modules = [
2020
'reframe.core.schedulers.local',
2121
'reframe.core.schedulers.slurm',
22-
'reframe.core.schedulers.pbs',
23-
'reframe.core.schedulers.torque'
22+
'reframe.core.schedulers.pbs'
2423
]
2524
_schedulers = {}
2625

reframe/core/schedulers/pbs.py

Lines changed: 108 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
import reframe.utility.osext as osext
2121
from reframe.core.backends import register_scheduler
2222
from reframe.core.config import settings
23-
from reframe.core.exceptions import JobSchedulerError
23+
from reframe.core.exceptions import (JobError, JobSchedulerError)
2424
from reframe.utility import seconds_to_hms
2525

2626

@@ -40,6 +40,18 @@
4040
_run_strict = functools.partial(osext.run_command, check=True)
4141

4242

43+
JOB_STATES = {
44+
'Q': 'QUEUED',
45+
'H': 'HELD',
46+
'R': 'RUNNING',
47+
'E': 'EXITING',
48+
'T': 'MOVED',
49+
'W': 'WAITING',
50+
'S': 'SUSPENDED',
51+
'C': 'COMPLETED',
52+
}
53+
54+
4355
class _PbsJob(sched.Job):
4456
def __init__(self, *args, **kwargs):
4557
super().__init__(*args, **kwargs)
@@ -156,24 +168,104 @@ def finished(self, job):
156168

157169
return job.completed
158170

159-
def _poll_job(self, job):
160-
if job is None:
171+
def _update_nodelist(self, job, nodespec):
172+
if job.nodelist is not None:
161173
return
162174

163-
with osext.change_dir(job.workdir):
164-
output_ready = (os.path.exists(job.stdout) and
165-
os.path.exists(job.stderr))
175+
job._nodelist = [x.split('/')[0] for x in nodespec.split('+')]
176+
job._nodelist.sort()
166177

167-
done = job.cancelled or output_ready
168-
if done:
169-
t_now = time.time()
170-
if job.completion_time is None:
171-
job._completion_time = t_now
178+
def poll(self, *jobs):
179+
if jobs:
180+
# Filter out non-jobs
181+
jobs = [job for job in jobs if job is not None]
172182

173-
time_from_finish = t_now - job.completion_time
174-
if time_from_finish > PBS_OUTPUT_WRITEBACK_WAIT:
175-
job._completed = True
183+
if not jobs:
184+
return
185+
186+
completed = osext.run_command(
187+
f'qstat -f {" ".join(job.jobid for job in jobs)}'
188+
)
189+
190+
# Depending on the configuration, completed jobs will remain on the job
191+
# list for a limited time, or be removed upon completion.
192+
# If qstat cannot find any of the job IDs, it will return 153.
193+
# Otherwise, it will return with return code 0 and print information
194+
# only for the jobs it could find.
195+
if completed.returncode == 153:
196+
self.log('Return code is 153: jobids not known by scheduler, '
197+
'assuming all jobs completed')
198+
for job in jobs:
199+
job._state = 'COMPLETED'
200+
201+
return
202+
203+
if completed.returncode != 0:
204+
raise JobSchedulerError(
205+
f'qstat failed with exit code {completed.returncode} '
206+
f'(standard error follows):\n{completed.stderr}'
207+
)
208+
209+
# Store information for each job separately
210+
jobinfo = {}
211+
for job_raw_info in completed.stdout.split('\n\n'):
212+
jobid_match = re.search(
213+
r'^Job Id:\s*(?P<jobid>\S+)', job_raw_info, re.MULTILINE
214+
)
215+
if jobid_match:
216+
jobid = jobid_match.group('jobid')
217+
jobinfo[jobid] = job_raw_info
176218

177-
def poll(self, *jobs):
178219
for job in jobs:
179-
self._poll_job(job)
220+
if job.jobid not in jobinfo:
221+
self.log(f'Job {job.jobid} not known to scheduler, '
222+
f'assuming job completed')
223+
job._state = 'COMPLETED'
224+
job._completed = True
225+
continue
226+
227+
info = jobinfo[job.jobid]
228+
state_match = re.search(
229+
r'^\s*job_state = (?P<state>[A-Z])', info, re.MULTILINE
230+
)
231+
if not state_match:
232+
self.log(f'Job state not found (job info follows):\n{info}')
233+
continue
234+
235+
state = state_match.group('state')
236+
job._state = JOB_STATES[state]
237+
nodelist_match = re.search(
238+
r'exec_host = (?P<nodespec>[\S\t\n]+)',
239+
info, re.MULTILINE
240+
)
241+
if nodelist_match:
242+
nodespec = nodelist_match.group('nodespec')
243+
nodespec = re.sub(r'[\n\t]*', '', nodespec)
244+
self._update_nodelist(job, nodespec)
245+
246+
if job.state == 'COMPLETED':
247+
exitcode_match = re.search(
248+
r'^\s*exit_status = (?P<code>\d+)',
249+
info, re.MULTILINE,
250+
)
251+
if exitcode_match:
252+
job._exitcode = int(exitcode_match.group('code'))
253+
254+
# We report a job as finished only when its stdout/stderr are
255+
# written back to the working directory
256+
stdout = os.path.join(job.workdir, job.stdout)
257+
stderr = os.path.join(job.workdir, job.stderr)
258+
out_ready = os.path.exists(stdout) and os.path.exists(stderr)
259+
done = job.cancelled or out_ready
260+
if done:
261+
job._completed = True
262+
elif (job.state in ['QUEUED', 'HELD', 'WAITING'] and
263+
job.max_pending_time):
264+
if (time.time() - job.submit_time >= job.max_pending_time):
265+
self.cancel(job)
266+
job._exception = JobError('maximum pending time exceeded')
267+
268+
269+
@register_scheduler('torque')
270+
class TorqueJobScheduler(PbsJobScheduler):
271+
TASKS_OPT = '-l nodes={num_nodes}:ppn={num_cpus_per_node}'

reframe/core/schedulers/torque.py

Lines changed: 0 additions & 132 deletions
This file was deleted.

unittests/test_schedulers.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -455,7 +455,7 @@ def test_guess_num_tasks(minimal_job, scheduler):
455455

456456

457457
def test_submit_max_pending_time(make_job, exec_ctx, scheduler):
458-
if scheduler.registered_name in ('local', 'pbs'):
458+
if scheduler.registered_name in ('local'):
459459
pytest.skip(f"max_pending_time not supported by the "
460460
f"'{scheduler.registered_name}' scheduler")
461461

@@ -467,7 +467,7 @@ def test_submit_max_pending_time(make_job, exec_ctx, scheduler):
467467
def state(self):
468468
if scheduler.registered_name in ('slurm', 'squeue'):
469469
return 'PENDING'
470-
elif scheduler.registered_name == 'torque':
470+
elif scheduler.registered_name in ('pbs', 'torque'):
471471
return 'QUEUED'
472472
else:
473473
# This should not happen

0 commit comments

Comments
 (0)