Skip to content

Commit f6a2baf

Browse files
committed
fix: checking file hash
1 parent e79ca07 commit f6a2baf

File tree

3 files changed

+324
-151
lines changed

3 files changed

+324
-151
lines changed

functions-python/batch_process_dataset/src/pipeline_tasks.py

Lines changed: 41 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,9 @@
33
import os
44

55
from google.cloud import tasks_v2
6+
from sqlalchemy.orm import Session
67

8+
from shared.database.database import with_db_session
79
from shared.database_gen.sqlacodegen_models import Gtfsdataset
810
from shared.helpers.utils import create_http_task
911

@@ -63,7 +65,39 @@ def create_http_pmtiles_builder_task(
6365
)
6466

6567

66-
def create_pipeline_tasks(dataset: Gtfsdataset) -> None:
68+
@with_db_session
69+
def has_file_changed(dataset: Gtfsdataset, file_name: str, db_session: Session) -> bool:
70+
"""
71+
Check if a file has changed in the dataset.
72+
"""
73+
previous_dataset = (
74+
db_session.query(Gtfsdataset)
75+
.filter(
76+
Gtfsdataset.feed_id == dataset.feed_id,
77+
Gtfsdataset.id != dataset.id,
78+
Gtfsdataset.latest.is_(False),
79+
)
80+
.order_by(Gtfsdataset.downloaded_at.desc())
81+
.first()
82+
)
83+
if not previous_dataset:
84+
return True
85+
existing_file = next(
86+
(file for file in previous_dataset.gtfsfiles if file.file_name == file_name),
87+
None,
88+
)
89+
if not existing_file:
90+
return True
91+
new_dataset_file = next(
92+
(file for file in dataset.gtfsfiles if file.file_name == file_name), None
93+
)
94+
if not new_dataset_file:
95+
return True
96+
return existing_file.hash != new_dataset_file.hash
97+
98+
99+
@with_db_session
100+
def create_pipeline_tasks(dataset: Gtfsdataset, db_session: Session) -> None:
67101
"""
68102
Create pipeline tasks for a dataset.
69103
"""
@@ -76,7 +110,7 @@ def create_pipeline_tasks(dataset: Gtfsdataset) -> None:
76110
stops_url = stops_file.hosted_url if stops_file else None
77111

78112
# Create reverse geolocation task
79-
if stops_url:
113+
if stops_url and has_file_changed(dataset, "stops.txt", db_session):
80114
create_http_reverse_geolocation_processor_task(
81115
stable_id, dataset_stable_id, stops_url
82116
)
@@ -85,7 +119,11 @@ def create_pipeline_tasks(dataset: Gtfsdataset) -> None:
85119
(file for file in gtfs_files if file.file_name == "routes.txt"), None
86120
)
87121
# Create PMTiles builder task
88-
if routes_file and 0 < routes_file.file_size_bytes < 1_000_000:
122+
if (
123+
routes_file
124+
and 0 < routes_file.file_size_bytes < 1_000_000
125+
and has_file_changed(dataset, "routes.txt", db_session)
126+
):
89127
create_http_pmtiles_builder_task(stable_id, dataset_stable_id)
90128
elif routes_file:
91129
logging.info(

0 commit comments

Comments
 (0)