Skip to content

Commit d9fd5c2

Browse files
committed
Merge branch 'split_run_job' into develop
2 parents 354e904 + 9697b9e commit d9fd5c2

File tree

3 files changed

+23
-6
lines changed

3 files changed

+23
-6
lines changed

ibllib/ephys/ephysqc.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,8 @@ def run(self, update: bool = False, overwrite: bool = True, stream: bool = None,
110110
if self.data.ap_meta:
111111
rms_file = self.probe_path.joinpath("_iblqc_ephysChannels.apRMS.npy")
112112
if rms_file.exists() and not overwrite:
113-
_logger.warning(f'File {rms_file} already exists and overwrite=False. Skipping RMS compute.')
113+
_logger.warning(f'RMS map already exists for .ap data in {self.probe_path}, skipping. '
114+
f'Use overwrite option.')
114115
median_rms = np.load(rms_file)
115116
else:
116117
rl = self.data.ap_meta.fileTimeSecs

ibllib/pipes/ephys_preprocessing.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -69,8 +69,12 @@ def _run(self, overwrite=False):
6969
pids = [p['id'] for p in create_alyx_probe_insertions(self.session_path, one=self.one)]
7070
qc_files = []
7171
for pid in pids:
72-
eqc = ephysqc.EphysQC(pid, session_path=self.session_path, one=self.one)
73-
qc_files.extend(eqc.run(update=True, overwrite=overwrite))
72+
try:
73+
eqc = ephysqc.EphysQC(pid, session_path=self.session_path, one=self.one)
74+
qc_files.extend(eqc.run(update=True, overwrite=overwrite))
75+
except AssertionError:
76+
self.status = -1
77+
continue
7478
return qc_files
7579

7680

ibllib/pipes/local_server.py

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import ibllib.oneibl.registration as registration
1717

1818
_logger = logging.getLogger('ibllib')
19+
LARGE_TASKS = ['EphysVideoCompress', 'TrainingVideoCompress', 'EphysDLC', 'TrainingDLC', 'SpikeSorting']
1920

2021

2122
def _get_pipeline_class(session_path, one):
@@ -134,11 +135,12 @@ def job_creator(root_path, one=None, dry=False, rerun=False, max_md5_size=None):
134135
return all_datasets
135136

136137

137-
def job_runner(subjects_path, lab=None, dry=False, one=None, count=5):
138+
def job_runner(subjects_path, mode='all', lab=None, dry=False, one=None, count=5):
138139
"""
139140
Function to be used as a process to run the jobs as they are created on the database
140141
This will query waiting jobs from the specified Lab
141142
:param subjects_path: on servers: /mnt/s0/Data/Subjects. Contains sessions
143+
:param mode: Whether to run all jobs, or only small or large (video compression, DLC, spike sorting) jobs
142144
:param lab: lab name as per Alyx
143145
:param dry:
144146
:param count:
@@ -150,8 +152,18 @@ def job_runner(subjects_path, lab=None, dry=False, one=None, count=5):
150152
lab = _get_lab(one)
151153
if lab is None:
152154
return # if the lab is none, this will return empty tasks each time
153-
tasks = one.alyx.rest('tasks', 'list', status='Waiting',
154-
django=f'session__lab__name__in,{lab}', no_cache=True)
155+
# Filter for tasks
156+
if mode == 'all':
157+
tasks = one.alyx.rest('tasks', 'list', status='Waiting',
158+
django=f'session__lab__name__in,{lab}', no_cache=True)
159+
elif mode == 'small':
160+
tasks_all = one.alyx.rest('tasks', 'list', status='Waiting',
161+
django=f'session__lab__name__in,{lab}', no_cache=True)
162+
tasks = [t for t in tasks_all if t['name'] not in LARGE_TASKS]
163+
elif mode == 'large':
164+
tasks = one.alyx.rest('tasks', 'list', status='Waiting',
165+
django=f'session__lab__name__in,{lab},name__in,{LARGE_TASKS}', no_cache=True)
166+
155167
tasks_runner(subjects_path, tasks, one=one, count=count, time_out=3600, dry=dry)
156168

157169

0 commit comments

Comments
 (0)