@@ -64,7 +64,7 @@ def ingest_dataframe(
6464 Returns:
6565 The ingestion result with source_id for status tracking
6666 """
67- LOGGER .debug (T ("coal.logs .adx.ingesting_dataframe" ).format (table_name = table_name , rows = len (dataframe )))
67+ LOGGER .debug (T ("coal.services .adx.ingesting_dataframe" ).format (table_name = table_name , rows = len (dataframe )))
6868
6969 drop_by_tags = [drop_by_tag ] if (drop_by_tag is not None ) else None
7070
@@ -83,7 +83,7 @@ def ingest_dataframe(
8383 _ingest_status [source_id ] = IngestionStatus .QUEUED
8484 _ingest_times [source_id ] = time .time ()
8585
86- LOGGER .debug (T ("coal.logs .adx.ingestion_queued" ).format (source_id = source_id ))
86+ LOGGER .debug (T ("coal.services .adx.ingestion_queued" ).format (source_id = source_id ))
8787
8888 return ingestion_result
8989
@@ -112,10 +112,10 @@ def send_to_adx(
112112 Returns:
113113 The ingestion result with source_id for status tracking
114114 """
115- LOGGER .debug (T ("coal.logs .adx.sending_to_adx" ).format (table_name = table_name , items = len (dict_list )))
115+ LOGGER .debug (T ("coal.services .adx.sending_to_adx" ).format (table_name = table_name , items = len (dict_list )))
116116
117117 if not dict_list :
118- LOGGER .warning (T ("coal.logs .adx.empty_dict_list" ))
118+ LOGGER .warning (T ("coal.services .adx.empty_dict_list" ))
119119 return None
120120
121121 if not ignore_table_creation :
@@ -125,7 +125,7 @@ def send_to_adx(
125125
126126 # Then try to create the table
127127 if not create_table (query_client , database , table_name , types ):
128- LOGGER .error (T ("coal.logs .adx.table_creation_failed" ).format (table_name = table_name ))
128+ LOGGER .error (T ("coal.services .adx.table_creation_failed" ).format (table_name = table_name ))
129129 return False
130130
131131 # Create a dataframe with the data to write and send them to ADX
@@ -169,7 +169,7 @@ def check_ingestion_status(
169169 if not remaining_ids :
170170 return
171171
172- LOGGER .debug (T ("coal.logs .adx.checking_status" ).format (count = len (remaining_ids )))
172+ LOGGER .debug (T ("coal.services .adx.checking_status" ).format (count = len (remaining_ids )))
173173
174174 # Get status queues
175175 qs = KustoIngestStatusQueues (client )
@@ -183,7 +183,7 @@ def get_messages(queues):
183183 successes = get_messages (qs .success ._get_queues ())
184184 failures = get_messages (qs .failure ._get_queues ())
185185
186- LOGGER .debug (T ("coal.logs .adx.status_messages" ).format (success = len (successes ), failure = len (failures )))
186+ LOGGER .debug (T ("coal.services .adx.status_messages" ).format (success = len (successes ), failure = len (failures )))
187187
188188 queued_ids = list (remaining_ids )
189189 # Process success and failure messages
@@ -199,7 +199,7 @@ def get_messages(queues):
199199 if dm .IngestionSourceId == str (source_id ):
200200 _ingest_status [source_id ] = status
201201
202- log_function (T ("coal.logs .adx.status_found" ).format (source_id = source_id , status = status .value ))
202+ log_function (T ("coal.services .adx.status_found" ).format (source_id = source_id , status = status .value ))
203203
204204 _q .delete_message (_m )
205205 remaining_ids .remove (source_id )
@@ -213,7 +213,7 @@ def get_messages(queues):
213213 for source_id in remaining_ids :
214214 if time .time () - _ingest_times [source_id ] > actual_timeout :
215215 _ingest_status [source_id ] = IngestionStatus .TIMEOUT
216- LOGGER .warning (T ("coal.logs .adx.ingestion_timeout" ).format (source_id = source_id ))
216+ LOGGER .warning (T ("coal.services .adx.ingestion_timeout" ).format (source_id = source_id ))
217217
218218 # Yield results for remaining IDs
219219 for source_id in queued_ids :
@@ -237,7 +237,7 @@ def monitor_ingestion(
237237 has_failures = False
238238 source_ids_copy = source_ids .copy ()
239239
240- LOGGER .info (T ("coal.logs .adx.waiting_ingestion" ))
240+ LOGGER .info (T ("coal.services .adx.waiting_ingestion" ))
241241
242242 with tqdm .tqdm (desc = "Ingestion status" , total = len (source_ids_copy )) as pbar :
243243 while any (
@@ -252,7 +252,7 @@ def monitor_ingestion(
252252 for ingestion_id , ingestion_status in results :
253253 if ingestion_status == IngestionStatus .FAILURE :
254254 LOGGER .error (
255- T ("coal.logs .adx.ingestion_failed" ).format (
255+ T ("coal.services .adx.ingestion_failed" ).format (
256256 ingestion_id = ingestion_id , table = table_ingestion_id_mapping .get (ingestion_id )
257257 )
258258 )
@@ -273,14 +273,14 @@ def monitor_ingestion(
273273 for ingestion_id , ingestion_status in results :
274274 if ingestion_status == IngestionStatus .FAILURE :
275275 LOGGER .error (
276- T ("coal.logs .adx.ingestion_failed" ).format (
276+ T ("coal.services .adx.ingestion_failed" ).format (
277277 ingestion_id = ingestion_id , table = table_ingestion_id_mapping .get (ingestion_id )
278278 )
279279 )
280280 has_failures = True
281281 pbar .update (len (source_ids_copy ))
282282
283- LOGGER .info (T ("coal.logs .adx.ingestion_completed" ))
283+ LOGGER .info (T ("coal.services .adx.ingestion_completed" ))
284284 return has_failures
285285
286286
@@ -298,7 +298,7 @@ def handle_failures(kusto_client: KustoClient, database: str, operation_tag: str
298298 bool: True if the process should abort, False otherwise
299299 """
300300 if has_failures :
301- LOGGER .warning (T ("coal.logs .adx.failures_detected" ).format (operation_tag = operation_tag ))
301+ LOGGER .warning (T ("coal.services .adx.failures_detected" ).format (operation_tag = operation_tag ))
302302 _drop_by_tag (kusto_client , database , operation_tag )
303303 return True
304304 return False
@@ -314,10 +314,10 @@ def clear_ingestion_status_queues(client: QueuedIngestClient, confirmation: bool
314314 confirmation: Must be True to proceed with clearing
315315 """
316316 if not confirmation :
317- LOGGER .warning (T ("coal.logs .adx.clear_queues_no_confirmation" ))
317+ LOGGER .warning (T ("coal.services .adx.clear_queues_no_confirmation" ))
318318 return
319319
320- LOGGER .warning (T ("coal.logs .adx.clearing_queues" ))
320+ LOGGER .warning (T ("coal.services .adx.clearing_queues" ))
321321 qs = KustoIngestStatusQueues (client )
322322
323323 while not qs .success .is_empty ():
@@ -326,4 +326,4 @@ def clear_ingestion_status_queues(client: QueuedIngestClient, confirmation: bool
326326 while not qs .failure .is_empty ():
327327 qs .failure .pop (32 )
328328
329- LOGGER .info (T ("coal.logs .adx.queues_cleared" ))
329+ LOGGER .info (T ("coal.services .adx.queues_cleared" ))
0 commit comments