Skip to content

Commit d57eb83

Browse files
committed
enhance batch_+process_datasets logging
1 parent 9683e0a commit d57eb83

File tree

3 files changed

+69
-70
lines changed

3 files changed

+69
-70
lines changed

functions-python/batch_datasets/src/main.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -152,5 +152,5 @@ def batch_datasets(request, db_session: Session):
152152
)
153153
)
154154
message = f"Publish completed. Published {len(feeds)} feeds to {pubsub_topic_name}."
155-
logging.info(message, extra={"message_extra": message})
155+
logging.info(message)
156156
return message

functions-python/batch_process_dataset/src/main.py

Lines changed: 44 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -34,10 +34,12 @@
3434
from shared.database.database import with_db_session, refresh_materialized_view
3535
import logging
3636

37-
from shared.helpers.logger import Logger
37+
from shared.helpers.logger import init_logger, get_logger
3838
from shared.helpers.utils import download_and_get_hash
3939
from sqlalchemy.orm import Session
4040

41+
init_logger()
42+
4143

4244
@dataclass
4345
class DatasetFile:
@@ -63,6 +65,7 @@ def __init__(
6365
api_key_parameter_name,
6466
public_hosted_datasets_url,
6567
):
68+
self.logger = get_logger(DatasetProcessor.__name__, feed_stable_id)
6669
self.producer_url = producer_url
6770
self.bucket_name = bucket_name
6871
self.latest_hash = latest_hash
@@ -73,7 +76,7 @@ def __init__(
7376
self.api_key_parameter_name = api_key_parameter_name
7477
self.date = datetime.now().strftime("%Y%m%d%H%M")
7578
if self.authentication_type != 0:
76-
logging.info(f"Getting feed credentials for feed {self.feed_stable_id}")
79+
self.logger.info(f"Getting feed credentials for feed {self.feed_stable_id}")
7780
self.feed_credentials = self.get_feed_credentials(self.feed_stable_id)
7881
if self.feed_credentials is None:
7982
raise Exception(
@@ -95,7 +98,9 @@ def get_feed_credentials(feed_stable_id) -> str | None:
9598
feeds_credentials = json.loads(os.getenv("FEEDS_CREDENTIALS", "{}"))
9699
return feeds_credentials.get(feed_stable_id, None)
97100
except Exception as e:
98-
logging.error(f"Error getting feed credentials: {e}")
101+
get_logger(DatasetProcessor.__name__, feed_stable_id).error(
102+
f"Error getting feed credentials: {e}"
103+
)
99104
return None
100105

101106
@staticmethod
@@ -141,23 +146,27 @@ def upload_dataset(self) -> DatasetFile or None:
141146
:return: the file hash and the hosted url as a tuple or None if no upload is required
142147
"""
143148
try:
144-
logging.info(f"[{self.feed_stable_id}] - Accessing URL {self.producer_url}")
149+
self.logger.info(
150+
"Accessing URL %s", self.producer_url
151+
)
145152
temp_file_path = self.generate_temp_filename()
146153
file_sha256_hash, is_zip = self.download_content(temp_file_path)
147154
if not is_zip:
148-
logging.error(
155+
self.logger.error(
149156
f"[{self.feed_stable_id}] The downloaded file from {self.producer_url} is not a valid ZIP file."
150157
)
151158
return None
152159

153-
logging.info(f"[{self.feed_stable_id}] File hash is {file_sha256_hash}.")
160+
self.logger.info(
161+
f"[{self.feed_stable_id}] File hash is {file_sha256_hash}."
162+
)
154163

155164
if self.latest_hash != file_sha256_hash:
156-
logging.info(
165+
self.logger.info(
157166
f"[{self.feed_stable_id}] Dataset has changed (hash {self.latest_hash}"
158167
f"-> {file_sha256_hash}). Uploading new version."
159168
)
160-
logging.info(
169+
self.logger.info(
161170
f"Creating file {self.feed_stable_id}/latest.zip in bucket {self.bucket_name}"
162171
)
163172
self.upload_file_to_storage(
@@ -170,7 +179,7 @@ def upload_dataset(self) -> DatasetFile or None:
170179
dataset_full_path = (
171180
f"{self.feed_stable_id}/{dataset_stable_id}/{dataset_stable_id}.zip"
172181
)
173-
logging.info(
182+
self.logger.info(
174183
f"Creating file: {dataset_full_path}"
175184
f" in bucket {self.bucket_name}"
176185
)
@@ -185,7 +194,7 @@ def upload_dataset(self) -> DatasetFile or None:
185194
hosted_url=f"{self.public_hosted_datasets_url}/{dataset_full_path}",
186195
)
187196

188-
logging.info(
197+
self.logger.info(
189198
f"[{self.feed_stable_id}] Datasets hash has not changed (hash {self.latest_hash} "
190199
f"-> {file_sha256_hash}). Not uploading it."
191200
)
@@ -216,11 +225,11 @@ def create_dataset(self, dataset_file: DatasetFile, db_session: Session):
216225
.one_or_none()
217226
)
218227
if not latest_dataset:
219-
logging.info(
228+
self.logger.info(
220229
f"[{self.feed_stable_id}] No latest dataset found for feed."
221230
)
222231

223-
logging.info(
232+
self.logger.info(
224233
f"[{self.feed_stable_id}] Creating new dataset for feed with stable id {dataset_file.stable_id}."
225234
)
226235
new_dataset = Gtfsdataset(
@@ -239,10 +248,10 @@ def create_dataset(self, dataset_file: DatasetFile, db_session: Session):
239248
db_session.add(latest_dataset)
240249
db_session.add(new_dataset)
241250
db_session.commit()
242-
logging.info(f"[{self.feed_stable_id}] Dataset created successfully.")
251+
self.logger.info(f"[{self.feed_stable_id}] Dataset created successfully.")
243252

244253
refresh_materialized_view(db_session, t_feedsearch.name)
245-
logging.info(
254+
self.logger.info(
246255
f"[{self.feed_stable_id}] Materialized view refresh event triggered successfully."
247256
)
248257
except Exception as e:
@@ -256,7 +265,7 @@ def process(self) -> DatasetFile or None:
256265
dataset_file = self.upload_dataset()
257266

258267
if dataset_file is None:
259-
logging.info(f"[{self.feed_stable_id}] No database update required.")
268+
self.logger.info(f"[{self.feed_stable_id}] No database update required.")
260269
return None
261270
self.create_dataset(dataset_file)
262271
return dataset_file
@@ -268,8 +277,8 @@ def record_trace(
268277
"""
269278
Record the trace in the datastore
270279
"""
271-
logging.info(
272-
f"[{stable_id}] Recording trace in execution: [{execution_id}] with status: [{status}]"
280+
get_logger("record_trace", stable_id if stable_id else "UNKNOWN").info(
281+
f"Recording trace in execution: [{execution_id}] with status: [{status}]"
273282
)
274283
trace = DatasetTrace(
275284
trace_id=None,
@@ -306,7 +315,6 @@ def process_dataset(cloud_event: CloudEvent):
306315
}
307316
}
308317
"""
309-
Logger.init_logger()
310318
logging.info("Function Started")
311319
stable_id = "UNKNOWN"
312320
bucket_name = os.getenv("DATASETS_BUCKET_NAME")
@@ -336,27 +344,27 @@ def process_dataset(cloud_event: CloudEvent):
336344
# Extract data from message
337345
data = base64.b64decode(cloud_event.data["message"]["data"]).decode()
338346
json_payload = json.loads(data)
339-
logging.info(
340-
f"[{json_payload['feed_stable_id']}] JSON Payload: {json.dumps(json_payload)}"
341-
)
342347
stable_id = json_payload["feed_stable_id"]
348+
logger = get_logger("process_dataset", stable_id)
349+
logger.info(f"JSON Payload: {json.dumps(json_payload)}")
350+
343351
execution_id = json_payload["execution_id"]
344352
trace_service = DatasetTraceService()
345353
trace = trace_service.get_by_execution_and_stable_ids(execution_id, stable_id)
346-
logging.info(f"[{stable_id}] Dataset trace: {trace}")
354+
logger.info(f"Dataset trace: {trace}")
347355
executions = len(trace) if trace else 0
348-
logging.info(
349-
f"[{stable_id}] Dataset executed times={executions}/{maximum_executions} "
356+
logger.info(
357+
f"Dataset executed times={executions}/{maximum_executions} "
350358
f"in execution=[{execution_id}] "
351359
)
352360

353361
if executions > 0:
354362
if executions >= maximum_executions:
355363
error_message = (
356-
f"[{stable_id}] Function already executed maximum times "
364+
f"Function already executed maximum times "
357365
f"in execution: [{execution_id}]"
358366
)
359-
logging.error(error_message)
367+
logger.error(error_message)
360368
return error_message
361369

362370
processor = DatasetProcessor(
@@ -372,11 +380,14 @@ def process_dataset(cloud_event: CloudEvent):
372380
)
373381
dataset_file = processor.process()
374382
except Exception as e:
375-
logging.error(e)
376-
error_message = f"[{stable_id}] Error execution: [{execution_id}] error: [{e}]"
377-
logging.error(error_message)
378-
logging.error(f"Function completed with error:{error_message}")
383+
# This makes sure the logger is initialized
384+
logger = get_logger("process_dataset", stable_id if stable_id else "UNKNOWN")
385+
logger.error(e)
386+
error_message = f"Error execution: [{execution_id}] error: [{e}]"
387+
logger.error(error_message)
388+
logger.error(f"Function completed with error:{error_message}")
379389
finally:
390+
logger = get_logger("process_dataset", stable_id if stable_id else "UNKNOWN")
380391
if stable_id and execution_id:
381392
status = (
382393
Status.PUBLISHED if dataset_file is not None else Status.NOT_PUBLISHED
@@ -392,12 +403,12 @@ def process_dataset(cloud_event: CloudEvent):
392403
trace_service,
393404
)
394405
else:
395-
logging.error(
406+
logger.error(
396407
f"Function completed with errors, missing stable={stable_id} or execution_id={execution_id}"
397408
)
398409
return f"Function completed with errors, missing stable={stable_id} or execution_id={execution_id}"
399-
logging.info(
400-
f"[{stable_id}] Function %s in execution: [{execution_id}]",
410+
logger.info(
411+
f"Function %s in execution: [{execution_id}]",
401412
"successfully completed" if not error_message else "Failed",
402413
)
403414
return "Completed." if error_message is None else error_message

functions-python/helpers/logger.py

Lines changed: 24 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,11 @@
1313
# See the License for the specific language governing permissions and
1414
# limitations under the License.
1515
#
16-
import os
17-
18-
import google.cloud.logging
19-
from google.cloud.logging_v2 import Client
16+
import threading
2017
import logging
2118

19+
from shared.common.logging_utils import get_env_logging_level
20+
2221

2322
class StableIdFilter(logging.Filter):
2423
"""Add a stable_id to the log record"""
@@ -33,37 +32,26 @@ def filter(self, record):
3332
return True
3433

3534

36-
class Logger:
37-
"""
38-
Util class for logging information, errors or warnings.
39-
This class uses the Google Cloud Logging API enhancing the logs with extra request information.
40-
"""
35+
_logger_initialized = False
36+
lock = threading.Lock()
4137

42-
def __init__(self, name):
43-
self.init_logger()
44-
self.logger = self.init_logger().logger(name)
4538

46-
@staticmethod
47-
def init_logger() -> Client | None:
48-
"""
49-
Initializes the logger
50-
"""
51-
if os.getenv("DEBUG", "False") == "True":
52-
return None
53-
try:
54-
client = google.cloud.logging.Client()
55-
client.get_default_handler()
56-
client.setup_logging()
57-
return client
58-
except Exception as error:
59-
# This might happen when the GCP authorization credentials are not available.
60-
# Example, when running the tests locally
61-
logging.error(f"Error initializing the logger: {error}")
62-
return None
63-
64-
def get_logger(self) -> Client:
65-
"""
66-
Get the GCP logger instance
67-
:return: the logger instance
68-
"""
69-
return self.logger
39+
def init_logger():
40+
"""
41+
Initializes the logger
42+
"""
43+
with lock:
44+
global _logger_initialized
45+
if _logger_initialized:
46+
return
47+
logging.basicConfig(level=get_env_logging_level())
48+
_logger_initialized = True
49+
50+
51+
def get_logger(name: str, stable_id: str = None):
52+
logger = logging.getLogger(name)
53+
if stable_id and not any(
54+
isinstance(handler, StableIdFilter) for handler in logger.handlers
55+
):
56+
logger.addFilter(StableIdFilter(stable_id))
57+
return logger

0 commit comments

Comments
 (0)