Skip to content

Commit d5387d9

Browse files
committed
lock in GPU tasks - local data handler
1 parent 95a0553 commit d5387d9

File tree

4 files changed

+117
-21
lines changed

4 files changed

+117
-21
lines changed

ibllib/oneibl/data_handlers.py

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
import abc
88
from time import time
99

10-
from one.api import ONE
1110
from one.util import filter_datasets
1211
from one.alf.files import add_uuid_string
1312
from iblutil.io.parquet import np2str
@@ -27,10 +26,10 @@ def __init__(self, session_path, signature, one=None):
2726
:param one: ONE instance
2827
"""
2928
self.session_path = session_path
30-
self.one = one or ONE()
3129
self.signature = signature
30+
self.one = one
3231

33-
def setup(self):
32+
def setUp(self):
3433
"""
3534
Function to optionally overload to download required data to run task
3635
:return:
@@ -42,7 +41,8 @@ def getData(self):
4241
Finds the datasets required for task based on input signatures
4342
:return:
4443
"""
45-
44+
if self.one is None:
45+
return
4646
session_datasets = self.one.list_datasets(self.one.path2eid(self.session_path), details=True)
4747
df = pd.DataFrame(columns=self.one._cache.datasets.columns)
4848
for file in self.signature['input_files']:
@@ -72,6 +72,17 @@ def cleanUp(self):
7272
pass
7373

7474

75+
class LocalDataHandler(DataHandler):
76+
def __init__(self, session_path, signatures, one=None):
77+
"""
78+
Data handler for running tasks locally, with no architecture or db connection
79+
:param session_path: path to session
80+
:param signature: input and output file signatures
81+
:param one: ONE instance
82+
"""
83+
super().__init__(session_path, signatures, one=one)
84+
85+
7586
class ServerDataHandler(DataHandler):
7687
def __init__(self, session_path, signatures, one=None):
7788
"""

ibllib/pipes/tasks.py

Lines changed: 67 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -6,29 +6,30 @@
66
import time
77
from _collections import OrderedDict
88
import traceback
9+
import json
910

1011
from graphviz import Digraph
1112

1213
from ibllib.misc import version
13-
import one.params
1414
from ibllib.oneibl import data_handlers
15-
15+
import one.params
16+
from one.api import ONE
1617

1718
_logger = logging.getLogger('ibllib')
1819

1920

2021
class Task(abc.ABC):
21-
log = ""
22-
cpu = 1
23-
gpu = 0
22+
log = "" # place holder to keep the log of the task for registratoin
23+
cpu = 1 # CPU resource
24+
gpu = 0 # GPU resources: as of now, either 0 or 1
2425
io_charge = 5 # integer percentage
2526
priority = 30 # integer percentage, 100 means highest priority
2627
ram = 4 # RAM needed to run (Go)
2728
one = None # one instance (optional)
28-
level = 0
29-
outputs = None
29+
level = 0 # level in the pipeline hierarchy: level 0 means there is no parent task
30+
outputs = None # place holder for a list of Path containing output files
3031
time_elapsed_secs = None
31-
time_out_secs = None
32+
time_out_secs = 3600 * 2 # time-out after which a task is considered dead
3233
version = version.ibllib()
3334
signature = {'input_files': [], 'output_files': []} # list of tuples (filename, collection, required_flag)
3435
force = False # whether or not to re-download missing input files on local server if not present
@@ -69,6 +70,11 @@ def run(self, **kwargs):
6970
wraps the _run() method with
7071
- error management
7172
- logging to variable
73+
- writing a lock file if the GPU is used
74+
- labels the status property of the object. The status value is labeled as:
75+
0: Complete
76+
-1: Errored
77+
-2: Didn't run as a lock was encountered
7278
"""
7379
# if taskid of one properties are not available, local run only without alyx
7480
use_alyx = self.one is not None and self.taskid is not None
@@ -91,17 +97,20 @@ def run(self, **kwargs):
9197
# setup
9298
setup = self.setUp(**kwargs)
9399
_logger.info(f"Setup value is: {setup}")
100+
self.status = 0
94101
if not setup:
95102
# case where outputs are present but don't have input files locally to rerun task
96103
# label task as complete
97-
self.status = 0
98104
_, self.outputs = self.assert_expected_outputs()
99-
100105
else:
101106
# run task
102-
self.status = 0
103107
start_time = time.time()
104108
try:
109+
if self.gpu >= 1:
110+
if not self._creates_lock():
111+
self.status = -2
112+
_logger.info(f"Job {self.__class__} exited as a lock was found")
113+
return
105114
self.outputs = self._run(**kwargs)
106115
_logger.info(f"Job {self.__class__} complete")
107116
except BaseException:
@@ -169,7 +178,6 @@ def setUp(self, **kwargs):
169178
:param kwargs:
170179
:return:
171180
"""
172-
173181
if self.location == 'server':
174182
self.get_signatures(**kwargs)
175183

@@ -196,7 +204,6 @@ def setUp(self, **kwargs):
196204
# TODO in future should raise error if even after downloading don't have the correct files
197205
self.assert_expected_inputs(raise_error=False)
198206
return True
199-
200207
else:
201208
self.data_handler = self.get_data_handler()
202209
self.data_handler.setUp()
@@ -206,9 +213,10 @@ def setUp(self, **kwargs):
206213

207214
def tearDown(self):
208215
"""
209-
Function to optionally overload to check results
216+
Function after runs()
210217
"""
211-
pass
218+
if self.gpu >= 1:
219+
self._lock_file_path().unlink()
212220

213221
def cleanUp(self):
214222
"""
@@ -270,7 +278,9 @@ def get_data_handler(self, location=None):
270278
:return:
271279
"""
272280
location = location or self.location
273-
281+
if location == 'local':
282+
return data_handlers.LocalDataHandler(self.session_path, self.signature, one=self.one)
283+
self.one = self.one or ONE()
274284
if location == 'server':
275285
dhandler = data_handlers.ServerDataHandler(self.session_path, self.signature, one=self.one)
276286
elif location == 'serverglobus':
@@ -281,9 +291,49 @@ def get_data_handler(self, location=None):
281291
dhandler = data_handlers.RemoteAwsDataHandler(self.session_path, self.signature, one=self.one)
282292
elif location == 'SDSC':
283293
dhandler = data_handlers.SDSCDataHandler(self, self.session_path, self.signature, one=self.one)
284-
285294
return dhandler
286295

296+
@staticmethod
297+
def make_lock_file(taskname="", time_out_secs=7200):
298+
"""Creates a GPU lock file with a timeout of"""
299+
d = {'start': time.time(), 'name': taskname, 'time_out_secs': time_out_secs}
300+
with open(Task._lock_file_path(), 'w+') as fid:
301+
json.dump(d, fid)
302+
return d
303+
304+
@staticmethod
305+
def _lock_file_path():
306+
"""the lock file is in ~/.one/gpu.lock"""
307+
folder = Path.home().joinpath('.one')
308+
folder.mkdir(exist_ok=True)
309+
return folder.joinpath('gpu.lock')
310+
311+
def _make_lock_file(self):
312+
"""creates a lock file with the current time"""
313+
return Task.make_lock_file(self.name, self.time_out_secs)
314+
315+
def is_locked(self):
316+
"""Checks if there is a lock file for this given task"""
317+
lock_file = self._lock_file_path()
318+
if not lock_file.exists():
319+
return False
320+
321+
with open(lock_file) as fid:
322+
d = json.load(fid)
323+
now = time.time()
324+
if (now - d['start']) > d['time_out_secs']:
325+
lock_file.unlink()
326+
return False
327+
else:
328+
return True
329+
330+
def _creates_lock(self):
331+
if self.is_locked():
332+
return False
333+
else:
334+
self._make_lock_file()
335+
return True
336+
287337

288338
class Pipeline(abc.ABC):
289339
"""

ibllib/tests/test_tasks.py

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,40 @@ def __init__(self, session_path=None, **kwargs):
105105
self.tasks = tasks
106106

107107

108+
# job to output a single file (pathlib.Path)
109+
class GpuTask(ibllib.pipes.tasks.Task):
110+
gpu = 1
111+
112+
def _run(self, overwrite=False):
113+
out_files = self.session_path.joinpath('alf', 'gpu.times.npy')
114+
out_files.touch()
115+
return out_files
116+
117+
118+
class TestLocks(unittest.TestCase):
119+
120+
def test_gpu_lock_and_local_data_handler(self) -> None:
121+
with tempfile.TemporaryDirectory() as td:
122+
session_path = Path(td).joinpath('algernon', '2021/02/12', '001')
123+
session_path.joinpath('alf').mkdir(parents=True)
124+
task = GpuTask(session_path, one=None, location='local')
125+
assert task.is_locked() is False
126+
task.run()
127+
assert task.status == 0
128+
assert task.is_locked() is False
129+
# then make a lock file and make sure it fails and is still locked afterwards
130+
task._make_lock_file()
131+
task.run()
132+
assert task.status == - 2
133+
assert task.is_locked()
134+
# test the time out feature
135+
task.time_out_secs = - 1
136+
task._make_lock_file()
137+
assert not task.is_locked()
138+
task.run()
139+
assert task.status == 0
140+
141+
108142
class TestPipelineAlyx(unittest.TestCase):
109143

110144
def setUp(self) -> None:

release_notes.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
- Spike sorting and EphysVideoSyncQc download data on local servers if not available
55
- brainbox.io.one load_spike_sorting_fast: bugfix returns acronyms
66
- creates sequence files for spikesorting
7+
- GPU tasks have a lock - local data handler doesn't instanciate one
78

89
## Release Notes 2.2
910
### Release Notes 2.2.1 2021-11-02

0 commit comments

Comments
 (0)