88
99import requests
1010import xmltodict
11- from pydantic import BaseModel
1211
1312import murfey .util .eer
1413from murfey .client .context import Context , ProcessingParameter
1716 MovieTracker ,
1817 MurfeyID ,
1918 MurfeyInstanceEnvironment ,
20- global_env_lock ,
2119)
2220from murfey .util import authorised_requests , capture_post , get_machine_config_client
2321from murfey .util .mdoc import get_block , get_global_data , get_num_blocks
@@ -65,15 +63,6 @@ def _construct_tilt_series_name(file_path: Path) -> str:
6563 return "_" .join (split_name [:- 5 ])
6664
6765
68- class ProcessFileIncomplete (BaseModel ):
69- dest : Path
70- source : Path
71- image_number : int
72- mc_uuid : int
73- tag : str
74- description : str = ""
75-
76-
7766class TomographyContext (Context ):
7867 user_params = [
7968 ProcessingParameter (
@@ -101,7 +90,6 @@ def __init__(self, acquisition_software: str, basepath: Path):
10190 self ._aligned_tilt_series : List [str ] = []
10291 self ._data_collection_stash : list = []
10392 self ._processing_job_stash : dict = {}
104- self ._preprocessing_triggers : dict = {}
10593 self ._lock : RLock = RLock ()
10694
10795 def _flush_data_collections (self ):
@@ -120,12 +108,6 @@ def _flush_data_collections(self):
120108 capture_post (dc_data [0 ], json = data )
121109 self ._data_collection_stash = []
122110
123- def _flush_processing_job (self , tag : str ):
124- if proc_data := self ._processing_job_stash .get (tag ):
125- for pd in proc_data :
126- requests .post (pd [0 ], json = pd [1 ])
127- self ._processing_job_stash .pop (tag )
128-
129111 def _flush_processing_jobs (self ):
130112 logger .info (
131113 f"Flushing { len (self ._processing_job_stash .keys ())} processing job API calls"
@@ -135,75 +117,6 @@ def _flush_processing_jobs(self):
135117 requests .post (pd [0 ], json = pd [1 ])
136118 self ._processing_job_stash = {}
137119
138- def _flush_preprocess (self , tag : str , app_id : int ):
139- if tag_tr := self ._preprocessing_triggers .get (tag ):
140- for tr in tag_tr :
141- process_file = self ._complete_process_file (tr [1 ], tr [2 ], app_id )
142- if process_file :
143- capture_post (tr [0 ], json = process_file )
144- self ._preprocessing_triggers .pop (tag )
145-
146- def _complete_process_file (
147- self ,
148- incomplete_process_file : ProcessFileIncomplete ,
149- environment : MurfeyInstanceEnvironment ,
150- app_id : int ,
151- ) -> dict :
152- try :
153- with global_env_lock :
154- tag = incomplete_process_file .tag
155-
156- eer_fractionation_file = None
157- if environment .data_collection_parameters .get ("num_eer_frames" ):
158- response = requests .post (
159- f"{ str (environment .url .geturl ())} /visits/{ environment .visit } /{ environment .murfey_session } /eer_fractionation_file" ,
160- json = {
161- "num_frames" : environment .data_collection_parameters [
162- "num_eer_frames"
163- ],
164- "fractionation" : environment .data_collection_parameters [
165- "eer_fractionation"
166- ],
167- "dose_per_frame" : environment .data_collection_parameters [
168- "dose_per_frame"
169- ],
170- "fractionation_file_name" : "eer_fractionation_tomo.txt" ,
171- },
172- )
173- eer_fractionation_file = response .json ()["eer_fractionation_file" ]
174-
175- new_dict = {
176- "path" : str (incomplete_process_file .dest ),
177- "description" : incomplete_process_file .description ,
178- "size" : incomplete_process_file .source .stat ().st_size ,
179- "timestamp" : incomplete_process_file .source .stat ().st_ctime ,
180- "processing_job" : environment .processing_job_ids [tag ][
181- "em-tomo-preprocess"
182- ],
183- "data_collection_id" : environment .data_collection_ids [tag ],
184- "image_number" : incomplete_process_file .image_number ,
185- "pixel_size" : environment .data_collection_parameters [
186- "pixel_size_on_image"
187- ],
188- "autoproc_program_id" : app_id ,
189- "mc_uuid" : incomplete_process_file .mc_uuid ,
190- "dose_per_frame" : environment .data_collection_parameters .get (
191- "dose_per_frame"
192- ),
193- "mc_binning" : environment .data_collection_parameters .get (
194- "motion_corr_binning" , 1
195- ),
196- "gain_ref" : environment .data_collection_parameters .get ("gain_ref" ),
197- "voltage" : environment .data_collection_parameters .get (
198- "voltage" , 300
199- ),
200- "eer_fractionation_file" : eer_fractionation_file ,
201- }
202- return new_dict
203- except KeyError :
204- logger .warning ("Key error encountered in _complete_process_file" )
205- return {}
206-
207120 def _file_transferred_to (
208121 self , environment : MurfeyInstanceEnvironment , source : Path , file_path : Path
209122 ):
@@ -441,14 +354,10 @@ def _add_tilt(
441354 preproc_data = {
442355 "path" : str (file_transferred_to ),
443356 "description" : "" ,
444- "data_collection_id" : environment .data_collection_ids .get (tilt_series ),
445357 "image_number" : environment .movies [file_transferred_to ].movie_number ,
446358 "pixel_size" : environment .data_collection_parameters .get (
447359 "pixel_size_on_image" , 0
448360 ),
449- "autoproc_program_id" : environment .autoproc_program_ids .get (
450- tilt_series , {}
451- ).get ("em-tomo-preprocess" ),
452361 "dose_per_frame" : environment .data_collection_parameters .get (
453362 "dose_per_frame" , 0
454363 ),
0 commit comments