Skip to content

Commit cf9bbb6

Browse files
committed
feat: mdb -> tdg redirect
1 parent 4adc818 commit cf9bbb6

File tree

4 files changed

+277
-0
lines changed

4 files changed

+277
-0
lines changed
Lines changed: 269 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,269 @@
1+
#!/usr/bin/env python3
2+
#
3+
# MobilityData 2025
4+
#
5+
# Licensed under the Apache License, Version 2.0 (the "License");
6+
# you may not use this file except in compliance with the License.
7+
# You may obtain a copy of the License at
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
#
16+
17+
from __future__ import annotations
18+
19+
import logging
20+
import os
21+
from typing import Dict, Optional
22+
23+
import pandas as pd
24+
from sqlalchemy.exc import IntegrityError
25+
from sqlalchemy.orm import Session
26+
27+
from shared.database.database import with_db_session
28+
from shared.database_gen.sqlacodegen_models import Feed, Redirectingid
29+
30+
logger = logging.getLogger(__name__)
31+
32+
TDG_REDIRECT_DATA_LINK = (
33+
"https://raw.githubusercontent.com/MobilityData/mobility-feed-api/"
34+
"refs/heads/main/functions-data/tdg_feed_redirect/redirect_mdb_to_tdg.csv"
35+
)
36+
37+
DEFAULT_COMMIT_BATCH_SIZE = 100
38+
39+
40+
def _update_feed_redirect(
41+
db_session: Session, mdb_stable_id: str, tdg_stable_id: str
42+
) -> Dict[str, int]:
43+
"""
44+
Ensure there is a Redirectingid from MDB feed to TDG feed.
45+
46+
Returns a dict with counters:
47+
{
48+
"redirects_created": 0|1,
49+
"redirects_existing": 0|1,
50+
"missing_mdb_feeds": 0|1,
51+
"missing_tdg_feeds": 0|1,
52+
}
53+
"""
54+
counters = {
55+
"redirects_created": 0,
56+
"redirects_existing": 0,
57+
"missing_mdb_feeds": 0,
58+
"missing_tdg_feeds": 0,
59+
}
60+
61+
mdb_feed: Feed | None = (
62+
db_session.query(Feed).filter(Feed.stable_id == mdb_stable_id).one_or_none()
63+
)
64+
if not mdb_feed:
65+
logger.warning(
66+
"MDB feed not found for stable_id=%s, skipping redirect", mdb_stable_id
67+
)
68+
counters["missing_mdb_feeds"] = 1
69+
return counters
70+
71+
tdg_feed: Feed | None = (
72+
db_session.query(Feed).filter(Feed.stable_id == tdg_stable_id).one_or_none()
73+
)
74+
if not tdg_feed:
75+
logger.warning(
76+
"TDG feed not found for stable_id=%s, skipping redirect", tdg_stable_id
77+
)
78+
counters["missing_tdg_feeds"] = 1
79+
return counters
80+
81+
# Both feeds exist: ensure redirect exists (source MDB → target TDG).
82+
redirect = (
83+
db_session.query(Redirectingid)
84+
.filter(
85+
Redirectingid.target_id == tdg_stable_id,
86+
Redirectingid.source_id == mdb_stable_id,
87+
)
88+
.one_or_none()
89+
)
90+
91+
if redirect:
92+
logger.info(
93+
"Redirect already exists: source=%s → target=%s",
94+
mdb_stable_id,
95+
tdg_stable_id,
96+
)
97+
counters["redirects_existing"] = 1
98+
return counters
99+
100+
logger.info(
101+
"Creating redirect: source=%s → target=%s",
102+
mdb_stable_id,
103+
tdg_stable_id,
104+
)
105+
redirect = Redirectingid(
106+
target_id=tdg_stable_id,
107+
source_id=mdb_stable_id,
108+
redirect_comment="Redirecting post TDG import",
109+
)
110+
db_session.add(redirect)
111+
counters["redirects_created"] = 1
112+
return counters
113+
114+
115+
def commit_changes(db_session: Session, created_since_commit: int) -> None:
116+
"""
117+
Commit DB changes for redirects.
118+
119+
Mirrors the TDG import pattern: commit, rollback on IntegrityError.
120+
"""
121+
try:
122+
logger.info(
123+
"Committing DB changes after creating %d redirect(s)", created_since_commit
124+
)
125+
db_session.commit()
126+
except IntegrityError:
127+
db_session.rollback()
128+
logger.exception(
129+
"Commit failed with IntegrityError; rolled back TDG redirects batch"
130+
)
131+
132+
133+
@with_db_session
134+
def _update_tdg_redirects(db_session: Session, dry_run: bool = True) -> dict:
135+
"""
136+
Orchestrate TDG redirect updates:
137+
- Load redirect CSV
138+
- For each row, ensure redirect from MDB → TDG
139+
- Support dry_run and batch commits (COMMIT_BATCH_SIZE)
140+
"""
141+
logger.info("Starting TDG redirects update dry_run=%s", dry_run)
142+
143+
try:
144+
df = pd.read_csv(TDG_REDIRECT_DATA_LINK)
145+
except Exception as e:
146+
logger.exception(
147+
"Failed to load TDG redirect CSV from %s", TDG_REDIRECT_DATA_LINK
148+
)
149+
return {
150+
"message": "Failed to load TDG redirect CSV.",
151+
"error": str(e),
152+
"params": {"dry_run": dry_run},
153+
"rows_processed": 0,
154+
"redirects_created": 0,
155+
"redirects_existing": 0,
156+
"missing_mdb_feeds": 0,
157+
"missing_tdg_feeds": 0,
158+
}
159+
160+
commit_batch_size = int(
161+
os.getenv("COMMIT_BATCH_SIZE", str(DEFAULT_COMMIT_BATCH_SIZE))
162+
)
163+
logger.info("Commit batch size (env COMMIT_BATCH_SIZE)=%s", commit_batch_size)
164+
165+
rows_processed = 0
166+
redirects_created = 0
167+
redirects_existing = 0
168+
missing_mdb_feeds = 0
169+
missing_tdg_feeds = 0
170+
171+
created_since_commit = 0
172+
173+
for idx, row in df.iterrows():
174+
mdb_stable_id = row.get("MDB ID")
175+
tdg_ids_raw = row.get("TDG ID")
176+
177+
if not isinstance(mdb_stable_id, str) or not isinstance(tdg_ids_raw, str):
178+
logger.warning(
179+
"Skipping row index=%s: invalid MDB/TDG IDs row=%s",
180+
idx,
181+
row.to_dict(),
182+
)
183+
continue
184+
185+
tdg_stable_ids = [
186+
f"tdg-{stable_id.strip()}"
187+
for stable_id in tdg_ids_raw.split(",")
188+
if str(stable_id).strip()
189+
]
190+
191+
for tdg_stable_id in tdg_stable_ids:
192+
rows_processed += 1
193+
logger.debug(
194+
"Processing redirect row: MDB=%s TDG=%s",
195+
mdb_stable_id,
196+
tdg_stable_id,
197+
)
198+
199+
counters = _update_feed_redirect(
200+
db_session=db_session,
201+
mdb_stable_id=mdb_stable_id,
202+
tdg_stable_id=tdg_stable_id,
203+
)
204+
205+
redirects_created += counters["redirects_created"]
206+
redirects_existing += counters["redirects_existing"]
207+
missing_mdb_feeds += counters["missing_mdb_feeds"]
208+
missing_tdg_feeds += counters["missing_tdg_feeds"]
209+
210+
created_since_commit += counters["redirects_created"]
211+
212+
if not dry_run and created_since_commit >= commit_batch_size:
213+
commit_changes(db_session, created_since_commit)
214+
created_since_commit = 0
215+
216+
if not dry_run and created_since_commit > 0:
217+
commit_changes(db_session, created_since_commit)
218+
219+
message = (
220+
"Dry run: no DB writes performed."
221+
if dry_run
222+
else "TDG redirects update executed successfully."
223+
)
224+
summary = {
225+
"message": message,
226+
"rows_processed": rows_processed,
227+
"redirects_created": redirects_created,
228+
"redirects_existing": redirects_existing,
229+
"missing_mdb_feeds": missing_mdb_feeds,
230+
"missing_tdg_feeds": missing_tdg_feeds,
231+
"params": {"dry_run": dry_run},
232+
}
233+
logger.info("TDG redirects update summary: %s", summary)
234+
return summary
235+
236+
237+
def update_tdg_redirects_handler(payload: Optional[dict] = None) -> dict:
238+
"""
239+
Cloud Function-style entrypoint.
240+
241+
Payload: {"dry_run": bool} (default True)
242+
"""
243+
payload = payload or {}
244+
logger.info("update_tdg_redirects_handler called with payload=%s", payload)
245+
246+
dry_run_raw = payload.get("dry_run", True)
247+
dry_run = (
248+
dry_run_raw
249+
if isinstance(dry_run_raw, bool)
250+
else str(dry_run_raw).lower() == "true"
251+
)
252+
logger.info("Parsed dry_run=%s (raw=%s)", dry_run, dry_run_raw)
253+
254+
result = _update_tdg_redirects(dry_run=dry_run)
255+
logger.info(
256+
"update_tdg_redirects_handler summary: %s",
257+
{
258+
k: result.get(k)
259+
for k in (
260+
"message",
261+
"rows_processed",
262+
"redirects_created",
263+
"redirects_existing",
264+
"missing_mdb_feeds",
265+
"missing_tdg_feeds",
266+
)
267+
},
268+
)
269+
return result

functions-python/tasks_executor/tests/tasks/data_import/test_redirect_mdb_feeds.py

Whitespace-only changes.

liquibase/changelog.xml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,4 +89,6 @@
8989
<!-- Enhance license queries on id and name columns. -->
9090
<!-- Individual transactions are needed to create these indexes, this is why XML file is used -->
9191
<include file="changes/feat_1432_indexes.xml" relativeToChangelogFile="true"/>
92+
<!-- Added constraint on operational status -->
93+
<include file="changes/fix_operation_status_constraint.sql" relativeToChangelogFile="true"/>
9294
</databaseChangeLog>
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
update feed
2+
set operational_status = 'published'
3+
where operational_status is null;
4+
5+
alter table feed
6+
alter column operational_status set not null;

0 commit comments

Comments
 (0)