Skip to content

Commit 7e3835d

Browse files
author
Vasileios Karakasis
authored
Merge pull request #1109 from ekouts/bugfix/inf_loop_async
[bugfix] Fix dependency bug in async policy
2 parents 56a1689 + ec7d7b3 commit 7e3835d

File tree

3 files changed

+97
-40
lines changed

3 files changed

+97
-40
lines changed

reframe/frontend/executors/policies.py

Lines changed: 36 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,9 @@ def __init__(self):
168168
self.task_listeners.append(self)
169169

170170
def _remove_from_running(self, task):
171-
getlogger().debug('removing task: %s' % task.check.info())
171+
getlogger().debug(
172+
'removing task from running list: %s' % task.check.info()
173+
)
172174
try:
173175
self._running_tasks.remove(task)
174176
except ValueError:
@@ -210,6 +212,30 @@ def on_task_exit(self, task):
210212
self._remove_from_running(task)
211213
self._completed_tasks.append(task)
212214

215+
def _setup_task(self, task):
216+
if self.deps_succeeded(task):
217+
try:
218+
task.setup(task.testcase.partition,
219+
task.testcase.environ,
220+
sched_flex_alloc_nodes=self.sched_flex_alloc_nodes,
221+
sched_account=self.sched_account,
222+
sched_partition=self.sched_partition,
223+
sched_reservation=self.sched_reservation,
224+
sched_nodelist=self.sched_nodelist,
225+
sched_exclude_nodelist=self.sched_exclude_nodelist,
226+
sched_options=self.sched_options)
227+
except TaskExit:
228+
return False
229+
else:
230+
return True
231+
elif self.deps_failed(task):
232+
exc = TaskDependencyError('dependencies failed')
233+
task.fail((type(exc), exc, None))
234+
return False
235+
else:
236+
# Not all dependencies have finished yet
237+
return False
238+
213239
def runcase(self, case):
214240
super().runcase(case)
215241
check, partition, environ = case
@@ -228,29 +254,17 @@ def runcase(self, case):
228254
)
229255
try:
230256
partname = partition.fullname
231-
if self.deps_failed(task):
232-
exc = TaskDependencyError('dependencies failed')
233-
task.fail((type(exc), exc, None))
234-
return
257+
if not self._setup_task(task):
258+
if not task.failed:
259+
self.printer.status(
260+
'DEP', '%s on %s using %s' %
261+
(check.name, partname, environ.name),
262+
just='right'
263+
)
264+
self._waiting_tasks.append(task)
235265

236-
if not self.deps_succeeded(task):
237-
self.printer.status(
238-
'DEP', '%s on %s using %s' %
239-
(check.name, partname, environ.name),
240-
just='right'
241-
)
242-
self._waiting_tasks.append(task)
243266
return
244267

245-
task.setup(partition, environ,
246-
sched_flex_alloc_nodes=self.sched_flex_alloc_nodes,
247-
sched_account=self.sched_account,
248-
sched_partition=self.sched_partition,
249-
sched_reservation=self.sched_reservation,
250-
sched_nodelist=self.sched_nodelist,
251-
sched_exclude_nodelist=self.sched_exclude_nodelist,
252-
sched_options=self.sched_options)
253-
254268
if self._running_tasks_counts[partname] >= partition.max_jobs:
255269
# Make sure that we still exceeded the job limit
256270
getlogger().debug('reached job limit (%s) for partition %s' %
@@ -296,20 +310,7 @@ def _poll_tasks(self):
296310
def _setup_all(self):
297311
still_waiting = []
298312
for task in self._waiting_tasks:
299-
if self.deps_failed(task):
300-
exc = TaskDependencyError('dependencies failed')
301-
task.fail((type(exc), exc, None))
302-
elif self.deps_succeeded(task):
303-
task.setup(task.testcase.partition,
304-
task.testcase.environ,
305-
sched_flex_alloc_nodes=self.sched_flex_alloc_nodes,
306-
sched_account=self.sched_account,
307-
sched_partition=self.sched_partition,
308-
sched_reservation=self.sched_reservation,
309-
sched_nodelist=self.sched_nodelist,
310-
sched_exclude_nodelist=self.sched_exclude_nodelist,
311-
sched_options=self.sched_options)
312-
else:
313+
if not self._setup_task(task) and not task.failed:
313314
still_waiting.append(task)
314315

315316
self._waiting_tasks[:] = still_waiting

unittests/resources/checks_unlisted/deps_complex.py

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
# |
1212
# +-->t4<--+
1313
# | |
14-
# t5<------t1
14+
# t5<------t1<--t8<--t9
1515
# ^ ^
1616
# | |
1717
# +---t6---+
@@ -150,3 +150,37 @@ def __init__(self):
150150
def prepend_output(self, T2):
151151
with open(os.path.join(T2().stagedir, 'out.txt')) as fp:
152152
self._count += int(fp.read())
153+
154+
155+
@rfm.simple_test
156+
class T8(BaseTest):
157+
def __init__(self):
158+
super().__init__()
159+
self.depends_on('T1')
160+
self.sanity_patterns = sn.assert_eq(self.count, 22)
161+
162+
@rfm.require_deps
163+
def prepend_output(self, T1):
164+
with open(os.path.join(T1().stagedir, 'out.txt')) as fp:
165+
self._count += int(fp.read())
166+
167+
@rfm.run_after('setup')
168+
def fail(self):
169+
# Make this test fail on purpose
170+
raise Exception
171+
172+
173+
@rfm.simple_test
174+
class T9(BaseTest):
175+
# This tests fails because of T8. It is added to make sure that
176+
# all tests are accounted for in the summary.
177+
178+
def __init__(self):
179+
super().__init__()
180+
self.depends_on('T8')
181+
self.sanity_patterns = sn.assert_eq(self.count, 31)
182+
183+
@rfm.require_deps
184+
def prepend_output(self, T8):
185+
with open(os.path.join(T8().stagedir, 'out.txt')) as fp:
186+
self._count += int(fp.read())

unittests/test_policies.py

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,11 @@ def runall(self, checks, sort=False, *args, **kwargs):
5858

5959
self.runner.runall(cases)
6060

61+
def assertRunall(self):
62+
# Make sure that all cases finished or failed
63+
for t in self.runner.stats.tasks():
64+
assert t.succeeded or t.failed
65+
6166
def _num_failures_stage(self, stage):
6267
stats = self.runner.stats
6368
return len([t for t in stats.failures() if t.failed_stage == stage])
@@ -77,6 +82,7 @@ def test_runall(self):
7782

7883
stats = self.runner.stats
7984
self.assertEqual(7, stats.num_cases())
85+
self.assertRunall()
8086
self.assertEqual(4, len(stats.failures()))
8187
self.assertEqual(2, self._num_failures_stage('setup'))
8288
self.assertEqual(1, self._num_failures_stage('sanity'))
@@ -87,6 +93,7 @@ def test_runall_skip_system_check(self):
8793

8894
stats = self.runner.stats
8995
self.assertEqual(8, stats.num_cases())
96+
self.assertRunall()
9097
self.assertEqual(4, len(stats.failures()))
9198
self.assertEqual(2, self._num_failures_stage('setup'))
9299
self.assertEqual(1, self._num_failures_stage('sanity'))
@@ -97,6 +104,7 @@ def test_runall_skip_prgenv_check(self):
97104

98105
stats = self.runner.stats
99106
self.assertEqual(8, stats.num_cases())
107+
self.assertRunall()
100108
self.assertEqual(4, len(stats.failures()))
101109
self.assertEqual(2, self._num_failures_stage('setup'))
102110
self.assertEqual(1, self._num_failures_stage('sanity'))
@@ -108,6 +116,7 @@ def test_runall_skip_sanity_check(self):
108116

109117
stats = self.runner.stats
110118
self.assertEqual(7, stats.num_cases())
119+
self.assertRunall()
111120
self.assertEqual(3, len(stats.failures()))
112121
self.assertEqual(2, self._num_failures_stage('setup'))
113122
self.assertEqual(0, self._num_failures_stage('sanity'))
@@ -119,6 +128,7 @@ def test_runall_skip_performance_check(self):
119128

120129
stats = self.runner.stats
121130
self.assertEqual(7, stats.num_cases())
131+
self.assertRunall()
122132
self.assertEqual(3, len(stats.failures()))
123133
self.assertEqual(2, self._num_failures_stage('setup'))
124134
self.assertEqual(1, self._num_failures_stage('sanity'))
@@ -130,6 +140,7 @@ def test_strict_performance_check(self):
130140

131141
stats = self.runner.stats
132142
self.assertEqual(7, stats.num_cases())
143+
self.assertRunall()
133144
self.assertEqual(5, len(stats.failures()))
134145
self.assertEqual(2, self._num_failures_stage('setup'))
135146
self.assertEqual(1, self._num_failures_stage('sanity'))
@@ -138,6 +149,7 @@ def test_strict_performance_check(self):
138149
def test_force_local_execution(self):
139150
self.runner.policy.force_local = True
140151
self.runall([HelloTest()])
152+
self.assertRunall()
141153
stats = self.runner.stats
142154
for t in stats.tasks():
143155
self.assertTrue(t.check.local)
@@ -165,6 +177,7 @@ def test_retries_bad_check(self):
165177

166178
# Ensure that the test was retried #max_retries times and failed.
167179
self.assertEqual(2, self.runner.stats.num_cases())
180+
self.assertRunall()
168181
self.assertEqual(max_retries, rt.runtime().current_run)
169182
self.assertEqual(2, len(self.runner.stats.failures()))
170183

@@ -179,6 +192,7 @@ def test_retries_good_check(self):
179192

180193
# Ensure that the test passed without retries.
181194
self.assertEqual(1, self.runner.stats.num_cases())
195+
self.assertRunall()
182196
self.assertEqual(0, rt.runtime().current_run)
183197
self.assertEqual(0, len(self.runner.stats.failures()))
184198

@@ -196,6 +210,7 @@ def test_pass_in_retries(self):
196210

197211
# Ensure that the test passed after retries in run #run_to_pass.
198212
self.assertEqual(1, self.runner.stats.num_cases())
213+
self.assertRunall()
199214
self.assertEqual(1, len(self.runner.stats.failures(run=0)))
200215
self.assertEqual(run_to_pass, rt.runtime().current_run)
201216
self.assertEqual(0, len(self.runner.stats.failures()))
@@ -210,13 +225,14 @@ def test_dependencies(self):
210225
self.checks = self.loader.load_all()
211226
self.runall(self.checks, sort=True)
212227

228+
self.assertRunall()
213229
stats = self.runner.stats
214-
assert stats.num_cases(0) == 8
215-
assert len(stats.failures()) == 2
230+
assert stats.num_cases(0) == 10
231+
assert len(stats.failures()) == 4
216232
for tf in stats.failures():
217233
check = tf.testcase.check
218-
exc_type, exc_value, _ = tf.exc_info
219-
if check.name == 'T7':
234+
_, exc_value, _ = tf.exc_info
235+
if check.name == 'T7' or check.name == 'T9':
220236
assert isinstance(exc_value, TaskDependencyError)
221237

222238
# Check that cleanup is executed properly for successful tests as well
@@ -309,6 +325,7 @@ def test_concurrency_unlimited(self):
309325

310326
# Ensure that all tests were run and without failures.
311327
self.assertEqual(len(checks), self.runner.stats.num_cases())
328+
self.assertRunall()
312329
self.assertEqual(0, len(self.runner.stats.failures()))
313330

314331
# Ensure that maximum concurrency was reached as fast as possible
@@ -334,6 +351,7 @@ def test_concurrency_limited(self):
334351

335352
# Ensure that all tests were run and without failures.
336353
self.assertEqual(len(checks), self.runner.stats.num_cases())
354+
self.assertRunall()
337355
self.assertEqual(0, len(self.runner.stats.failures()))
338356

339357
# Ensure that maximum concurrency was reached as fast as possible
@@ -372,6 +390,7 @@ def test_concurrency_none(self):
372390

373391
# Ensure that all tests were run and without failures.
374392
self.assertEqual(len(checks), self.runner.stats.num_cases())
393+
self.assertRunall()
375394
self.assertEqual(0, len(self.runner.stats.failures()))
376395

377396
# Ensure that a single task was running all the time
@@ -391,6 +410,7 @@ def _run_checks(self, checks, max_jobs):
391410
self.assertRaises(KeyboardInterrupt, self.runall, checks)
392411

393412
self.assertEqual(4, self.runner.stats.num_cases())
413+
self.assertRunall()
394414
self.assertEqual(4, len(self.runner.stats.failures()))
395415
self.assert_all_dead()
396416

@@ -427,6 +447,7 @@ def test_poll_fails_main_loop(self):
427447
self.runall(checks)
428448
stats = self.runner.stats
429449
self.assertEqual(num_tasks, stats.num_cases())
450+
self.assertRunall()
430451
self.assertEqual(num_tasks, len(stats.failures()))
431452

432453
def test_poll_fails_busy_loop(self):
@@ -437,6 +458,7 @@ def test_poll_fails_busy_loop(self):
437458
self.runall(checks)
438459
stats = self.runner.stats
439460
self.assertEqual(num_tasks, stats.num_cases())
461+
self.assertRunall()
440462
self.assertEqual(num_tasks, len(stats.failures()))
441463

442464

0 commit comments

Comments
 (0)