Skip to content

Commit a88e933

Browse files
author
Vasileios Karakasis
authored
Merge pull request #1959 from giordano/mg/sge-scheduler
[feat] Add SGE scheduler backend
2 parents 4642104 + 2bcc6f6 commit a88e933

File tree

6 files changed

+215
-5
lines changed

6 files changed

+215
-5
lines changed

docs/config_reference.rst

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -212,12 +212,58 @@ System Partition Configuration
212212
- ``local``: Jobs will be launched locally without using any job scheduler.
213213
- ``pbs``: Jobs will be launched using the `PBS Pro <https://en.wikipedia.org/wiki/Portable_Batch_System>`__ scheduler.
214214
- ``torque``: Jobs will be launched using the `Torque <https://en.wikipedia.org/wiki/TORQUE>`__ scheduler.
215+
- ``sge``: Jobs will be launched using the `Sun Grid Engine <https://arc.liv.ac.uk/SGE/htmlman/manuals.html>`__ scheduler.
215216
- ``slurm``: Jobs will be launched using the `Slurm <https://www.schedmd.com/>`__ scheduler.
216217
This backend requires job accounting to be enabled in the target system.
217218
If not, you should consider using the ``squeue`` backend below.
218219
- ``squeue``: Jobs will be launched using the `Slurm <https://www.schedmd.com/>`__ scheduler.
219220
This backend does not rely on job accounting to retrieve job statuses, but ReFrame does its best to query the job state as reliably as possible.
220221

222+
.. versionadded:: 3.7.2
223+
Support for the SGE scheduler is added.
224+
225+
.. note::
226+
227+
The way that multiple node jobs are submitted using the SGE scheduler can be very site-specific.
228+
For this reason, the ``sge`` scheduler backend does not try to interpret any related arguments, e.g., ``num_tasks``, ``num_tasks_per_node`` etc.
229+
Users must specify how these resources are to be requested by setting the :js:attr:`resources` partition configuration parameter and then request them from inside a test using the :py:attr:`~reframe.core.pipeline.RegressionTest.extra_resources` test attribute.
230+
Here is an example configuration for a system partition named ``foo`` that defines different ways for submitting MPI-only, OpenMP-only and MPI+OpenMP jobs:
231+
232+
.. code-block:: python
233+
234+
{
235+
'name': 'foo',
236+
'scheduler': 'sge',
237+
'resources': [
238+
{
239+
'name': 'smp',
240+
'options': ['-pe smp {num_slots}']
241+
},
242+
{
243+
'name': 'mpi',
244+
'options': ['-pe mpi {num_slots}']
245+
},
246+
{
247+
'name': 'mpismp',
248+
'options': ['-pe mpismp {num_slots}']
249+
}
250+
]
251+
}
252+
253+
Each test then can request the different type of slots as follows:
254+
255+
.. code-block:: python
256+
257+
self.extra_resouces = {
258+
'smp': {'num_slots': self.num_cpus_per_task},
259+
'mpi': {'num_slots': self.num_tasks},
260+
'mpismp': {'num_slots': self.num_tasks*self.num_cpus_per_task}
261+
}
262+
263+
Notice that defining :py:attr:`~reframe.core.pipeline.RegressionTest.extra_resources` does not make the test non-portable to other systems that have different schedulers;
264+
the :py:attr:`extra_resources` will be simply ignored in this case and the scheduler backend will interpret the different test fields in the appropriate way.
265+
266+
221267
.. js:attribute:: .systems[].partitions[].launcher
222268

223269
:required: Yes

reframe/core/backends.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,8 @@
1919
_scheduler_backend_modules = [
2020
'reframe.core.schedulers.local',
2121
'reframe.core.schedulers.slurm',
22-
'reframe.core.schedulers.pbs'
22+
'reframe.core.schedulers.pbs',
23+
'reframe.core.schedulers.sge'
2324
]
2425
_schedulers = {}
2526

reframe/core/schedulers/registry.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,4 +38,5 @@ def getscheduler(name):
3838
import reframe.core.schedulers.local # noqa: F401, F403
3939
import reframe.core.schedulers.slurm # noqa: F401, F403
4040
import reframe.core.schedulers.pbs # noqa: F401, F403
41+
import reframe.core.schedulers.sge # noqa: F401, F403
4142
import reframe.core.schedulers.torque # noqa: F401, F403

reframe/core/schedulers/sge.py

Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
# Copyright 2016-2021 Swiss National Supercomputing Centre (CSCS/ETH Zurich)
2+
# ReFrame Project Developers. See the top-level LICENSE file for details.
3+
#
4+
# SPDX-License-Identifier: BSD-3-Clause
5+
6+
#
7+
# SGE backend
8+
#
9+
# - Initial version submitted by Mosè Giordano, UCL (based on the PBS backend)
10+
#
11+
12+
import functools
13+
import re
14+
import time
15+
import xml.etree.ElementTree as ET
16+
17+
import reframe.core.runtime as rt
18+
import reframe.utility.osext as osext
19+
from reframe.core.backends import register_scheduler
20+
from reframe.core.exceptions import JobSchedulerError
21+
from reframe.core.schedulers.pbs import PbsJobScheduler
22+
from reframe.utility import seconds_to_hms
23+
24+
_run_strict = functools.partial(osext.run_command, check=True)
25+
26+
27+
@register_scheduler('sge')
28+
class SgeJobScheduler(PbsJobScheduler):
29+
def __init__(self):
30+
self._prefix = '#$'
31+
self._submit_timeout = rt.runtime().get_option(
32+
f'schedulers/@{self.registered_name}/job_submit_timeout'
33+
)
34+
35+
def emit_preamble(self, job):
36+
preamble = [
37+
self._format_option(f'-N "{job.name}"'),
38+
self._format_option(f'-o {job.stdout}'),
39+
self._format_option(f'-e {job.stderr}'),
40+
self._format_option(f'-wd {job.workdir}')
41+
]
42+
43+
if job.time_limit is not None:
44+
h, m, s = seconds_to_hms(job.time_limit)
45+
preamble.append(
46+
self._format_option(f'-l h_rt=%d:%d:%d' % (h, m, s))
47+
)
48+
49+
# Emit the rest of the options
50+
options = job.options + job.cli_options
51+
for opt in options:
52+
if opt.startswith('#'):
53+
preamble.append(opt)
54+
else:
55+
preamble.append(self._format_option(opt))
56+
57+
return preamble
58+
59+
def submit(self, job):
60+
# `-o` and `-e` options are only recognized in command line by the PBS,
61+
# SGE, and Slurm wrappers.
62+
cmd = f'qsub -o {job.stdout} -e {job.stderr} {job.script_filename}'
63+
completed = _run_strict(cmd, timeout=self._submit_timeout)
64+
jobid_match = re.search(r'^Your job (?P<jobid>\S+)', completed.stdout)
65+
if not jobid_match:
66+
raise JobSchedulerError('could not retrieve the job id '
67+
'of the submitted job')
68+
69+
job._jobid = jobid_match.group('jobid')
70+
job._submit_time = time.time()
71+
72+
def poll(self, *jobs):
73+
if jobs:
74+
# Filter out non-jobs
75+
jobs = [job for job in jobs if job is not None]
76+
77+
if not jobs:
78+
return
79+
80+
user = osext.osuser()
81+
completed = osext.run_command(f'qstat -xml -u {user}')
82+
if completed.returncode != 0:
83+
raise JobSchedulerError(
84+
f'qstat failed with exit code {completed.returncode} '
85+
f'(standard error follows):\n{completed.stderr}'
86+
)
87+
88+
# Index the jobs to poll on their jobid
89+
jobs_to_poll = {job.jobid: job for job in jobs}
90+
91+
# Parse the XML
92+
root = ET.fromstring(completed.stdout)
93+
94+
# We are iterating over the returned XML and update the status of the
95+
# jobs relevant to ReFrame; the naming convention of variables matches
96+
# that of SGE's XML output
97+
98+
known_jobs = set() # jobs known to the SGE scheduler
99+
for queue_info in root:
100+
# Reads the XML and prints jobs with status belonging to user.
101+
if queue_info is None:
102+
raise JobSchedulerError('could not retrieve queue information')
103+
104+
for job_list in queue_info:
105+
if job_list.find("JB_owner").text != user:
106+
# Not a job of this user.
107+
continue
108+
109+
jobid = job_list.find("JB_job_number").text
110+
if jobid not in jobs_to_poll:
111+
# Not a reframe job
112+
continue
113+
114+
state = job_list.find("state").text
115+
job = jobs_to_poll[jobid]
116+
known_jobs.add(job)
117+
118+
# For the list of known statuses see `man 5 sge_status`
119+
# (https://arc.liv.ac.uk/SGE/htmlman/htmlman5/sge_status.html)
120+
if state in ['r', 'hr', 't', 'Rr', 'Rt']:
121+
job._state = 'RUNNING'
122+
elif state in ['qw', 'Rq', 'hqw', 'hRwq']:
123+
job._state = 'PENDING'
124+
elif state in ['s', 'ts', 'S', 'tS', 'T', 'tT', 'Rs',
125+
'Rts', 'RS', 'RtS', 'RT', 'RtT']:
126+
job._state = 'SUSPENDED'
127+
elif state in ['Eqw', 'Ehqw', 'EhRqw']:
128+
job._state = 'ERROR'
129+
elif state in ['dr', 'dt', 'dRr', 'dRt', 'ds',
130+
'dS', 'dT', 'dRs', 'dRS', 'dRT']:
131+
job._state = 'DELETING'
132+
elif state == 'z':
133+
job._state = 'COMPLETED'
134+
135+
# Mark any "unknown" job as completed
136+
unknown_jobs = set(jobs) - known_jobs
137+
for job in unknown_jobs:
138+
self.log(f'Job {job.jobid} not known to scheduler, '
139+
f'assuming job completed')
140+
job._state = 'COMPLETED'
141+
142+
def finished(self, job):
143+
if job.exception:
144+
raise job.exception
145+
146+
return job.state == 'COMPLETED'

reframe/schemas/config.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -248,7 +248,7 @@
248248
"type": "string",
249249
"enum": [
250250
"local", "pbs", "slurm",
251-
"squeue", "torque"
251+
"sge", "squeue", "torque"
252252
]
253253
},
254254
"launcher": {
@@ -370,7 +370,7 @@
370370
"properties": {
371371
"name": {
372372
"type": "string",
373-
"enum": ["local", "pbs", "slurm", "squeue", "torque"]
373+
"enum": ["local", "pbs", "sge", "slurm", "squeue", "torque"]
374374
},
375375
"ignore_reqnodenotavail": {"type": "boolean"},
376376
"resubmit_on_errors": {

unittests/test_schedulers.py

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ def launcher():
2626
return getlauncher('local')
2727

2828

29-
@pytest.fixture(params=['slurm', 'squeue', 'local', 'pbs', 'torque'])
29+
@pytest.fixture(params=['sge', 'slurm', 'squeue', 'local', 'pbs', 'torque'])
3030
def scheduler(request):
3131
return getscheduler(request.param)
3232

@@ -132,6 +132,22 @@ def assert_job_script_sanity(job):
132132
'echo postrun'] == matches
133133

134134

135+
def _expected_sge_directives(job):
136+
num_nodes = job.num_tasks // job.num_tasks_per_node
137+
num_cpus_per_node = job.num_cpus_per_task * job.num_tasks_per_node
138+
return set([
139+
f'#$ -N "testjob"',
140+
f'#$ -l h_rt=0:5:0',
141+
f'#$ -o {job.stdout}',
142+
f'#$ -e {job.stderr}',
143+
f'#$ -wd {job.workdir}',
144+
f'#$ --gres=gpu:4',
145+
f'#$ --account=spam',
146+
f'#DW jobdw capacity=100GB',
147+
f'#DW stage_in source=/foo'
148+
])
149+
150+
135151
def _expected_slurm_directives(job):
136152
return set([
137153
'#SBATCH --job-name="testjob"',
@@ -205,7 +221,7 @@ def test_prepare(fake_job):
205221

206222
prepare_job(fake_job)
207223
with open(fake_job.script_filename) as fp:
208-
found_directives = set(re.findall(r'^\#\w+ .*', fp.read(),
224+
found_directives = set(re.findall(r'^\#\S+ .*', fp.read(),
209225
re.MULTILINE))
210226

211227
expected_directives = globals()[f'_expected_{sched_name}_directives']

0 commit comments

Comments
 (0)