@@ -78,8 +78,6 @@ def __print_progress_a2vybg(progress: float) -> None:
7878
7979
8080def load_data_dict_a2vybg (record : Dict [str , Any ]) -> Dict [str , Any ]:
81- global vocab_a2vybg
82-
8381 if record ["bytes" ][:2 ] == "\\ x" :
8482 record ["bytes" ] = record ["bytes" ][2 :]
8583 else :
@@ -109,17 +107,15 @@ def parse_data_to_record_dict_a2vybg(
109107
110108
111109def send_cache_to_object_storage_a2vybg ():
112- global llm_ac_cache_a2vybg , llm_config_hash_a2vybg , cached_records_a2vybg
110+ global llm_ac_cache_a2vybg
113111
114112 if data_type == "LLM_RESPONSE" and "http" in CACHE_FILE_UPLOAD_LINK_A2VYBG :
115113 llm_ac_cache_a2vybg [llm_config_hash_a2vybg ] = cached_records_a2vybg
116114 requests .put (CACHE_FILE_UPLOAD_LINK_A2VYBG , json = llm_ac_cache_a2vybg )
117115
118116
119117def save_ac_value_a2vybg (record_id : str , attr_value : Any ) -> None :
120- global calculated_attribute_by_record_id_a2vybg , processed_records_a2vybg , progress_size_a2vybg , amount_a2vybg
121- global check_data_type_a2vybg , py_data_types_a2vybg , llm_ac_cache_a2vybg , llm_config_hash_a2vybg , cached_records_a2vybg
122- global CACHE_FILE_UPLOAD_LINK_A2VYBG
118+ global processed_records_a2vybg
123119
124120 if not check_data_type_a2vybg (attr_value ):
125121 raise ValueError (
@@ -147,14 +143,13 @@ def process_attribute_calculation_a2vybg(
147143
148144def check_abort_status_a2vybg () -> bool :
149145 # function outside the async loop for reading always the freshest value
150- global should_abort_a2vybg
151146 return should_abort_a2vybg
152147
153148
154149async def process_llm_record_batch_a2vybg (
155150 record_dict_batch : List [Dict [str , Any ]]
156151) -> None :
157- global DEFAULT_USER_PROMPT_A2VYBG , cached_records_a2vybg
152+ global should_abort_a2vybg
158153
159154 for record_dict in record_dict_batch :
160155 if check_abort_status_a2vybg ():
@@ -169,33 +164,32 @@ async def process_llm_record_batch_a2vybg(
169164
170165 save_ac_value_a2vybg (record_dict ["id" ], attr_value )
171166 except Exception as e :
172- global should_abort_a2vybg
173167 should_abort_a2vybg = True
174168 print (f"Error in record { record_dict ['data' ]['running_id' ]} : { str (e )} " )
175169 return
176170
177171
172+ def make_batches (
173+ iterable : List [Any ], size : int = 1
174+ ) -> Generator [List [Any ], None , None ]:
175+ length = len (iterable )
176+ for ndx in range (0 , length , size ):
177+ yield iterable [ndx : min (ndx + size , length )]
178+
179+
178180async def process_async_llm_calls_a2vybg (
179181 record_dict_list : List [Dict [str , Any ]]
180182) -> None :
181- global amount_a2vybg
182-
183- def make_batches (
184- iterable : List [Any ], size : int = 1
185- ) -> Generator [List [Any ], None , None ]:
186- length = len (iterable )
187- for ndx in range (0 , length , size ):
188- yield iterable [ndx : min (ndx + size , length )]
189183
190184 batch_size = max (amount_a2vybg // int (attribute_calculators .NUM_WORKERS_A2VYBG ), 1 )
191185 tasks = [
192186 process_llm_record_batch_a2vybg (batch )
193187 for batch in make_batches (record_dict_list , size = batch_size )
194188 ]
195189 await asyncio .gather (* tasks )
196- send_cache_to_object_storage_a2vybg ()
197190 if check_abort_status_a2vybg ():
198191 raise ValueError ("Encountered error during LLM processing." )
192+ send_cache_to_object_storage_a2vybg ()
199193
200194
201195if __name__ == "__main__" :
0 commit comments