diff --git a/api/.openapi-generator/FILES b/api/.openapi-generator/FILES
index 42513a9d8..008305ccf 100644
--- a/api/.openapi-generator/FILES
+++ b/api/.openapi-generator/FILES
@@ -18,6 +18,7 @@ src/feeds_gen/models/bounding_box.py
src/feeds_gen/models/external_id.py
src/feeds_gen/models/extra_models.py
src/feeds_gen/models/feed.py
+src/feeds_gen/models/feed_related_link.py
src/feeds_gen/models/gbfs_endpoint.py
src/feeds_gen/models/gbfs_feed.py
src/feeds_gen/models/gbfs_validation_report.py
diff --git a/api/src/shared/common/db_utils.py b/api/src/shared/common/db_utils.py
index 45c8e9b4b..8e1bf698b 100644
--- a/api/src/shared/common/db_utils.py
+++ b/api/src/shared/common/db_utils.py
@@ -379,6 +379,7 @@ def get_joinedload_options(include_extracted_location_entities: bool = False) ->
return joinedload_options + [
joinedload(Feed.locations),
joinedload(Feed.externalids),
+ joinedload(Feed.feedrelatedlinks),
joinedload(Feed.redirectingids).joinedload(Redirectingid.target),
joinedload(Feed.officialstatushistories),
]
diff --git a/api/src/shared/db_models/feed_impl.py b/api/src/shared/db_models/feed_impl.py
index dc4bf527b..477682741 100644
--- a/api/src/shared/db_models/feed_impl.py
+++ b/api/src/shared/db_models/feed_impl.py
@@ -1,6 +1,7 @@
from shared.db_models.basic_feed_impl import BaseFeedImpl
from feeds_gen.models.feed import Feed
from shared.database_gen.sqlacodegen_models import Feed as FeedOrm
+from shared.db_models.feed_related_link_impl import FeedRelatedLinkImpl
class FeedImpl(BaseFeedImpl, Feed):
@@ -23,5 +24,6 @@ def from_orm(cls, feed_orm: FeedOrm | None) -> Feed | None:
feed.official = feed_orm.official
feed.official_updated_at = feed_orm.official_updated_at
feed.feed_name = feed_orm.feed_name
+ feed.related_links = [FeedRelatedLinkImpl.from_orm(related_link) for related_link in feed_orm.feedrelatedlinks]
feed.note = feed_orm.note
return feed
diff --git a/api/src/shared/db_models/feed_related_link_impl.py b/api/src/shared/db_models/feed_related_link_impl.py
new file mode 100644
index 000000000..5927e4422
--- /dev/null
+++ b/api/src/shared/db_models/feed_related_link_impl.py
@@ -0,0 +1,23 @@
+from feeds_gen.models.feed_related_link import FeedRelatedLink
+from shared.database_gen.sqlacodegen_models import Feedrelatedlink
+
+
+class FeedRelatedLinkImpl(FeedRelatedLink):
+ """Implementation of the FeedRelatedLink model."""
+
+ class Config:
+ """Pydantic configuration.
+ Enabling `from_orm` method to create a model instance from a SQLAlchemy row object."""
+
+ from_attributes = True
+
+ @classmethod
+ def from_orm(cls, feed_related_link_orm: Feedrelatedlink) -> FeedRelatedLink | None:
+ if not feed_related_link_orm:
+ return None
+ return cls(
+ code=feed_related_link_orm.code,
+ url=feed_related_link_orm.url,
+ description=feed_related_link_orm.description,
+ created_at=feed_related_link_orm.created_at,
+ )
diff --git a/api/tests/unittest/models/test_basic_feed_impl.py b/api/tests/unittest/models/test_basic_feed_impl.py
index a6435182a..f9baeafe4 100644
--- a/api/tests/unittest/models/test_basic_feed_impl.py
+++ b/api/tests/unittest/models/test_basic_feed_impl.py
@@ -95,6 +95,7 @@
feed_name="feed_name",
note="note",
feed_contact_email="feed_contact_email",
+ related_links=[],
source_info=SourceInfo(
producer_url="producer_url",
authentication_type=1,
@@ -149,6 +150,7 @@ def test_from_orm_empty_fields(self):
external_ids=[],
redirects=[],
source_info=SourceInfo(),
+ related_links=[],
)
empty_result = FeedImpl.from_orm(empty_feed_orm)
assert empty_result == expected_empty_feed
diff --git a/api/tests/unittest/models/test_gbfs_feed_impl.py b/api/tests/unittest/models/test_gbfs_feed_impl.py
index 701f34acd..621de0513 100644
--- a/api/tests/unittest/models/test_gbfs_feed_impl.py
+++ b/api/tests/unittest/models/test_gbfs_feed_impl.py
@@ -49,6 +49,7 @@ def test_from_orm_all_fields(self):
expected = GbfsFeedImpl(
id="feed_stable_1",
system_id="sys1",
+ related_links=[],
data_type="gbfs",
created_at=datetime(2024, 1, 1, 10, 0, 0),
external_ids=[],
@@ -88,6 +89,7 @@ def test_from_orm_empty_fields(self):
redirects=[],
locations=[],
versions=[],
+ related_links=[],
bounding_box=None,
bounding_box_generated_at=None,
source_info=SourceInfo(
diff --git a/api/tests/unittest/models/test_gtfs_feed_impl.py b/api/tests/unittest/models/test_gtfs_feed_impl.py
index a5ac64582..336ace53c 100644
--- a/api/tests/unittest/models/test_gtfs_feed_impl.py
+++ b/api/tests/unittest/models/test_gtfs_feed_impl.py
@@ -134,6 +134,7 @@ def create_test_notice(notice_code: str, total_notices: int, severity: str):
feed_name="feed_name",
note="note",
feed_contact_email="feed_contact_email",
+ related_links=[],
source_info=SourceInfo(
producer_url="producer_url",
authentication_type=1,
@@ -226,6 +227,7 @@ def test_from_orm_empty_fields(self):
provider=None,
feed_name="",
note=None,
+ related_links=[],
feed_contact_email=None,
source_info=SourceInfo(
producer_url=None,
diff --git a/api/tests/unittest/models/test_gtfs_rt_feed_impl.py b/api/tests/unittest/models/test_gtfs_rt_feed_impl.py
index 6b2b5ae50..09a478e42 100644
--- a/api/tests/unittest/models/test_gtfs_rt_feed_impl.py
+++ b/api/tests/unittest/models/test_gtfs_rt_feed_impl.py
@@ -64,6 +64,7 @@
id="id",
data_type="gtfs_rt",
status="active",
+ related_links=[],
external_ids=[ExternalIdImpl(external_id="associated_id", source="source")],
provider="provider",
feed_name="feed_name",
diff --git a/api/tests/unittest/test_feeds.py b/api/tests/unittest/test_feeds.py
index 456b4797d..70cfb6dc3 100644
--- a/api/tests/unittest/test_feeds.py
+++ b/api/tests/unittest/test_feeds.py
@@ -72,6 +72,7 @@
"producer_url": "test_producer_url",
},
external_ids=[{"external_id": "test_associated_id", "source": "test_source"}],
+ related_links=[],
redirects=[{"comment": "Some comment", "target_id": "test_target_id"}],
).model_dump_json()
)
diff --git a/docs/DatabaseCatalogAPI.yaml b/docs/DatabaseCatalogAPI.yaml
index fd8eb86a7..6432ece35 100644
--- a/docs/DatabaseCatalogAPI.yaml
+++ b/docs/DatabaseCatalogAPI.yaml
@@ -454,7 +454,36 @@ components:
note:
description: A note to clarify complex use cases for consumers.
type: string
-
+ related_links:
+ description: >
+ A list of related links for the feed.
+ type: array
+ items:
+ $ref: '#/components/schemas/FeedRelatedLink'
+ FeedRelatedLink:
+ type: object
+ properties:
+ code:
+ description: >
+ A short code to identify the type of link.
+ type: string
+ example: next_1
+ description:
+ description: >
+ A description of the link.
+ type: string
+ example: The URL for a future feed version with an upcoming service period.
+ url:
+ description: >
+ The URL of the related link.
+ type: string
+ format: url
+ created_at:
+ description: >
+ The date and time the related link was created, in ISO 8601 date-time format.
+ type: string
+ example: 2023-07-10T22:06:00Z
+ format: date-time
GtfsFeed:
allOf:
- $ref: "#/components/schemas/Feed"
diff --git a/docs/OperationsAPI.yaml b/docs/OperationsAPI.yaml
index e9b698ee3..9c8606731 100644
--- a/docs/OperationsAPI.yaml
+++ b/docs/OperationsAPI.yaml
@@ -285,6 +285,41 @@ components:
note:
description: A note to clarify complex use cases for consumers.
type: string
+ related_links:
+ description: >
+ A list of related links for the feed.
+
+ type: array
+ items:
+ $ref: '#/components/schemas/FeedRelatedLink'
+ FeedRelatedLink:
+ type: object
+ properties:
+ code:
+ description: >
+ A short code to identify the type of link.
+
+ type: string
+ example: next_1
+ description:
+ description: >
+ A description of the link.
+
+ type: string
+ example: The URL for a future feed version with an upcoming service period.
+ url:
+ description: >
+ The URL of the related link.
+
+ type: string
+ format: url
+ created_at:
+ description: >
+ The date and time the related link was created, in ISO 8601 date-time format.
+
+ type: string
+ example: 2023-07-10T22:06:00Z
+ format: date-time
GtfsFeed:
allOf:
- $ref: "#/components/schemas/Feed"
@@ -990,7 +1025,7 @@ components:
stable_id:
description: Unique stable identifier used as a key for the feeds table.
type: string
- example: mdb-1210
+ example: mdb-1210
operational_status:
type: string
enum: [wip, published, unpublished]
@@ -1006,7 +1041,7 @@ components:
stable_id:
description: Unique stable identifier used as a key for the feeds table.
type: string
- example: mdb-1210
+ example: mdb-1210
operational_status:
type: string
enum: [wip, published, unpublished]
@@ -1058,6 +1093,7 @@ components:
description: >
The type of realtime entry:
+
* vp - vehicle positions
* tu - trip updates
* sa - service alerts
@@ -1135,6 +1171,7 @@ components:
Describes status of the Feed. Should be one of
+
* `active` Feed should be used in public trip planners.
* `deprecated` Feed is explicitly deprecated and should not be used in public trip planners.
* `inactive` Feed hasn't been recently updated and should be used at risk of providing outdated information.
@@ -1154,6 +1191,7 @@ components:
Describes data type of a feed. Should be one of
+
* `gtfs` GTFS feed.
* `gtfs_rt` GTFS-RT feed.
* `gbfs` GBFS feed.
diff --git a/functions-python/helpers/locations.py b/functions-python/helpers/locations.py
index e0286b70b..6f5e25b97 100644
--- a/functions-python/helpers/locations.py
+++ b/functions-python/helpers/locations.py
@@ -113,6 +113,7 @@ def create_or_get_location(
municipality=city_name,
)
session.add(location)
+ session.flush()
logging.debug(f"Created new location: {location_id}")
return location
diff --git a/functions-python/tasks_executor/requirements.txt b/functions-python/tasks_executor/requirements.txt
index 0dbf26655..ecdc13525 100644
--- a/functions-python/tasks_executor/requirements.txt
+++ b/functions-python/tasks_executor/requirements.txt
@@ -27,3 +27,4 @@ google-cloud-storage
# Configuration
python-dotenv==1.0.0
+pycountry
diff --git a/functions-python/tasks_executor/src/main.py b/functions-python/tasks_executor/src/main.py
index 56308c3cc..5608ca19e 100644
--- a/functions-python/tasks_executor/src/main.py
+++ b/functions-python/tasks_executor/src/main.py
@@ -38,6 +38,7 @@
from tasks.geojson.update_geojson_files_precision import (
update_geojson_files_precision_handler,
)
+from tasks.data_import.import_jbda_feeds import import_jbda_handler
init_logger()
LIST_COMMAND: Final[str] = "list"
@@ -77,6 +78,10 @@
"description": "Rebuilds missing visualization files for GTFS datasets.",
"handler": rebuild_missing_visualization_files_handler,
},
+ "jbda_import": {
+ "description": "Imports JBDA data into the system.",
+ "handler": import_jbda_handler,
+ },
}
diff --git a/functions-python/tasks_executor/src/tasks/data_import/data_import_utils.py b/functions-python/tasks_executor/src/tasks/data_import/data_import_utils.py
new file mode 100644
index 000000000..fbd495ab6
--- /dev/null
+++ b/functions-python/tasks_executor/src/tasks/data_import/data_import_utils.py
@@ -0,0 +1,43 @@
+import os
+from shared.database_gen.sqlacodegen_models import Feed
+import logging
+import json
+from google.cloud import pubsub_v1
+
+PROJECT_ID = os.getenv("PROJECT_ID")
+DATASET_BATCH_TOPIC = os.getenv("DATASET_PROCESSING_TOPIC_NAME")
+
+
+def trigger_dataset_download(
+ feed: Feed, execution_id: str, publisher: pubsub_v1.PublisherClient
+) -> None:
+ """Publishes the feed to the configured Pub/Sub topic."""
+ topic_path = publisher.topic_path(PROJECT_ID, DATASET_BATCH_TOPIC)
+ logging.debug("Publishing to Pub/Sub topic: %s", topic_path)
+
+ message_data = {
+ "execution_id": execution_id,
+ "producer_url": feed.producer_url,
+ "feed_stable_id": feed.stable_id,
+ "feed_id": feed.id,
+ "dataset_id": None,
+ "dataset_hash": None,
+ "authentication_type": feed.authentication_type,
+ "authentication_info_url": feed.authentication_info_url,
+ "api_key_parameter_name": feed.api_key_parameter_name,
+ }
+
+ try:
+ # Convert to JSON string
+ json_message = json.dumps(message_data)
+ future = publisher.publish(topic_path, data=json_message.encode("utf-8"))
+ future.add_done_callback(
+ lambda _: logging.info(
+ "Published feed %s to dataset batch topic", feed.stable_id
+ )
+ )
+ future.result()
+ logging.info("Message published for feed %s", feed.stable_id)
+ except Exception as e:
+ logging.error("Error publishing to dataset batch topic: %s", str(e))
+ raise
diff --git a/functions-python/tasks_executor/src/tasks/data_import/import_jbda_feeds.py b/functions-python/tasks_executor/src/tasks/data_import/import_jbda_feeds.py
new file mode 100644
index 000000000..50d4e41db
--- /dev/null
+++ b/functions-python/tasks_executor/src/tasks/data_import/import_jbda_feeds.py
@@ -0,0 +1,655 @@
+#!/usr/bin/env python3
+#
+# MobilityData 2025
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from __future__ import annotations
+
+import logging
+import os
+import uuid
+from datetime import datetime
+from typing import Optional, Tuple, Dict, Any, List, Final, Type, TypeVar
+
+import requests
+import pycountry
+from sqlalchemy import select, and_
+from sqlalchemy.orm import Session
+from sqlalchemy.exc import IntegrityError
+
+from shared.database.database import with_db_session
+from shared.database_gen.sqlacodegen_models import (
+ Feed,
+ Gtfsfeed,
+ Gtfsrealtimefeed,
+ Entitytype,
+ Feedrelatedlink,
+ Externalid,
+ Officialstatushistory,
+)
+from shared.helpers.locations import create_or_get_location
+from tasks.data_import.data_import_utils import trigger_dataset_download
+from google.cloud import pubsub_v1
+
+T = TypeVar("T", bound="Feed")
+
+logger = logging.getLogger(__name__)
+
+JBDA_BASE: Final[str] = "https://api.gtfs-data.jp/v2"
+FEEDS_URL: Final[str] = f"{JBDA_BASE}/feeds"
+DETAIL_URL_TMPL: Final[str] = f"{JBDA_BASE}/organizations/{{org_id}}/feeds/{{feed_id}}"
+REQUEST_TIMEOUT_S: Final[int] = 60
+
+URLS_TO_ENTITY_TYPES_MAP: Final[dict[str, str]] = {
+ "trip_update_url": "tu",
+ "vehicle_position_url": "vp",
+ "alert_url": "sa",
+}
+
+RID_DESCRIPTIONS: Final[dict[str, str]] = {
+ "next_1": "The URL for a future feed version with an upcoming service period.",
+ "next_2": "The URL for a future feed version with a service period that will proceed after next_1.",
+ "prev_1": "The URL for the expired feed version. This URL was proceeded by the current feed.",
+ "prev_2": "The URL for a past feed version with an expired service period.",
+}
+
+
+def import_jbda_handler(payload: dict | None = None) -> dict:
+ """
+ Cloud Function entrypoint.
+ Payload: {"dry_run": bool} (default True)
+ """
+ payload = payload or {}
+ logger.info("import_jbda_handler called with payload=%s", payload)
+
+ dry_run_raw = payload.get("dry_run", True)
+ dry_run = (
+ dry_run_raw
+ if isinstance(dry_run_raw, bool)
+ else str(dry_run_raw).lower() == "true"
+ )
+ logger.info("Parsed dry_run=%s (raw=%s)", dry_run, dry_run_raw)
+
+ result = _import_jbda(dry_run=dry_run)
+ logger.info(
+ "import_jbda_handler summary: %s",
+ {
+ k: result.get(k)
+ for k in (
+ "message",
+ "created_gtfs",
+ "updated_gtfs",
+ "created_rt",
+ "linked_refs",
+ "total_processed_items",
+ )
+ },
+ )
+ return result
+
+
+def _get_or_create_entity_type(session: Session, entity_type_name: str) -> Entitytype:
+ """Get or create an Entitytype by name."""
+ logger.debug("Looking up Entitytype name=%s", entity_type_name)
+ et = session.scalar(select(Entitytype).where(Entitytype.name == entity_type_name))
+ if et:
+ logger.debug("Found existing Entitytype name=%s", entity_type_name)
+ return et
+ et = Entitytype(name=entity_type_name)
+ session.add(et)
+ session.flush()
+ logger.info("Created Entitytype name=%s", entity_type_name)
+ return et
+
+
+def get_gtfs_file_url(
+ detail_body: Dict[str, Any], rid: str = "current"
+) -> Optional[str]:
+ """
+ Build & validate the GTFS file download URL for a given rid (e.g., 'current', 'next_1', 'prev_1').
+ Uses a HEAD request (fast) and returns None for 404 or failures.
+ """
+ org_id = detail_body.get("organization_id")
+ feed_id = detail_body.get("feed_id")
+ if not org_id or not feed_id:
+ logger.warning(
+ "Cannot construct GTFS file URL; missing organization_id (%s) or feed_id (%s)",
+ org_id,
+ feed_id,
+ )
+ return None
+
+ expected_url = f"https://api.gtfs-data.jp/v2/organizations/{org_id}/feeds/{feed_id}/files/feed.zip?rid={rid}"
+ try:
+ resp = requests.head(expected_url, allow_redirects=True, timeout=15)
+ if resp.status_code == 200:
+ logger.debug("Verified GTFS file URL (rid=%s): %s", rid, expected_url)
+ return expected_url
+ logger.debug(
+ "GTFS file URL check failed for rid=%s (status=%s)", rid, resp.status_code
+ )
+ except requests.RequestException as e:
+ logger.warning("HEAD request failed for %s: %s", expected_url, e)
+ return None
+
+
+def _get_or_create_feed(
+ session: Session, model: Type[T], stable_id: str, data_type: str
+) -> Tuple[T, bool]:
+ """Generic helper to get or create a Feed subclass (Gtfsfeed, Gtfsrealtimefeed) by stable_id."""
+ logger.debug(
+ "Lookup feed model=%s stable_id=%s",
+ getattr(model, "__name__", str(model)),
+ stable_id,
+ )
+ feed = session.scalar(select(model).where(model.stable_id == stable_id))
+ if feed:
+ logger.info(
+ "Found existing %s stable_id=%s id=%s",
+ getattr(model, "__name__", str(model)),
+ stable_id,
+ feed.id,
+ )
+ return feed, False
+
+ new_id = str(uuid.uuid4())
+ feed = model(
+ id=new_id,
+ data_type=data_type,
+ stable_id=stable_id,
+ official=True,
+ official_updated_at=datetime.now(),
+ )
+ feed.officialstatushistories = [
+ Officialstatushistory(
+ is_official=True,
+ reviewer_email="emma@mobilitydata.org",
+ timestamp=datetime.now(),
+ notes="Imported from JBDA as official feed.",
+ )
+ ]
+ session.add(feed)
+ session.flush()
+ logger.info(
+ "Created %s stable_id=%s id=%s data_type=%s",
+ getattr(model, "__name__", str(model)),
+ stable_id,
+ new_id,
+ data_type,
+ )
+ return feed, True
+
+
+def _update_common_feed_fields(
+ feed: Feed, list_item: dict, detail: dict, producer_url: str
+) -> None:
+ """Update common fields of a Feed (Gtfsfeed or Gtfsrealtimefeed) from JBDA list item and detail."""
+ logger.debug(
+ "Updating common fields for feed id=%s stable_id=%s",
+ getattr(feed, "id", None),
+ getattr(feed, "stable_id", None),
+ )
+ feed.feed_name = detail.get("feed_name")
+ feed.provider = list_item.get("organization_name")
+ feed.producer_url = producer_url
+ feed.license_url = detail.get("feed_license_url")
+ feed.feed_contact_email = (
+ list_item.get("organization_email")
+ if list_item.get("organization_email") != "not_set"
+ else None
+ )
+ feed.status = "active"
+ feed.operational_status = "wip"
+ feed.note = list_item.get("feed_memo")
+
+ # Ensure a JBDA external id exists; only append if missing.
+ jbda_id = feed.stable_id.replace("jbda-", "")
+ has_jbda = any(
+ (ei.source == "jbda" and ei.associated_id == jbda_id)
+ for ei in getattr(feed, "externalids", [])
+ )
+ if not has_jbda:
+ feed.externalids.append(Externalid(associated_id=jbda_id, source="jbda"))
+ logger.debug("Appended missing JBDA Externalid for %s", feed.stable_id)
+
+ logger.debug(
+ "Updated fields: name=%s provider=%s producer_url_set=%s",
+ feed.feed_name,
+ feed.provider,
+ bool(producer_url),
+ )
+
+
+def _get_or_create_location(db_session: Session, pref_id_raw: Any):
+ """
+ Map JBDA pref_id (1..47) to JP subdivision name; create or get Location.
+ """
+ logger.debug("Resolving location for pref_id_raw=%s", pref_id_raw)
+ try:
+ i = int(str(pref_id_raw).strip())
+ code = f"JP-{i:02d}"
+ municipality = pycountry.subdivisions.lookup(code).name
+ except Exception as e:
+ logger.warning(
+ "Failed to map pref_id_raw=%s to JP subdivision: %s", pref_id_raw, e
+ )
+ municipality = None
+
+ loc = create_or_get_location(
+ db_session, country="Japan", state_province=municipality, city_name=None
+ )
+ logger.info(
+ "Location resolved for pref_id=%s → %s", pref_id_raw, getattr(loc, "id", None)
+ )
+ return loc
+
+
+def _add_related_gtfs_url(
+ detail_body: Dict[str, Any],
+ db_session: Session,
+ rid: str,
+ description: str,
+ feed: Gtfsfeed,
+):
+ """
+ Add a related GTFS URL (prev/next) if it exists and isn't already present.
+ """
+ logger.debug("Adding related GTFS URL for feed_id=%s rid=%s", feed.id, rid)
+ url = get_gtfs_file_url(detail_body, rid=rid)
+ if not url:
+ logger.info("No URL available for rid=%s; skipping related link", rid)
+ return
+ db_rid = "jbda-" + rid
+ existing = db_session.scalar(
+ select(Feedrelatedlink).where(
+ and_(Feedrelatedlink.feed_id == feed.id, Feedrelatedlink.code == db_rid)
+ )
+ )
+ if existing:
+ logger.debug("Related link already exists for feed_id=%s rid=%s", feed.id, rid)
+ return
+
+ related_link = Feedrelatedlink(
+ url=url, description=description, code=db_rid, created_at=datetime.now()
+ )
+ feed.feedrelatedlinks.append(related_link)
+ db_session.flush()
+ logger.info("Added related link for feed_id=%s rid=%s url=%s", feed.id, rid, url)
+
+
+def _add_related_gtfs_urls(
+ detail_body: Dict[str, Any], db_session: Session, feed: Gtfsfeed
+):
+ """Batch add prev/next related URLs (idempotent)."""
+ logger.debug("Adding batch of related GTFS URLs for feed_id=%s", feed.id)
+ for rid, description in RID_DESCRIPTIONS.items():
+ _add_related_gtfs_url(detail_body, db_session, rid, description, feed)
+
+
+def _extract_api_rt_map(list_item: dict, detail: dict) -> dict[str, Optional[str]]:
+ """Map entity_type_name -> url from API payload (tu/vp/sa)."""
+ rt_info = (detail.get("real_time") or {}) or (list_item.get("real_time") or {})
+ out: dict[str, Optional[str]] = {}
+ for url_key, entity_type_name in URLS_TO_ENTITY_TYPES_MAP.items():
+ out[entity_type_name] = rt_info.get(url_key) or None
+ return out
+
+
+def _extract_db_rt_map(
+ db_session: Session, stable_id_base: str
+) -> dict[str, Optional[str]]:
+ """Map entity_type_name -> producer_url from DB for existing RT feeds."""
+ out: dict[str, Optional[str]] = {"tu": None, "vp": None, "sa": None}
+ for et in ("tu", "vp", "sa"):
+ sid = f"{stable_id_base}-{et}"
+ rt = db_session.scalar(
+ select(Gtfsrealtimefeed).where(Gtfsrealtimefeed.stable_id == sid)
+ )
+ out[et] = getattr(rt, "producer_url", None) if rt else None
+ return out
+
+
+def _fetch_feeds(session_http: requests.Session) -> List[dict]:
+ """Fetch the JBDA feeds list or raise on HTTP error."""
+ res = session_http.get(FEEDS_URL, timeout=REQUEST_TIMEOUT_S)
+ res.raise_for_status()
+ payload = res.json() or {}
+ return payload.get("body") or []
+
+
+def _fetch_detail(
+ session_http: requests.Session, org_id: str, feed_id: str
+) -> Optional[dict]:
+ """Fetch one feed's detail body, or raise on HTTP error."""
+ detail_url = DETAIL_URL_TMPL.format(org_id=org_id, feed_id=feed_id)
+ logger.debug("Detail URL: %s", detail_url)
+ dres = session_http.get(detail_url, timeout=REQUEST_TIMEOUT_S)
+ dres.raise_for_status()
+ return (
+ dres.json().get("body", {})
+ if dres.headers.get("Content-Type", "").startswith("application/json")
+ else {}
+ )
+
+
+def _upsert_rt_feeds(
+ db_session: Session,
+ stable_id: str,
+ list_item: dict,
+ detail_body: dict,
+ gtfs_feed: Gtfsfeed,
+ location,
+) -> Tuple[int, int]:
+ """
+ Upsert RT feeds for available URLs and link them to the schedule feed.
+ Returns: (created_rt_delta, linked_refs_delta)
+ """
+ created_rt = 0
+ linked_refs = 0
+
+ rt_info = detail_body.get("real_time") or list_item.get("real_time") or {}
+ for url_key, entity_type_name in URLS_TO_ENTITY_TYPES_MAP.items():
+ url = rt_info.get(url_key)
+ if not url:
+ logger.debug(
+ "No RT url for key=%s (feed_id=%s)", url_key, list_item.get("feed_id")
+ )
+ continue
+
+ et = _get_or_create_entity_type(db_session, entity_type_name)
+ rt_stable_id = f"{stable_id}-{entity_type_name}"
+ rt_feed, is_new_rt = _get_or_create_feed(
+ db_session, Gtfsrealtimefeed, rt_stable_id, "gtfs_rt"
+ )
+
+ rt_feed.entitytypes.clear()
+ rt_feed.entitytypes.append(et)
+
+ _update_common_feed_fields(rt_feed, list_item, detail_body, url)
+
+ rt_feed.gtfs_feeds.clear()
+ rt_feed.gtfs_feeds.append(gtfs_feed)
+
+ try:
+ if location and (not rt_feed.locations or len(rt_feed.locations) == 0):
+ rt_feed.locations.append(location)
+ except AttributeError:
+ logger.warning("RT feed model lacks 'locations' relationship; skipping")
+
+ if is_new_rt:
+ created_rt += 1
+ logger.info(
+ "Created RT feed stable_id=%s url_key=%s", rt_stable_id, url_key
+ )
+ linked_refs += 1
+
+ return created_rt, linked_refs
+
+
+def _process_feed(
+ db_session: Session,
+ session_http: requests.Session,
+ item: dict,
+) -> Tuple[dict, Optional[Feed]]:
+ """
+ Process a single feed list item end-to-end.
+ Returns:
+ (deltas_dict, feed_to_publish_or_none)
+ """
+ org_id = item.get("organization_id")
+ feed_id = item.get("feed_id")
+ if not org_id or not feed_id:
+ logger.warning("Missing organization_id/feed_id in list item; skipping")
+ return {
+ "created_gtfs": 0,
+ "updated_gtfs": 0,
+ "created_rt": 0,
+ "linked_refs": 0,
+ "processed": 0,
+ }, None
+
+ # Detail payload
+ try:
+ dbody = _fetch_detail(session_http, org_id, feed_id)
+ except Exception as e:
+ logger.exception(
+ "Exception during DETAIL request for %s/%s: %s", org_id, feed_id, e
+ )
+ return {
+ "created_gtfs": 0,
+ "updated_gtfs": 0,
+ "created_rt": 0,
+ "linked_refs": 0,
+ "processed": 0,
+ }, None
+
+ # Validate current GTFS url
+ producer_url = get_gtfs_file_url(dbody, rid="current")
+ if not producer_url:
+ logger.warning("No GTFS URL found for feed %s/%s; skipping", org_id, feed_id)
+ return {
+ "created_gtfs": 0,
+ "updated_gtfs": 0,
+ "created_rt": 0,
+ "linked_refs": 0,
+ "processed": 0,
+ }, None
+
+ # Upsert/lookup schedule feed
+ stable_id = f"jbda-{feed_id}"
+ gtfs_feed, is_new_gtfs = _get_or_create_feed(
+ db_session, Gtfsfeed, stable_id, "gtfs"
+ )
+
+ # Diff detection
+ api_sched_fp = _build_api_schedule_fingerprint(item, dbody, producer_url)
+ api_rt_map = _extract_api_rt_map(item, dbody)
+ if not is_new_gtfs:
+ db_sched_fp = _build_db_schedule_fingerprint(gtfs_feed)
+ db_rt_map = _extract_db_rt_map(db_session, stable_id)
+ if db_sched_fp == api_sched_fp and db_rt_map == api_rt_map:
+ logger.info("No change detected; skipping feed stable_id=%s", stable_id)
+ return {
+ "created_gtfs": 0,
+ "updated_gtfs": 0,
+ "created_rt": 0,
+ "linked_refs": 0,
+ "processed": 1,
+ }, None
+ diff = {
+ k: (db_sched_fp.get(k), api_sched_fp.get(k))
+ for k in api_sched_fp
+ if db_sched_fp.get(k) != api_sched_fp.get(k)
+ }
+ diff_rt = {
+ k: (db_rt_map.get(k), api_rt_map.get(k))
+ for k in api_rt_map
+ if db_rt_map.get(k) != api_rt_map.get(k)
+ }
+ logger.info("Diff %s sched=%s rt=%s", stable_id, diff, diff_rt)
+
+ # Apply schedule fields
+ _update_common_feed_fields(gtfs_feed, item, dbody, producer_url)
+
+ # Related links (idempotent)
+ _add_related_gtfs_urls(dbody, db_session, gtfs_feed)
+
+ # Location (append only if empty)
+ location = _get_or_create_location(db_session, item.get("feed_pref_id"))
+ if location and (not gtfs_feed.locations or len(gtfs_feed.locations) == 0):
+ gtfs_feed.locations.append(location)
+
+ created_gtfs = 1 if is_new_gtfs else 0
+ updated_gtfs = 0 if is_new_gtfs else 1
+
+ # RT upserts + links
+ created_rt, linked_refs = _upsert_rt_feeds(
+ db_session=db_session,
+ stable_id=stable_id,
+ list_item=item,
+ detail_body=dbody,
+ gtfs_feed=gtfs_feed,
+ location=location,
+ )
+
+ feed_to_publish = gtfs_feed if is_new_gtfs else None
+ return (
+ {
+ "created_gtfs": created_gtfs,
+ "updated_gtfs": updated_gtfs,
+ "created_rt": created_rt,
+ "linked_refs": linked_refs,
+ "processed": 1,
+ },
+ feed_to_publish,
+ )
+
+
+def _build_api_schedule_fingerprint(
+ list_item: dict, detail: dict, producer_url: str
+) -> dict:
+ """Collect only fields we actually persist on schedule feeds."""
+ return {
+ "feed_name": detail.get("feed_name"),
+ "provider": list_item.get("organization_name"),
+ "producer_url": producer_url,
+ "license_url": detail.get("feed_license_url"),
+ "feed_contact_email": (
+ list_item.get("organization_email")
+ if list_item.get("organization_email") != "not_set"
+ else None
+ ),
+ "note": list_item.get("feed_memo"),
+ }
+
+
+def _build_db_schedule_fingerprint(feed: Gtfsfeed) -> dict:
+ return {
+ "feed_name": getattr(feed, "feed_name", None),
+ "provider": getattr(feed, "provider", None),
+ "producer_url": getattr(feed, "producer_url", None),
+ "license_url": getattr(feed, "license_url", None),
+ "feed_contact_email": getattr(feed, "feed_contact_email", None),
+ "note": getattr(feed, "note", None),
+ }
+
+
+@with_db_session
+def _import_jbda(db_session: Session, dry_run: bool = True) -> dict:
+ """
+ Orchestrates the JBDA import: fetch list, iterate, process, batch-commit, then finalize.
+ """
+ logger.info("Starting JBDA import dry_run=%s", dry_run)
+ session_http = requests.Session()
+
+ # Fetch list
+ try:
+ feeds_list = _fetch_feeds(session_http)
+ except Exception as e:
+ logger.exception("Exception during FEEDS_URL request")
+ return {
+ "message": "Failed to fetch JBDA feeds.",
+ "error": str(e),
+ "params": {"dry_run": dry_run},
+ "created_gtfs": 0,
+ "updated_gtfs": 0,
+ "created_rt": 0,
+ "linked_refs": 0,
+ "total_processed_items": 0,
+ }
+
+ logger.info(
+ "Commit batch size (env COMMIT_BATCH_SIZE)=%s",
+ os.getenv("COMMIT_BATCH_SIZE", "20"),
+ )
+ commit_batch_size = int(os.getenv("COMMIT_BATCH_SIZE", 20))
+
+ # Aggregates
+ created_gtfs = updated_gtfs = created_rt = linked_refs = total_processed = 0
+ feeds_to_publish: List[Feed] = []
+
+ for idx, item in enumerate(feeds_list, start=1):
+ try:
+ if item.get("feed_is_discontinued"):
+ logger.info(
+ "Skipping discontinued feed at index=%d feed_id=%s",
+ idx,
+ item.get("feed_id"),
+ )
+ continue
+
+ deltas, feed_to_publish = _process_feed(db_session, session_http, item)
+ created_gtfs += deltas["created_gtfs"]
+ updated_gtfs += deltas["updated_gtfs"]
+ created_rt += deltas["created_rt"]
+ linked_refs += deltas["linked_refs"]
+ total_processed += deltas["processed"]
+
+ if feed_to_publish and not dry_run:
+ feeds_to_publish.append(feed_to_publish)
+
+ if not dry_run and (total_processed % commit_batch_size == 0):
+ logger.info("Committing batch at total_processed=%d", total_processed)
+ try:
+ db_session.commit()
+ except IntegrityError:
+ db_session.rollback()
+ logger.exception(
+ "DB IntegrityError during batch commit at processed=%d",
+ total_processed,
+ )
+
+ except Exception as e:
+ logger.exception("Exception processing feed at index=%d: %s", idx, e)
+ continue
+
+ if not dry_run:
+ commit_changes(db_session, feeds_to_publish, total_processed)
+
+ message = (
+ "Dry run: no DB writes performed."
+ if dry_run
+ else "JBDA import executed successfully."
+ )
+ summary = {
+ "message": message,
+ "created_gtfs": created_gtfs,
+ "updated_gtfs": updated_gtfs,
+ "created_rt": created_rt,
+ "linked_refs": linked_refs,
+ "total_processed_items": total_processed,
+ "params": {"dry_run": dry_run},
+ }
+ logger.info("Import summary: %s", summary)
+ return summary
+
+
+def commit_changes(
+ db_session: Session, feeds_to_publish: list[Feed], total_processed: int
+):
+ """
+ Final commit + downstream triggers after main loop.
+ """
+ try:
+ logger.info(
+ "Final commit after processing all items (count=%d)", total_processed
+ )
+ db_session.commit()
+ execution_id = str(uuid.uuid4())
+ publisher = pubsub_v1.PublisherClient()
+ for feed in feeds_to_publish:
+ trigger_dataset_download(feed, execution_id, publisher)
+ except IntegrityError:
+ db_session.rollback()
+ logger.exception("Final commit failed with IntegrityError; rolled back")
diff --git a/functions-python/tasks_executor/tests/tasks/data_import/test_jbda_import.py b/functions-python/tasks_executor/tests/tasks/data_import/test_jbda_import.py
new file mode 100644
index 000000000..5d60020f1
--- /dev/null
+++ b/functions-python/tasks_executor/tests/tasks/data_import/test_jbda_import.py
@@ -0,0 +1,327 @@
+import os
+import json
+import unittest
+from typing import Any, Dict, List
+from unittest.mock import patch
+
+from sqlalchemy.orm import Session
+
+from test_shared.test_utils.database_utils import default_db_url
+from shared.database.database import with_db_session
+from shared.database_gen.sqlacodegen_models import (
+ Gtfsfeed,
+ Gtfsrealtimefeed,
+ Feedrelatedlink,
+)
+
+from tasks.data_import.import_jbda_feeds import (
+ import_jbda_handler,
+ get_gtfs_file_url,
+)
+
+
+class _FakeResponse:
+ def __init__(
+ self,
+ body: Dict[str, Any] | None = None,
+ status: int = 200,
+ headers: Dict[str, str] | None = None,
+ ):
+ self._body = body or {}
+ self.status_code = status
+ self.headers = headers or {"Content-Type": "application/json; charset=utf-8"}
+
+ def json(self):
+ return self._body
+
+ def raise_for_status(self):
+ if not (200 <= self.status_code < 300):
+ raise RuntimeError(f"HTTP {self.status_code}")
+
+
+class _FakeSessionOK:
+ """
+ Returns a feeds list with 3 items:
+ - feed1: valid (creates 1 GTFS + 2 RT and 1 related link for next_1)
+ - feed2: discontinued -> skipped
+ - feed3: missing valid HEAD for current -> skipped
+ """
+
+ FEEDS_URL = "https://api.gtfs-data.jp/v2/feeds"
+ DETAIL_TMPL = "https://api.gtfs-data.jp/v2/organizations/{org_id}/feeds/{feed_id}"
+
+ def get(self, url, timeout=60):
+ if url == self.FEEDS_URL:
+ return _FakeResponse(
+ {
+ "body": [
+ {
+ "organization_id": "org1",
+ "feed_id": "feed1",
+ "organization_name": "Org A",
+ "organization_email": "contact@orga.example",
+ "feed_pref_id": 1,
+ "feed_memo": "memo1",
+ },
+ {
+ "organization_id": "org2",
+ "feed_id": "feed2",
+ "is_discontinued": True,
+ "organization_name": "Org B",
+ "organization_email": "b@b.example",
+ "feed_pref_id": 2,
+ },
+ {
+ "organization_id": "org3",
+ "feed_id": "feed3",
+ "organization_name": "Org C",
+ "organization_email": "c@c.example",
+ "feed_pref_id": 3,
+ },
+ ]
+ }
+ )
+
+ if url == self.DETAIL_TMPL.format(org_id="org1", feed_id="feed1"):
+ return _FakeResponse(
+ {
+ "body": {
+ # include org/feed ids so get_gtfs_file_url can build URL
+ "organization_id": "org1",
+ "feed_id": "feed1",
+ "feed_name": "Feed One",
+ "feed_license_url": "https://license.example/1",
+ # gtfs_files not used by get_gtfs_file_url anymore, but harmless to keep
+ "gtfs_files": [
+ {
+ "rid": "current",
+ "gtfs_file_uid": "u1",
+ "gtfs_url": "https://gtfs.example/one.zip",
+ "stop_url": "https://gtfs.example/one-stops.txt",
+ },
+ {
+ "rid": "next_1",
+ "gtfs_file_uid": "u2",
+ "gtfs_url": "https://gtfs.example/one-next.zip",
+ },
+ ],
+ "real_time": {
+ "trip_update_url": "https://rt.example/one/tu.pb",
+ "vehicle_position_url": "https://rt.example/one/vp.pb",
+ # "alert_url": missing on purpose
+ },
+ }
+ }
+ )
+
+ if url == self.DETAIL_TMPL.format(org_id="org3", feed_id="feed3"):
+ return _FakeResponse(
+ {
+ "body": {
+ "organization_id": "org3",
+ "feed_id": "feed3",
+ "feed_name": "Feed Three",
+ "gtfs_files": [
+ {"rid": "current", "gtfs_file_uid": "u3"} # no urls
+ ],
+ "real_time": {},
+ }
+ }
+ )
+
+ return _FakeResponse({}, 404)
+
+
+class _FakeSessionError:
+ def get(self, url, timeout=60):
+ raise RuntimeError("network down")
+
+
+class _FakeFuture:
+ def __init__(self):
+ self._callbacks = []
+
+ def add_done_callback(self, cb):
+ try:
+ cb(self)
+ except Exception:
+ pass
+
+ def result(self, timeout=None):
+ return None
+
+
+class _FakePublisher:
+ def __init__(self):
+ self.published = [] # list of tuples (topic_path, data_bytes)
+
+ def topic_path(self, project_id, topic_name):
+ return f"projects/{project_id}/topics/{topic_name}"
+
+ def publish(self, topic_path, data: bytes):
+ self.published.append((topic_path, data))
+ return _FakeFuture()
+
+
+# ─────────────────────────────────────────────────────────────────────────────
+# Helper function tests
+# ─────────────────────────────────────────────────────────────────────────────
+
+
+class TestHelpers(unittest.TestCase):
+ def test_get_gtfs_file_url_head_success_and_missing(self):
+ # detail now needs org/feed ids
+ detail = {"organization_id": "orgX", "feed_id": "feedX"}
+
+ # Construct the URLs the function will probe
+ base = (
+ "https://api.gtfs-data.jp/v2/organizations/orgX/feeds/feedX/files/feed.zip"
+ )
+ url_current = f"{base}?rid=current"
+ url_next1 = f"{base}?rid=next_1"
+ url_next2 = f"{base}?rid=next_2"
+
+ def _head_side_effect(url, allow_redirects=True, timeout=15):
+ if url in (url_current, url_next1):
+ return _FakeResponse(status=200)
+ if url == url_next2:
+ return _FakeResponse(status=404)
+ return _FakeResponse(status=404)
+
+ with patch(
+ "tasks.data_import.import_jbda_feeds.requests.head",
+ side_effect=_head_side_effect,
+ ):
+ self.assertEqual(get_gtfs_file_url(detail, rid="current"), url_current)
+ self.assertEqual(get_gtfs_file_url(detail, rid="next_1"), url_next1)
+ self.assertIsNone(get_gtfs_file_url(detail, rid="next_2"))
+
+
+# ─────────────────────────────────────────────────────────────────────────────
+# Import tests
+# ─────────────────────────────────────────────────────────────────────────────
+
+
+class TestImportJBDA(unittest.TestCase):
+ @with_db_session(db_url=default_db_url)
+ def test_import_creates_gtfs_rt_and_related_links(self, db_session: Session):
+ fake_pub = _FakePublisher()
+
+ # The importer will call HEAD on these URLs for org1/feed1
+ base = (
+ "https://api.gtfs-data.jp/v2/organizations/org1/feeds/feed1/files/feed.zip"
+ )
+ url_current = f"{base}?rid=current"
+ url_next1 = f"{base}?rid=next_1"
+
+ def _head_side_effect(url, allow_redirects=True, timeout=15):
+ # succeed for current and next_1 (feed1)
+ if url in (url_current, url_next1):
+ return _FakeResponse(status=200)
+ # fail for anything else (e.g., feed3 current)
+ return _FakeResponse(status=404)
+
+ with patch(
+ "tasks.data_import.import_jbda_feeds.requests.Session",
+ return_value=_FakeSessionOK(),
+ ), patch(
+ "tasks.data_import.import_jbda_feeds.requests.head",
+ side_effect=_head_side_effect,
+ ), patch(
+ "tasks.data_import.import_jbda_feeds.REQUEST_TIMEOUT_S", 0.01
+ ), patch(
+ "tasks.data_import.import_jbda_feeds.pubsub_v1.PublisherClient",
+ return_value=fake_pub,
+ ), patch(
+ "tasks.data_import.data_import_utils.PROJECT_ID", "test-project"
+ ), patch(
+ "tasks.data_import.data_import_utils.DATASET_BATCH_TOPIC", "dataset-batch"
+ ), patch.dict(
+ os.environ, {"COMMIT_BATCH_SIZE": "1"}, clear=False
+ ):
+ result = import_jbda_handler({"dry_run": False})
+
+ # Summary checks (unchanged intent)
+ self.assertEqual(
+ {
+ "message": "JBDA import executed successfully.",
+ "created_gtfs": 1,
+ "updated_gtfs": 0,
+ "created_rt": 2,
+ "linked_refs": 2, # per RT link (tu + vp)
+ "total_processed_items": 1,
+ "params": {"dry_run": False},
+ },
+ result,
+ )
+
+ # DB checks for GTFS feed
+ sched = (
+ db_session.query(Gtfsfeed)
+ .filter(Gtfsfeed.stable_id == "jbda-feed1")
+ .first()
+ )
+ self.assertIsNotNone(sched)
+ self.assertEqual(sched.feed_name, "Feed One")
+ # producer_url now points to the verified JBDA URL (HEAD-checked)
+ self.assertEqual(sched.producer_url, url_current)
+
+ # Related links (only next_1 exists) – also uses JBDA URL
+ links: List[Feedrelatedlink] = list(sched.feedrelatedlinks)
+ codes = {link.code for link in links}
+ self.assertIn("jbda-next_1", codes)
+ next1 = next(link for link in links if link.code == "jbda-next_1")
+ self.assertEqual(next1.url, url_next1)
+
+ # RT feeds + entity types + back-links
+ tu = (
+ db_session.query(Gtfsrealtimefeed)
+ .filter(Gtfsrealtimefeed.stable_id == "jbda-feed1-tu")
+ .first()
+ )
+ vp = (
+ db_session.query(Gtfsrealtimefeed)
+ .filter(Gtfsrealtimefeed.stable_id == "jbda-feed1-vp")
+ .first()
+ )
+ self.assertIsNotNone(tu)
+ self.assertIsNotNone(vp)
+ self.assertEqual(len(tu.entitytypes), 1)
+ self.assertEqual(len(vp.entitytypes), 1)
+ self.assertEqual(tu.entitytypes[0].name, "tu")
+ self.assertEqual(vp.entitytypes[0].name, "vp")
+ self.assertEqual([f.id for f in tu.gtfs_feeds], [sched.id])
+ self.assertEqual([f.id for f in vp.gtfs_feeds], [sched.id])
+ self.assertEqual(tu.producer_url, "https://rt.example/one/tu.pb")
+ self.assertEqual(vp.producer_url, "https://rt.example/one/vp.pb")
+
+ # Pub/Sub was called exactly once (only 1 new GTFS feed)
+ self.assertEqual(len(fake_pub.published), 1)
+ topic_path, data_bytes = fake_pub.published[0]
+ self.assertEqual(topic_path, "projects/test-project/topics/dataset-batch")
+
+ payload = json.loads(data_bytes.decode("utf-8"))
+ self.assertEqual(payload["feed_stable_id"], "jbda-feed1")
+ self.assertEqual(payload["producer_url"], url_current)
+ self.assertIsNone(payload["dataset_id"])
+ self.assertIsNone(payload["dataset_hash"])
+
+ @with_db_session(db_url=default_db_url)
+ def test_import_http_failure_graceful(self, db_session: Session):
+ with patch(
+ "tasks.data_import.import_jbda_feeds.requests.Session",
+ return_value=_FakeSessionError(),
+ ), patch("tasks.data_import.import_jbda_feeds.REQUEST_TIMEOUT_S", 0.01):
+ out = import_jbda_handler({"dry_run": True})
+
+ self.assertEqual(out["message"], "Failed to fetch JBDA feeds.")
+ self.assertIn("error", out)
+ self.assertEqual(out["created_gtfs"], 0)
+ self.assertEqual(out["updated_gtfs"], 0)
+ self.assertEqual(out["created_rt"], 0)
+ self.assertEqual(out["linked_refs"], 0)
+ self.assertEqual(out["total_processed_items"], 0)
+
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/infra/functions-python/main.tf b/infra/functions-python/main.tf
index fe2ba00a0..35a3b3b62 100644
--- a/infra/functions-python/main.tf
+++ b/infra/functions-python/main.tf
@@ -549,6 +549,30 @@ resource "google_cloud_scheduler_job" "gbfs_validator_batch_scheduler" {
attempt_deadline = "320s"
}
+# Schedule the JBDA import function to run monthly
+resource "google_cloud_scheduler_job" "jbda_import_schedule" {
+ name = "jbda-import-scheduler-${var.environment}"
+ description = "Schedule the jbda import function"
+ time_zone = "Etc/UTC"
+ schedule = var.jbda_scheduler_schedule
+ region = var.gcp_region
+ paused = var.environment == "prod" ? false : true
+ depends_on = [google_cloudfunctions2_function.tasks_executor, google_cloudfunctions2_function_iam_member.tasks_executor_invoker]
+ http_target {
+ http_method = "POST"
+ uri = google_cloudfunctions2_function.tasks_executor.url
+ oidc_token {
+ service_account_email = google_service_account.functions_service_account.email
+ }
+ headers = {
+ "Content-Type" = "application/json"
+ }
+ body = base64encode("{\"task\": \"jbda_import\", \"payload\": {\"dry_run\": false}}")
+ }
+ attempt_deadline = "320s"
+}
+
+
resource "google_cloud_scheduler_job" "transit_land_scraping_scheduler" {
name = "transitland-scraping-scheduler-${var.environment}"
description = "Schedule the transitland scraping function"
diff --git a/infra/functions-python/vars.tf b/infra/functions-python/vars.tf
index 9abff45ca..82ae6b153 100644
--- a/infra/functions-python/vars.tf
+++ b/infra/functions-python/vars.tf
@@ -65,10 +65,16 @@ variable "gbfs_scheduler_schedule" {
default = "0 0 * * *" # At 00:00 every day
}
+variable "jbda_scheduler_schedule" {
+ type = string
+ description = "Schedule for the JBDA scheduler job"
+ default = "0 0 3 * *" # At 00:00 on the 3rd day of every month
+}
+
variable "transitland_scraping_schedule" {
type = string
description = "Schedule Transitland scraping job"
- default = "0 0 3 * *" # every month on the 3rd day at 00:00
+ default = "0 15 3 * *" # Runs at 00:00 JST on the 3rd day of every month
}
variable "transitland_api_key" {
diff --git a/liquibase/changelog.xml b/liquibase/changelog.xml
index 490e10c46..00f3b3a2f 100644
--- a/liquibase/changelog.xml
+++ b/liquibase/changelog.xml
@@ -74,5 +74,7 @@
-
+
+
+
diff --git a/liquibase/changes/feat_pt_154.sql b/liquibase/changes/feat_pt_154.sql
new file mode 100644
index 000000000..f787194a3
--- /dev/null
+++ b/liquibase/changes/feat_pt_154.sql
@@ -0,0 +1,282 @@
+-- Schema adjustment and rebuild of the FeedSearch materialized view.
+--
+-- Purpose:
+-- - Update the `Feed.note` column type to TEXT for better flexibility and compatibility.
+-- - Drop and recreate the `FeedSearch` materialized view to align with this new data type.
+-- - Recreate associated indexes to maintain full-text search performance and concurrent refresh capability.
+
+--
+-- Additionally:
+-- - A new table `FeedRelatedLink` is created if it does not already exist, to store URLs related
+-- to each feed (e.g., next/previous dataset links) with descriptive metadata.
+--
+-- Note: No logical or structural changes were made to FeedSearch content — only a data type update
+-- and re-creation due to the column change.
+
+
+DROP MATERIALIZED VIEW IF EXISTS feedsearch;
+
+ALTER TABLE feed
+ALTER COLUMN note TYPE TEXT;
+
+-- Recreate the FeedSearch materialized view
+CREATE MATERIALIZED VIEW FeedSearch AS
+SELECT
+ -- feed
+ Feed.stable_id AS feed_stable_id,
+ Feed.id AS feed_id,
+ Feed.data_type,
+ Feed.status,
+ Feed.feed_name,
+ Feed.note,
+ Feed.feed_contact_email,
+ -- source
+ Feed.producer_url,
+ Feed.authentication_info_url,
+ Feed.authentication_type,
+ Feed.api_key_parameter_name,
+ Feed.license_url,
+ Feed.provider,
+ Feed.operational_status,
+ -- official status
+ Feed.official AS official,
+ -- created_at
+ Feed.created_at AS created_at,
+ -- latest_dataset
+ Latest_dataset.id AS latest_dataset_id,
+ Latest_dataset.hosted_url AS latest_dataset_hosted_url,
+ Latest_dataset.downloaded_at AS latest_dataset_downloaded_at,
+ Latest_dataset.bounding_box AS latest_dataset_bounding_box,
+ Latest_dataset.hash AS latest_dataset_hash,
+ Latest_dataset.agency_timezone AS latest_dataset_agency_timezone,
+ Latest_dataset.service_date_range_start AS latest_dataset_service_date_range_start,
+ Latest_dataset.service_date_range_end AS latest_dataset_service_date_range_end,
+ -- Latest dataset features
+ LatestDatasetFeatures AS latest_dataset_features,
+ -- Latest dataset validation totals
+ COALESCE(LatestDatasetValidationReportJoin.total_error, 0) as latest_total_error,
+ COALESCE(LatestDatasetValidationReportJoin.total_warning, 0) as latest_total_warning,
+ COALESCE(LatestDatasetValidationReportJoin.total_info, 0) as latest_total_info,
+ COALESCE(LatestDatasetValidationReportJoin.unique_error_count, 0) as latest_unique_error_count,
+ COALESCE(LatestDatasetValidationReportJoin.unique_warning_count, 0) as latest_unique_warning_count,
+ COALESCE(LatestDatasetValidationReportJoin.unique_info_count, 0) as latest_unique_info_count,
+ -- external_ids
+ ExternalIdJoin.external_ids,
+ -- redirect_ids
+ RedirectingIdJoin.redirect_ids,
+ -- feed gtfs_rt references
+ FeedReferenceJoin.feed_reference_ids,
+ -- feed gtfs_rt entities
+ EntityTypeFeedJoin.entities,
+ -- locations
+ FeedLocationJoin.locations,
+ -- osm locations grouped
+ OsmLocationJoin.osm_locations,
+ -- gbfs versions
+ COALESCE(GbfsVersionsJoin.versions, '[]'::jsonb) AS versions,
+
+ -- full-text searchable document
+ setweight(to_tsvector('english', coalesce(unaccent(Feed.feed_name), '')), 'C') ||
+ setweight(to_tsvector('english', coalesce(unaccent(Feed.provider), '')), 'C') ||
+ setweight(to_tsvector('english', coalesce(unaccent((
+ SELECT string_agg(
+ coalesce(location->>'country_code', '') || ' ' ||
+ coalesce(location->>'country', '') || ' ' ||
+ coalesce(location->>'subdivision_name', '') || ' ' ||
+ coalesce(location->>'municipality', ''),
+ ' '
+ )
+ FROM json_array_elements(FeedLocationJoin.locations) AS location
+ )), '')), 'A') ||
+ setweight(to_tsvector('english', coalesce(unaccent(OsmLocationNamesJoin.osm_location_names), '')), 'A')
+ AS document
+FROM Feed
+
+-- Latest dataset
+LEFT JOIN gtfsfeed gtf ON gtf.id = Feed.id AND Feed.data_type = 'gtfs'
+LEFT JOIN gtfsdataset Latest_dataset ON Latest_dataset.id = gtf.latest_dataset_id
+
+-- Latest dataset features
+LEFT JOIN (
+ SELECT
+ GtfsDataset.id AS FeatureGtfsDatasetId,
+ array_agg(DISTINCT FeatureValidationReport.feature) AS LatestDatasetFeatures
+ FROM GtfsDataset
+ JOIN ValidationReportGtfsDataset
+ ON ValidationReportGtfsDataset.dataset_id = GtfsDataset.id
+ JOIN (
+ -- Pick latest ValidationReport per dataset based on validated_at
+ SELECT DISTINCT ON (ValidationReportGtfsDataset.dataset_id)
+ ValidationReportGtfsDataset.dataset_id,
+ ValidationReport.id AS latest_validation_report_id
+ FROM ValidationReportGtfsDataset
+ JOIN ValidationReport
+ ON ValidationReport.id = ValidationReportGtfsDataset.validation_report_id
+ ORDER BY
+ ValidationReportGtfsDataset.dataset_id,
+ ValidationReport.validated_at DESC
+ ) AS LatestReports
+ ON LatestReports.latest_validation_report_id = ValidationReportGtfsDataset.validation_report_id
+ JOIN FeatureValidationReport
+ ON FeatureValidationReport.validation_id = ValidationReportGtfsDataset.validation_report_id
+ GROUP BY FeatureGtfsDatasetId
+) AS LatestDatasetFeaturesJoin ON Latest_dataset.id = FeatureGtfsDatasetId
+
+-- Latest dataset validation report
+LEFT JOIN (
+ SELECT
+ GtfsDataset.id AS ValidationReportGtfsDatasetId,
+ ValidationReport.total_error,
+ ValidationReport.total_warning,
+ ValidationReport.total_info,
+ ValidationReport.unique_error_count,
+ ValidationReport.unique_warning_count,
+ ValidationReport.unique_info_count
+ FROM GtfsDataset
+ JOIN ValidationReportGtfsDataset
+ ON ValidationReportGtfsDataset.dataset_id = GtfsDataset.id
+ JOIN (
+ -- Pick latest ValidationReport per dataset based on validated_at
+ SELECT DISTINCT ON (ValidationReportGtfsDataset.dataset_id)
+ ValidationReportGtfsDataset.dataset_id,
+ ValidationReport.id AS latest_validation_report_id
+ FROM ValidationReportGtfsDataset
+ JOIN ValidationReport
+ ON ValidationReport.id = ValidationReportGtfsDataset.validation_report_id
+ ORDER BY
+ ValidationReportGtfsDataset.dataset_id,
+ ValidationReport.validated_at DESC
+ ) AS LatestReports
+ ON LatestReports.latest_validation_report_id = ValidationReportGtfsDataset.validation_report_id
+ JOIN ValidationReport
+ ON ValidationReport.id = ValidationReportGtfsDataset.validation_report_id
+) AS LatestDatasetValidationReportJoin ON Latest_dataset.id = ValidationReportGtfsDatasetId
+
+-- External ids
+LEFT JOIN (
+ SELECT
+ feed_id,
+ json_agg(json_build_object('external_id', associated_id, 'source', source)) AS external_ids
+ FROM externalid
+ GROUP BY feed_id
+) AS ExternalIdJoin ON ExternalIdJoin.feed_id = Feed.id
+
+-- feed reference ids
+LEFT JOIN (
+ SELECT
+ gtfs_rt_feed_id,
+ array_agg(FeedReferenceJoinInnerQuery.stable_id) AS feed_reference_ids
+ FROM FeedReference
+ LEFT JOIN Feed AS FeedReferenceJoinInnerQuery ON FeedReferenceJoinInnerQuery.id = FeedReference.gtfs_feed_id
+ GROUP BY gtfs_rt_feed_id
+) AS FeedReferenceJoin ON FeedReferenceJoin.gtfs_rt_feed_id = Feed.id AND Feed.data_type = 'gtfs_rt'
+
+-- Redirect ids
+LEFT JOIN (
+ SELECT
+ target_id,
+ json_agg(json_build_object('target_id', target_id, 'comment', redirect_comment)) AS redirect_ids
+ FROM RedirectingId
+ GROUP BY target_id
+) AS RedirectingIdJoin ON RedirectingIdJoin.target_id = Feed.id
+
+-- Feed locations
+LEFT JOIN (
+ SELECT
+ LocationFeed.feed_id,
+ json_agg(json_build_object('country', country, 'country_code', country_code, 'subdivision_name',
+ subdivision_name, 'municipality', municipality)) AS locations
+ FROM Location
+ LEFT JOIN LocationFeed ON LocationFeed.location_id = Location.id
+ GROUP BY LocationFeed.feed_id
+) AS FeedLocationJoin ON FeedLocationJoin.feed_id = Feed.id
+
+-- Entity types
+LEFT JOIN (
+ SELECT
+ feed_id,
+ array_agg(entity_name) AS entities
+ FROM EntityTypeFeed
+ GROUP BY feed_id
+) AS EntityTypeFeedJoin ON EntityTypeFeedJoin.feed_id = Feed.id AND Feed.data_type = 'gtfs_rt'
+
+-- OSM locations
+LEFT JOIN (
+ WITH locations_per_group AS (
+ SELECT
+ fog.feed_id,
+ olg.group_name,
+ jsonb_agg(
+ DISTINCT jsonb_build_object(
+ 'admin_level', gp.admin_level,
+ 'name', gp.name
+ )
+ ) AS locations
+ FROM FeedOsmLocationGroup fog
+ JOIN OsmLocationGroup olg ON olg.group_id = fog.group_id
+ JOIN OsmLocationGroupGeopolygon olgg ON olgg.group_id = olg.group_id
+ JOIN Geopolygon gp ON gp.osm_id = olgg.osm_id
+ GROUP BY fog.feed_id, olg.group_name
+ )
+ SELECT
+ feed_id,
+ jsonb_agg(
+ jsonb_build_object(
+ 'group_name', group_name,
+ 'locations', locations
+ )
+ )::json AS osm_locations
+ FROM locations_per_group
+ GROUP BY feed_id
+) AS OsmLocationJoin ON OsmLocationJoin.feed_id = Feed.id
+
+-- OSM location names
+LEFT JOIN (
+ SELECT
+ fog.feed_id,
+ string_agg(DISTINCT gp.name, ' ') AS osm_location_names
+ FROM FeedOsmLocationGroup fog
+ JOIN OsmLocationGroup olg ON olg.group_id = fog.group_id
+ JOIN OsmLocationGroupGeopolygon olgg ON olgg.group_id = olg.group_id
+ JOIN Geopolygon gp ON gp.osm_id = olgg.osm_id
+ WHERE gp.name IS NOT NULL
+ GROUP BY fog.feed_id
+) AS OsmLocationNamesJoin ON OsmLocationNamesJoin.feed_id = Feed.id
+
+-- GBFS versions
+LEFT JOIN (
+ SELECT
+ Feed.id AS feed_id,
+ to_jsonb(array_agg(DISTINCT GbfsVersion.version ORDER BY GbfsVersion.version)) AS versions
+ FROM Feed
+ JOIN GbfsFeed ON GbfsFeed.id = Feed.id
+ JOIN GbfsVersion ON GbfsVersion.feed_id = GbfsFeed.id
+ WHERE Feed.data_type = 'gbfs'
+ GROUP BY Feed.id
+) AS GbfsVersionsJoin ON GbfsVersionsJoin.feed_id = Feed.id;
+
+
+-- This index allows concurrent refresh on the materialized view avoiding table locks
+CREATE UNIQUE INDEX idx_unique_feed_id ON FeedSearch(feed_id);
+
+-- Indices for feedsearch view optimization
+CREATE INDEX feedsearch_document_idx ON FeedSearch USING GIN(document);
+CREATE INDEX feedsearch_feed_stable_id ON FeedSearch(feed_stable_id);
+CREATE INDEX feedsearch_data_type ON FeedSearch(data_type);
+CREATE INDEX feedsearch_status ON FeedSearch(status);
+
+
+-- Update search
+REFRESH MATERIALIZED VIEW CONCURRENTLY FeedSearch;
+
+-- create new related links table
+CREATE TABLE IF NOT EXISTS FeedRelatedLink (
+ feed_id varchar(255) NOT NULL REFERENCES Feed(id) ON DELETE CASCADE,
+ description TEXT NOT NULL,
+ code TEXT NOT NULL,
+ url TEXT NOT NULL,
+ created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
+ PRIMARY KEY (feed_id, code)
+);
+
+
diff --git a/web-app/public/locales/en/feeds.json b/web-app/public/locales/en/feeds.json
index 4a6f230dc..36d6d897e 100644
--- a/web-app/public/locales/en/feeds.json
+++ b/web-app/public/locales/en/feeds.json
@@ -222,5 +222,6 @@
"customStopRadiusAria": "Custom stop radius",
"radius": "Radius: {{px}}px"
}
- }
+ },
+ "relatedLinks": "Related Links"
}
diff --git a/web-app/src/app/screens/Feed/components/FeedSummary.tsx b/web-app/src/app/screens/Feed/components/FeedSummary.tsx
index 30546cb02..13692f247 100644
--- a/web-app/src/app/screens/Feed/components/FeedSummary.tsx
+++ b/web-app/src/app/screens/Feed/components/FeedSummary.tsx
@@ -366,7 +366,89 @@ export default function FeedSummary({
)}
)}
+ {feed?.related_links != null && feed.related_links?.length > 0 && (
+
+
+
+
+ {t('relatedLinks')}
+
+
+
+
+
+ {feed.related_links.map((relatedLink) => {
+ const url = relatedLink.url;
+ const code = relatedLink.code ?? '';
+ const description = relatedLink.description;
+
+ return (
+
+
+
+
+ {description != null && (
+
+
+
+
+
+ )}
+
+
+
+ {url != null && (
+
+ {url}
+
+ )}
+ {
+ if (url != null) {
+ setSnackbarOpen(true);
+ void navigator.clipboard
+ .writeText(url)
+ .then(() => {});
+ }
+ }}
+ />
+
+
+
+ );
+ })}
+
+
+
+ )}
{latestDataset?.validation_report?.features != undefined && (
diff --git a/web-app/src/app/services/feeds/types.ts b/web-app/src/app/services/feeds/types.ts
index 2bda6cf18..2ba79d800 100644
--- a/web-app/src/app/services/feeds/types.ts
+++ b/web-app/src/app/services/feeds/types.ts
@@ -188,6 +188,34 @@ export interface components {
feed_name?: string;
/** @description A note to clarify complex use cases for consumers. */
note?: string;
+ /** @description A list of related links for the feed. */
+ related_links?: Array;
+ };
+ FeedRelatedLink: {
+ /**
+ * @description A short code to identify the type of link.
+ *
+ * @example next_1
+ */
+ code?: string;
+ /**
+ * @description A description of the link.
+ *
+ * @example The URL for a future feed version with an upcoming service period.
+ */
+ description?: string;
+ /**
+ * Format: url
+ * @description The URL of the related link.
+ */
+ url?: string;
+ /**
+ * Format: date-time
+ * @description The date and time the related link was created, in ISO 8601 date-time format.
+ *
+ * @example "2023-07-10T22:06:00.000Z"
+ */
+ created_at?: string;
};
GtfsFeed: components['schemas']['Feed'] & {
/**