Skip to content

Commit 93be6b9

Browse files
committed
Merge branch 'release/2.1.3'
2 parents 91abb56 + 8b2c162 commit 93be6b9

File tree

11 files changed

+115
-48
lines changed

11 files changed

+115
-48
lines changed

ibllib/ephys/ephysqc.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,8 @@ def run(self, update: bool = False, overwrite: bool = True, stream: bool = None,
110110
if self.data.ap_meta:
111111
rms_file = self.probe_path.joinpath("_iblqc_ephysChannels.apRMS.npy")
112112
if rms_file.exists() and not overwrite:
113-
_logger.warning(f'File {rms_file} already exists and overwrite=False. Skipping RMS compute.')
113+
_logger.warning(f'RMS map already exists for .ap data in {self.probe_path}, skipping. '
114+
f'Use overwrite option.')
114115
median_rms = np.load(rms_file)
115116
else:
116117
rl = self.data.ap_meta.fileTimeSecs

ibllib/io/extractors/base.py

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,18 @@ def _get_task_types_json_config():
167167
return task_types
168168

169169

170+
def get_task_protocol(session_path):
171+
try:
172+
settings = load_settings(get_session_path(session_path))
173+
except json.decoder.JSONDecodeError:
174+
_logger.error(f"Can't read settings for {session_path}")
175+
return
176+
if settings:
177+
return settings.get('PYBPOD_PROTOCOL', None)
178+
else:
179+
return
180+
181+
170182
def get_task_extractor_type(task_name):
171183
"""
172184
Returns the task type string from the full pybpod task name:
@@ -176,13 +188,8 @@ def get_task_extractor_type(task_name):
176188
:return: one of ['biased', 'habituation', 'training', 'ephys', 'mock_ephys', 'sync_ephys']
177189
"""
178190
if isinstance(task_name, Path):
179-
try:
180-
settings = load_settings(get_session_path(task_name))
181-
except json.decoder.JSONDecodeError:
182-
return
183-
if settings:
184-
task_name = settings.get('PYBPOD_PROTOCOL', None)
185-
else:
191+
task_name = get_task_protocol(task_name)
192+
if task_name is None:
186193
return
187194
task_types = _get_task_types_json_config()
188195
task_type = next((task_types[tt] for tt in task_types if tt in task_name), None)
@@ -225,7 +232,7 @@ def _get_pipeline_from_task_type(stype):
225232
:param stype: session_type or task extractor type
226233
:return:
227234
"""
228-
if 'ephys' in stype:
235+
if stype in ['ephys_biased_opto', 'ephys', 'ephys_training', 'mock_ephys', 'sync_ephys']:
229236
return 'ephys'
230237
elif stype in ['habituation', 'training', 'biased', 'biased_opto']:
231238
return 'training'

ibllib/io/extractors/ephys_fpga.py

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -544,6 +544,20 @@ def _get_all_probes_sync(session_path, bin_exists=True):
544544
return ephys_files
545545

546546

547+
def get_wheel_positions(sync, chmap):
548+
"""
549+
Gets the wheel position from synchronisation pulses
550+
:param sync:
551+
:param chmap:
552+
:return:wheel: dictionary with keys 'timestamps' and 'position'
553+
moves: dictionary with keys 'intervals' and 'peakAmplitude'
554+
"""
555+
ts, pos = extract_wheel_sync(sync=sync, chmap=chmap)
556+
moves = extract_wheel_moves(ts, pos)
557+
wheel = {'timestamps': ts, 'position': pos}
558+
return wheel, moves
559+
560+
547561
def get_main_probe_sync(session_path, bin_exists=False):
548562
"""
549563
From 3A or 3B multiprobe session, returns the main probe (3A) or nidq sync pulses
@@ -561,7 +575,6 @@ def get_main_probe_sync(session_path, bin_exists=False):
561575
elif version == '3B':
562576
# the sync master is the nidq breakout box
563577
sync_box_ind = np.argmax([1 if ef.get('nidq') else 0 for ef in ephys_files])
564-
565578
sync = ephys_files[sync_box_ind].sync
566579
sync_chmap = ephys_files[sync_box_ind].sync_map
567580
return sync, sync_chmap
@@ -684,16 +697,16 @@ def _extract(self, sync=None, chmap=None, **kwargs):
684697
out.update({k: self.bpod2fpga(bpod_trials[k][ibpod]) for k in bpod_rsync_fields})
685698
out.update({k: fpga_trials[k][ifpga] for k in sorted(fpga_trials.keys())})
686699
# extract the wheel data
700+
wheel, moves = get_wheel_positions(sync=sync, chmap=chmap)
687701
from ibllib.io.extractors.training_wheel import extract_first_movement_times
688-
ts, pos = extract_wheel_sync(sync=sync, chmap=chmap)
689-
moves = extract_wheel_moves(ts, pos)
690702
settings = raw_data_loaders.load_settings(session_path=self.session_path)
691703
min_qt = settings.get('QUIESCENT_PERIOD', None)
692704
first_move_onsets, *_ = extract_first_movement_times(moves, out, min_qt=min_qt)
693705
out.update({'firstMovement_times': first_move_onsets})
694706

695707
assert tuple(filter(lambda x: 'wheel' not in x, self.var_names)) == tuple(out.keys())
696-
return [out[k] for k in out] + [ts, pos, moves['intervals'], moves['peakAmplitude']]
708+
return [out[k] for k in out] + [wheel['timestamps'], wheel['position'],
709+
moves['intervals'], moves['peakAmplitude']]
697710

698711

699712
def extract_all(session_path, save=True, bin_exists=False):

ibllib/io/extractors/extractor_types.json

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
1-
{
2-
"ksocha_ephysOptoStimulation": "ephys_passive_opto",
1+
{"ksocha_ephysOptoStimulation": "ephys_passive_opto",
32
"ksocha_ephysOptoChoiceWorld": "ephys_biased_opto",
43
"passiveChoiceWorld": "ephys_replay",
54
"opto_ephysChoiceWorld": "ephys_biased_opto",

ibllib/io/extractors/opto_trials.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,8 @@ class LaserBool(BaseBpodTrialsExtractor):
1111
"""
1212
Extracts the laser probabilities from the bpod jsonable
1313
"""
14-
save_names = ('_ibl_trials.laser_stimulation.npy', '_ibl_trials.laser_probability.npy')
15-
var_names = ('laser_stimulation', 'laser_probability')
14+
save_names = ('_ibl_trials.laserStimulation.npy', '_ibl_trials.laserProbability.npy')
15+
var_names = ('laserStimulation', 'laserProbability')
1616

1717
def _extract(self, **kwargs):
1818
_logger.info('Extracting laser datasets')
@@ -41,11 +41,11 @@ def _extract(self, **kwargs):
4141

4242
if np.all(np.isnan(lprob)):
4343
# this prevents the file from being saved when no data
44-
self.save_names = ('_ibl_trials.laser_stimulation.npy', None)
44+
self.save_names = ('_ibl_trials.laserStimulation.npy', None)
4545
_logger.warning('No laser probability found in bpod data')
4646
if np.all(np.isnan(lstim)):
4747
# this prevents the file from being saved when no data
48-
self.save_names = (None, '_ibl_trials.laser_probability.npy')
48+
self.save_names = (None, '_ibl_trials.laserProbability.npy')
4949
_logger.warning('No laser stimulation found in bpod data')
5050
return lstim, lprob
5151

ibllib/pipes/ephys_preprocessing.py

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ class RawEphysQC(tasks.Task):
5858
io_charge = 30 # this jobs reads raw ap files
5959
priority = 10 # a lot of jobs depend on this one
6060
level = 0 # this job doesn't depend on anything
61-
input_files = signatures.RAWEPHYSQC
61+
signature = {'input_files': signatures.RAWEPHYSQC, 'output_files': ()}
6262

6363
def _run(self, overwrite=False):
6464
eid = self.one.path2eid(self.session_path)
@@ -69,19 +69,26 @@ def _run(self, overwrite=False):
6969
pids = [p['id'] for p in create_alyx_probe_insertions(self.session_path, one=self.one)]
7070
qc_files = []
7171
for pid in pids:
72-
eqc = ephysqc.EphysQC(pid, session_path=self.session_path, one=self.one)
73-
qc_files.extend(eqc.run(update=True, overwrite=overwrite))
72+
try:
73+
eqc = ephysqc.EphysQC(pid, session_path=self.session_path, one=self.one)
74+
qc_files.extend(eqc.run(update=True, overwrite=overwrite))
75+
except AssertionError:
76+
self.status = -1
77+
continue
7478
return qc_files
7579

7680

7781
class EphysAudio(tasks.Task):
7882
"""
79-
Computes raw electrophysiology QC
83+
Compresses the microphone wav file in a lossless flac file
8084
"""
8185

8286
cpu = 2
8387
priority = 10 # a lot of jobs depend on this one
8488
level = 0 # this job doesn't depend on anything
89+
signature = {'input_files': ('_iblrig_micData.raw.wav', 'raw_behavior_data', True),
90+
'output_files': ('_iblrig_micData.raw.flac', 'raw_behavior_data', True),
91+
}
8592

8693
def _run(self, overwrite=False):
8794
command = "ffmpeg -i {file_in} -y -nostdin -c:a flac -nostats {file_out}"
@@ -106,7 +113,7 @@ class SpikeSorting(tasks.Task):
106113
)
107114
SPIKE_SORTER_NAME = 'pykilosort'
108115
PYKILOSORT_REPO = Path.home().joinpath('Documents/PYTHON/SPIKE_SORTING/pykilosort')
109-
input_files = signatures.SPIKESORTING
116+
signature = {'input_files': signatures.SPIKESORTING, 'output_files': ()}
110117

111118
@staticmethod
112119
def _sample2v(ap_file):
@@ -285,7 +292,7 @@ def _run(self, **kwargs):
285292
class EphysTrials(tasks.Task):
286293
priority = 90
287294
level = 1
288-
input_files = signatures.EPHYSTRIALS
295+
signature = {'input_files': signatures.EPHYSTRIALS, 'output_files': ()}
289296

290297
def _behaviour_criterion(self):
291298
"""
@@ -454,7 +461,7 @@ class EphysPassive(tasks.Task):
454461
cpu = 1
455462
io_charge = 90
456463
level = 1
457-
input_files = signatures.EPHYSPASSIVE
464+
signature = {'input_files': signatures.EPHYSPASSIVE, 'output_files': ()}
458465

459466
def _run(self):
460467
"""returns a list of pathlib.Paths. """

ibllib/pipes/local_server.py

Lines changed: 36 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,28 @@
1010

1111
from one.api import ONE
1212

13-
from ibllib.io.extractors.base import get_session_extractor_type, get_pipeline
14-
from ibllib.pipes import ephys_preprocessing, training_preprocessing, tasks
13+
from ibllib.io.extractors.base import get_pipeline, get_task_protocol, get_session_extractor_type
14+
from ibllib.pipes import tasks, training_preprocessing, ephys_preprocessing
1515
from ibllib.time import date2isostr
1616
import ibllib.oneibl.registration as registration
1717

1818
_logger = logging.getLogger('ibllib')
19+
LARGE_TASKS = ['EphysVideoCompress', 'TrainingVideoCompress', 'EphysDLC', 'TrainingDLC', 'SpikeSorting']
20+
21+
22+
def _get_pipeline_class(session_path, one):
23+
pipeline = get_pipeline(session_path)
24+
if pipeline == 'training':
25+
PipelineClass = training_preprocessing.TrainingExtractionPipeline
26+
elif pipeline == 'ephys':
27+
PipelineClass = ephys_preprocessing.EphysExtractionPipeline
28+
else:
29+
# try and look if there is a custom extractor in the personal projects extraction class
30+
import projects.base
31+
task_type = get_session_extractor_type(session_path)
32+
PipelineClass = projects.base.get_pipeline(task_type)
33+
_logger.info(f"Using {PipelineClass} pipeline for {session_path}")
34+
return PipelineClass(session_path=session_path, one=one)
1935

2036

2137
def _get_lab(one):
@@ -101,16 +117,10 @@ def job_creator(root_path, one=None, dry=False, rerun=False, max_md5_size=None):
101117
session_path, one=one, max_md5_size=max_md5_size)
102118
if dsets is not None:
103119
all_datasets.extend(dsets)
104-
pipeline = get_pipeline(session_path)
105-
if pipeline == 'training':
106-
pipe = training_preprocessing.TrainingExtractionPipeline(session_path, one=one)
107-
# only start extracting ephys on a raw_session.flag
108-
elif pipeline == 'ephys' and flag_file.name == 'raw_session.flag':
109-
pipe = ephys_preprocessing.EphysExtractionPipeline(session_path, one=one)
110-
else:
111-
_logger.info(f'Session type {get_session_extractor_type(session_path)}'
112-
f'as no matching pipeline pattern {session_path}')
113-
continue
120+
pipe = _get_pipeline_class(session_path, one)
121+
if pipe is None:
122+
task_protocol = get_task_protocol(session_path)
123+
_logger.info(f'Session task protocol {task_protocol} has no matching pipeline pattern {session_path}')
114124
if rerun:
115125
rerun__status__in = '__all__'
116126
else:
@@ -125,11 +135,12 @@ def job_creator(root_path, one=None, dry=False, rerun=False, max_md5_size=None):
125135
return all_datasets
126136

127137

128-
def job_runner(subjects_path, lab=None, dry=False, one=None, count=5):
138+
def job_runner(subjects_path, mode='all', lab=None, dry=False, one=None, count=5):
129139
"""
130140
Function to be used as a process to run the jobs as they are created on the database
131141
This will query waiting jobs from the specified Lab
132142
:param subjects_path: on servers: /mnt/s0/Data/Subjects. Contains sessions
143+
:param mode: Whether to run all jobs, or only small or large (video compression, DLC, spike sorting) jobs
133144
:param lab: lab name as per Alyx
134145
:param dry:
135146
:param count:
@@ -141,8 +152,18 @@ def job_runner(subjects_path, lab=None, dry=False, one=None, count=5):
141152
lab = _get_lab(one)
142153
if lab is None:
143154
return # if the lab is none, this will return empty tasks each time
144-
tasks = one.alyx.rest('tasks', 'list', status='Waiting',
145-
django=f'session__lab__name__in,{lab}')
155+
# Filter for tasks
156+
if mode == 'all':
157+
tasks = one.alyx.rest('tasks', 'list', status='Waiting',
158+
django=f'session__lab__name__in,{lab}', no_cache=True)
159+
elif mode == 'small':
160+
tasks_all = one.alyx.rest('tasks', 'list', status='Waiting',
161+
django=f'session__lab__name__in,{lab}', no_cache=True)
162+
tasks = [t for t in tasks_all if t['name'] not in LARGE_TASKS]
163+
elif mode == 'large':
164+
tasks = one.alyx.rest('tasks', 'list', status='Waiting',
165+
django=f'session__lab__name__in,{lab},name__in,{LARGE_TASKS}', no_cache=True)
166+
146167
tasks_runner(subjects_path, tasks, one=one, count=count, time_out=3600, dry=dry)
147168

148169

ibllib/pipes/tasks.py

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ class Task(abc.ABC):
3838
time_out_secs = None
3939
version = version.ibllib()
4040
log = ''
41-
input_files = None
41+
signature = {'input_files': (), 'output_files': ()} # tuple (filename, collection, required_flag)
4242

4343
def __init__(self, session_path, parents=None, taskid=None, one=None,
4444
machine=None, clobber=True, aws=None, location='server'):
@@ -254,12 +254,9 @@ def _getData(self):
254254
:return:
255255
"""
256256
assert self.one
257-
258-
# This will be improved by Olivier new filters
259-
session_datasets = self.one.list_datasets(self.one.path2eid(self.session_path),
260-
details=True)
257+
session_datasets = self.one.list_datasets(self.one.path2eid(self.session_path), details=True)
261258
df = pd.DataFrame(columns=self.one._cache.datasets.columns)
262-
for file in self.input_files:
259+
for file in self.signature['input_files']:
263260
df = df.append(filter_datasets(session_datasets, filename=file[0], collection=file[1],
264261
wildcards=True, assert_unique=False))
265262
return df
@@ -278,6 +275,24 @@ def _cleanUp_SDSC(self):
278275
assert SDSC_PATCH_PATH.parts[0:4] == self.session_path.parts[0:4]
279276
shutil.rmtree(self.session_path)
280277

278+
def assert_expected_outputs(self):
279+
"""
280+
After a run, asserts that all signature files are present at least once in the output files
281+
Mainly useful for integration tests
282+
:return:
283+
"""
284+
assert self.status == 0
285+
everthing_is_fine = True
286+
for expected_file in self.signature['output_files']:
287+
actual_files = list(self.session_path.rglob(str(Path(expected_file[1]).joinpath(expected_file[0]))))
288+
if len(actual_files) == 0:
289+
everthing_is_fine = False
290+
_logger.error(f"Signature file expected {expected_file} not found in the output")
291+
if not everthing_is_fine:
292+
for out in self.outputs:
293+
_logger.error(f"{out}")
294+
raise FileNotFoundError("Missing outputs after task completion")
295+
281296

282297
class Pipeline(abc.ABC):
283298
"""

ibllib/tests/test_pipes.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ def test_task_to_pipeline(self):
3232
("mock_ephys", "ephys"),
3333
("sync_ephys", "ephys"),
3434
("ephys", "ephys"),
35+
("ephys_passive_opto", "ephys_passive_opto")
3536
]
3637
for typ, exp in pipe_out:
3738
assert ibllib.io.extractors.base._get_pipeline_from_task_type(typ) == exp

release_notes.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,9 @@
99
### Release Notes 2.1.2 2021-10-14
1010
- Fix issue with RawEphysQC that was not looking in local Subjects folder for data
1111
- Fix ensure_required_data in DlcQc
12+
### Release Notes 2.1.3 2021-10-19
13+
- Split jobs.py run function in two, one running large tasks (video compression, dlc, spike sorting), one the rest
14+
- Ensure RawEphysQC runs both probes if one fails
1215

1316
## Release Notes 2.0
1417
### Release Notes 2.0.1 2021-08-07

0 commit comments

Comments
 (0)