Skip to content

Commit fe4ea3e

Browse files
authored
chore: improve logs TLD functions (#1212)
1 parent d8b3258 commit fe4ea3e

File tree

5 files changed

+50
-77
lines changed

5 files changed

+50
-77
lines changed

functions-python/feed_sync_dispatcher_transitland/function_config.json

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
"description": "Feed Sync Dispatcher for Transitland",
44
"entry_point": "feed_sync_dispatcher_transitland",
55
"timeout": 3600,
6-
"memory": "1Gi",
76
"trigger_http": true,
87
"include_folders": ["helpers"],
98
"include_api_folders": ["database_gen", "database", "common"],
@@ -17,5 +16,5 @@
1716
"max_instance_count": 1,
1817
"min_instance_count": 0,
1918
"available_cpu": 1,
20-
"available_memory": "512Mi"
19+
"available_memory": "1Gi"
2120
}

functions-python/feed_sync_dispatcher_transitland/src/main.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232
from shared.helpers.feed_sync.feed_sync_common import FeedSyncProcessor, FeedSyncPayload
3333
from shared.helpers.feed_sync.feed_sync_dispatcher import feed_sync_dispatcher
3434
from shared.helpers.feed_sync.models import TransitFeedSyncPayload
35-
from shared.helpers.logger import Logger
35+
from shared.helpers.logger import init_logger
3636
from shared.helpers.pub_sub import get_pubsub_client, get_execution_id
3737
from typing import Tuple, List
3838
from collections import defaultdict
@@ -47,6 +47,7 @@
4747

4848
# session instance to reuse connections
4949
session = requests.Session()
50+
init_logger()
5051

5152

5253
def process_feed_urls(feed: dict, urls_in_db: List[str]) -> Tuple[List[str], List[str]]:
@@ -388,7 +389,6 @@ def feed_sync_dispatcher_transitland(request):
388389
"""
389390
HTTP Function entry point queries the transitland API and publishes events to a Pub/Sub topic to be processed.
390391
"""
391-
Logger.init_logger()
392392
publisher = get_pubsub_client()
393393
topic_path = publisher.topic_path(PROJECT_ID, PUBSUB_TOPIC_NAME)
394394
transit_land_feed_sync_processor = TransitFeedSyncProcessor()

functions-python/feed_sync_process_transitland/src/feed_processor_utils.py

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,14 @@ def check_url_status(url: str) -> bool:
1919
"""Check if a URL is reachable."""
2020
try:
2121
response = requests.head(url, timeout=10)
22-
return response.status_code < 400 or response.status_code == 403
22+
result = response.status_code < 400 or response.status_code == 403
23+
if not result:
24+
logging.error(
25+
"Url [%s] replied with status code: [%s]", url, response.status_code
26+
)
27+
return result
2328
except requests.RequestException:
24-
logging.warning(f"Failed to reach URL: {url}")
29+
logging.warning("Failed to reach URL: %s", url)
2530
return False
2631

2732

@@ -92,10 +97,10 @@ def create_new_feed(session: Session, stable_id: str, payload: FeedPayload) -> F
9297
)
9398
if location:
9499
new_feed.locations = [location]
95-
logging.debug(f"Added location for feed {new_feed.id}")
100+
logging.debug("Added location for feed %s", new_feed.id)
96101

97102
# Persist the new feed
98103
session.add(new_feed)
99104
session.flush()
100-
logging.info(f"Created new feed with ID: {new_feed.id}")
105+
logging.info("Created new feed with ID: %s", new_feed.id)
101106
return new_feed

functions-python/feed_sync_process_transitland/src/main.py

Lines changed: 25 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
from sqlalchemy.orm import Session
2626

2727
from shared.database.database import with_db_session
28-
from shared.helpers.logger import Logger
28+
from shared.helpers.logger import init_logger
2929
from shared.database_gen.sqlacodegen_models import Feed
3030
from shared.helpers.feed_sync.models import TransitFeedSyncPayload as FeedPayload
3131
from feed_processor_utils import check_url_status, create_new_feed
@@ -35,6 +35,8 @@
3535
DATASET_BATCH_TOPIC = os.getenv("DATASET_BATCH_TOPIC_NAME")
3636
FEEDS_DATABASE_URL = os.getenv("FEEDS_DATABASE_URL")
3737

38+
init_logger()
39+
3840

3941
class FeedProcessor:
4042
def __init__(self, db_session: Session):
@@ -46,10 +48,12 @@ def process_feed(self, payload: FeedPayload) -> None:
4648
"""Process a feed based on its database state."""
4749
try:
4850
logging.info(
49-
f"Processing feed: external_id={payload.external_id}, feed_id={payload.feed_id}"
51+
"Processing feed: external_id=%s, feed_id=%s",
52+
payload.external_id,
53+
payload.feed_id,
5054
)
5155
if not check_url_status(payload.feed_url):
52-
logging.error(f"Feed URL not reachable: {payload.feed_url}. Skipping.")
56+
logging.error("Feed URL not reachable: %s. Skipping.", payload.feed_url)
5357
return
5458

5559
self.feed_stable_id = f"{payload.source}-{payload.stable_id}".lower()
@@ -70,9 +74,9 @@ def process_feed(self, payload: FeedPayload) -> None:
7074
def _process_new_feed_or_skip(self, payload: FeedPayload) -> Optional[Feed]:
7175
"""Process a new feed or skip if the URL already exists."""
7276
if self._check_feed_url_exists(payload.feed_url):
73-
logging.error(f"Feed URL already exists: {payload.feed_url}. Skipping.")
77+
logging.error("Feed URL already exists: %s. Skipping.", payload.feed_url)
7478
return
75-
logging.info(f"Creating new feed for external_id: {payload.external_id}")
79+
logging.info("Creating new feed for external_id: %s", payload.external_id)
7680
return create_new_feed(self.session, self.feed_stable_id, payload)
7781

7882
def _process_existing_feed_refs(
@@ -83,7 +87,9 @@ def _process_existing_feed_refs(
8387
f for f in current_feeds if f.producer_url == payload.feed_url
8488
]
8589
if matching_feeds:
86-
logging.info(f"Feed with URL already exists: {payload.feed_url}. Skipping.")
90+
logging.info(
91+
"Feed with URL already exists: %s. Skipping.", payload.feed_url
92+
)
8793
return
8894

8995
stable_id_matches = [
@@ -92,12 +98,12 @@ def _process_existing_feed_refs(
9298
reference_count = len(stable_id_matches)
9399
active_match = [f for f in stable_id_matches if f.status == "active"]
94100
if reference_count > 0:
95-
logging.info(f"Updating feed for stable_id: {self.feed_stable_id}")
101+
logging.info("Updating feed for stable_id: %s", self.feed_stable_id)
96102
self.feed_stable_id = f"{self.feed_stable_id}_{reference_count}".lower()
97103
new_feed = self._deprecate_old_feed(payload, active_match[0].id)
98104
else:
99105
logging.info(
100-
f"No matching stable_id. Creating new feed for {payload.external_id}."
106+
"No matching stable_id. Creating new feed for %s.", payload.external_id
101107
)
102108
new_feed = create_new_feed(self.session, self.feed_stable_id, payload)
103109
return new_feed
@@ -125,7 +131,7 @@ def _deprecate_old_feed(
125131
old_feed = self.session.get(Feed, old_feed_id)
126132
if old_feed:
127133
old_feed.status = "deprecated"
128-
logging.info(f"Deprecated old feed: {old_feed.id}")
134+
logging.info("Deprecated old feed: %s", old_feed.id)
129135
return create_new_feed(self.session, self.feed_stable_id, payload)
130136

131137
def _publish_to_batch_topic_if_needed(
@@ -164,13 +170,13 @@ def _publish_to_topic(self, feed: Feed, payload: FeedPayload) -> None:
164170
)
165171
future.add_done_callback(
166172
lambda _: logging.info(
167-
f"Published feed {feed.stable_id} to dataset batch topic"
173+
"Published feed %s to dataset batch topic", feed.stable_id
168174
)
169175
)
170176
future.result()
171-
logging.info(f"Message published for feed {feed.stable_id}")
177+
logging.info("Message published for feed %s", feed.stable_id)
172178
except Exception as e:
173-
logging.error(f"Error publishing to dataset batch topic: {str(e)}")
179+
logging.error("Error publishing to dataset batch topic: %s", str(e))
174180
raise
175181

176182
def _rollback_transaction(self, message: str) -> None:
@@ -181,13 +187,17 @@ def _rollback_transaction(self, message: str) -> None:
181187

182188
@with_db_session
183189
@functions_framework.cloud_event
184-
def process_feed_event(cloud_event, db_session: Session) -> None:
190+
def process_feed_event(cloud_event, db_session: Session) -> str:
185191
"""Cloud Function entry point for feed processing."""
186-
Logger.init_logger()
187192
try:
188193
message_data = base64.b64decode(cloud_event.data["message"]["data"]).decode()
189194
payload = FeedPayload(**json.loads(message_data))
190195
processor = FeedProcessor(db_session)
191196
processor.process_feed(payload)
197+
result = "Feed processing completed successfully."
198+
logging.info(result)
199+
return result
192200
except Exception as e:
193-
logging.error(f"Error processing feed event: {str(e)}")
201+
result = f"Error processing feed event: {str(e)}"
202+
logging.error(result)
203+
return result

functions-python/feed_sync_process_transitland/tests/test_feed_sync_process.py

Lines changed: 13 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
import base64
22
import json
3-
import logging
43
from unittest import mock
54
from unittest.mock import patch, Mock, MagicMock
65

@@ -41,42 +40,6 @@ def mock_db_session():
4140
return Mock()
4241

4342

44-
class MockLogger:
45-
"""Mock logger for testing"""
46-
47-
@staticmethod
48-
def init_logger():
49-
return MagicMock()
50-
51-
def __init__(self, name):
52-
self.name = name
53-
self._logger = logging.getLogger(name)
54-
55-
def get_logger(self):
56-
mock_logger = MagicMock()
57-
# Add all required logging methods
58-
mock_logger.info = MagicMock()
59-
mock_logger.error = MagicMock()
60-
mock_logger.warning = MagicMock()
61-
mock_logger.debug = MagicMock()
62-
mock_logger.addFilter = MagicMock()
63-
return mock_logger
64-
65-
66-
@pytest.fixture(autouse=True)
67-
def mock_logging():
68-
"""Mock both local and GCP logging."""
69-
with patch("main.logging") as mock_log, patch("main.Logger", MockLogger):
70-
for logger in [mock_log]:
71-
logger.info = MagicMock()
72-
logger.error = MagicMock()
73-
logger.warning = MagicMock()
74-
logger.debug = MagicMock()
75-
logger.addFilter = MagicMock()
76-
77-
yield mock_log
78-
79-
8043
@pytest.fixture
8144
def feed_payload():
8245
"""Fixture for feed payload."""
@@ -150,7 +113,7 @@ def _create_payload_dict(feed_payload: FeedPayload) -> dict:
150113
"payload_type": feed_payload.payload_type,
151114
}
152115

153-
def test_get_current_feed_info(self, processor, feed_payload, mock_logging):
116+
def test_get_current_feed_info(self, processor, feed_payload):
154117
"""Test retrieving current feed information."""
155118
# Mock database query
156119
processor.session.query.return_value.filter.return_value.all.return_value = [
@@ -179,7 +142,7 @@ def test_get_current_feed_info(self, processor, feed_payload, mock_logging):
179142
)
180143
assert len(feeds) == 0
181144

182-
def test_check_feed_url_exists_comprehensive(self, processor, mock_logging):
145+
def test_check_feed_url_exists_comprehensive(self, processor):
183146
"""Test comprehensive feed URL existence checks."""
184147
test_url = "https://example.com/feed"
185148

@@ -191,7 +154,7 @@ def test_check_feed_url_exists_comprehensive(self, processor, mock_logging):
191154
result = processor._check_feed_url_exists(test_url)
192155
assert result is True
193156

194-
def test_database_error_handling(self, processor, feed_payload, mock_logging):
157+
def test_database_error_handling(self, processor, feed_payload):
195158
"""Test database error handling in different scenarios."""
196159

197160
# Test case 1: General database error during feed processing
@@ -201,9 +164,7 @@ def test_database_error_handling(self, processor, feed_payload, mock_logging):
201164

202165
processor._rollback_transaction.assert_called_once()
203166

204-
def test_publish_to_batch_topic_comprehensive(
205-
self, processor, feed_payload, mock_logging
206-
):
167+
def test_publish_to_batch_topic_comprehensive(self, processor, feed_payload):
207168
"""Test publishing to batch topic including success, error, and message format validation."""
208169

209170
# Test case 1: Successful publish with message format validation
@@ -227,7 +188,7 @@ def test_publish_to_batch_topic_comprehensive(
227188
assert "feed_stable_id" in json.loads(message_arg["data"])
228189
assert "tld-feed1" == json.loads(message_arg["data"])["feed_stable_id"]
229190

230-
def test_process_feed_event_validation(self, mock_logging):
191+
def test_process_feed_event_validation(self):
231192
"""Test feed event processing with various invalid payloads."""
232193

233194
# Test case 1: Empty payload
@@ -255,7 +216,7 @@ def test_process_feed_event_validation(self, mock_logging):
255216
process_feed_event(cloud_event)
256217

257218
def test_process_feed_event_pubsub_error(
258-
self, processor, feed_payload, mock_logging, mock_db_session
219+
self, processor, feed_payload, mock_db_session
259220
):
260221
"""Test feed event processing handles missing credentials error."""
261222
# Create cloud event with valid payload
@@ -274,7 +235,7 @@ def test_process_feed_event_pubsub_error(
274235

275236
process_feed_event(cloud_event, db_session=mock_session)
276237

277-
def test_process_feed_event_malformed_cloud_event(self, mock_logging):
238+
def test_process_feed_event_malformed_cloud_event(self):
278239
"""Test feed event processing with malformed cloud event."""
279240
# Test case 1: Missing message data
280241
cloud_event = Mock()
@@ -287,7 +248,7 @@ def test_process_feed_event_malformed_cloud_event(self, mock_logging):
287248

288249
process_feed_event(cloud_event)
289250

290-
def test_process_feed_event_invalid_json(self, mock_logging):
251+
def test_process_feed_event_invalid_json(self):
291252
"""Test handling of invalid JSON in cloud event"""
292253
# Create invalid base64 encoded JSON
293254
invalid_json = base64.b64encode(b'{"invalid": "json"').decode()
@@ -296,14 +257,14 @@ def test_process_feed_event_invalid_json(self, mock_logging):
296257
cloud_event.data = {"message": {"data": invalid_json}}
297258

298259
# Process the event
299-
process_feed_event(cloud_event)
260+
result = process_feed_event(cloud_event)
300261

301262
# Verify error handling
302-
mock_logging.error.assert_called()
263+
assert result.startswith("Error processing feed event")
303264

304265
@patch("main.create_new_feed")
305266
def test_process_new_feed_or_skip(
306-
self, create_new_feed_mock, processor, feed_payload, mock_logging
267+
self, create_new_feed_mock, processor, feed_payload
307268
):
308269
"""Test processing new feed or skipping existing feed."""
309270
processor._check_feed_url_exists = MagicMock()
@@ -313,9 +274,7 @@ def test_process_new_feed_or_skip(
313274
create_new_feed_mock.assert_called_once()
314275

315276
@patch("main.create_new_feed")
316-
def test_process_new_feed_skip(
317-
self, create_new_feed_mock, processor, feed_payload, mock_logging
318-
):
277+
def test_process_new_feed_skip(self, create_new_feed_mock, processor, feed_payload):
319278
"""Test processing new feed or skipping existing feed."""
320279
processor._check_feed_url_exists = MagicMock()
321280
# Test case 2: Existing feed
@@ -325,7 +284,7 @@ def test_process_new_feed_skip(
325284

326285
@patch("main.create_new_feed")
327286
def test_process_existing_feed_refs(
328-
self, create_new_feed_mock, processor, feed_payload, mock_logging
287+
self, create_new_feed_mock, processor, feed_payload
329288
):
330289
"""Test processing existing feed references."""
331290
# 1. Existing feed with same url

0 commit comments

Comments
 (0)