|
10 | 10 |
|
11 | 11 | from one.api import ONE |
12 | 12 |
|
13 | | -from ibllib.io.extractors.base import get_session_extractor_type, get_pipeline |
14 | | -from ibllib.pipes import ephys_preprocessing, training_preprocessing, tasks |
| 13 | +from ibllib.io.extractors.base import get_pipeline, get_task_protocol, get_session_extractor_type |
| 14 | +from ibllib.pipes import tasks, training_preprocessing, ephys_preprocessing |
15 | 15 | from ibllib.time import date2isostr |
16 | 16 | import ibllib.oneibl.registration as registration |
17 | 17 |
|
18 | 18 | _logger = logging.getLogger('ibllib') |
19 | 19 |
|
20 | 20 |
|
| 21 | +def _get_pipeline_class(session_path, one): |
| 22 | + pipeline = get_pipeline(session_path) |
| 23 | + if pipeline == 'training': |
| 24 | + PipelineClass = training_preprocessing.TrainingExtractionPipeline |
| 25 | + elif pipeline == 'ephys': |
| 26 | + PipelineClass = ephys_preprocessing.EphysExtractionPipeline |
| 27 | + else: |
| 28 | + # try and look if there is a custom extractor in the personal projects extraction class |
| 29 | + import projects.base |
| 30 | + task_type = get_session_extractor_type(session_path) |
| 31 | + PipelineClass = projects.base.get_pipeline(task_type) |
| 32 | + _logger.info(f"Using {PipelineClass} pipeline for {session_path}") |
| 33 | + return PipelineClass(session_path=session_path, one=one) |
| 34 | + |
| 35 | + |
21 | 36 | def _get_lab(one): |
22 | 37 | with open(Path.home().joinpath(".globusonline/lta/client-id.txt"), 'r') as fid: |
23 | 38 | globus_id = fid.read() |
@@ -101,16 +116,10 @@ def job_creator(root_path, one=None, dry=False, rerun=False, max_md5_size=None): |
101 | 116 | session_path, one=one, max_md5_size=max_md5_size) |
102 | 117 | if dsets is not None: |
103 | 118 | all_datasets.extend(dsets) |
104 | | - pipeline = get_pipeline(session_path) |
105 | | - if pipeline == 'training': |
106 | | - pipe = training_preprocessing.TrainingExtractionPipeline(session_path, one=one) |
107 | | - # only start extracting ephys on a raw_session.flag |
108 | | - elif pipeline == 'ephys' and flag_file.name == 'raw_session.flag': |
109 | | - pipe = ephys_preprocessing.EphysExtractionPipeline(session_path, one=one) |
110 | | - else: |
111 | | - _logger.info(f'Session type {get_session_extractor_type(session_path)}' |
112 | | - f'as no matching pipeline pattern {session_path}') |
113 | | - continue |
| 119 | + pipe = _get_pipeline_class(session_path, one) |
| 120 | + if pipe is None: |
| 121 | + task_protocol = get_task_protocol(session_path) |
| 122 | + _logger.info(f'Session task protocol {task_protocol} has no matching pipeline pattern {session_path}') |
114 | 123 | if rerun: |
115 | 124 | rerun__status__in = '__all__' |
116 | 125 | else: |
@@ -142,7 +151,7 @@ def job_runner(subjects_path, lab=None, dry=False, one=None, count=5): |
142 | 151 | if lab is None: |
143 | 152 | return # if the lab is none, this will return empty tasks each time |
144 | 153 | tasks = one.alyx.rest('tasks', 'list', status='Waiting', |
145 | | - django=f'session__lab__name__in,{lab}') |
| 154 | + django=f'session__lab__name__in,{lab}', no_cache=True) |
146 | 155 | tasks_runner(subjects_path, tasks, one=one, count=count, time_out=3600, dry=dry) |
147 | 156 |
|
148 | 157 |
|
|
0 commit comments