Skip to content

Commit b64cb0c

Browse files
authored
feat: sync transitfeeds data with the mdb (#1451)
1 parent 765b49d commit b64cb0c

File tree

7 files changed

+815
-76
lines changed

7 files changed

+815
-76
lines changed

functions-python/tasks_executor/requirements.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,3 +28,6 @@ google-cloud-storage
2828
# Configuration
2929
python-dotenv==1.0.0
3030
pycountry
31+
32+
# Other utilities
33+
pandas

functions-python/tasks_executor/src/main.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import functions_framework
2121

2222
from shared.helpers.logger import init_logger
23+
from tasks.data_import.transitfeeds.sync_transitfeeds import sync_transitfeeds_handler
2324
from tasks.dataset_files.rebuild_missing_dataset_files import (
2425
rebuild_missing_dataset_files_handler,
2526
)
@@ -38,7 +39,7 @@
3839
from tasks.geojson.update_geojson_files_precision import (
3940
update_geojson_files_precision_handler,
4041
)
41-
from tasks.data_import.import_jbda_feeds import import_jbda_handler
42+
from tasks.data_import.jbda.import_jbda_feeds import import_jbda_handler
4243

4344
from tasks.licenses.populate_license_rules import (
4445
populate_license_rules_handler,
@@ -98,6 +99,10 @@
9899
"description": "Populates licenses and license-rules in the database from a predefined JSON source.",
99100
"handler": populate_licenses_handler,
100101
},
102+
"sync_transitfeeds_data": {
103+
"description": "Syncs data from TransitFeeds to the database.",
104+
"handler": sync_transitfeeds_handler,
105+
},
101106
}
102107

103108

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
import logging
2+
import uuid
3+
from datetime import datetime
4+
from typing import Tuple, Type, TypeVar
5+
6+
from sqlalchemy import select
7+
from sqlalchemy.orm import Session
8+
9+
from shared.database_gen.sqlacodegen_models import (
10+
Feed,
11+
Officialstatushistory,
12+
Entitytype,
13+
)
14+
15+
logger = logging.getLogger(__name__)
16+
T = TypeVar("T", bound="Feed")
17+
18+
19+
def _get_or_create_entity_type(session: Session, entity_type_name: str) -> Entitytype:
20+
"""Get or create an Entitytype by name."""
21+
logger.debug("Looking up Entitytype name=%s", entity_type_name)
22+
et = session.scalar(select(Entitytype).where(Entitytype.name == entity_type_name))
23+
if et:
24+
logger.debug("Found existing Entitytype name=%s", entity_type_name)
25+
return et
26+
et = Entitytype(name=entity_type_name)
27+
session.add(et)
28+
session.flush()
29+
logger.info("Created Entitytype name=%s", entity_type_name)
30+
return et
31+
32+
33+
def get_feed(
34+
session: Session,
35+
stable_id: str,
36+
model: Type[T] = Feed,
37+
) -> T | None:
38+
"""Get a Feed by stable_id."""
39+
logger.debug("Lookup feed stable_id=%s", stable_id)
40+
feed = session.scalar(select(model).where(model.stable_id == stable_id))
41+
if feed:
42+
logger.debug("Found existing feed stable_id=%s id=%s", stable_id, feed.id)
43+
else:
44+
logger.debug("No Feed found with stable_id=%s", stable_id)
45+
return feed
46+
47+
48+
def _get_or_create_feed(
49+
session: Session,
50+
model: Type[T],
51+
stable_id: str,
52+
data_type: str,
53+
is_official: bool = True,
54+
official_notes: str = "Imported from JBDA as official feed.",
55+
reviewer_email: str = "[email protected]",
56+
) -> Tuple[T, bool]:
57+
"""Generic helper to get or create a Feed subclass (Gtfsfeed, Gtfsrealtimefeed) by stable_id."""
58+
logger.debug(
59+
"Lookup feed model=%s stable_id=%s",
60+
getattr(model, "__name__", str(model)),
61+
stable_id,
62+
)
63+
feed = session.scalar(select(model).where(model.stable_id == stable_id))
64+
if feed:
65+
logger.info(
66+
"Found existing %s stable_id=%s id=%s",
67+
getattr(model, "__name__", str(model)),
68+
stable_id,
69+
feed.id,
70+
)
71+
return feed, False
72+
73+
new_id = str(uuid.uuid4())
74+
feed = model(
75+
id=new_id,
76+
data_type=data_type,
77+
stable_id=stable_id,
78+
official=is_official,
79+
official_updated_at=datetime.now(),
80+
)
81+
if is_official:
82+
feed.officialstatushistories = [
83+
Officialstatushistory(
84+
is_official=True,
85+
reviewer_email=reviewer_email,
86+
timestamp=datetime.now(),
87+
notes=official_notes,
88+
)
89+
]
90+
session.add(feed)
91+
session.flush()
92+
logger.info(
93+
"Created %s stable_id=%s id=%s data_type=%s",
94+
getattr(model, "__name__", str(model)),
95+
stable_id,
96+
new_id,
97+
data_type,
98+
)
99+
return feed, True

functions-python/tasks_executor/src/tasks/data_import/import_jbda_feeds.py renamed to functions-python/tasks_executor/src/tasks/data_import/jbda/import_jbda_feeds.py

Lines changed: 7 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -20,27 +20,28 @@
2020
import os
2121
import uuid
2222
from datetime import datetime
23-
from typing import Optional, Tuple, Dict, Any, List, Final, Type, TypeVar
23+
from typing import Optional, Tuple, Dict, Any, List, Final, TypeVar
2424

25-
import requests
2625
import pycountry
26+
import requests
2727
from sqlalchemy import select, and_
28-
from sqlalchemy.orm import Session
2928
from sqlalchemy.exc import IntegrityError
29+
from sqlalchemy.orm import Session
3030

3131
from shared.common.locations_utils import create_or_get_location
3232
from shared.database.database import with_db_session
3333
from shared.database_gen.sqlacodegen_models import (
3434
Feed,
3535
Gtfsfeed,
3636
Gtfsrealtimefeed,
37-
Entitytype,
3837
Feedrelatedlink,
3938
Externalid,
40-
Officialstatushistory,
4139
)
42-
4340
from shared.helpers.pub_sub import trigger_dataset_download
41+
from tasks.data_import.data_import_utils import (
42+
_get_or_create_entity_type,
43+
_get_or_create_feed,
44+
)
4445

4546
T = TypeVar("T", bound="Feed")
4647

@@ -99,20 +100,6 @@ def import_jbda_handler(payload: dict | None = None) -> dict:
99100
return result
100101

101102

102-
def _get_or_create_entity_type(session: Session, entity_type_name: str) -> Entitytype:
103-
"""Get or create an Entitytype by name."""
104-
logger.debug("Looking up Entitytype name=%s", entity_type_name)
105-
et = session.scalar(select(Entitytype).where(Entitytype.name == entity_type_name))
106-
if et:
107-
logger.debug("Found existing Entitytype name=%s", entity_type_name)
108-
return et
109-
et = Entitytype(name=entity_type_name)
110-
session.add(et)
111-
session.flush()
112-
logger.info("Created Entitytype name=%s", entity_type_name)
113-
return et
114-
115-
116103
def get_gtfs_file_url(
117104
detail_body: Dict[str, Any], rid: str = "current"
118105
) -> Optional[str]:
@@ -144,53 +131,6 @@ def get_gtfs_file_url(
144131
return None
145132

146133

147-
def _get_or_create_feed(
148-
session: Session, model: Type[T], stable_id: str, data_type: str
149-
) -> Tuple[T, bool]:
150-
"""Generic helper to get or create a Feed subclass (Gtfsfeed, Gtfsrealtimefeed) by stable_id."""
151-
logger.debug(
152-
"Lookup feed model=%s stable_id=%s",
153-
getattr(model, "__name__", str(model)),
154-
stable_id,
155-
)
156-
feed = session.scalar(select(model).where(model.stable_id == stable_id))
157-
if feed:
158-
logger.info(
159-
"Found existing %s stable_id=%s id=%s",
160-
getattr(model, "__name__", str(model)),
161-
stable_id,
162-
feed.id,
163-
)
164-
return feed, False
165-
166-
new_id = str(uuid.uuid4())
167-
feed = model(
168-
id=new_id,
169-
data_type=data_type,
170-
stable_id=stable_id,
171-
official=True,
172-
official_updated_at=datetime.now(),
173-
)
174-
feed.officialstatushistories = [
175-
Officialstatushistory(
176-
is_official=True,
177-
reviewer_email="[email protected]",
178-
timestamp=datetime.now(),
179-
notes="Imported from JBDA as official feed.",
180-
)
181-
]
182-
session.add(feed)
183-
session.flush()
184-
logger.info(
185-
"Created %s stable_id=%s id=%s data_type=%s",
186-
getattr(model, "__name__", str(model)),
187-
stable_id,
188-
new_id,
189-
data_type,
190-
)
191-
return feed, True
192-
193-
194134
def _update_common_feed_fields(
195135
feed: Feed, list_item: dict, detail: dict, producer_url: str
196136
) -> None:

0 commit comments

Comments
 (0)