11import logging
22import argparse
3- from allensdk .internal .core .lims_pipeline_module import PipelineModule
4- import allensdk .core .json_utilities as ju
5- from allensdk .internal .brain_observatory import time_sync as ts
3+ import os
4+ import datetime
5+ import json
6+ from typing import NamedTuple , Optional
7+
68import numpy as np
79import h5py
8- import os
910
11+ import allensdk
12+ from allensdk .internal .core .lims_pipeline_module import PipelineModule
13+ from allensdk .internal .brain_observatory import time_sync as ts
14+ from allensdk .brain_observatory .argschema_utilities import \
15+ check_write_access_overwrite
1016
11- def write_output (output_file , ophys_times , stim_alignment , eye_alignment ,
12- behavior_alignment , ophys_delta , stim_delta , eye_delta ,
13- behavior_delta ):
14- with h5py .File (output_file , "w" ) as f :
15- f ['stimulus_alignment' ] = stim_alignment
16- f ['eye_tracking_alignment' ] = eye_alignment
17- f ['body_camera_alignment' ] = behavior_alignment
18- f ['twop_vsync_fall' ] = ophys_times
19- f ['ophys_delta' ] = ophys_delta
20- f ['stim_delta' ] = stim_delta
21- f ['eye_delta' ] = eye_delta
22- f ['behavior_delta' ] = behavior_delta
2317
18+ class TimeSyncOutputs (NamedTuple ):
19+ """ Schema for synchronization outputs
20+ """
2421
25- def main ():
26- parser = argparse .ArgumentParser ("Generate brain observatory alignment." )
27- parser .add_argument ('input_json' )
28- parser .add_argument ('--log-level' , default = logging .DEBUG )
29- mod = PipelineModule ("Generate brain observatory alignment." , parser )
22+ # unique identifier for the experiment being aligned
23+ experiment_id : int
3024
31- input_data = mod .input_data ()
32- experiment_id = input_data .pop ("ophys_experiment_id" )
33- sync_file = input_data .pop ("sync_file" )
34- output_file = input_data .pop ("output_file" )
25+ # calculated monitor delay (s)
26+ stimulus_delay : float
27+
28+ # For each data stream, the count of "extra" timestamps (compared to the
29+ # number of samples)
30+ ophys_delta : int
31+ stimulus_delta : int
32+ eye_delta : int
33+ behavior_delta : int
34+
35+ # aligned timestamps for each data stream (s)
36+ ophys_times : np .ndarray
37+ stimulus_times : np .ndarray
38+ eye_times : np .ndarray
39+ behavior_times : np .ndarray
40+
41+ # for non-ophys data streams, a mapping from samples to corresponding ophys
42+ # frames
43+ stimulus_alignment : np .ndarray
44+ eye_alignment : np .ndarray
45+ behavior_alignment : np .ndarray
46+
47+
48+ class TimeSyncWriter :
49+
50+ def __init__ (
51+ self ,
52+ output_h5_path : str ,
53+ output_json_path : Optional [str ] = None
54+ ):
55+ """ Writes synchronization outputs to h5 and (optionally) json.
56+
57+ Parameters
58+ ----------
59+ output_h5_path : "heavy" outputs (e.g aligned timestamps and
60+ ophy frame correspondances) will ONLY be stored here. Lightweight
61+ outputs (e.g. stimulus delay) will also be written here as scalars.
62+ output_json_path : if provided, lightweight outputs will be written
63+ here, along with provenance information, such as the date and
64+ allensdk version.
65+
66+ """
67+
68+ self .output_h5_path : str = output_h5_path
69+ self .output_json_path : Optional [str ] = output_json_path
70+
71+ def validate_paths (self ):
72+ """ Determines whether we can actually write to the specified paths,
73+ allowing for creation of intermediate directories. It is a good idea
74+ to run this beore doing any heavy calculations!
75+ """
76+
77+ check_write_access_overwrite (self .output_h5_path )
78+
79+ if self .output_json_path is not None :
80+ check_write_access_overwrite (self .output_json_path )
81+
82+ def write (self , outputs : TimeSyncOutputs ):
83+ """ Convenience for writing both an output h5 and (if applicable) an
84+ output json.
85+
86+ Parameters
87+ ----------
88+ outputs : the data to be written
89+
90+ """
91+
92+ self .write_output_h5 (outputs )
93+
94+ if self .output_json_path is not None :
95+ self .write_output_json (outputs )
96+
97+ def write_output_h5 (self , outputs ):
98+ """ Write (mainly) heaviweight data to an h5 file.
3599
36- aligner = ts .OphysTimeAligner (sync_file , ** input_data )
100+ Parameters
101+ ----------
102+ outputs : the data to be written
103+
104+ """
105+
106+ os .makedirs (os .path .dirname (self .output_h5_path ), exist_ok = True )
107+
108+ with h5py .File (self .output_h5_path , "w" ) as output_h5 :
109+ output_h5 ["stimulus_alignment" ] = outputs .stimulus_alignment
110+ output_h5 ["eye_tracking_alignment" ] = outputs .eye_alignment
111+ output_h5 ["body_camera_alignment" ] = outputs .behavior_alignment
112+ output_h5 ["twop_vsync_fall" ] = outputs .ophys_times
113+ output_h5 ["ophys_delta" ] = outputs .ophys_delta
114+ output_h5 ["stim_delta" ] = outputs .stimulus_delta
115+ output_h5 ["stim_delay" ] = outputs .stimulus_delay
116+ output_h5 ["eye_delta" ] = outputs .eye_delta
117+ output_h5 ["behavior_delta" ] = outputs .behavior_delta
118+
119+ def write_output_json (self , outputs ):
120+ """ Write lightweight data to a json
121+
122+ Parameters
123+ ----------
124+ outputs : the data to be written
125+
126+ """
127+ os .makedirs (os .path .dirname (self .output_json_path ), exist_ok = True )
128+
129+ with open (self .output_json_path , "w" ) as output_json :
130+ json .dump ({
131+ "allensdk_version" : allensdk .__version__ ,
132+ "date" : str (datetime .datetime .now ()),
133+ "experiment_id" : outputs .experiment_id ,
134+ "output_h5_path" : self .output_h5_path ,
135+ "ophys_delta" : outputs .ophys_delta ,
136+ "stim_delta" : outputs .stimulus_delta ,
137+ "stim_delay" : outputs .stimulus_delay ,
138+ "eye_delta" : outputs .eye_delta ,
139+ "behavior_delta" : outputs .behavior_delta
140+ }, output_json , indent = 2 )
141+
142+
143+ def check_stimulus_delay (obt_delay : float , min_delay : float , max_delay : float ):
144+ """ Raise an exception if the monitor delay is not within specified bounds
145+
146+ Parameters
147+ ----------
148+ obt_delay : obtained monitor delay (s)
149+ min_delay : lower threshold (s)
150+ max_delay : upper threshold (s)
151+
152+ """
153+
154+ if obt_delay < min_delay or obt_delay > max_delay :
155+ raise ValueError (
156+ f"calculated monitor delay was { obt_delay :.3f} s "
157+ f"(acceptable interval: [{ min_delay :.3f} s, "
158+ f"{ max_delay :.3f} s])"
159+ )
160+
161+
162+ def run_ophys_time_sync (
163+ aligner : ts .OphysTimeAligner ,
164+ experiment_id : int ,
165+ min_stimulus_delay : float ,
166+ max_stimulus_delay : float
167+ ) -> TimeSyncOutputs :
168+ """ Carry out synchronization of timestamps across the data streams of an
169+ ophys experiment.
170+
171+ Parameters
172+ ----------
173+ aligner : drives alignment. See OphysTimeAligner for details of the
174+ attributes and properties that must be implemented.
175+ experiment_id : unique identifier for the experiment being aligned
176+ min_stimulus_delay : reject alignment run (raise a ValueError) if the
177+ calculated monitor delay is below this value (s).
178+ max_stimulus_delay : reject alignment run (raise a ValueError) if the
179+ calculated monitor delay is above this value (s).
180+
181+ Returns
182+ -------
183+ A TimeSyncOutputs (see definintion for more information) of output
184+ parameters and arrays of aligned timestamps.
185+
186+ """
187+
188+ stim_times , stim_delta , stim_delay = aligner .corrected_stim_timestamps
189+ check_stimulus_delay (stim_delay , min_stimulus_delay , max_stimulus_delay )
37190
38191 ophys_times , ophys_delta = aligner .corrected_ophys_timestamps
39- stim_times , stim_delta = aligner .corrected_stim_timestamps
40192 eye_times , eye_delta = aligner .corrected_eye_video_timestamps
41193 beh_times , beh_delta = aligner .corrected_behavior_video_timestamps
42194
@@ -53,9 +205,64 @@ def main():
53205 behavior_alignment = ts .get_alignment_array (beh_times , ophys_times ,
54206 int_method = np .ceil )
55207
56- write_output (output_file , ophys_times , stim_alignment , eye_alignment ,
57- behavior_alignment , ophys_delta , stim_delta , eye_delta ,
58- beh_delta )
208+ return TimeSyncOutputs (
209+ experiment_id ,
210+ stim_delay ,
211+ ophys_delta ,
212+ stim_delta ,
213+ eye_delta ,
214+ beh_delta ,
215+ ophys_times ,
216+ stim_times ,
217+ eye_times ,
218+ beh_times ,
219+ stim_alignment ,
220+ eye_alignment ,
221+ behavior_alignment
222+ )
223+
224+
225+ def main ():
226+ parser = argparse .ArgumentParser ("Generate brain observatory alignment." )
227+ parser .add_argument ("input_json" , type = str ,
228+ help = "path to input json"
229+ )
230+ parser .add_argument ("output_json" , type = str , nargs = "?" ,
231+ help = "path to which output json will be written"
232+ )
233+ parser .add_argument ("--log-level" , default = logging .DEBUG )
234+ parser .add_argument ("--min-stimulus-delay" , type = float , default = 0.0 ,
235+ help = "reject results if monitor delay less than this value (s)"
236+ )
237+ parser .add_argument ("--max-stimulus-delay" , type = float , default = 0.07 ,
238+ help = "reject results if monitor delay greater than this value (s)"
239+ )
240+ mod = PipelineModule ("Generate brain observatory alignment." , parser )
241+
242+ input_data = mod .input_data ()
243+
244+ writer = TimeSyncWriter (input_data .get ("output_file" ), mod .args .output_json )
245+ writer .validate_paths ()
246+
247+ aligner = ts .OphysTimeAligner (
248+ input_data .get ("sync_file" ),
249+ scanner = input_data .get ("scanner" , None ),
250+ dff_file = input_data .get ("dff_file" , None ),
251+ stimulus_pkl = input_data .get ("stimulus_pkl" , None ),
252+ eye_video = input_data .get ("eye_video" , None ),
253+ behavior_video = input_data .get ("behavior_video" , None ),
254+ long_stim_threshold = input_data .get (
255+ "long_stim_threshold" , ts .LONG_STIM_THRESHOLD
256+ )
257+ )
258+
259+ outputs = run_ophys_time_sync (
260+ aligner ,
261+ input_data .get ("ophys_experiment_id" ),
262+ mod .args .min_stimulus_delay ,
263+ mod .args .max_stimulus_delay
264+ )
265+ writer .write (outputs )
59266
60267
61268if __name__ == "__main__" : main ()
0 commit comments