Skip to content

Commit 148aa74

Browse files
authored
fix: failing metrics cloud functions (#942)
1 parent 9bbee47 commit 148aa74

File tree

16 files changed

+135
-141
lines changed

16 files changed

+135
-141
lines changed

.gitignore

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,3 +74,6 @@ __pycache__
7474
.coverage
7575
coverage_reports
7676
tf.plan
77+
78+
# CSV generation output files
79+
functions-python/**/*.csv

functions-python/big_query_ingestion/src/gbfs/gbfs_big_query_ingest.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,5 +10,5 @@ def __init__(self):
1010
super().__init__()
1111
current_dir = os.path.dirname(os.path.abspath(__file__))
1212
self.schema_path = os.path.join(
13-
current_dir, "../helpers/bq_schema/gbfs_schema.json"
13+
current_dir, "../shared/helpers/bq_schema/gbfs_schema.json"
1414
)

functions-python/big_query_ingestion/src/gtfs/gtfs_big_query_ingest.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,5 +10,5 @@ def __init__(self):
1010
super().__init__()
1111
current_dir = os.path.dirname(os.path.abspath(__file__))
1212
self.schema_path = os.path.join(
13-
current_dir, "../helpers/bq_schema/gtfs_schema.json"
13+
current_dir, "../shared/helpers/bq_schema/gtfs_schema.json"
1414
)

functions-python/preprocessed_analytics/function_config.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
"description": "Preprocess analytics",
44
"entry_point": "preprocess_analytics",
55
"timeout": 540,
6-
"memory": "2Gi",
6+
"memory": "4Gi",
77
"trigger_http": false,
88
"include_folders": ["helpers"],
99
"include_api_folders": ["database_gen"],

functions-python/preprocessed_analytics/src/processors/base_analytics_processor.py

Lines changed: 11 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,14 @@
66
import pandas as pd
77
from google.cloud import storage
88
from sqlalchemy.orm import Query
9-
9+
from sqlalchemy.orm.session import Session
1010
from shared.database_gen.sqlacodegen_models import (
1111
Gbfsfeed,
1212
Gbfssnapshot,
1313
Gtfsfeed,
1414
Gtfsdataset,
1515
)
16-
from shared.helpers.database import Database
16+
from shared.helpers.database import with_db_session
1717

1818

1919
class NoFeedDataException(Exception):
@@ -23,7 +23,6 @@ class NoFeedDataException(Exception):
2323
class BaseAnalyticsProcessor:
2424
def __init__(self, run_date):
2525
self.run_date = run_date
26-
self.session = Database().start_db_session(echo=False)
2726
self.processed_feeds = set()
2827
self.data = []
2928
self.feed_metrics_data = []
@@ -33,14 +32,11 @@ def __init__(self, run_date):
3332
os.getenv("ANALYTICS_BUCKET")
3433
)
3534

36-
def get_latest_data(self) -> Query:
35+
def get_latest_data(self, db_session: Session) -> Query:
3736
raise NotImplementedError("Subclasses should implement this method.")
3837

3938
def process_feed_data(
40-
self,
41-
feed: Gtfsfeed | Gbfsfeed,
42-
dataset_or_snapshot: Gtfsdataset | Gbfssnapshot,
43-
translations: Dict,
39+
self, feed: Gtfsfeed | Gbfsfeed, dataset_or_snapshot: Gtfsdataset | Gbfssnapshot
4440
) -> None:
4541
raise NotImplementedError("Subclasses should implement this method.")
4642

@@ -135,65 +131,25 @@ def save_analytics(self) -> None:
135131
self.save()
136132
logging.info(f"Analytics saved to bucket as {file_name}")
137133

138-
def run(self) -> None:
139-
for (
140-
feed,
141-
dataset_or_snapshot,
142-
translation_fields,
143-
) in self._get_data_with_translations():
144-
self.process_feed_data(feed, dataset_or_snapshot, translation_fields)
134+
@with_db_session
135+
def run(self, db_session: Session) -> None:
136+
for feed, dataset_or_snapshot in self._get_data(db_session):
137+
self.process_feed_data(feed, dataset_or_snapshot)
145138

146-
self.session.close()
147139
self.save_summary()
148140
self.save_analytics()
149141
self.update_analytics_files()
150142
logging.info(f"Finished running analytics for date: {self.run_date}")
151143

152-
def _get_data_with_translations(self):
153-
query = self.get_latest_data()
144+
def _get_data(self, db_session: Session):
145+
query = self.get_latest_data(db_session)
154146
all_results = query.all()
155147
if len(all_results) == 0:
156148
raise NoFeedDataException("No feed data found")
157149
logging.info(f"Loaded {len(all_results)} feeds to process")
158-
try:
159-
location_translations = [
160-
self._extract_translation_fields(result[2:]) for result in all_results
161-
]
162-
logging.info("Location translations loaded")
163-
location_translations_dict = {
164-
translation["location_id"]: translation
165-
for translation in location_translations
166-
if translation["location_id"] is not None
167-
}
168-
except Exception as e:
169-
logging.warning(
170-
f"Error loading location translations: {e}\n Continuing without translations"
171-
)
172-
location_translations_dict = {}
173150
unique_feeds = {result[0].stable_id: result for result in all_results}
174151
logging.info(f"Nb of unique feeds loaded: {len(unique_feeds)}")
175-
return [
176-
(result[0], result[1], location_translations_dict)
177-
for result in unique_feeds.values()
178-
]
179-
180-
@staticmethod
181-
def _extract_translation_fields(translation_data):
182-
keys = [
183-
"location_id",
184-
"country_code",
185-
"country",
186-
"subdivision_name",
187-
"municipality",
188-
"country_translation",
189-
"subdivision_name_translation",
190-
"municipality_translation",
191-
]
192-
try:
193-
return dict(zip(keys, translation_data))
194-
except Exception as e:
195-
logging.error(f"Error extracting translation fields: {e}")
196-
return dict(zip(keys, [None] * len(keys)))
152+
return [(result[0], result[1]) for result in unique_feeds.values()]
197153

198154
def update_analytics_files(self) -> None:
199155
try:

functions-python/preprocessed_analytics/src/processors/gbfs_analytics_processor.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from typing import List
22

33
import sqlalchemy
4+
from sqlalchemy.orm.session import Session
45
from sqlalchemy.orm import joinedload
56
from sqlalchemy.sql import func, and_
67

@@ -18,9 +19,9 @@ def __init__(self, run_date):
1819
super().__init__(run_date)
1920
self.versions_metrics_data = []
2021

21-
def get_latest_data(self) -> sqlalchemy.orm.Query:
22+
def get_latest_data(self, db_session: Session) -> sqlalchemy.orm.Query:
2223
subquery = (
23-
self.session.query(
24+
db_session.query(
2425
Gbfssnapshot.feed_id,
2526
func.max(Gbfssnapshot.downloaded_at).label("max_downloaded_at"),
2627
)
@@ -30,7 +31,7 @@ def get_latest_data(self) -> sqlalchemy.orm.Query:
3031
)
3132

3233
query = (
33-
self.session.query(Gbfsfeed, Gbfssnapshot)
34+
db_session.query(Gbfsfeed, Gbfssnapshot)
3435
.join(Gbfssnapshot, Gbfsfeed.id == Gbfssnapshot.feed_id)
3536
.join(
3637
subquery,
@@ -50,7 +51,7 @@ def get_latest_data(self) -> sqlalchemy.orm.Query:
5051
)
5152
return query
5253

53-
def process_feed_data(self, feed: Gbfsfeed, snapshot: Gbfssnapshot, _) -> None:
54+
def process_feed_data(self, feed: Gbfsfeed, snapshot: Gbfssnapshot) -> None:
5455
if feed.stable_id in self.processed_feeds:
5556
return
5657
self.processed_feeds.add(feed.stable_id)

functions-python/preprocessed_analytics/src/processors/gtfs_analytics_processor.py

Lines changed: 7 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,18 @@
1-
from typing import List, Dict
1+
from typing import List
22

33
import sqlalchemy
44
from sqlalchemy.orm import joinedload
5+
from sqlalchemy.orm.session import Session
56
from sqlalchemy.sql import func, and_
7+
68
from shared.database_gen.sqlacodegen_models import (
79
Gtfsdataset,
810
Gtfsfeed,
911
Validationreport,
1012
Notice,
1113
Feature,
1214
Feed,
13-
t_location_with_translations_en,
14-
Location,
1515
)
16-
from shared.helpers.locations import translate_feed_locations
1716
from .base_analytics_processor import BaseAnalyticsProcessor
1817

1918

@@ -22,9 +21,9 @@ def __init__(self, run_date):
2221
super().__init__(run_date)
2322
self.features_metrics_data = []
2423

25-
def get_latest_data(self) -> sqlalchemy.orm.Query:
24+
def get_latest_data(self, db_session: Session) -> sqlalchemy.orm.Query:
2625
subquery = (
27-
self.session.query(
26+
db_session.query(
2827
Gtfsdataset.feed_id,
2928
func.max(Gtfsdataset.downloaded_at).label("max_downloaded_at"),
3029
)
@@ -34,7 +33,7 @@ def get_latest_data(self) -> sqlalchemy.orm.Query:
3433
)
3534

3635
query = (
37-
self.session.query(Gtfsfeed, Gtfsdataset, t_location_with_translations_en)
36+
db_session.query(Gtfsfeed, Gtfsdataset)
3837
.join(Gtfsdataset, Gtfsfeed.id == Gtfsdataset.feed_id)
3938
.join(
4039
subquery,
@@ -43,11 +42,6 @@ def get_latest_data(self) -> sqlalchemy.orm.Query:
4342
Gtfsdataset.downloaded_at == subquery.c.max_downloaded_at,
4443
),
4544
)
46-
.outerjoin(Location, Feed.locations)
47-
.outerjoin(
48-
t_location_with_translations_en,
49-
Location.id == t_location_with_translations_en.c.location_id,
50-
)
5145
.where(Gtfsfeed.status != "deprecated")
5246
.options(
5347
joinedload(Gtfsfeed.locations),
@@ -72,9 +66,7 @@ def save_summary(self) -> None:
7266
}
7367
self._save_json(summary_file_name, summary_data)
7468

75-
def process_feed_data(
76-
self, feed: Feed, dataset: Gtfsdataset, translations: Dict
77-
) -> None:
69+
def process_feed_data(self, feed: Feed, dataset: Gtfsdataset) -> None:
7870
if feed.stable_id in self.processed_feeds:
7971
return
8072
self.processed_feeds.add(feed.stable_id)
@@ -83,8 +75,6 @@ def process_feed_data(
8375
if not validation_reports:
8476
return
8577

86-
translate_feed_locations(feed, translations)
87-
8878
latest_validation_report = max(validation_reports, key=lambda x: x.validated_at)
8979
notices = latest_validation_report.notices
9080
errors = [notice for notice in notices if notice.severity == "ERROR"]

functions-python/preprocessed_analytics/tests/test_base_processor.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import os
12
import unittest
23
from unittest.mock import patch, MagicMock
34
from datetime import datetime
@@ -6,12 +7,12 @@
67
from processors.base_analytics_processor import (
78
BaseAnalyticsProcessor,
89
)
10+
from test_shared.test_utils.database_utils import default_db_url
911

1012

1113
class TestBaseAnalyticsProcessor(unittest.TestCase):
12-
@patch("processors.base_analytics_processor.Database")
1314
@patch("processors.base_analytics_processor.storage.Client")
14-
def setUp(self, mock_storage_client, _):
15+
def setUp(self, mock_storage_client):
1516
self.mock_storage_client = mock_storage_client
1617
self.mock_bucket = MagicMock()
1718
self.mock_storage_client().bucket.return_value = self.mock_bucket
@@ -81,6 +82,7 @@ def test_update_analytics_files(self, mock_blob, mock_save_json):
8182
"processors.base_analytics_processor.BaseAnalyticsProcessor.update_analytics_files"
8283
)
8384
@patch("processors.base_analytics_processor.BaseAnalyticsProcessor.save_summary")
85+
@patch.dict(os.environ, {"FEEDS_DATABASE_URL": default_db_url})
8486
def test_run(
8587
self,
8688
mock_save_summary,

functions-python/preprocessed_analytics/tests/test_gbfs_processor.py

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,16 @@
1+
import os
12
import unittest
23
from unittest.mock import patch, MagicMock
34
from datetime import datetime
45
from processors.gbfs_analytics_processor import (
56
GBFSAnalyticsProcessor,
67
)
8+
from test_shared.test_utils.database_utils import default_db_url
79

810

911
class TestGBFSAnalyticsProcessor(unittest.TestCase):
10-
@patch("processors.base_analytics_processor.Database")
1112
@patch("processors.base_analytics_processor.storage.Client")
12-
def setUp(self, mock_storage_client, _):
13+
def setUp(self, mock_storage_client):
1314
self.mock_storage_client = mock_storage_client
1415
self.mock_bucket = MagicMock()
1516
self.mock_storage_client().bucket.return_value = self.mock_bucket
@@ -26,6 +27,7 @@ def setUp(self, mock_storage_client, _):
2627
"processors.gbfs_analytics_processor."
2728
"GBFSAnalyticsProcessor.update_analytics_files"
2829
)
30+
@patch.dict(os.environ, {"FEEDS_DATABASE_URL": default_db_url})
2931
def test_run(
3032
self,
3133
mock_update_analytics_files,
@@ -60,8 +62,8 @@ def test_run(
6062

6163
# Assert that process_feed_data was called twice (once for each feed-snapshot pair)
6264
self.assertEqual(mock_process_feed_data.call_count, 2)
63-
mock_process_feed_data.assert_any_call(mock_feed1, mock_snapshot1, {})
64-
mock_process_feed_data.assert_any_call(mock_feed2, mock_snapshot2, {})
65+
mock_process_feed_data.assert_any_call(mock_feed1, mock_snapshot1)
66+
mock_process_feed_data.assert_any_call(mock_feed2, mock_snapshot2)
6567

6668
# Assert that save was called once
6769
mock_save.assert_called_once()
@@ -96,7 +98,7 @@ def test_process_feed_data(self):
9698
]
9799

98100
# Run process_feed_data
99-
self.processor.process_feed_data(mock_feed, mock_snapshot, None)
101+
self.processor.process_feed_data(mock_feed, mock_snapshot)
100102

101103
# Assert the data was appended correctly
102104
self.assertEqual(len(self.processor.data), 1)

functions-python/preprocessed_analytics/tests/test_gtfs_processor.py

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,16 @@
1+
import os
12
import unittest
23
from unittest.mock import patch, MagicMock
34
from datetime import datetime
45
from processors.gtfs_analytics_processor import (
56
GTFSAnalyticsProcessor,
67
)
8+
from test_shared.test_utils.database_utils import default_db_url
79

810

911
class TestGTFSAnalyticsProcessor(unittest.TestCase):
10-
@patch("processors.base_analytics_processor.Database")
1112
@patch("processors.base_analytics_processor.storage.Client")
12-
def setUp(self, mock_storage_client, _):
13+
def setUp(self, mock_storage_client):
1314
self.mock_storage_client = mock_storage_client
1415
self.mock_bucket = MagicMock()
1516
self.mock_storage_client().bucket.return_value = self.mock_bucket
@@ -26,6 +27,7 @@ def setUp(self, mock_storage_client, _):
2627
"processors.gtfs_analytics_processor.GTFSAnalyticsProcessor"
2728
".update_analytics_files"
2829
)
30+
@patch.dict(os.environ, {"FEEDS_DATABASE_URL": default_db_url})
2931
def test_run(
3032
self,
3133
mock_update_analytics_files,
@@ -59,8 +61,8 @@ def test_run(
5961

6062
# Assert that process_feed_data was called twice (once for each feed-dataset pair)
6163
self.assertEqual(mock_process_feed_data.call_count, 2)
62-
mock_process_feed_data.assert_any_call(mock_feed1, mock_dataset1, {})
63-
mock_process_feed_data.assert_any_call(mock_feed2, mock_dataset2, {})
64+
mock_process_feed_data.assert_any_call(mock_feed1, mock_dataset1)
65+
mock_process_feed_data.assert_any_call(mock_feed2, mock_dataset2)
6466

6567
# Assert that save was called once
6668
mock_save.assert_called_once()
@@ -98,7 +100,7 @@ def test_process_feed_data(self):
98100
mock_dataset.validation_reports[0].features = [MagicMock(name="feature1")]
99101

100102
# Run process_feed_data
101-
self.processor.process_feed_data(mock_feed, mock_dataset, {})
103+
self.processor.process_feed_data(mock_feed, mock_dataset)
102104

103105
# Assert the data was appended correctly
104106
self.assertEqual(len(self.processor.data), 1)

0 commit comments

Comments
 (0)