Skip to content

Commit 75f2f54

Browse files
authored
Merge branch 'master' into enhancement/valid_sysenv-logging
2 parents cb5fb5b + f51058c commit 75f2f54

File tree

13 files changed

+99
-112
lines changed

13 files changed

+99
-112
lines changed

reframe/core/schedulers/__init__.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -625,7 +625,7 @@ def wait(self):
625625
raise JobNotStartedError('cannot wait an unstarted job')
626626

627627
self.scheduler.wait(self)
628-
self._completion_time = self._completion_time or time.time()
628+
self.finished()
629629

630630
def cancel(self):
631631
if self.jobid is None:
@@ -640,6 +640,10 @@ def finished(self):
640640
done = self.scheduler.finished(self)
641641
if done:
642642
self._completion_time = self._completion_time or time.time()
643+
if self._exception:
644+
exc = self._exception
645+
self._exception = None
646+
raise exc
643647

644648
return done
645649

reframe/core/schedulers/flux.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,4 @@ def wait(self, job):
150150
time.sleep(next(intervals))
151151

152152
def finished(self, job):
153-
if job.exception:
154-
raise job.exception
155-
156153
return job.completed

reframe/core/schedulers/local.py

Lines changed: 44 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
#
44
# SPDX-License-Identifier: BSD-3-Clause
55

6+
import contextlib
67
import errno
78
import os
89
import signal
@@ -67,6 +68,7 @@ def submit(self, job):
6768
stderr=f_stderr,
6869
start_new_session=True
6970
)
71+
self.log(f'spawned local process: {proc.pid}')
7072

7173
# Update job info
7274
job._jobid = proc.pid
@@ -92,10 +94,11 @@ def filternodes(self, job, nodes):
9294
return [sched.AlwaysIdleNode(socket.gethostname())]
9395

9496
def _kill_all(self, job):
95-
'''Send SIGKILL to all the processes of the spawned job.'''
97+
'''Send SIGKILL to all the processes of the spawned job and wait for
98+
any children to finish'''
9699
try:
100+
self.log(f'sending SIGKILL to process group {job._jobid}')
97101
os.killpg(job._jobid, signal.SIGKILL)
98-
job._signal = signal.SIGKILL
99102
except (ProcessLookupError, PermissionError):
100103
# The process group may already be dead or assigned to a different
101104
# group, so ignore this error
@@ -104,19 +107,19 @@ def _kill_all(self, job):
104107
# Close file handles
105108
job.f_stdout.close()
106109
job.f_stderr.close()
107-
job._state = 'FAILURE'
110+
with contextlib.suppress(ChildProcessError):
111+
os.waitpid(0, 0)
108112

109113
def _term_all(self, job):
110114
'''Send SIGTERM to all the processes of the spawned job.'''
111115
try:
116+
self.log(f'sending SIGTERM to process group {job._jobid}')
112117
os.killpg(job._jobid, signal.SIGTERM)
113-
job._signal = signal.SIGTERM
114118
except (ProcessLookupError, PermissionError):
115119
# Job has finished already, close file handles
116120
self.log(f'pid {job.jobid} already dead')
117121
job.f_stdout.close()
118122
job.f_stderr.close()
119-
job._state = 'FAILURE'
120123

121124
def cancel(self, job):
122125
'''Cancel job.
@@ -126,6 +129,7 @@ def cancel(self, job):
126129
127130
This function waits for the spawned process tree to finish.
128131
'''
132+
self.log(f'cancelling job {job._jobid}')
129133
self._term_all(job)
130134
job._cancel_time = time.time()
131135

@@ -150,10 +154,7 @@ def finished(self, job):
150154
the process has finished, you *must* call wait() to properly cleanup
151155
after it.
152156
'''
153-
if job.exception:
154-
raise job.exception
155-
156-
return job.state in ['SUCCESS', 'FAILURE', 'TIMEOUT']
157+
return job.exitcode is not None or job.signal is not None
157158

158159
def poll(self, *jobs):
159160
for job in jobs:
@@ -173,37 +174,45 @@ def _poll_job(self, job):
173174
else:
174175
raise e
175176

176-
if job.cancel_time:
177-
# Job has been cancelled; give it a grace period and kill it
178-
self.log(f'Job {job.jobid} has been cancelled; '
179-
f'giving it a grace period')
180-
t_rem = self.CANCEL_GRACE_PERIOD - (time.time() - job.cancel_time)
181-
if t_rem > 0:
182-
time.sleep(t_rem)
177+
if pid:
178+
# Job has finished
179+
self.log(f'spawned process {job._jobid} has finished')
183180

181+
# Forcefully kill the whole session once the parent process exits
184182
self._kill_all(job)
185-
return
186183

187-
if not pid:
188-
# Job has not finished; check if we have reached a timeout
189-
t_elapsed = time.time() - job.submit_time
190-
if job.time_limit and t_elapsed > job.time_limit:
191-
self._kill_all(job)
184+
# Call wait() in the underlying Popen object to avoid false
185+
# positive warnings
186+
job._proc.wait()
187+
188+
# Retrieve the status of the job and return
189+
if os.WIFEXITED(status):
190+
job._exitcode = os.WEXITSTATUS(status)
191+
if job._state == 'RUNNING':
192+
job._state = 'FAILURE' if job._exitcode != 0 else 'SUCCESS'
193+
elif os.WIFSIGNALED(status):
194+
if job._state == 'RUNNING':
195+
job._state = 'FAILURE'
196+
197+
job._signal = os.WTERMSIG(status)
198+
self.log(f'job killed by signal: {job._signal}')
199+
200+
self.log(f'job state: {job._state}')
201+
else:
202+
# Job has not finished; check for timeouts
203+
now = time.time()
204+
t_elapsed = now - job.submit_time
205+
if job.cancel_time:
206+
t_rem = self.CANCEL_GRACE_PERIOD - (now - job.cancel_time)
207+
self.log(f'job {job.jobid} has been cancelled; '
208+
f'giving it a grace period of {t_rem} seconds')
209+
if t_rem <= 0:
210+
self._kill_all(job)
211+
elif job.time_limit and t_elapsed > job.time_limit:
212+
self.log(f'job {job._jobid} timed out; cancelling it')
213+
self.cancel(job)
192214
job._state = 'TIMEOUT'
193215
job._exception = JobError(
194216
f'job timed out ({t_elapsed:.6f}s > {job.time_limit}s)',
195217
job.jobid
196218
)
197-
198-
return
199-
200-
# Job has finished; kill the whole session
201-
self._kill_all(job)
202-
203-
# Retrieve the status of the job and return
204-
if os.WIFEXITED(status):
205-
job._exitcode = os.WEXITSTATUS(status)
206-
job._state = 'FAILURE' if job.exitcode != 0 else 'SUCCESS'
207-
elif os.WIFSIGNALED(status):
208-
job._state = 'FAILURE'
209-
job._signal = os.WTERMSIG(status)

reframe/core/schedulers/lsf.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,4 @@ def poll(self, *jobs):
147147
job._completed = True
148148

149149
def finished(self, job):
150-
if job.exception:
151-
raise job.exception
152-
153150
return job.state == 'COMPLETED'

reframe/core/schedulers/pbs.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -190,9 +190,6 @@ def cancel(self, job):
190190
job._cancelled = True
191191

192192
def finished(self, job):
193-
if job.exception:
194-
raise job.exception
195-
196193
return job.completed
197194

198195
def _update_nodelist(self, job, nodespec):

reframe/core/schedulers/sge.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,4 @@ def poll(self, *jobs):
137137
job._state = 'COMPLETED'
138138

139139
def finished(self, job):
140-
if job.exception:
141-
raise job.exception
142-
143140
return job.state == 'COMPLETED'

reframe/core/schedulers/slurm.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -628,9 +628,6 @@ def cancel(self, job):
628628
job._is_cancelling = True
629629

630630
def finished(self, job):
631-
if job.exception:
632-
raise job.exception
633-
634631
return slurm_state_completed(job.state)
635632

636633

reframe/core/schedulers/ssh.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -174,9 +174,6 @@ def cancel(self, job):
174174
step.cancel()
175175

176176
def finished(self, job):
177-
if job.exception:
178-
raise job.exception
179-
180177
return job.state is not None
181178

182179
def poll(self, *jobs):

reframe/frontend/cli.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1028,7 +1028,7 @@ def restrict_logging():
10281028
sys.exit(0)
10291029

10301030
if options.list_stored_testcases:
1031-
namepatt = '|'.join(options.names)
1031+
namepatt = '|'.join(n.replace('%', ' %') for n in options.names)
10321032
with exit_gracefully_on_error('failed to retrieve test case data',
10331033
printer):
10341034
filt = options.filter_expr[-1] if options.filter_expr else None
@@ -1050,7 +1050,7 @@ def restrict_logging():
10501050
if options.describe_stored_testcases:
10511051
# Restore logging level
10521052
printer.setLevel(logging.INFO)
1053-
namepatt = '|'.join(options.names)
1053+
namepatt = '|'.join(n.replace('%', ' %') for n in options.names)
10541054
with exit_gracefully_on_error('failed to retrieve test case data',
10551055
printer):
10561056
filt = options.filter_expr[-1] if options.filter_expr else None
@@ -1068,7 +1068,7 @@ def restrict_logging():
10681068
sys.exit(0)
10691069

10701070
if options.performance_compare:
1071-
namepatt = '|'.join(options.names)
1071+
namepatt = '|'.join(n.replace('%', ' %') for n in options.names)
10721072
with exit_gracefully_on_error('failed to generate performance report',
10731073
printer):
10741074
filt = [None, None]

reframe/frontend/executors/__init__.py

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -215,9 +215,6 @@ def __init__(self, case, listeners=None, timeout=None):
215215
# if it is zero
216216
self.ref_count = case.num_dependents
217217

218-
# Test case has finished, but has not been waited for yet
219-
self.zombie = False
220-
221218
# Timestamps for the start and finish phases of the pipeline
222219
self._timestamps = {}
223220

@@ -445,7 +442,6 @@ def run(self):
445442
def run_complete(self):
446443
done = self._safe_call(self.check.run_complete)
447444
if done:
448-
self.zombie = True
449445
self._notify_listeners('on_task_exit')
450446

451447
return done
@@ -461,7 +457,6 @@ def compile_complete(self):
461457
@logging.time_function
462458
def run_wait(self):
463459
self._safe_call(self.check.run_wait)
464-
self.zombie = False
465460

466461
@logging.time_function
467462
def sanity(self):
@@ -501,6 +496,15 @@ def cleanup(self, *args, **kwargs):
501496
self._safe_call(self.check.cleanup, *args, **kwargs)
502497

503498
def fail(self, exc_info=None, callback='on_task_failure'):
499+
def _wait_job(job):
500+
if job:
501+
with contextlib.suppress(JobNotStartedError):
502+
job.wait()
503+
504+
# Make sure to properly wait/reap any spawned job in case of failures
505+
_wait_job(self.check.build_job)
506+
_wait_job(self.check.job)
507+
504508
self._failed_stage = self._current_stage
505509
self._exc_info = exc_info or sys.exc_info()
506510
self._notify_listeners(callback)
@@ -527,8 +531,9 @@ def abort(self, cause=None):
527531
exc.__cause__ = cause
528532
self._aborted = True
529533
try:
530-
if not self.zombie and self.check.job:
534+
if self.check.job:
531535
self.check.job.cancel()
536+
self.check.job.wait()
532537
except JobNotStartedError:
533538
self.fail((type(exc), exc, None), 'on_task_abort')
534539
except BaseException:

0 commit comments

Comments
 (0)