Skip to content

Commit 78c8bd4

Browse files
committed
feat: deprecate + license link + scheduler
1 parent fe95f48 commit 78c8bd4

File tree

5 files changed

+126
-31
lines changed

5 files changed

+126
-31
lines changed

functions-python/tasks_executor/src/tasks/data_import/data_import_utils.py

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import logging
22
import uuid
33
from datetime import datetime
4-
from typing import Tuple, Type, TypeVar
4+
from typing import Tuple, Type, TypeVar, Optional
55

66
from sqlalchemy import select
77
from sqlalchemy.orm import Session
@@ -10,13 +10,14 @@
1010
Feed,
1111
Officialstatushistory,
1212
Entitytype,
13+
License,
1314
)
1415

1516
logger = logging.getLogger(__name__)
1617
T = TypeVar("T", bound="Feed")
1718

1819

19-
def _get_or_create_entity_type(session: Session, entity_type_name: str) -> Entitytype:
20+
def get_or_create_entity_type(session: Session, entity_type_name: str) -> Entitytype:
2021
"""Get or create an Entitytype by name."""
2122
logger.debug("Looking up Entitytype name=%s", entity_type_name)
2223
et = session.scalar(select(Entitytype).where(Entitytype.name == entity_type_name))
@@ -45,7 +46,21 @@ def get_feed(
4546
return feed
4647

4748

48-
def _get_or_create_feed(
49+
def get_license(session: Session, license_id: str) -> Optional[License]:
50+
"""Get a License by ID."""
51+
logger.debug("Lookup License id=%s", license_id)
52+
if not license_id:
53+
logger.debug("No License ID provided")
54+
return None
55+
license = session.get(License, license_id)
56+
if license:
57+
logger.debug("Found existing License id=%s", license_id)
58+
return license
59+
logger.debug("No License found with id=%s", license_id)
60+
return None
61+
62+
63+
def get_or_create_feed(
4964
session: Session,
5065
model: Type[T],
5166
stable_id: str,

functions-python/tasks_executor/src/tasks/data_import/jbda/import_jbda_feeds.py

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,8 @@
3939
)
4040
from shared.helpers.pub_sub import trigger_dataset_download
4141
from tasks.data_import.data_import_utils import (
42-
_get_or_create_entity_type,
43-
_get_or_create_feed,
42+
get_or_create_entity_type,
43+
get_or_create_feed,
4444
)
4545

4646
T = TypeVar("T", bound="Feed")
@@ -307,9 +307,9 @@ def _upsert_rt_feeds(
307307
)
308308
continue
309309

310-
et = _get_or_create_entity_type(db_session, entity_type_name)
310+
et = get_or_create_entity_type(db_session, entity_type_name)
311311
rt_stable_id = f"{stable_id}-{entity_type_name}"
312-
rt_feed, is_new_rt = _get_or_create_feed(
312+
rt_feed, is_new_rt = get_or_create_feed(
313313
db_session, Gtfsrealtimefeed, rt_stable_id, "gtfs_rt"
314314
)
315315

@@ -388,9 +388,7 @@ def _process_feed(
388388

389389
# Upsert/lookup schedule feed
390390
stable_id = f"jbda-{org_id}-{feed_id}"
391-
gtfs_feed, is_new_gtfs = _get_or_create_feed(
392-
db_session, Gtfsfeed, stable_id, "gtfs"
393-
)
391+
gtfs_feed, is_new_gtfs = get_or_create_feed(db_session, Gtfsfeed, stable_id, "gtfs")
394392

395393
# Diff detection
396394
api_sched_fp = _build_api_schedule_fingerprint(item, dbody, producer_url)

functions-python/tasks_executor/src/tasks/data_import/transitfeeds/sync_transitfeeds.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,8 @@
3434
Gtfsdataset,
3535
)
3636
from tasks.data_import.data_import_utils import (
37-
_get_or_create_feed,
38-
_get_or_create_entity_type,
37+
get_or_create_feed,
38+
get_or_create_entity_type,
3939
get_feed,
4040
)
4141

@@ -80,7 +80,7 @@ def _process_feeds(
8080
feed_stable_id,
8181
)
8282

83-
feed, is_new = _get_or_create_feed(
83+
feed, is_new = get_or_create_feed(
8484
db_session, model_cls, feed_stable_id, feed_kind, is_official=False
8585
)
8686
# All TransitFeeds imports are marked deprecated
@@ -274,7 +274,7 @@ def _rt_on_is_new(session: Session, feed, row: pd.Series) -> None:
274274
)
275275
if entity_types:
276276
feed.entitytypes = [
277-
_get_or_create_entity_type(session, et) for et in entity_types
277+
get_or_create_entity_type(session, et) for et in entity_types
278278
]
279279
logger.info(
280280
"[GTFS_RT] Set %d entity types for %s",

functions-python/tasks_executor/src/tasks/data_import/trasportdatagouv/import_tdg_feeds.py

Lines changed: 76 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,9 @@
3737
)
3838
from shared.helpers.pub_sub import trigger_dataset_download
3939
from tasks.data_import.data_import_utils import (
40-
_get_or_create_feed,
41-
_get_or_create_entity_type,
40+
get_or_create_feed,
41+
get_or_create_entity_type,
42+
get_license,
4243
)
4344

4445
logger = logging.getLogger(__name__)
@@ -51,10 +52,21 @@
5152
GTFS_RT_FORMAT = "gtfs-rt"
5253

5354
LICENSE_URL_MAP = {
54-
"odc-odbl": "https://opendatacommons.org/licenses/odbl/1.0/",
55-
"mobility-licence": "https://wiki.lafabriquedesmobilites.fr/wiki/Licence_Mobilit%C3%A9s",
56-
"fr-lo": "https://spdx.org/licenses/etalab-2.0.html",
57-
"lov2": "https://spdx.org/licenses/etalab-2.0.html",
55+
"odc-odbl": {
56+
"url": "https://opendatacommons.org/licenses/odbl/1.0/",
57+
"id": "ODbL-1.0",
58+
},
59+
"mobility-licence": {
60+
"url": "https://wiki.lafabriquedesmobilites.fr/wiki/Licence_Mobilit%C3%A9s",
61+
},
62+
"fr-lo": {
63+
"url": "https://www.data.gouv.fr/pages/legal/licences/etalab-2.0",
64+
"id": "etalab-2.0",
65+
},
66+
"lov2": {
67+
"url": "https://www.data.gouv.fr/pages/legal/licences/etalab-2.0",
68+
"id": "etalab-2.0",
69+
},
5870
}
5971

6072
ENTITY_TYPES_MAP = {
@@ -75,7 +87,7 @@ def _get_license_url(license_id: Optional[str]) -> Optional[str]:
7587
"""
7688
if not license_id:
7789
return None
78-
return LICENSE_URL_MAP.get(license_id.lower())
90+
return LICENSE_URL_MAP.get(license_id.lower(), {}).get("url")
7991

8092

8193
def _probe_head_format(
@@ -241,6 +253,7 @@ def _update_common_tdg_fields(
241253
resource: dict,
242254
producer_url: str,
243255
locations: List[Location],
256+
db_session: Session,
244257
) -> None:
245258
"""
246259
Update common fields for both schedule GTFS and RT from TDG dataset + resource.
@@ -254,7 +267,11 @@ def _update_common_tdg_fields(
254267
feed.operational_status = "wip"
255268

256269
feed.license_url = _get_license_url(dataset.get("licence"))
257-
270+
feed_license = get_license(
271+
db_session, LICENSE_URL_MAP.get(dataset.get("licence"), {}).get("id")
272+
)
273+
if feed_license:
274+
feed.license = feed_license
258275
# Use locations only if not already set
259276
if locations and (not feed.locations or len(feed.locations) == 0):
260277
feed.locations = locations
@@ -338,6 +355,7 @@ def _process_tdg_dataset(
338355
db_session: Session,
339356
session_http: requests.Session,
340357
dataset: dict,
358+
processed_stable_ids: Optional[set] = None,
341359
) -> Tuple[dict, List[Feed]]:
342360
"""
343361
Process one TDG dataset:
@@ -391,8 +409,13 @@ def _process_tdg_dataset(
391409
# ---- STATIC GTFS ----
392410
if res_format == GTFS_FORMAT:
393411
stable_id = f"tdg-{res_id}"
394-
gtfs_feed, is_new = _get_or_create_feed(
395-
db_session, Gtfsfeed, stable_id, "gtfs"
412+
processed_stable_ids.add(stable_id)
413+
gtfs_feed, is_new = get_or_create_feed(
414+
db_session,
415+
Gtfsfeed,
416+
stable_id,
417+
"gtfs",
418+
official_notes="Imported from Transport.data.gouv.fr as official feed.",
396419
)
397420

398421
if not is_new:
@@ -406,7 +429,9 @@ def _process_tdg_dataset(
406429
stable_id,
407430
)
408431
processed += 1
409-
static_feeds_by_dataset_id[dataset_id] = gtfs_feed
432+
if dataset_id not in static_feeds_by_dataset_id:
433+
static_feeds_by_dataset_id[dataset_id] = []
434+
static_feeds_by_dataset_id[dataset_id].append(gtfs_feed)
410435
continue
411436

412437
# Requirement: if GTFS url returns CSV, skip it (listing, not feed).
@@ -430,7 +455,9 @@ def _process_tdg_dataset(
430455
continue
431456

432457
# Apply changes
433-
_update_common_tdg_fields(gtfs_feed, dataset, resource, res_url, locations)
458+
_update_common_tdg_fields(
459+
gtfs_feed, dataset, resource, res_url, locations, db_session
460+
)
434461
_ensure_tdg_external_id(gtfs_feed, res_id)
435462

436463
if dataset_id not in static_feeds_by_dataset_id:
@@ -457,7 +484,8 @@ def _process_tdg_dataset(
457484
)
458485

459486
rt_stable_id = f"tdg-{res_id}"
460-
rt_feed, is_new_rt = _get_or_create_feed(
487+
processed_stable_ids.add(rt_stable_id)
488+
rt_feed, is_new_rt = get_or_create_feed(
461489
db_session, Gtfsrealtimefeed, rt_stable_id, "gtfs_rt"
462490
)
463491

@@ -481,7 +509,9 @@ def _process_tdg_dataset(
481509
continue
482510

483511
# Apply changes
484-
_update_common_tdg_fields(rt_feed, dataset, resource, res_url, locations)
512+
_update_common_tdg_fields(
513+
rt_feed, dataset, resource, res_url, locations, db_session
514+
)
485515
_ensure_tdg_external_id(rt_feed, res_id)
486516

487517
# Link RT → schedule
@@ -490,7 +520,7 @@ def _process_tdg_dataset(
490520
# Add entity types
491521
entity_types = _get_entity_types_from_resource(resource)
492522
rt_feed.entitytypes = [
493-
_get_or_create_entity_type(db_session, et) for et in entity_types
523+
get_or_create_entity_type(db_session, et) for et in entity_types
494524
]
495525

496526
if is_new_rt:
@@ -528,6 +558,28 @@ def _process_tdg_dataset(
528558
return deltas, feeds_to_publish
529559

530560

561+
def _deprecate_stale_feeds(db_session, processed_stable_ids):
562+
"""
563+
Deprecate TDG feeds not seen in this import run.
564+
"""
565+
logger.info("Deprecating stale TDG feeds not in processed_stable_ids")
566+
tdg_feeds = (
567+
db_session.query(Feed)
568+
.filter(Feed.stable_id.like("tdg-%"))
569+
.filter(~Feed.stable_id.in_(processed_stable_ids))
570+
.all()
571+
)
572+
logger.info("Found %d tdg_feeds stale stable_ids", len(tdg_feeds))
573+
deprecated_count = 0
574+
for feed in tdg_feeds:
575+
if feed.status != "deprecated":
576+
feed.status = "deprecated"
577+
deprecated_count += 1
578+
logger.info("Deprecated stale TDG feed stable_id=%s", feed.stable_id)
579+
580+
logger.info("Total deprecated stale TDG feeds: %d", deprecated_count)
581+
582+
531583
# ---------------------------------------------------------------------------
532584
# Orchestrator & handler
533585
# ---------------------------------------------------------------------------
@@ -567,10 +619,15 @@ def _import_tdg(db_session: Session, dry_run: bool = True) -> dict:
567619

568620
created_gtfs = updated_gtfs = created_rt = total_processed = 0
569621
feeds_to_publish: List[Feed] = []
570-
622+
processed_stable_ids = set()
571623
for idx, dataset in enumerate(datasets, start=1):
572624
try:
573-
deltas, new_feeds = _process_tdg_dataset(db_session, session_http, dataset)
625+
deltas, new_feeds = _process_tdg_dataset(
626+
db_session,
627+
session_http,
628+
dataset,
629+
processed_stable_ids=processed_stable_ids,
630+
)
574631

575632
created_gtfs += deltas["created_gtfs"]
576633
updated_gtfs += deltas["updated_gtfs"]
@@ -594,6 +651,8 @@ def _import_tdg(db_session: Session, dry_run: bool = True) -> dict:
594651
continue
595652

596653
if not dry_run:
654+
# Deprecate TDG feeds not seen in this import
655+
_deprecate_stale_feeds(db_session, processed_stable_ids)
597656
# Last commit for remaining feeds
598657
commit_changes(db_session, feeds_to_publish, total_processed)
599658

infra/functions-python/main.tf

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -572,6 +572,29 @@ resource "google_cloud_scheduler_job" "jbda_import_schedule" {
572572
attempt_deadline = "320s"
573573
}
574574

575+
# Schedule the TDG import function to run monthly
576+
resource "google_cloud_scheduler_job" "tdg_import_schedule" {
577+
name = "tdg-import-scheduler-${var.environment}"
578+
description = "Schedule the tdg import function"
579+
time_zone = "Europe/Paris"
580+
schedule = "0 0 3 * *"
581+
region = var.gcp_region
582+
paused = var.environment == "prod" ? false : true
583+
depends_on = [google_cloudfunctions2_function.tasks_executor, google_cloudfunctions2_function_iam_member.tasks_executor_invoker]
584+
http_target {
585+
http_method = "POST"
586+
uri = google_cloudfunctions2_function.tasks_executor.url
587+
oidc_token {
588+
service_account_email = google_service_account.functions_service_account.email
589+
}
590+
headers = {
591+
"Content-Type" = "application/json"
592+
}
593+
body = base64encode("{\"task\": \"tdg_import\", \"payload\": {\"dry_run\": false}}")
594+
}
595+
attempt_deadline = "320s"
596+
}
597+
575598

576599
resource "google_cloud_scheduler_job" "transit_land_scraping_scheduler" {
577600
name = "transitland-scraping-scheduler-${var.environment}"

0 commit comments

Comments
 (0)