Skip to content

Commit 442dd4a

Browse files
authored
Merge pull request #407 from int-brain-lab/task_statuses
Task statuses
2 parents 32d4732 + 39ddcb1 commit 442dd4a

File tree

3 files changed

+88
-40
lines changed

3 files changed

+88
-40
lines changed

ibllib/pipes/tasks.py

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -72,9 +72,10 @@ def run(self, **kwargs):
7272
- logging to variable
7373
- writing a lock file if the GPU is used
7474
- labels the status property of the object. The status value is labeled as:
75-
0: Complete
75+
0: Complete
7676
-1: Errored
7777
-2: Didn't run as a lock was encountered
78+
-3: Incomplete
7879
"""
7980
# if taskid of one properties are not available, local run only without alyx
8081
use_alyx = self.one is not None and self.taskid is not None
@@ -110,7 +111,11 @@ def run(self, **kwargs):
110111
if not self._creates_lock():
111112
self.status = -2
112113
_logger.info(f"Job {self.__class__} exited as a lock was found")
113-
return
114+
new_log = log_capture_string.getvalue()
115+
self.log = new_log if self.clobber else self.log + new_log
116+
log_capture_string.close()
117+
_logger.removeHandler(ch)
118+
return self.status
114119
self.outputs = self._run(**kwargs)
115120
_logger.info(f"Job {self.__class__} complete")
116121
except BaseException:
@@ -215,6 +220,7 @@ def setUp(self, **kwargs):
215220
def tearDown(self):
216221
"""
217222
Function after runs()
223+
Does not run if a lock is encountered by the task (status -2)
218224
"""
219225
if self.gpu >= 1:
220226
self._lock_file_path().unlink()
@@ -459,7 +465,7 @@ def rerun_failed(self, **kwargs):
459465
return self.run(status__in=['Waiting', 'Held', 'Started', 'Errored', 'Empty'], **kwargs)
460466

461467
def rerun(self, **kwargs):
462-
return self.run(status__in=['Waiting', 'Held', 'Started', 'Errored', 'Empty', 'Complete'],
468+
return self.run(status__in=['Waiting', 'Held', 'Started', 'Errored', 'Empty', 'Complete', 'Incomplete'],
463469
**kwargs)
464470

465471
@property
@@ -527,6 +533,12 @@ def run_alyx_task(tdict=None, session_path=None, one=None, job_deck=None,
527533
# overwrite status to errored
528534
if status == -1:
529535
patch_data['status'] = 'Errored'
536+
# Status -2 means a lock was encountered during run, should be rerun
537+
if status == -2:
538+
patch_data['status'] = 'Waiting'
539+
# Status -3 should be returned if a task is Incomplete
540+
if status == -3:
541+
patch_data['status'] = 'Incomplete'
530542
# update task status on Alyx
531543
t = one.alyx.rest('tasks', 'partial_update', id=tdict['id'], data=patch_data)
532544
task.cleanUp()

ibllib/tests/test_tasks.py

Lines changed: 69 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,9 @@
2424
'Task01_void': 'Empty',
2525
'Task02_error': 'Errored',
2626
'Task10': 'Complete',
27-
'Task11': 'Held'
27+
'Task11': 'Held',
28+
'TaskIncomplete': 'Incomplete',
29+
'TaskGpuLock': 'Waiting'
2830
}
2931

3032
desired_datasets = ['spikes.times.npy', 'spikes.amps.npy', 'spikes.clusters.npy']
@@ -37,7 +39,9 @@
3739
'Task01_void': 2,
3840
'Task02_error': 2,
3941
'Task10': 1,
40-
'Task11': None
42+
'Task11': None,
43+
'TaskIncomplete': 1,
44+
'TaskGpuLock': 2
4145
}
4246

4347

@@ -89,6 +93,27 @@ def _run(self, overwrite=False):
8993
return out_files
9094

9195

96+
# Job that encounters a GPU lock and is set to Waiting
97+
class TaskGpuLock(ibllib.pipes.tasks.Task):
98+
gpu = 1
99+
100+
# Overwrite setUp to create a lock file before running the task and remove it after
101+
def setUp(self):
102+
self.make_lock_file()
103+
self.data_handler = self.get_data_handler()
104+
return True
105+
106+
def _run(self, overwrite=False):
107+
pass
108+
109+
110+
# Job that encounters a GPU lock and is set to Waiting
111+
class TaskIncomplete(ibllib.pipes.tasks.Task):
112+
113+
def _run(self, overwrite=False):
114+
self.status = -3
115+
116+
92117
class SomePipeline(ibllib.pipes.tasks.Pipeline):
93118

94119
def __init__(self, session_path=None, **kwargs):
@@ -99,46 +124,14 @@ def __init__(self, session_path=None, **kwargs):
99124
tasks['Task00'] = Task00(self.session_path)
100125
tasks['Task01_void'] = Task01_void(self.session_path)
101126
tasks['Task02_error'] = Task02_error(self.session_path)
127+
tasks['TaskGpuLock'] = TaskGpuLock(self.session_path)
128+
tasks['TaskIncomplete'] = TaskIncomplete(self.session_path)
102129
tasks['Task10'] = Task10(self.session_path, parents=[tasks['Task00']])
103130
tasks['Task11'] = Task11(self.session_path, parents=[tasks['Task02_error'],
104131
tasks['Task00']])
105132
self.tasks = tasks
106133

107134

108-
# job to output a single file (pathlib.Path)
109-
class GpuTask(ibllib.pipes.tasks.Task):
110-
gpu = 1
111-
112-
def _run(self, overwrite=False):
113-
out_files = self.session_path.joinpath('alf', 'gpu.times.npy')
114-
out_files.touch()
115-
return out_files
116-
117-
118-
class TestLocks(unittest.TestCase):
119-
120-
def test_gpu_lock_and_local_data_handler(self) -> None:
121-
with tempfile.TemporaryDirectory() as td:
122-
session_path = Path(td).joinpath('algernon', '2021/02/12', '001')
123-
session_path.joinpath('alf').mkdir(parents=True)
124-
task = GpuTask(session_path, one=None, location='local')
125-
assert task.is_locked() is False
126-
task.run()
127-
assert task.status == 0
128-
assert task.is_locked() is False
129-
# then make a lock file and make sure it fails and is still locked afterwards
130-
task._make_lock_file()
131-
task.run()
132-
assert task.status == - 2
133-
assert task.is_locked()
134-
# test the time out feature
135-
task.time_out_secs = - 1
136-
task._make_lock_file()
137-
assert not task.is_locked()
138-
task.run()
139-
assert task.status == 0
140-
141-
142135
class TestPipelineAlyx(unittest.TestCase):
143136

144137
def setUp(self) -> None:
@@ -213,7 +206,7 @@ def test_pipeline_alyx(self):
213206
self.assertTrue(all(check_logs))
214207
self.assertTrue(all(check_rerun))
215208

216-
# Rerun without clobber and check that logs are overwritten
209+
# Rerun without clobber and check that logs are not overwritten
217210
task_deck, dsets = pipeline.rerun_failed(machine='testmachine', clobber=False)
218211
check_logs = [t['log'].count(desired_logs) == desired_logs_rerun[t['name']] if t['log']
219212
else t['log'] == desired_logs_rerun[t['name']] for t in task_deck]
@@ -222,6 +215,45 @@ def test_pipeline_alyx(self):
222215
self.assertTrue(all(check_logs))
223216
self.assertTrue(all(check_rerun))
224217

218+
# Remove the lock file
219+
Path.home().joinpath('.one', 'gpu.lock').unlink()
220+
221+
222+
class GpuTask(ibllib.pipes.tasks.Task):
223+
gpu = 1
224+
225+
def _run(self, overwrite=False):
226+
out_files = self.session_path.joinpath('alf', 'gpu.times.npy')
227+
out_files.touch()
228+
return out_files
229+
230+
231+
class TestLocks(unittest.TestCase):
232+
233+
def test_gpu_lock_and_local_data_handler(self) -> None:
234+
# Remove any existing locks first
235+
if Path.home().joinpath('.one', 'gpu.lock').exists():
236+
Path.home().joinpath('.one', 'gpu.lock').unlink()
237+
with tempfile.TemporaryDirectory() as td:
238+
session_path = Path(td).joinpath('algernon', '2021/02/12', '001')
239+
session_path.joinpath('alf').mkdir(parents=True)
240+
task = GpuTask(session_path, one=None, location='local')
241+
assert task.is_locked() is False
242+
task.run()
243+
assert task.status == 0
244+
assert task.is_locked() is False
245+
# then make a lock file and make sure it fails and is still locked afterwards
246+
task._make_lock_file()
247+
task.run()
248+
assert task.status == - 2
249+
assert task.is_locked()
250+
# test the time out feature
251+
task.time_out_secs = - 1
252+
task._make_lock_file()
253+
assert not task.is_locked()
254+
task.run()
255+
assert task.status == 0
256+
225257

226258
if __name__ == "__main__":
227259
unittest.main(exit=False, verbosity=2)

release_notes.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1+
### Develop
2+
- Setting tasks to Waiting if they encountered lock (status -2)
3+
- Setting tasks to Incomplete if they return status -3
4+
15
## Release Notes 2.3
26
### Release Notes 2.3.1 2021-11-08
37
- Trial wheel extraction: use alternative sync method when first on fails

0 commit comments

Comments
 (0)