Skip to content

Commit 08b4cda

Browse files
authored
chore: improve logs on GBFS processing functions (#1213)
1 parent 6b7eb9b commit 08b4cda

File tree

5 files changed

+87
-90
lines changed

5 files changed

+87
-90
lines changed

functions-python/gbfs_validator/src/gbfs_data_processor.py

Lines changed: 33 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
import json
22
import language_tags
3-
import logging
43
from datetime import datetime
54
from http import HTTPMethod
65
from typing import Dict, Any, Optional, List, Tuple
@@ -28,6 +27,7 @@
2827
)
2928
from sqlalchemy.orm import Session
3029
from shared.database.database import with_db_session
30+
from shared.helpers.logger import get_logger
3131
from shared.helpers.utils import create_http_task
3232

3333
FEATURE_ENDPOINTS = [
@@ -51,6 +51,7 @@ def __init__(self, stable_id: str, feed_id: str):
5151
self.gbfs_versions: List[GBFSVersion] = []
5252
self.gbfs_endpoints: Dict[str, List[GBFSEndpoint]] = {}
5353
self.validation_reports: Dict[str, Dict[str, Any]] = {}
54+
self.logger = get_logger(GBFSDataProcessor.__name__, stable_id)
5455

5556
def process_gbfs_data(self, autodiscovery_url: str) -> None:
5657
"""Process the GBFS data from the autodiscovery URL."""
@@ -76,8 +77,10 @@ def record_autodiscovery_request(
7677
self, autodiscovery_url: str, db_session: Session
7778
) -> None:
7879
"""Record the request to the autodiscovery URL."""
79-
logging.info(f"Accessing auto-discovery URL: {autodiscovery_url}")
80-
request_metadata = GBFSEndpoint.get_request_metadata(autodiscovery_url)
80+
self.logger.info("Accessing auto-discovery URL: %s", autodiscovery_url)
81+
request_metadata = GBFSEndpoint.get_request_metadata(
82+
autodiscovery_url, self.logger
83+
)
8184
gbfs_feed = (
8285
db_session.query(Gbfsfeed).filter(Gbfsfeed.id == self.feed_id).one_or_none()
8386
)
@@ -96,28 +99,28 @@ def record_autodiscovery_request(
9699
if request_metadata.get("response_size_bytes") is None:
97100
raise ValueError(f"Error fetching {autodiscovery_url}")
98101

99-
@staticmethod
100102
def extract_gbfs_endpoints(
103+
self,
101104
gbfs_json_url: str,
102105
) -> Tuple[Optional[List[GBFSEndpoint]], GBFSVersion]:
103106
"""
104107
Extract GBFS endpoints from the GBFS JSON URL.
105108
@:returns: GBFS endpoints and the version of the GBFS feed.
106109
"""
107-
logging.info(f"Fetching GBFS data from {gbfs_json_url}")
110+
self.logger.info("Fetching GBFS data from %s", gbfs_json_url)
108111
gbfs_json = fetch_gbfs_data(gbfs_json_url)
109112
feeds_matches = parse("$..feeds").find(gbfs_json)
110113
version_match = parse("$..version").find(gbfs_json)
111114
if not version_match:
112-
logging.warning(
115+
self.logger.warning(
113116
"No version found in the GBFS data. Defaulting to version 1.0."
114117
)
115118
gbfs_version = GBFSVersion("1.0", gbfs_json_url)
116119
else:
117120
gbfs_version = GBFSVersion(version_match[0].value, gbfs_json_url)
118121
if not feeds_matches:
119-
logging.error(
120-
f"No feeds found in the GBFS data for version {gbfs_version.version}."
122+
self.logger.error(
123+
"No feeds found in the GBFS data for version %s.", gbfs_version.version
121124
)
122125
return None, gbfs_version
123126
endpoints = []
@@ -144,15 +147,15 @@ def extract_gbfs_endpoints(
144147
for endpoint in endpoints
145148
}.values()
146149
)
147-
logging.info(f"Found version {gbfs_version.version}.")
148-
logging.info(
149-
f"Found endpoints {', '.join([endpoint.name for endpoint in endpoints])}."
150+
self.logger.info("Found version %s.", gbfs_version.version)
151+
self.logger.info(
152+
"Found endpoints %s.", ", ".join([endpoint.name for endpoint in endpoints])
150153
)
151154
return unique_endpoints, gbfs_version
152155

153156
def extract_gbfs_versions(self, gbfs_json_url: str) -> Optional[List[GBFSVersion]]:
154157
"""Extract GBFS versions from the autodiscovery URL"""
155-
all_endpoints, version = GBFSDataProcessor.extract_gbfs_endpoints(gbfs_json_url)
158+
all_endpoints, version = self.extract_gbfs_endpoints(gbfs_json_url)
156159
if not all_endpoints or not version:
157160
return None
158161
self.gbfs_endpoints[version.version] = all_endpoints
@@ -163,17 +166,20 @@ def extract_gbfs_versions(self, gbfs_json_url: str) -> Optional[List[GBFSVersion
163166
)
164167

165168
if gbfs_versions_endpoint:
166-
logging.info(f"Fetching GBFS versions from {gbfs_versions_endpoint.url}")
169+
self.logger.info(
170+
"Fetching GBFS versions from %s", gbfs_versions_endpoint.url
171+
)
167172
gbfs_versions_json = fetch_gbfs_data(gbfs_versions_endpoint.url)
168173
versions_matches = parse("$..versions").find(gbfs_versions_json)
169174
if versions_matches:
170175
gbfs_versions = GBFSVersion.from_dict(versions_matches[0].value)
171-
logging.info(
172-
f"Found versions {', '.join([version.version for version in gbfs_versions])}"
176+
self.logger.info(
177+
"Found versions %s",
178+
", ".join([version.version for version in gbfs_versions]),
173179
)
174180
return gbfs_versions
175181
else:
176-
logging.warning(
182+
self.logger.warning(
177183
"No versions found in the GBFS versions data. Defaulting to the autodiscovery URL version."
178184
)
179185
return [
@@ -192,14 +198,14 @@ def get_latest_version(self) -> Optional[str]:
192198
default=None,
193199
)
194200
if not max_version:
195-
logging.error(
201+
self.logger.error(
196202
"No non-RC versions found. Trying to set the latest to a RC version."
197203
)
198204
max_version = max(
199205
self.gbfs_versions, key=lambda version: version.version, default=None
200206
)
201207
if not max_version:
202-
logging.error("No versions found.")
208+
self.logger.error("No versions found.")
203209
return None
204210
return max_version.version
205211

@@ -213,7 +219,7 @@ def update_database_entities(self, db_session: Session) -> None:
213219
.one_or_none()
214220
)
215221
if not gbfs_feed:
216-
logging.error(f"GBFS feed with ID {self.feed_id} not found.")
222+
self.logger.error("GBFS feed with ID %s not found.", self.feed_id)
217223
return
218224
gbfs_versions_orm = []
219225
latest_version = self.get_latest_version()
@@ -329,7 +335,8 @@ def validate_gbfs_feed_versions(self) -> None:
329335
response.raise_for_status()
330336
json_report_summary = response.json()
331337
except RequestException as e:
332-
logging.error(f"Validation request failed for {version.url}: {e}")
338+
self.logger.error("Validation request failed for %s", version.url)
339+
self.logger.error(e)
333340
continue
334341

335342
report_summary_blob = bucket.blob(
@@ -362,7 +369,7 @@ def create_validation_report_entities(
362369
"validatorVersion", None
363370
)
364371
if validator_version is None or validation_time is None:
365-
logging.error("Validation version or time not found.")
372+
self.logger.error("Validation version or time not found.")
366373
return None
367374

368375
validation_report_id = (
@@ -400,13 +407,13 @@ def extract_endpoints_for_all_versions(self):
400407
if endpoints:
401408
self.gbfs_endpoints[version.version] = endpoints
402409
else:
403-
logging.error(f"No endpoints found for version {version.version}.")
410+
self.logger.error("No endpoints found for version %s.", version.version)
404411

405412
def trigger_location_extraction(self):
406413
"""Trigger the location extraction process."""
407414
latest_version = self.get_latest_version()
408415
if not latest_version:
409-
logging.error("No latest version found.")
416+
self.logger.error("No latest version found.")
410417
return
411418
endpoints = self.gbfs_endpoints.get(latest_version, [])
412419
# Find the station_information_url endpoint
@@ -428,7 +435,9 @@ def trigger_location_extraction(self):
428435
None,
429436
)
430437
if not station_information_url and not vehicle_status_url:
431-
logging.warning("No station_information_url or vehicle_status_url found.")
438+
self.logger.warning(
439+
"No station_information_url or vehicle_status_url found."
440+
)
432441
return
433442
client = tasks_v2.CloudTasksClient()
434443
body = json.dumps(

functions-python/gbfs_validator/src/gbfs_utils.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,12 @@ class GBFSEndpoint:
2424
language: Optional[str] = None
2525

2626
@staticmethod
27-
def get_request_metadata(url: str) -> Optional[Dict[str, Any]]:
27+
def get_request_metadata(
28+
url: str, logger: Optional[logging.Logger] = None
29+
) -> Optional[Dict[str, Any]]:
2830
"""Fetch the endpoint and return latency, status code, and response size."""
2931
try:
32+
logger = logger or logging.getLogger(__name__)
3033
response = requests.get(url)
3134
response.raise_for_status()
3235
return {
@@ -35,7 +38,7 @@ def get_request_metadata(url: str) -> Optional[Dict[str, Any]]:
3538
"response_size_bytes": len(response.content),
3639
}
3740
except requests.exceptions.RequestException as error:
38-
logging.error(f"Error fetching {url}: {error}")
41+
logger.error("Error fetching %s. Error %s", url, error)
3942
return {
4043
"latency": None,
4144
"status_code": error.response.status_code if error.response else 400,

functions-python/gbfs_validator/src/main.py

Lines changed: 42 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,10 @@
2424
MaxExecutionsReachedError,
2525
)
2626
from shared.database.database import with_db_session
27-
from shared.helpers.logger import Logger, StableIdFilter
27+
from shared.helpers.logger import init_logger, get_logger
2828
from shared.helpers.parser import jsonify_pubsub
2929

30-
logging.basicConfig(level=logging.INFO)
30+
init_logger()
3131

3232

3333
def fetch_all_gbfs_feeds(db_session: Session) -> List[Gbfsfeed]:
@@ -38,15 +38,14 @@ def fetch_all_gbfs_feeds(db_session: Session) -> List[Gbfsfeed]:
3838
db_session.expunge_all()
3939
return gbfs_feeds
4040
except Exception as e:
41-
logging.error(f"Error fetching all GBFS feeds: {e}")
41+
logging.error("Error fetching all GBFS feeds: %s", e)
4242
raise e
4343

4444

4545
@functions_framework.cloud_event
4646
def gbfs_validator_pubsub(cloud_event: CloudEvent):
47-
Logger.init_logger()
4847
data = cloud_event.data
49-
logging.info(f"Function triggered with Pub/Sub event data: {data}")
48+
logging.info("Function triggered with Pub/Sub event data: %s", data)
5049

5150
# Get the pubsub message data
5251
message_json = jsonify_pubsub(data)
@@ -59,50 +58,45 @@ def gbfs_validator_pubsub(cloud_event: CloudEvent):
5958
url = message_json["url"]
6059
feed_id = message_json["feed_id"]
6160
except KeyError as e:
62-
logging.error(f"Missing required field: {e}")
61+
logging.error("Missing required field: %s", e)
6362
return f"Invalid Pub/Sub message data. Missing {e}."
6463

65-
# Add stable_id to logs
66-
stable_id_filter = StableIdFilter(stable_id)
67-
logging.getLogger().addFilter(stable_id_filter)
64+
# get logger with stable_id
65+
logger = get_logger(__name__, stable_id)
66+
# Save trace and validate if the execution is allowed
67+
trace_service = DatasetTraceService()
68+
trace_id = str(uuid.uuid4())
69+
trace = DatasetTrace(
70+
trace_id=trace_id,
71+
stable_id=stable_id,
72+
execution_id=execution_id,
73+
status=Status.PROCESSING,
74+
timestamp=datetime.now(),
75+
pipeline_stage=PipelineStage.GBFS_VALIDATION,
76+
)
77+
6878
try:
69-
# Save trace and validate if the execution is allowed
70-
trace_service = DatasetTraceService()
71-
trace_id = str(uuid.uuid4())
72-
trace = DatasetTrace(
73-
trace_id=trace_id,
74-
stable_id=stable_id,
75-
execution_id=execution_id,
76-
status=Status.PROCESSING,
77-
timestamp=datetime.now(),
78-
pipeline_stage=PipelineStage.GBFS_VALIDATION,
79-
)
79+
trace_service.validate_and_save(trace, int(os.getenv("MAXIMUM_EXECUTIONS", 1)))
80+
except (ValueError, MaxExecutionsReachedError) as e:
81+
error_message = str(e)
82+
logger.error(error_message)
83+
save_trace_with_error(trace, error_message, trace_service)
84+
return error_message
85+
86+
# Process GBFS data
87+
try:
88+
processor = GBFSDataProcessor(stable_id, feed_id)
89+
processor.process_gbfs_data(url)
90+
except Exception as e:
91+
error_message = f"Error processing GBFS data: {e}"
92+
logger.error(error_message)
93+
logger.error("Traceback: %s", traceback.format_exc())
94+
save_trace_with_error(trace, error_message, trace_service)
95+
return error_message
8096

81-
try:
82-
trace_service.validate_and_save(
83-
trace, int(os.getenv("MAXIMUM_EXECUTIONS", 1))
84-
)
85-
except (ValueError, MaxExecutionsReachedError) as e:
86-
error_message = str(e)
87-
logging.error(error_message)
88-
save_trace_with_error(trace, error_message, trace_service)
89-
return error_message
90-
91-
# Process GBFS data
92-
try:
93-
processor = GBFSDataProcessor(stable_id, feed_id)
94-
processor.process_gbfs_data(url)
95-
except Exception as e:
96-
error_message = f"Error processing GBFS data: {e}"
97-
logging.error(f"{error_message}\nTraceback:\n{traceback.format_exc()}")
98-
save_trace_with_error(trace, error_message, trace_service)
99-
return error_message
100-
101-
trace.status = Status.SUCCESS
102-
trace_service.save(trace)
103-
return "GBFS data processed and stored successfully."
104-
finally:
105-
logging.getLogger().removeFilter(stable_id_filter)
97+
trace.status = Status.SUCCESS
98+
trace_service.save(trace)
99+
return "GBFS data processed and stored successfully."
106100

107101

108102
@with_db_session
@@ -113,7 +107,6 @@ def gbfs_validator_batch(_, db_session: Session):
113107
@param _: The request object.
114108
@return: The response of the function.
115109
"""
116-
Logger.init_logger()
117110
logging.info("Batch function triggered.")
118111
pubsub_topic_name = os.getenv("PUBSUB_TOPIC_NAME", None)
119112
if pubsub_topic_name is None:
@@ -137,7 +130,7 @@ def gbfs_validator_batch(_, db_session: Session):
137130
"url": gbfs_feed.auto_discovery_url,
138131
}
139132
feeds_data.append(feed_data)
140-
logging.info(f"Feed {gbfs_feed.stable_id} added to the batch.")
133+
logging.info("Feed %s added to the batch.", gbfs_feed.stable_id)
141134

142135
# Publish to Pub/Sub topic
143136
try:
@@ -148,9 +141,9 @@ def gbfs_validator_batch(_, db_session: Session):
148141
message_data = json.dumps(feed_data).encode("utf-8")
149142
future = publisher.publish(topic_path, message_data)
150143
future.result() # Ensure message was published
151-
logging.info(f"Published feed {feed_data['stable_id']} to Pub/Sub.")
144+
logging.info("Published feed %s to Pub/Sub.", feed_data["stable_id"])
152145
except Exception as e:
153-
logging.error(f"Error publishing feeds to Pub/Sub: {e}")
146+
logging.error("Error publishing feeds to Pub/Sub: %s", e)
154147
return "Error publishing feeds to Pub/Sub.", 500
155148

156149
return (

functions-python/gbfs_validator/tests/test_gbfs_data_processor.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ def mock_fetch_gbfs_data(url):
4949
return {}
5050

5151

52-
def mock_get_request_metadata(_):
52+
def mock_get_request_metadata(*args, **kwargs):
5353
return {
5454
"latency": 100,
5555
"status_code": 200,

0 commit comments

Comments
 (0)