Skip to content

Commit cdaec70

Browse files
feat: udpate feed status on validation report (#1122)
* udpate feed status on validation report
1 parent 4e4ea8a commit cdaec70

File tree

5 files changed

+122
-82
lines changed

5 files changed

+122
-82
lines changed
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
import logging
2+
from datetime import datetime, timezone
3+
from sqlalchemy import text
4+
from shared.database_gen.sqlacodegen_models import Gtfsdataset, Feed, t_feedsearch
5+
from shared.database.database import refresh_materialized_view
6+
from typing import TYPE_CHECKING
7+
8+
if TYPE_CHECKING:
9+
from sqlalchemy.orm import Session
10+
11+
12+
# query to update the status of the feeds based on the service date range of the latest dataset
13+
def update_feed_statuses_query(session: "Session", stable_feed_ids: list[str]):
14+
today_utc = datetime.now(timezone.utc).date()
15+
16+
latest_dataset_subq = (
17+
session.query(
18+
Gtfsdataset.feed_id,
19+
Gtfsdataset.service_date_range_start,
20+
Gtfsdataset.service_date_range_end,
21+
)
22+
.filter(
23+
Gtfsdataset.latest.is_(True),
24+
Gtfsdataset.service_date_range_start.isnot(None),
25+
Gtfsdataset.service_date_range_end.isnot(None),
26+
)
27+
.subquery()
28+
)
29+
30+
status_conditions = [
31+
(
32+
latest_dataset_subq.c.service_date_range_end < today_utc,
33+
"inactive",
34+
),
35+
(
36+
latest_dataset_subq.c.service_date_range_start > today_utc,
37+
"future",
38+
),
39+
(
40+
(latest_dataset_subq.c.service_date_range_start <= today_utc)
41+
& (latest_dataset_subq.c.service_date_range_end >= today_utc),
42+
"active",
43+
),
44+
]
45+
46+
try:
47+
diff_counts: dict[str, int] = {}
48+
49+
def get_filters(status: str):
50+
filters = [
51+
Feed.id == latest_dataset_subq.c.feed_id,
52+
Feed.status != text("'deprecated'::status"),
53+
Feed.status != text("'development'::status"),
54+
# We filter out feeds that already have the status so that the
55+
# update count reflects the number of feeds that actually
56+
# changed status.
57+
Feed.status != text("'%s'::status" % status),
58+
service_date_conditions,
59+
]
60+
61+
if len(stable_feed_ids) > 0:
62+
filters.insert(0, Feed.stable_id.in_(stable_feed_ids))
63+
64+
return filters
65+
66+
for service_date_conditions, status in status_conditions:
67+
diff_counts[status] = (
68+
session.query(Feed)
69+
.filter(*get_filters(status))
70+
.update({Feed.status: status}, synchronize_session=False)
71+
)
72+
except Exception as e:
73+
logging.error(f"Error updating feed statuses: {e}")
74+
raise Exception(f"Error updating feed statuses: {e}")
75+
76+
try:
77+
session.commit()
78+
refresh_materialized_view(session, t_feedsearch.name)
79+
logging.info("Feed Database changes for status committed.")
80+
session.close()
81+
return diff_counts
82+
except Exception as e:
83+
logging.error("Error committing changes:", e)
84+
session.rollback()
85+
session.close()
86+
raise Exception(f"Error creating dataset: {e}")

functions-python/process_validation_report/src/main.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
)
3535
from shared.helpers.logger import Logger
3636
from shared.helpers.transform import get_nested_value
37+
from shared.helpers.feed_status import update_feed_statuses_query
3738

3839
logging.basicConfig(level=logging.INFO)
3940

@@ -269,8 +270,10 @@ def create_validation_report_entities(
269270
db_session.add(entity)
270271
logging.info(f"Committing {len(entities)} entities to the database.")
271272
db_session.commit()
272-
273273
logging.info("Entities committed successfully.")
274+
275+
update_feed_statuses_query(db_session, [feed_stable_id])
276+
274277
return f"Created {len(entities)} entities.", 200
275278
except Exception as error:
276279
logging.error(f"Error creating validation report entities: {error}")

functions-python/update_feed_status/src/main.py

Lines changed: 3 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -1,95 +1,20 @@
11
import logging
22
import functions_framework
3-
from datetime import datetime, timezone
43
from shared.helpers.logger import Logger
5-
from typing import TYPE_CHECKING
6-
from sqlalchemy import text
7-
from shared.database_gen.sqlacodegen_models import Gtfsdataset, Feed, t_feedsearch
8-
from shared.database.database import refresh_materialized_view, with_db_session
9-
10-
if TYPE_CHECKING:
11-
from sqlalchemy.orm import Session
4+
from shared.helpers.feed_status import update_feed_statuses_query
5+
from shared.database.database import with_db_session
126

137
logging.basicConfig(level=logging.INFO)
148

159

16-
# query to update the status of the feeds based on the service date range of the latest dataset
17-
def update_feed_statuses_query(session: "Session"):
18-
today_utc = datetime.now(timezone.utc).date()
19-
20-
latest_dataset_subq = (
21-
session.query(
22-
Gtfsdataset.feed_id,
23-
Gtfsdataset.service_date_range_start,
24-
Gtfsdataset.service_date_range_end,
25-
)
26-
.filter(
27-
Gtfsdataset.latest.is_(True),
28-
Gtfsdataset.service_date_range_start.isnot(None),
29-
Gtfsdataset.service_date_range_end.isnot(None),
30-
)
31-
.subquery()
32-
)
33-
34-
status_conditions = [
35-
(
36-
latest_dataset_subq.c.service_date_range_end < today_utc,
37-
"inactive",
38-
),
39-
(
40-
latest_dataset_subq.c.service_date_range_start > today_utc,
41-
"future",
42-
),
43-
(
44-
(latest_dataset_subq.c.service_date_range_start <= today_utc)
45-
& (latest_dataset_subq.c.service_date_range_end >= today_utc),
46-
"active",
47-
),
48-
]
49-
50-
try:
51-
diff_counts: dict[str, int] = {}
52-
53-
for service_date_conditions, status in status_conditions:
54-
diff_counts[status] = (
55-
session.query(Feed)
56-
.filter(
57-
Feed.id == latest_dataset_subq.c.feed_id,
58-
Feed.status != text("'deprecated'::status"),
59-
Feed.status != text("'development'::status"),
60-
# We filter out feeds that already have the status so that the
61-
# update count reflects the number of feeds that actually
62-
# changed status.
63-
Feed.status != text("'%s'::status" % status),
64-
service_date_conditions,
65-
)
66-
.update({Feed.status: status}, synchronize_session=False)
67-
)
68-
except Exception as e:
69-
logging.error(f"Error updating feed statuses: {e}")
70-
raise Exception(f"Error updating feed statuses: {e}")
71-
72-
try:
73-
session.commit()
74-
refresh_materialized_view(session, t_feedsearch.name)
75-
logging.info("Feed Database changes committed.")
76-
session.close()
77-
return diff_counts
78-
except Exception as e:
79-
logging.error("Error committing changes:", e)
80-
session.rollback()
81-
session.close()
82-
raise Exception(f"Error creating dataset: {e}")
83-
84-
8510
@with_db_session
8611
@functions_framework.http
8712
def update_feed_status(_, db_session):
8813
"""Updates the Feed status based on the latets dataset service date range."""
8914
Logger.init_logger()
9015
try:
9116
logging.info("Database session started.")
92-
diff_counts = update_feed_statuses_query(db_session)
17+
diff_counts = update_feed_statuses_query(db_session, [])
9318
return diff_counts, 200
9419

9520
except Exception as error:

functions-python/update_feed_status/tests/conftest.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,9 @@ def populate_database(db_session):
5454
}
5555
for status, (a, b) in id_range_by_status.items():
5656
for _id in map(str, range(a, b + 1)):
57-
db_session.add(Feed(id=str(_id), status=status))
57+
db_session.add(
58+
Feed(id=str(_id), status=status, stable_id="mdb-" + str(_id))
59+
)
5860

5961
# -> inactive
6062
for _id in [

functions-python/update_feed_status/tests/test_update_feed_status_main.py

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
from unittest.mock import patch, MagicMock
22

3+
from conftest import clean_testing_db, populate_database
34
from shared.database.database import with_db_session
45
from test_shared.test_utils.database_utils import default_db_url
56
from main import (
@@ -39,7 +40,7 @@ def fetch_feeds(session: Session) -> Iterator[PartialFeed]:
3940
@with_db_session(db_url=default_db_url)
4041
def test_update_feed_status(db_session: Session) -> None:
4142
feeds_before: dict[str, PartialFeed] = {f.id: f for f in fetch_feeds(db_session)}
42-
result = dict(update_feed_statuses_query(db_session))
43+
result = dict(update_feed_statuses_query(db_session, []))
4344
assert result == {
4445
"inactive": 3,
4546
"active": 2,
@@ -62,6 +63,29 @@ def test_update_feed_status(db_session: Session) -> None:
6263
)
6364

6465

66+
@with_db_session(db_url=default_db_url)
67+
def test_update_feed_status_with_ids(db_session: Session) -> None:
68+
clean_testing_db()
69+
populate_database()
70+
feeds_before: dict[str, PartialFeed] = {f.id: f for f in fetch_feeds(db_session)}
71+
result = dict(update_feed_statuses_query(db_session, ["mdb-8"]))
72+
assert result == {
73+
"inactive": 1,
74+
"active": 0,
75+
"future": 0,
76+
}
77+
78+
feeds_after: dict[str, PartialFeed] = {f.id: f for f in fetch_feeds(db_session)}
79+
expected_status_changes = {
80+
"8": "inactive",
81+
}
82+
for feed_id, feed_before in feeds_before.items():
83+
feed_after = feeds_after[feed_id]
84+
assert feed_after.status == expected_status_changes.get(
85+
feed_id, feed_before.status
86+
)
87+
88+
6589
def test_update_feed_status_failed_query():
6690
mock_session = MagicMock()
6791

@@ -79,7 +103,7 @@ def test_update_feed_status_failed_query():
79103
mock_update_query.update.side_effect = Exception("Mocked exception")
80104

81105
try:
82-
update_feed_statuses_query(mock_session)
106+
update_feed_statuses_query(mock_session, [])
83107
except Exception as e:
84108
assert str(e) == "Error updating feed statuses: Mocked exception"
85109

0 commit comments

Comments
 (0)