3434from shared .database .database import with_db_session , refresh_materialized_view
3535import logging
3636
37- from shared .helpers .logger import Logger
37+ from shared .helpers .logger import init_logger , get_logger
3838from shared .helpers .utils import download_and_get_hash
3939from sqlalchemy .orm import Session
4040
41+ init_logger ()
42+
4143
4244@dataclass
4345class DatasetFile :
@@ -63,6 +65,7 @@ def __init__(
6365 api_key_parameter_name ,
6466 public_hosted_datasets_url ,
6567 ):
68+ self .logger = get_logger (DatasetProcessor .__name__ , feed_stable_id )
6669 self .producer_url = producer_url
6770 self .bucket_name = bucket_name
6871 self .latest_hash = latest_hash
@@ -73,7 +76,7 @@ def __init__(
7376 self .api_key_parameter_name = api_key_parameter_name
7477 self .date = datetime .now ().strftime ("%Y%m%d%H%M" )
7578 if self .authentication_type != 0 :
76- logging .info (f"Getting feed credentials for feed { self .feed_stable_id } " )
79+ self . logger .info (f"Getting feed credentials for feed { self .feed_stable_id } " )
7780 self .feed_credentials = self .get_feed_credentials (self .feed_stable_id )
7881 if self .feed_credentials is None :
7982 raise Exception (
@@ -95,7 +98,9 @@ def get_feed_credentials(feed_stable_id) -> str | None:
9598 feeds_credentials = json .loads (os .getenv ("FEEDS_CREDENTIALS" , "{}" ))
9699 return feeds_credentials .get (feed_stable_id , None )
97100 except Exception as e :
98- logging .error (f"Error getting feed credentials: { e } " )
101+ get_logger (DatasetProcessor .__name__ , feed_stable_id ).error (
102+ f"Error getting feed credentials: { e } "
103+ )
99104 return None
100105
101106 @staticmethod
@@ -141,23 +146,25 @@ def upload_dataset(self) -> DatasetFile or None:
141146 :return: the file hash and the hosted url as a tuple or None if no upload is required
142147 """
143148 try :
144- logging . info (f"[ { self . feed_stable_id } ] - Accessing URL { self .producer_url } " )
149+ self . logger . info (" Accessing URL %s" , self .producer_url )
145150 temp_file_path = self .generate_temp_filename ()
146151 file_sha256_hash , is_zip = self .download_content (temp_file_path )
147152 if not is_zip :
148- logging .error (
153+ self . logger .error (
149154 f"[{ self .feed_stable_id } ] The downloaded file from { self .producer_url } is not a valid ZIP file."
150155 )
151156 return None
152157
153- logging .info (f"[{ self .feed_stable_id } ] File hash is { file_sha256_hash } ." )
158+ self .logger .info (
159+ f"[{ self .feed_stable_id } ] File hash is { file_sha256_hash } ."
160+ )
154161
155162 if self .latest_hash != file_sha256_hash :
156- logging .info (
163+ self . logger .info (
157164 f"[{ self .feed_stable_id } ] Dataset has changed (hash { self .latest_hash } "
158165 f"-> { file_sha256_hash } ). Uploading new version."
159166 )
160- logging .info (
167+ self . logger .info (
161168 f"Creating file { self .feed_stable_id } /latest.zip in bucket { self .bucket_name } "
162169 )
163170 self .upload_file_to_storage (
@@ -170,7 +177,7 @@ def upload_dataset(self) -> DatasetFile or None:
170177 dataset_full_path = (
171178 f"{ self .feed_stable_id } /{ dataset_stable_id } /{ dataset_stable_id } .zip"
172179 )
173- logging .info (
180+ self . logger .info (
174181 f"Creating file: { dataset_full_path } "
175182 f" in bucket { self .bucket_name } "
176183 )
@@ -185,7 +192,7 @@ def upload_dataset(self) -> DatasetFile or None:
185192 hosted_url = f"{ self .public_hosted_datasets_url } /{ dataset_full_path } " ,
186193 )
187194
188- logging .info (
195+ self . logger .info (
189196 f"[{ self .feed_stable_id } ] Datasets hash has not changed (hash { self .latest_hash } "
190197 f"-> { file_sha256_hash } ). Not uploading it."
191198 )
@@ -216,11 +223,11 @@ def create_dataset(self, dataset_file: DatasetFile, db_session: Session):
216223 .one_or_none ()
217224 )
218225 if not latest_dataset :
219- logging .info (
226+ self . logger .info (
220227 f"[{ self .feed_stable_id } ] No latest dataset found for feed."
221228 )
222229
223- logging .info (
230+ self . logger .info (
224231 f"[{ self .feed_stable_id } ] Creating new dataset for feed with stable id { dataset_file .stable_id } ."
225232 )
226233 new_dataset = Gtfsdataset (
@@ -239,10 +246,10 @@ def create_dataset(self, dataset_file: DatasetFile, db_session: Session):
239246 db_session .add (latest_dataset )
240247 db_session .add (new_dataset )
241248 db_session .commit ()
242- logging .info (f"[{ self .feed_stable_id } ] Dataset created successfully." )
249+ self . logger .info (f"[{ self .feed_stable_id } ] Dataset created successfully." )
243250
244251 refresh_materialized_view (db_session , t_feedsearch .name )
245- logging .info (
252+ self . logger .info (
246253 f"[{ self .feed_stable_id } ] Materialized view refresh event triggered successfully."
247254 )
248255 except Exception as e :
@@ -256,7 +263,7 @@ def process(self) -> DatasetFile or None:
256263 dataset_file = self .upload_dataset ()
257264
258265 if dataset_file is None :
259- logging .info (f"[{ self .feed_stable_id } ] No database update required." )
266+ self . logger .info (f"[{ self .feed_stable_id } ] No database update required." )
260267 return None
261268 self .create_dataset (dataset_file )
262269 return dataset_file
@@ -268,8 +275,8 @@ def record_trace(
268275 """
269276 Record the trace in the datastore
270277 """
271- logging .info (
272- f"[ { stable_id } ] Recording trace in execution: [{ execution_id } ] with status: [{ status } ]"
278+ get_logger ( "record_trace" , stable_id if stable_id else "UNKNOWN" ) .info (
279+ f"Recording trace in execution: [{ execution_id } ] with status: [{ status } ]"
273280 )
274281 trace = DatasetTrace (
275282 trace_id = None ,
@@ -306,7 +313,6 @@ def process_dataset(cloud_event: CloudEvent):
306313 }
307314 }
308315 """
309- Logger .init_logger ()
310316 logging .info ("Function Started" )
311317 stable_id = "UNKNOWN"
312318 bucket_name = os .getenv ("DATASETS_BUCKET_NAME" )
@@ -336,27 +342,27 @@ def process_dataset(cloud_event: CloudEvent):
336342 # Extract data from message
337343 data = base64 .b64decode (cloud_event .data ["message" ]["data" ]).decode ()
338344 json_payload = json .loads (data )
339- logging .info (
340- f"[{ json_payload ['feed_stable_id' ]} ] JSON Payload: { json .dumps (json_payload )} "
341- )
342345 stable_id = json_payload ["feed_stable_id" ]
346+ logger = get_logger ("process_dataset" , stable_id )
347+ logger .info (f"JSON Payload: { json .dumps (json_payload )} " )
348+
343349 execution_id = json_payload ["execution_id" ]
344350 trace_service = DatasetTraceService ()
345351 trace = trace_service .get_by_execution_and_stable_ids (execution_id , stable_id )
346- logging .info (f"[ { stable_id } ] Dataset trace: { trace } " )
352+ logger .info (f"Dataset trace: { trace } " )
347353 executions = len (trace ) if trace else 0
348- logging .info (
349- f"[ { stable_id } ] Dataset executed times={ executions } /{ maximum_executions } "
354+ logger .info (
355+ f"Dataset executed times={ executions } /{ maximum_executions } "
350356 f"in execution=[{ execution_id } ] "
351357 )
352358
353359 if executions > 0 :
354360 if executions >= maximum_executions :
355361 error_message = (
356- f"[ { stable_id } ] Function already executed maximum times "
362+ f"Function already executed maximum times "
357363 f"in execution: [{ execution_id } ]"
358364 )
359- logging .error (error_message )
365+ logger .error (error_message )
360366 return error_message
361367
362368 processor = DatasetProcessor (
@@ -372,11 +378,14 @@ def process_dataset(cloud_event: CloudEvent):
372378 )
373379 dataset_file = processor .process ()
374380 except Exception as e :
375- logging .error (e )
376- error_message = f"[{ stable_id } ] Error execution: [{ execution_id } ] error: [{ e } ]"
377- logging .error (error_message )
378- logging .error (f"Function completed with error:{ error_message } " )
381+ # This makes sure the logger is initialized
382+ logger = get_logger ("process_dataset" , stable_id if stable_id else "UNKNOWN" )
383+ logger .error (e )
384+ error_message = f"Error execution: [{ execution_id } ] error: [{ e } ]"
385+ logger .error (error_message )
386+ logger .error (f"Function completed with error:{ error_message } " )
379387 finally :
388+ logger = get_logger ("process_dataset" , stable_id if stable_id else "UNKNOWN" )
380389 if stable_id and execution_id :
381390 status = (
382391 Status .PUBLISHED if dataset_file is not None else Status .NOT_PUBLISHED
@@ -392,12 +401,12 @@ def process_dataset(cloud_event: CloudEvent):
392401 trace_service ,
393402 )
394403 else :
395- logging .error (
404+ logger .error (
396405 f"Function completed with errors, missing stable={ stable_id } or execution_id={ execution_id } "
397406 )
398407 return f"Function completed with errors, missing stable={ stable_id } or execution_id={ execution_id } "
399- logging .info (
400- f"[ { stable_id } ] Function %s in execution: [{ execution_id } ]" ,
408+ logger .info (
409+ f"Function %s in execution: [{ execution_id } ]" ,
401410 "successfully completed" if not error_message else "Failed" ,
402411 )
403412 return "Completed." if error_message is None else error_message
0 commit comments