Skip to content

Commit 89f3a0d

Browse files
committed
updating triggers
1 parent f6a2baf commit 89f3a0d

File tree

3 files changed

+62
-31
lines changed

3 files changed

+62
-31
lines changed

functions-python/batch_process_dataset/src/pipeline_tasks.py

Lines changed: 48 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import json
22
import logging
33
import os
4+
from typing import Iterable, List
45

56
from google.cloud import tasks_v2
67
from sqlalchemy.orm import Session
@@ -66,41 +67,57 @@ def create_http_pmtiles_builder_task(
6667

6768

6869
@with_db_session
69-
def has_file_changed(dataset: Gtfsdataset, file_name: str, db_session: Session) -> bool:
70+
def get_changed_files(
71+
dataset: Gtfsdataset,
72+
db_session: Session,
73+
) -> List[str]:
7074
"""
71-
Check if a file has changed in the dataset.
75+
Return the subset of `file_names` whose content hash changed compared to the
76+
previous dataset for the same feed.
77+
- If there is no previous dataset → any file that exists in the new dataset is considered "changed".
78+
- If the file existed before and now is missing → NOT considered changed.
79+
- If the file did not exist before but exists now → considered changed.
80+
- If hashes differ → considered changed.
7281
"""
7382
previous_dataset = (
7483
db_session.query(Gtfsdataset)
7584
.filter(
7685
Gtfsdataset.feed_id == dataset.feed_id,
7786
Gtfsdataset.id != dataset.id,
78-
Gtfsdataset.latest.is_(False),
7987
)
8088
.order_by(Gtfsdataset.downloaded_at.desc())
8189
.first()
8290
)
91+
92+
new_files = list(dataset.gtfsfiles)
93+
94+
# No previous dataset -> everything that exists now is "changed"
8395
if not previous_dataset:
84-
return True
85-
existing_file = next(
86-
(file for file in previous_dataset.gtfsfiles if file.file_name == file_name),
87-
None,
88-
)
89-
if not existing_file:
90-
return True
91-
new_dataset_file = next(
92-
(file for file in dataset.gtfsfiles if file.file_name == file_name), None
93-
)
94-
if not new_dataset_file:
95-
return True
96-
return existing_file.hash != new_dataset_file.hash
96+
return [f.file_name for f in new_files]
97+
98+
prev_map = {
99+
f.file_name: getattr(f, "hash", None) for f in previous_dataset.gtfsfiles
100+
}
101+
102+
changed_files = []
103+
for f in new_files:
104+
new_hash = getattr(f, "hash", None)
105+
old_hash = prev_map.get(f.file_name)
106+
107+
if old_hash is None or old_hash != new_hash:
108+
changed_files.append(f)
109+
logging.info(f"Changed file {f.file_name} from {old_hash} to {new_hash}")
110+
111+
return [f.file_name for f in changed_files]
97112

98113

99114
@with_db_session
100115
def create_pipeline_tasks(dataset: Gtfsdataset, db_session: Session) -> None:
101116
"""
102117
Create pipeline tasks for a dataset.
103118
"""
119+
changed_files = get_changed_files(dataset, db_session=db_session)
120+
104121
stable_id = dataset.feed.stable_id
105122
dataset_stable_id = dataset.stable_id
106123
gtfs_files = dataset.gtfsfiles
@@ -110,7 +127,7 @@ def create_pipeline_tasks(dataset: Gtfsdataset, db_session: Session) -> None:
110127
stops_url = stops_file.hosted_url if stops_file else None
111128

112129
# Create reverse geolocation task
113-
if stops_url and has_file_changed(dataset, "stops.txt", db_session):
130+
if stops_url and "stops.txt" in changed_files:
114131
create_http_reverse_geolocation_processor_task(
115132
stable_id, dataset_stable_id, stops_url
116133
)
@@ -119,10 +136,23 @@ def create_pipeline_tasks(dataset: Gtfsdataset, db_session: Session) -> None:
119136
(file for file in gtfs_files if file.file_name == "routes.txt"), None
120137
)
121138
# Create PMTiles builder task
139+
required_files = {"stops.txt", "routes.txt", "trips.txt", "stop_times.txt"}
140+
if not required_files.issubset(set(f.file_name for f in gtfs_files)):
141+
logging.info(
142+
f"Skipping PMTiles task for dataset {dataset_stable_id} due to missing required files. Required files: "
143+
f"{required_files}, available files: {[f.file_name for f in gtfs_files]}"
144+
)
145+
expected_file_change: Iterable[str] = {
146+
"stops.txt",
147+
"trips.txt",
148+
"routes.txt",
149+
"stop_times.txt",
150+
"shapes.txt",
151+
}
122152
if (
123153
routes_file
124154
and 0 < routes_file.file_size_bytes < 1_000_000
125-
and has_file_changed(dataset, "routes.txt", db_session)
155+
and not set(changed_files).isdisjoint(expected_file_change)
126156
):
127157
create_http_pmtiles_builder_task(stable_id, dataset_stable_id)
128158
elif routes_file:

functions-python/batch_process_dataset/tests/test_batch_process_dataset_main.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -451,6 +451,7 @@ def test_process_dataset_missing_stable_id(self, mock_dataset_trace):
451451
)
452452

453453
@patch.dict(os.environ, {"DATASETS_BUCKET_NAME": "test-bucket"})
454+
@patch("main.create_pipeline_tasks")
454455
@patch("main.DatasetProcessor.create_dataset_entities")
455456
@patch("main.DatasetProcessor.upload_files_to_storage")
456457
@patch("main.DatasetProcessor.unzip_files")
@@ -461,6 +462,7 @@ def test_process_from_bucket_latest_happy_path(
461462
mock_unzip_files,
462463
mock_upload_files_to_storage,
463464
mock_create_dataset_entities,
465+
_,
464466
):
465467
# Arrange
466468
mock_blob = MagicMock()

functions-python/batch_process_dataset/tests/test_pipeline_tasks.py

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
from pipeline_tasks import (
77
create_http_reverse_geolocation_processor_task,
88
create_http_pmtiles_builder_task,
9-
has_file_changed,
9+
get_changed_files,
1010
create_pipeline_tasks,
1111
)
1212

@@ -152,9 +152,8 @@ def test_no_previous_dataset_returns_true(self):
152152
)
153153
mock_session = self._make_mock_session_chain(previous_dataset=None)
154154

155-
# Use __wrapped__ to bypass @with_db_session
156-
result = has_file_changed(dataset, "stops.txt", db_session=mock_session)
157-
self.assertTrue(result)
155+
result = get_changed_files(dataset, db_session=mock_session)
156+
self.assertTrue("stops.txt" in result)
158157

159158
def test_previous_without_target_file_returns_true(self):
160159
prev = SimpleDataset(
@@ -173,10 +172,10 @@ def test_previous_without_target_file_returns_true(self):
173172
)
174173
mock_session = self._make_mock_session_chain(previous_dataset=prev)
175174

176-
result = has_file_changed(dataset, "stops.txt", db_session=mock_session)
177-
self.assertTrue(result)
175+
result = get_changed_files(dataset, db_session=mock_session)
176+
self.assertTrue("stops.txt" in result)
178177

179-
def test_new_dataset_missing_target_file_returns_true(self):
178+
def test_new_dataset_missing_target_file_returns_false(self):
180179
prev = SimpleDataset(
181180
feed_id=1,
182181
dataset_id=9,
@@ -193,8 +192,8 @@ def test_new_dataset_missing_target_file_returns_true(self):
193192
)
194193
mock_session = self._make_mock_session_chain(previous_dataset=prev)
195194

196-
result = has_file_changed(dataset, "stops.txt", db_session=mock_session)
197-
self.assertTrue(result)
195+
result = get_changed_files(dataset, db_session=mock_session)
196+
self.assertFalse("stops.txt" in result)
198197

199198
def test_hash_diff_returns_true(self):
200199
prev = SimpleDataset(
@@ -213,8 +212,8 @@ def test_hash_diff_returns_true(self):
213212
)
214213
mock_session = self._make_mock_session_chain(previous_dataset=prev)
215214

216-
result = has_file_changed(dataset, "stops.txt", db_session=mock_session)
217-
self.assertTrue(result)
215+
result = get_changed_files(dataset, db_session=mock_session)
216+
self.assertTrue("stops.txt" in result)
218217

219218
def test_hash_same_returns_false(self):
220219
prev = SimpleDataset(
@@ -233,8 +232,8 @@ def test_hash_same_returns_false(self):
233232
)
234233
mock_session = self._make_mock_session_chain(previous_dataset=prev)
235234

236-
result = has_file_changed(dataset, "stops.txt", db_session=mock_session)
237-
self.assertFalse(result)
235+
result = get_changed_files(dataset, db_session=mock_session)
236+
self.assertFalse("stops.txt" in result)
238237

239238
class TestCreatePipelineTasks(unittest.TestCase):
240239
@patch.dict(

0 commit comments

Comments
 (0)