Skip to content

Commit bb9b232

Browse files
authored
Merge pull request #460 from int-brain-lab/job_queues
Job queues
2 parents e5e9430 + d3fd30f commit bb9b232

File tree

2 files changed

+26
-21
lines changed

2 files changed

+26
-21
lines changed

ibllib/oneibl/data_handlers.py

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -47,10 +47,15 @@ def getData(self, one=None):
4747

4848
one = one or self.one
4949
session_datasets = one.list_datasets(one.path2eid(self.session_path), details=True)
50-
df = pd.DataFrame(columns=one._cache.datasets.columns)
50+
dfs = []
5151
for file in self.signature['input_files']:
52-
df = df.append(filter_datasets(session_datasets, filename=file[0], collection=file[1],
53-
wildcards=True, assert_unique=False))
52+
dfs.append(filter_datasets(session_datasets, filename=file[0], collection=file[1],
53+
wildcards=True, assert_unique=False))
54+
df = pd.concat(dfs)
55+
56+
# Some cases the eid is stored in the index. If so we drop this level
57+
if 'eid' in df.index.names:
58+
df = df.droplevel(level='eid')
5459
return df
5560

5661
def uploadData(self, outputs, version):
@@ -228,7 +233,7 @@ def setUp(self):
228233
:return:
229234
"""
230235
df = super().getData()
231-
self.one._download_datasets(df)
236+
self.one._check_filesystem(df)
232237

233238
def uploadData(self, outputs, version, **kwargs):
234239
"""

ibllib/pipes/local_server.py

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
import ibllib.oneibl.registration as registration
1717

1818
_logger = logging.getLogger('ibllib')
19-
LARGE_TASKS = ['EphysVideoCompress', 'TrainingVideoCompress', 'SpikeSorting', 'EphysDLC'] # 'TrainingDLC',
19+
LARGE_TASKS = ['EphysVideoCompress', 'TrainingVideoCompress', 'SpikeSorting', 'EphysDLC']
2020

2121

2222
def _get_pipeline_class(session_path, one):
@@ -135,39 +135,39 @@ def job_creator(root_path, one=None, dry=False, rerun=False, max_md5_size=None):
135135
return all_datasets
136136

137137

138-
def job_runner(subjects_path, mode='all', lab=None, dry=False, one=None, count=5):
138+
def task_queue(mode='all', lab=None, one=None):
139139
"""
140-
Function to be used as a process to run the jobs as they are created on the database
141-
This will query waiting jobs from the specified Lab
142-
:param subjects_path: on servers: /mnt/s0/Data/Subjects. Contains sessions
143-
:param mode: Whether to run all jobs, or only small or large (video compression, DLC, spike sorting) jobs
144-
:param lab: lab name as per Alyx
145-
:param dry:
146-
:param count:
147-
:return:
140+
Query waiting jobs from the specified Lab
141+
:param mode: Whether to return all waiting tasks, or only small or large (specified in LARGE_TASKS) jobs
142+
:param lab: lab name as per Alyx, otherwise try to infer from local globus install
143+
:param one: ONE instance
144+
-------
145+
148146
"""
149147
if one is None:
150148
one = ONE(cache_rest=None)
151149
if lab is None:
150+
_logger.info("Trying to infer lab from globus installation")
152151
lab = _get_lab(one)
153152
if lab is None:
153+
_logger.error("No lab provided or found")
154154
return # if the lab is none, this will return empty tasks each time
155155
# Filter for tasks
156156
if mode == 'all':
157-
tasks = one.alyx.rest('tasks', 'list', status='Waiting',
158-
django=f'session__lab__name__in,{lab}', no_cache=True)
157+
waiting_tasks = one.alyx.rest('tasks', 'list', status='Waiting',
158+
django=f'session__lab__name__in,{lab}', no_cache=True)
159159
elif mode == 'small':
160160
tasks_all = one.alyx.rest('tasks', 'list', status='Waiting',
161161
django=f'session__lab__name__in,{lab}', no_cache=True)
162-
tasks = [t for t in tasks_all if t['name'] not in LARGE_TASKS]
162+
waiting_tasks = [t for t in tasks_all if t['name'] not in LARGE_TASKS]
163163
elif mode == 'large':
164-
tasks = one.alyx.rest('tasks', 'list', status='Waiting',
165-
django=f'session__lab__name__in,{lab},name__in,{LARGE_TASKS}', no_cache=True)
164+
waiting_tasks = one.alyx.rest('tasks', 'list', status='Waiting',
165+
django=f'session__lab__name__in,{lab},name__in,{LARGE_TASKS}', no_cache=True)
166166

167167
# Order tasks by priority
168-
tasks = sorted(tasks, key=lambda d: d['priority'], reverse=True)
168+
sorted_tasks = sorted(waiting_tasks, key=lambda d: d['priority'], reverse=True)
169169

170-
tasks_runner(subjects_path, tasks, one=one, count=count, time_out=3600, dry=dry)
170+
return sorted_tasks
171171

172172

173173
def tasks_runner(subjects_path, tasks_dict, one=None, dry=False, count=5, time_out=None, **kwargs):

0 commit comments

Comments
 (0)