Skip to content

Commit db805c2

Browse files
author
Vasileios Karakasis
authored
Merge pull request #1099 from rsarm/feat/queuing-timeout
[feat] Allow tests to timeout if their associated job is pending for too long
2 parents a32e677 + 99188bf commit db805c2

File tree

6 files changed

+81
-9
lines changed

6 files changed

+81
-9
lines changed

reframe/core/pipeline.py

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -429,8 +429,8 @@ class RegressionTest(metaclass=RegressionTestMeta):
429429
#:
430430
#: :type: integral or :class:`None`
431431
#: :default: :class:`None`
432-
num_tasks_per_core = fields.TypedField('num_tasks_per_core',
433-
int, type(None))
432+
num_tasks_per_core = fields.TypedField('num_tasks_per_core',
433+
int, type(None))
434434

435435
#: Number of tasks per socket required by this test.
436436
#:
@@ -450,6 +450,18 @@ class RegressionTest(metaclass=RegressionTestMeta):
450450
use_multithreading = fields.TypedField('use_multithreading',
451451
bool, type(None))
452452

453+
#: The maximum time a job can be pending before starting running.
454+
#:
455+
#: Time duration is specified as of the :attr:`time_limit` attribute.
456+
#:
457+
#: :type: :class:`str` or :class:`datetime.timedelta``
458+
#: :default: :class:`None
459+
#:
460+
#: .. note::
461+
#: .. versionchanged:: 3.0
462+
#:
463+
max_pending_time = fields.TimerField('max_pending_time', type(None))
464+
453465
#: Specify whether this test needs exclusive access to nodes.
454466
#:
455467
#: :type: boolean
@@ -714,6 +726,7 @@ def _rfm_init(self, name=None, prefix=None):
714726
self.num_tasks_per_socket = None
715727
self.use_multithreading = None
716728
self.exclusive_access = False
729+
self.max_pending_time = None
717730

718731
# True only if check is to be run locally
719732
self.local = False
@@ -996,6 +1009,7 @@ def _setup_job(self, **job_opts):
9961009
launcher_type(),
9971010
name='rfm_%s_job' % self.name,
9981011
workdir=self._stagedir,
1012+
max_pending_time=self.max_pending_time,
9991013
sched_access=self._current_partition.access,
10001014
sched_exclusive_access=self.exclusive_access,
10011015
**job_opts)
@@ -1286,7 +1300,7 @@ def check_performance(self):
12861300

12871301
with os_ext.change_dir(self._stagedir):
12881302
# Check if default reference perf values are provided and
1289-
# store all the variables tested in the performance check
1303+
# store all the variables tested in the performance check
12901304
has_default = False
12911305
variables = set()
12921306
for key, ref in self.reference.items():

reframe/core/schedulers/__init__.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,7 @@ def __init__(self,
169169
script_filename=None,
170170
stdout=None,
171171
stderr=None,
172+
max_pending_time=None,
172173
sched_flex_alloc_nodes=None,
173174
sched_access=[],
174175
sched_account=None,
@@ -201,6 +202,7 @@ def __init__(self,
201202
self._script_filename = script_filename or '%s.sh' % name
202203
self._stdout = stdout or '%s.out' % name
203204
self._stderr = stderr or '%s.err' % name
205+
self._max_pending_time = max_pending_time
204206
self._completion_time = None
205207

206208
# Backend scheduler related information
@@ -228,6 +230,10 @@ def name(self):
228230
def workdir(self):
229231
return self._workdir
230232

233+
@property
234+
def max_pending_time(self):
235+
return self._max_pending_time
236+
231237
@property
232238
def script_filename(self):
233239
return self._script_filename

reframe/core/schedulers/pbs.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,8 @@ def submit(self, job):
122122
if info:
123123
self._pbs_server = info[0]
124124

125+
self._submit_time = datetime.now()
126+
125127
def wait(self, job):
126128
intervals = itertools.cycle(settings().job_poll_intervals)
127129
while not self.finished(job):

reframe/core/schedulers/slurm.py

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,7 @@ def submit(self, job):
195195
'could not retrieve the job id of the submitted job')
196196

197197
job.jobid = int(jobid_match.group('jobid'))
198+
self._submit_time = datetime.now()
198199

199200
def allnodes(self):
200201
try:
@@ -418,7 +419,14 @@ def wait(self, job):
418419

419420
intervals = itertools.cycle(settings().job_poll_intervals)
420421
self._update_state(job)
422+
421423
while not slurm_state_completed(job.state):
424+
if job.max_pending_time and slurm_state_pending(job.state):
425+
if datetime.now() - self._submit_time >= job.max_pending_time:
426+
self.cancel(job)
427+
raise JobError('maximum pending time exceeded',
428+
jobid=job.jobid)
429+
422430
time.sleep(next(intervals))
423431
self._update_state(job)
424432

@@ -443,6 +451,12 @@ def finished(self, job):
443451
getlogger().debug('ignoring error during polling: %s' % e)
444452
return False
445453
else:
454+
if job.max_pending_time and slurm_state_pending(job.state):
455+
if datetime.now() - self._submit_time >= job.max_pending_time:
456+
self.cancel(job)
457+
raise JobError('maximum pending time exceeded',
458+
jobid=job.jobid)
459+
446460
return slurm_state_completed(job.state)
447461

448462
def is_array(self, job):
@@ -473,10 +487,6 @@ def __init__(self):
473487
def completion_time(self, job):
474488
return None
475489

476-
def submit(self, job):
477-
super().submit(job)
478-
self._submit_time = datetime.now()
479-
480490
def _update_state(self, job):
481491
time_from_submit = datetime.now() - self._submit_time
482492
rem_wait = self._squeue_delay - time_from_submit.total_seconds()

reframe/core/schedulers/torque.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
#
1010

1111
import re
12+
from datetime import datetime
1213

1314
import reframe.utility.os_ext as os_ext
1415
from reframe.core.config import settings
@@ -83,4 +84,12 @@ def finished(self, job):
8384
getlogger().debug('ignoring error during polling: %s' % e)
8485
return False
8586
else:
87+
if job.max_pending_time and job.state in ['QUEUED',
88+
'HELD',
89+
'WAITING']:
90+
if datetime.now() - self._submit_time >= job.max_pending_time:
91+
self.cancel(job)
92+
raise JobError('maximum pending time exceeded',
93+
jobid=job.jobid)
94+
8695
return job.state == 'COMPLETED'

unittests/test_schedulers.py

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
import tempfile
1212
import time
1313
import unittest
14-
from datetime import datetime
14+
from datetime import datetime, timedelta
1515

1616
import reframe.core.runtime as rt
1717
import reframe.utility.os_ext as os_ext
@@ -197,6 +197,21 @@ def test_guess_num_tasks(self):
197197
with pytest.raises(NotImplementedError):
198198
self.testjob.guess_num_tasks()
199199

200+
# Monkey patch `self._update_state` to simulate that the job is
201+
# pending on the queue for enough time so it can be canceled due
202+
# to exceeding the maximum pending time
203+
@fixtures.switch_to_user_runtime
204+
def test_submit_max_pending_time(self):
205+
self.setup_user()
206+
self.parallel_cmd = 'sleep 30'
207+
self.prepare()
208+
self.testjob.scheduler._update_state = self._update_state
209+
self.testjob._max_pending_time = timedelta(milliseconds=50)
210+
self.testjob.submit()
211+
with pytest.raises(JobError,
212+
match='maximum pending time exceeded'):
213+
self.testjob.wait()
214+
200215

201216
class TestLocalJob(_TestJob, unittest.TestCase):
202217
def assertProcessDied(self, pid):
@@ -321,6 +336,10 @@ def test_guess_num_tasks(self):
321336
self.testjob.wait()
322337
assert self.testjob.num_tasks == 1
323338

339+
def test_submit_max_pending_time(self):
340+
pytest.skip('the maximum pending time has no effect on the '
341+
'local scheduler')
342+
324343

325344
class TestSlurmJob(_TestJob, unittest.TestCase):
326345
@property
@@ -338,6 +357,9 @@ def sched_configured(self):
338357
def setup_user(self, msg=None):
339358
super().setup_user(msg='SLURM (with sacct) not configured')
340359

360+
def _update_state(self, job):
361+
job.state = 'PENDING'
362+
341363
def test_prepare(self):
342364
self.setup_job()
343365
super().test_prepare()
@@ -529,9 +551,12 @@ def test_prepare_no_cpus(self):
529551
assert self.expected_directives == found_directives
530552

531553
def test_submit_timelimit(self):
532-
# Skip this test for PBS, since we the minimum time limit is 1min
554+
# Skip this test for PBS, since the minimum time limit is 1min
533555
pytest.skip("PBS minimum time limit is 60s")
534556

557+
def test_submit_max_pending_time(self):
558+
pytest.skip('not implemented for the pbs scheduler')
559+
535560

536561
class TestTorqueJob(TestPbsJob):
537562
@property
@@ -561,6 +586,12 @@ def test_submit_timelimit(self):
561586
# Skip this test for PBS, since we the minimum time limit is 1min
562587
pytest.skip("Torque minimum time limit is 60s")
563588

589+
def _update_state(self, job):
590+
job.state = 'QUEUED'
591+
592+
def test_submit_max_pending_time(self):
593+
_TestJob.test_submit_max_pending_time(self)
594+
564595

565596
class TestSlurmFlexibleNodeAllocation(unittest.TestCase):
566597
def create_dummy_nodes(obj):

0 commit comments

Comments
 (0)