Skip to content

Commit 9f7335f

Browse files
authored
Merge pull request #410 from int-brain-lab/updateHeld
Update held dependent tasks on complete
2 parents 86eeaf3 + e574041 commit 9f7335f

File tree

3 files changed

+33
-13
lines changed

3 files changed

+33
-13
lines changed

ibllib/pipes/tasks.py

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ def run(self, **kwargs):
118118
return self.status
119119
self.outputs = self._run(**kwargs)
120120
_logger.info(f"Job {self.__class__} complete")
121-
except BaseException:
121+
except Exception:
122122
_logger.error(traceback.format_exc())
123123
_logger.info(f"Job {self.__class__} errored")
124124
self.status = -1
@@ -410,7 +410,7 @@ def create_alyx_tasks(self, rerun__status__in=None):
410410
tasks_alyx = []
411411
# creates all the tasks by iterating through the ordered dict
412412
for k, t in self.tasks.items():
413-
# get the parents alyx ids to reference in the database
413+
# get the parents' alyx ids to reference in the database
414414
if len(t.parents):
415415
pnames = [p.name for p in t.parents]
416416
parents_ids = [ta['id'] for ta in tasks_alyx if ta['name'] in pnames]
@@ -492,10 +492,10 @@ def run_alyx_task(tdict=None, session_path=None, one=None, job_deck=None,
492492
:return:
493493
"""
494494
registered_dsets = []
495+
# here we need to check parents' status, get the job_deck if not available
496+
if not job_deck:
497+
job_deck = one.alyx.rest('tasks', 'list', session=tdict['session'], no_cache=True)
495498
if len(tdict['parents']):
496-
# here we need to check parents status, get the job_deck if not available
497-
if not job_deck:
498-
job_deck = one.alyx.rest('tasks', 'list', session=tdict['session'], no_cache=True)
499499
# check the dependencies
500500
parent_tasks = filter(lambda x: x['id'] in tdict['parents'], job_deck)
501501
parent_statuses = [j['status'] for j in parent_tasks]
@@ -526,7 +526,7 @@ def run_alyx_task(tdict=None, session_path=None, one=None, job_deck=None,
526526
else:
527527
try:
528528
registered_dsets = task.register_datasets(one=one, max_md5_size=max_md5_size)
529-
except BaseException:
529+
except Exception:
530530
_logger.error(traceback.format_exc())
531531
patch_data['status'] = 'Errored'
532532
patch_data['status'] = 'Complete'
@@ -541,5 +541,15 @@ def run_alyx_task(tdict=None, session_path=None, one=None, job_deck=None,
541541
patch_data['status'] = 'Incomplete'
542542
# update task status on Alyx
543543
t = one.alyx.rest('tasks', 'partial_update', id=tdict['id'], data=patch_data)
544+
# check for dependent held tasks
545+
# NB: Assumes dependent tasks are all part of the same session!
546+
next(x for x in job_deck if x['id'] == t['id'])['status'] = t['status'] # Update status in job deck
547+
dependent_tasks = filter(lambda x: t['id'] in x['parents'] and x['status'] == 'Held', job_deck)
548+
for d in dependent_tasks:
549+
assert d['id'] != t['id'], 'task its own parent'
550+
# if all their parent tasks now complete, set to waiting
551+
parent_status = [next(x['status'] for x in job_deck if x['id'] == y) for y in d['parents']]
552+
if all(x == 'Complete' for x in parent_status):
553+
one.alyx.rest('tasks', 'partial_update', id=d['id'], data={'status': 'Waiting'})
544554
task.cleanUp()
545555
return t, registered_dsets

ibllib/tests/test_tasks.py

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -37,9 +37,9 @@
3737
desired_logs_rerun = {
3838
'Task00': 1,
3939
'Task01_void': 2,
40-
'Task02_error': 2,
40+
'Task02_error': 1,
4141
'Task10': 1,
42-
'Task11': None,
42+
'Task11': 1,
4343
'TaskIncomplete': 1,
4444
'TaskGpuLock': 2
4545
}
@@ -63,12 +63,17 @@ def _run(self, overwrite=False):
6363
return out_files
6464

6565

66-
# job that raises an error
66+
# job that raises an error on first run
6767
class Task02_error(ibllib.pipes.tasks.Task):
68-
level = 0
68+
run_count = 0
6969

7070
def _run(self, overwrite=False):
71-
raise Exception("Something dumb happened")
71+
Task02_error.run_count += 1
72+
if Task02_error.run_count == 1:
73+
raise Exception('Something dumb happened')
74+
out_files = self.session_path.joinpath('alf', 'spikes.templates.npy')
75+
out_files.touch()
76+
return out_files
7277

7378

7479
# job that outputs a list of files
@@ -127,6 +132,7 @@ def __init__(self, session_path=None, **kwargs):
127132
tasks['TaskGpuLock'] = TaskGpuLock(self.session_path)
128133
tasks['TaskIncomplete'] = TaskIncomplete(self.session_path)
129134
tasks['Task10'] = Task10(self.session_path, parents=[tasks['Task00']])
135+
# When both its parents Complete, this task should be set to Waiting and should finally complete
130136
tasks['Task11'] = Task11(self.session_path, parents=[tasks['Task02_error'],
131137
tasks['Task00']])
132138
self.tasks = tasks
@@ -197,8 +203,11 @@ def test_pipeline_alyx(self):
197203

198204
# test the rerun option
199205
task_deck, dsets = pipeline.rerun_failed(machine='testmachine')
200-
check_statuses = [desired_statuses[t['name']] == t['status'] for t in task_deck]
201-
self.assertTrue(all(check_statuses))
206+
task_02 = next(t for t in task_deck if t['name'] == 'Task02_error')
207+
self.assertEqual('Complete', task_02['status'])
208+
dep_task = next(x for x in task_deck if task_02['id'] in x['parents'])
209+
assert dep_task['name'] == 'Task11'
210+
self.assertEqual('Complete', dep_task['status'], 'Failed to set dependent task from "Held" to "Waiting"')
202211

203212
# check that logs were correctly overwritten
204213
check_logs = [t['log'].count(desired_logs) == 1 if t['log'] else True for t in task_deck]

release_notes.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
### Develop
22
- Setting tasks to Waiting if they encountered lock (status -2)
33
- Setting tasks to Incomplete if they return status -3
4+
- Completed tasks set held dependent tasks to waiting
45

56
## Release Notes 2.3
67
### Release Notes 2.3.1 2021-11-08

0 commit comments

Comments
 (0)