Skip to content

Commit 97b2859

Browse files
author
Vasileios Karakasis
committed
Merge branch 'master' into bugfix/treat-conflicts-module-mappings
2 parents 1dc9226 + 4aa8ac9 commit 97b2859

File tree

10 files changed

+248
-187
lines changed

10 files changed

+248
-187
lines changed

docs/config_reference.rst

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -695,12 +695,8 @@ All logging handlers share the following set of common attributes:
695695
If specific formatting is desired, the ``check_job_completion_time`` should be used instead.
696696
- ``%(check_name)s``: The name of the regression test on behalf of which ReFrame is currently executing.
697697
If ReFrame is not executing in the context of a regression test, ``reframe`` will be printed instead.
698-
- ``%(check_num_tasks)s``: The number of tasks assigned to the regression test.
699-
- ``%(check_outputdir)s``: The output directory associated with the currently executing test.
700698
- ``%(check_partition)s``: The system partition where this test is currently executing.
701-
- ``%(check_stagedir)s``: The stage directory associated with the currently executing test.
702699
- ``%(check_system)s``: The system where this test is currently executing.
703-
- ``%(check_tags)s``: The tags associated with this test.
704700
- ``%(check_perf_lower_thres)s``: The lower threshold of the performance difference from the reference value expressed as a fractional value.
705701
See the :attr:`reframe.core.pipeline.RegressionTest.reference` attribute of regression tests for more details.
706702
- ``%(check_perf_ref)s``: The reference performance value of a certain performance variable.
@@ -709,11 +705,20 @@ All logging handlers share the following set of common attributes:
709705
See the :attr:`reframe.core.pipeline.RegressionTest.reference` attribute of regression tests for more details.
710706
- ``%(check_perf_value)s``: The performance value obtained for a certain performance variable.
711707
- ``%(check_perf_var)s``: The name of the `performance variable <tutorial_basic.html#writing-a-performance-test>`__ being logged.
708+
- ``%(check_ATTR)s``: This will log the value of the attribute ``ATTR`` of the currently executing regression test.
709+
Mappings will be logged as ``k1=v1,k2=v2,..`` and all other iterables, except strings, will be logged as comma-separated lists.
710+
If ``ATTR`` is not an attribute of the test, ``%(check_ATTR)s`` will be logged as ``<undefined>``.
711+
This allows users to log arbitrary attributes of their tests.
712+
For the complete list of test attributes, please refer to :doc:`regression_test_api`.
712713
- ``%(osuser)s``: The name of the OS user running ReFrame.
713714
- ``%(osgroup)s``: The name of the OS group running ReFrame.
714715
- ``%(version)s``: The ReFrame version.
715716

716717

718+
.. versionadded:: 3.3
719+
The ability to log arbitrary test attributes was added.
720+
721+
717722
.. js:attribute:: .logging[].handlers[].datefmt
718723

719724
.. object:: .logging[].handlers_perflog[].datefmt

reframe/core/backends.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,7 @@
1919
_scheduler_backend_modules = [
2020
'reframe.core.schedulers.local',
2121
'reframe.core.schedulers.slurm',
22-
'reframe.core.schedulers.pbs',
23-
'reframe.core.schedulers.torque'
22+
'reframe.core.schedulers.pbs'
2423
]
2524
_schedulers = {}
2625

reframe/core/logging.py

Lines changed: 79 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
import logging.handlers
1010
import numbers
1111
import os
12-
import pprint
1312
import re
1413
import shutil
1514
import sys
@@ -145,7 +144,7 @@ def emit(self, record):
145144
except OSError as e:
146145
raise LoggingError('logging failed') from e
147146

148-
self.baseFilename = os.path.join(dirname, record.check_name + '.log')
147+
self.baseFilename = os.path.join(dirname, record.check.name + '.log')
149148
self.stream = self._streams.get(self.baseFilename, None)
150149
super().emit(record)
151150
self._streams[self.baseFilename] = self.stream
@@ -165,7 +164,65 @@ def _format_time_rfc3339(timestamp, datefmt):
165164
return re.sub(r'(%)?\:z', tz_rfc3339, time.strftime(datefmt, timestamp))
166165

167166

168-
class RFC3339Formatter(logging.Formatter):
167+
def _xfmt(val):
168+
from reframe.core.deferrable import _DeferredExpression
169+
170+
if val is None:
171+
return '<undefined>'
172+
173+
if isinstance(val, _DeferredExpression):
174+
try:
175+
return val.evaluate()
176+
except BaseException:
177+
return '<error>'
178+
179+
if isinstance(val, str):
180+
return val
181+
182+
if isinstance(val, collections.abc.Mapping):
183+
return ','.join(f'{k}={v}' for k, v in val.items())
184+
185+
if isinstance(val, collections.abc.Iterable):
186+
return ','.join(val)
187+
188+
return val
189+
190+
191+
class CheckFieldFormatter(logging.Formatter):
192+
'''Log formatter that dynamically looks up format specifiers inside a
193+
regression test.'''
194+
195+
def __init__(self, fmt=None, datefmt=None, style='%'):
196+
super().__init__(fmt, datefmt, style)
197+
198+
# NOTE: This will work only when style='%'
199+
self.__extras = {
200+
spec: None for spec in re.findall(r'\%\((check_\S+?)\)s', fmt)
201+
}
202+
203+
# Set the default value for 'check_name'
204+
if 'check_name' in self.__extras:
205+
self.__extras['check_name'] = 'reframe'
206+
207+
def format(self, record):
208+
# Fill in the check-specific record attributes
209+
if record.check:
210+
for spec in self.__extras:
211+
if hasattr(record, spec):
212+
# Attribute set elsewhere
213+
continue
214+
215+
attr = spec.split('_', maxsplit=1)[1]
216+
val = getattr(record.check, attr, None)
217+
record.__dict__[spec] = _xfmt(val)
218+
else:
219+
# Update record with the dynamic extras even if check is not set
220+
record.__dict__.update(self.__extras)
221+
222+
return super().format(record)
223+
224+
225+
class RFC3339Formatter(CheckFieldFormatter):
169226
def formatTime(self, record, datefmt=None):
170227
datefmt = datefmt or self.default_time_format
171228
if '%:z' not in datefmt:
@@ -176,7 +233,7 @@ def formatTime(self, record, datefmt=None):
176233

177234
def format(self, record):
178235
datefmt = self.datefmt or self.default_time_format
179-
if record.check_job_completion_time_unix is not None:
236+
if record.check_job_completion_time_unix != _xfmt(None):
180237
ct = self.converter(record.check_job_completion_time_unix)
181238
record.check_job_completion_time = _format_time_rfc3339(
182239
ct, datefmt
@@ -388,26 +445,24 @@ def __init__(self, logger=None, check=None):
388445
super().__init__(
389446
logger,
390447
{
391-
'check_name': 'reframe',
392-
'check_jobid': '-1',
393-
'check_job_completion_time': None,
394-
'check_job_completion_time_unix': None,
448+
# Here we only set the format specifiers that do not
449+
# correspond directly to check attributes
450+
'check': check,
451+
'check_jobid': _xfmt(None),
452+
'check_job_completion_time': _xfmt(None),
453+
'check_job_completion_time_unix': _xfmt(None),
395454
'check_info': 'reframe',
396-
'check_system': None,
397-
'check_partition': None,
398-
'check_environ': None,
399-
'check_outputdir': None,
400-
'check_stagedir': None,
401-
'check_num_tasks': None,
402-
'check_perf_var': None,
403-
'check_perf_value': None,
404-
'check_perf_ref': None,
405-
'check_perf_lower_thres': None,
406-
'check_perf_upper_thres': None,
407-
'check_perf_unit': None,
408-
'osuser': osext.osuser() or '<unknown>',
409-
'osgroup': osext.osgroup() or '<unknown>',
410-
'check_tags': None,
455+
'check_system': _xfmt(None),
456+
'check_partition': _xfmt(None),
457+
'check_environ': _xfmt(None),
458+
'check_perf_var': _xfmt(None),
459+
'check_perf_value': _xfmt(None),
460+
'check_perf_ref': _xfmt(None),
461+
'check_perf_lower_thres': _xfmt(None),
462+
'check_perf_upper_thres': _xfmt(None),
463+
'check_perf_unit': _xfmt(None),
464+
'osuser': _xfmt(osext.osuser()),
465+
'osgroup': _xfmt(osext.osgroup()),
411466
'version': osext.reframe_version(),
412467
}
413468
)
@@ -428,15 +483,11 @@ def std_stream_handlers(self):
428483

429484
def _update_check_extras(self):
430485
'''Return a dictionary with all the check-specific information.'''
486+
431487
if self.check is None:
432488
return
433489

434-
self.extra['check_name'] = self.check.name
435490
self.extra['check_info'] = self.check.info()
436-
self.extra['check_outputdir'] = self.check.outputdir
437-
self.extra['check_stagedir'] = self.check.stagedir
438-
self.extra['check_num_tasks'] = self.check.num_tasks
439-
self.extra['check_tags'] = ','.join(self.check.tags)
440491
if self.check.current_system:
441492
self.extra['check_system'] = self.check.current_system.name
442493

reframe/core/pipeline.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,17 @@ def disable_hook(cls, hook_name):
148148
'''
149149
cls._rfm_disabled_hooks.add(hook_name)
150150

151+
@classmethod
152+
def pipeline_hooks(cls):
153+
ret = {}
154+
for c in cls.mro():
155+
if hasattr(c, '_rfm_pipeline_hooks'):
156+
for kind, hook in c._rfm_pipeline_hooks.items():
157+
ret.setdefault(kind, [])
158+
ret[kind] += hook
159+
160+
return ret
161+
151162
#: The name of the test.
152163
#:
153164
#: :type: string that can contain any character except ``/``

reframe/core/schedulers/pbs.py

Lines changed: 108 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
import reframe.utility.osext as osext
2121
from reframe.core.backends import register_scheduler
2222
from reframe.core.config import settings
23-
from reframe.core.exceptions import JobSchedulerError
23+
from reframe.core.exceptions import (JobError, JobSchedulerError)
2424
from reframe.utility import seconds_to_hms
2525

2626

@@ -40,6 +40,18 @@
4040
_run_strict = functools.partial(osext.run_command, check=True)
4141

4242

43+
JOB_STATES = {
44+
'Q': 'QUEUED',
45+
'H': 'HELD',
46+
'R': 'RUNNING',
47+
'E': 'EXITING',
48+
'T': 'MOVED',
49+
'W': 'WAITING',
50+
'S': 'SUSPENDED',
51+
'C': 'COMPLETED',
52+
}
53+
54+
4355
class _PbsJob(sched.Job):
4456
def __init__(self, *args, **kwargs):
4557
super().__init__(*args, **kwargs)
@@ -156,24 +168,104 @@ def finished(self, job):
156168

157169
return job.completed
158170

159-
def _poll_job(self, job):
160-
if job is None:
171+
def _update_nodelist(self, job, nodespec):
172+
if job.nodelist is not None:
161173
return
162174

163-
with osext.change_dir(job.workdir):
164-
output_ready = (os.path.exists(job.stdout) and
165-
os.path.exists(job.stderr))
175+
job._nodelist = [x.split('/')[0] for x in nodespec.split('+')]
176+
job._nodelist.sort()
166177

167-
done = job.cancelled or output_ready
168-
if done:
169-
t_now = time.time()
170-
if job.completion_time is None:
171-
job._completion_time = t_now
178+
def poll(self, *jobs):
179+
if jobs:
180+
# Filter out non-jobs
181+
jobs = [job for job in jobs if job is not None]
172182

173-
time_from_finish = t_now - job.completion_time
174-
if time_from_finish > PBS_OUTPUT_WRITEBACK_WAIT:
175-
job._completed = True
183+
if not jobs:
184+
return
185+
186+
completed = osext.run_command(
187+
f'qstat -f {" ".join(job.jobid for job in jobs)}'
188+
)
189+
190+
# Depending on the configuration, completed jobs will remain on the job
191+
# list for a limited time, or be removed upon completion.
192+
# If qstat cannot find any of the job IDs, it will return 153.
193+
# Otherwise, it will return with return code 0 and print information
194+
# only for the jobs it could find.
195+
if completed.returncode == 153:
196+
self.log('Return code is 153: jobids not known by scheduler, '
197+
'assuming all jobs completed')
198+
for job in jobs:
199+
job._state = 'COMPLETED'
200+
201+
return
202+
203+
if completed.returncode != 0:
204+
raise JobSchedulerError(
205+
f'qstat failed with exit code {completed.returncode} '
206+
f'(standard error follows):\n{completed.stderr}'
207+
)
208+
209+
# Store information for each job separately
210+
jobinfo = {}
211+
for job_raw_info in completed.stdout.split('\n\n'):
212+
jobid_match = re.search(
213+
r'^Job Id:\s*(?P<jobid>\S+)', job_raw_info, re.MULTILINE
214+
)
215+
if jobid_match:
216+
jobid = jobid_match.group('jobid')
217+
jobinfo[jobid] = job_raw_info
176218

177-
def poll(self, *jobs):
178219
for job in jobs:
179-
self._poll_job(job)
220+
if job.jobid not in jobinfo:
221+
self.log(f'Job {job.jobid} not known to scheduler, '
222+
f'assuming job completed')
223+
job._state = 'COMPLETED'
224+
job._completed = True
225+
continue
226+
227+
info = jobinfo[job.jobid]
228+
state_match = re.search(
229+
r'^\s*job_state = (?P<state>[A-Z])', info, re.MULTILINE
230+
)
231+
if not state_match:
232+
self.log(f'Job state not found (job info follows):\n{info}')
233+
continue
234+
235+
state = state_match.group('state')
236+
job._state = JOB_STATES[state]
237+
nodelist_match = re.search(
238+
r'exec_host = (?P<nodespec>[\S\t\n]+)',
239+
info, re.MULTILINE
240+
)
241+
if nodelist_match:
242+
nodespec = nodelist_match.group('nodespec')
243+
nodespec = re.sub(r'[\n\t]*', '', nodespec)
244+
self._update_nodelist(job, nodespec)
245+
246+
if job.state == 'COMPLETED':
247+
exitcode_match = re.search(
248+
r'^\s*exit_status = (?P<code>\d+)',
249+
info, re.MULTILINE,
250+
)
251+
if exitcode_match:
252+
job._exitcode = int(exitcode_match.group('code'))
253+
254+
# We report a job as finished only when its stdout/stderr are
255+
# written back to the working directory
256+
stdout = os.path.join(job.workdir, job.stdout)
257+
stderr = os.path.join(job.workdir, job.stderr)
258+
out_ready = os.path.exists(stdout) and os.path.exists(stderr)
259+
done = job.cancelled or out_ready
260+
if done:
261+
job._completed = True
262+
elif (job.state in ['QUEUED', 'HELD', 'WAITING'] and
263+
job.max_pending_time):
264+
if (time.time() - job.submit_time >= job.max_pending_time):
265+
self.cancel(job)
266+
job._exception = JobError('maximum pending time exceeded')
267+
268+
269+
@register_scheduler('torque')
270+
class TorqueJobScheduler(PbsJobScheduler):
271+
TASKS_OPT = '-l nodes={num_nodes}:ppn={num_cpus_per_node}'

0 commit comments

Comments
 (0)