Skip to content

Commit f971c5a

Browse files
authored
feat: batch fill pmtiles files (#1336)
1 parent ac4adda commit f971c5a

File tree

12 files changed

+799
-109
lines changed

12 files changed

+799
-109
lines changed

functions-python/batch_process_dataset/src/main.py

Lines changed: 40 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -262,29 +262,22 @@ def upload_dataset(self, public=True) -> DatasetFile or None:
262262
return None
263263

264264
@with_db_session
265-
def process_from_bucket_latest(
266-
self, db_session, public=True
267-
) -> DatasetFile or None:
265+
def process_from_bucket(self, db_session, public=True) -> Optional[DatasetFile]:
268266
"""
269-
Uploads a dataset to a GCP bucket as <feed_stable_id>/latest.zip and
270-
<feed_stable_id>/<feed_stable_id>-<upload_datetime>.zip
271-
if the dataset hash is different from the latest dataset stored
272-
:return: the file hash and the hosted url as a tuple or None if no upload is required
267+
Process an existing dataset from the GCP bucket updates the related database entities
268+
:return: The DatasetFile object created
273269
"""
274270
temp_file_path = None
275271
try:
276-
self.logger.info("Accessing URL %s", self.producer_url)
277272
temp_file_path = self.generate_temp_filename()
278-
blob_file_path = f"{self.feed_stable_id}/latest.zip"
273+
blob_file_path = f"{self.feed_stable_id}/{self.dataset_stable_id}/{self.dataset_stable_id}.zip"
274+
self.logger.info(f"Processing dataset from bucket: {blob_file_path}")
279275
download_from_gcs(
280276
os.getenv("DATASETS_BUCKET_NAME"), blob_file_path, temp_file_path
281277
)
282278

283279
extracted_files_path = self.unzip_files(temp_file_path)
284-
dataset_full_path = f"{self.feed_stable_id}/{self.dataset_stable_id}/{self.dataset_stable_id}.zip"
285-
self.logger.info(
286-
f"Creating file {dataset_full_path} in bucket {self.bucket_name}"
287-
)
280+
288281
_, extracted_files = self.upload_files_to_storage(
289282
temp_file_path,
290283
self.dataset_stable_id,
@@ -296,7 +289,7 @@ def process_from_bucket_latest(
296289
dataset_file = DatasetFile(
297290
stable_id=self.dataset_stable_id,
298291
file_sha256_hash=self.latest_hash,
299-
hosted_url=f"{self.public_hosted_datasets_url}/{dataset_full_path}",
292+
hosted_url=f"{self.public_hosted_datasets_url}/{blob_file_path}",
300293
extracted_files=extracted_files,
301294
zipped_size=(
302295
os.path.getsize(temp_file_path)
@@ -307,11 +300,21 @@ def process_from_bucket_latest(
307300
dataset = self.create_dataset_entities(
308301
dataset_file, skip_dataset_creation=True, db_session=db_session
309302
)
310-
create_pipeline_tasks(dataset)
303+
if dataset and dataset.latest:
304+
self.logger.info(
305+
f"Creating pipeline tasks for latest dataset {dataset.stable_id}"
306+
)
307+
create_pipeline_tasks(dataset)
308+
elif dataset:
309+
self.logger.info(
310+
f"Dataset {dataset.stable_id} is not the latest, skipping pipeline tasks creation."
311+
)
312+
else:
313+
raise ValueError("Dataset update failed, dataset is None.")
314+
return dataset_file
311315
finally:
312316
if temp_file_path and os.path.exists(temp_file_path):
313317
os.remove(temp_file_path)
314-
return None
315318

316319
def unzip_files(self, temp_file_path):
317320
extracted_files_path = os.path.join(temp_file_path.split(".")[0], "extracted")
@@ -356,11 +359,11 @@ def create_dataset_entities(
356359
f"[{self.feed_stable_id}] No latest dataset found for feed."
357360
)
358361

359-
self.logger.info(
360-
f"[{self.feed_stable_id}] Creating new dataset for feed with stable id {dataset_file.stable_id}."
361-
)
362362
dataset = None
363363
if not skip_dataset_creation:
364+
self.logger.info(
365+
f"[{self.feed_stable_id}] Creating new dataset for feed with stable id {dataset_file.stable_id}."
366+
)
364367
dataset = Gtfsdataset(
365368
id=str(uuid.uuid4()),
366369
feed_id=self.feed_id,
@@ -377,22 +380,20 @@ def create_dataset_entities(
377380
else []
378381
),
379382
zipped_size_bytes=dataset_file.zipped_size,
380-
unzipped_size_bytes=(
381-
sum([ex.file_size_bytes for ex in dataset_file.extracted_files])
382-
if dataset_file.extracted_files
383-
else None
384-
),
383+
unzipped_size_bytes=self._get_unzipped_size(dataset_file),
385384
)
386385
db_session.add(dataset)
387386
elif skip_dataset_creation and latest_dataset:
387+
self.logger.info(
388+
f"[{self.feed_stable_id}] Updating latest dataset for feed with stable id "
389+
f"{latest_dataset.stable_id}."
390+
)
388391
latest_dataset.gtfsfiles = (
389392
dataset_file.extracted_files if dataset_file.extracted_files else []
390393
)
391394
latest_dataset.zipped_size_bytes = dataset_file.zipped_size
392-
latest_dataset.unzipped_size_bytes = (
393-
sum([ex.file_size_bytes for ex in dataset_file.extracted_files])
394-
if dataset_file.extracted_files
395-
else None
395+
latest_dataset.unzipped_size_bytes = self._get_unzipped_size(
396+
dataset_file
396397
)
397398

398399
if latest_dataset and not skip_dataset_creation:
@@ -406,11 +407,19 @@ def create_dataset_entities(
406407
except Exception as e:
407408
raise Exception(f"Error creating dataset: {e}")
408409

410+
@staticmethod
411+
def _get_unzipped_size(dataset_file):
412+
return (
413+
sum([ex.file_size_bytes for ex in dataset_file.extracted_files])
414+
if dataset_file.extracted_files
415+
else None
416+
)
417+
409418
@with_db_session
410-
def process_from_producer_url(self, db_session) -> DatasetFile or None:
419+
def process_from_producer_url(self, db_session) -> Optional[DatasetFile]:
411420
"""
412421
Process the dataset and store new version in GCP bucket if any changes are detected
413-
:return: the file hash and the hosted url as a tuple or None if no upload is required
422+
:return: the DatasetFile object created
414423
"""
415424
dataset_file = self.upload_dataset()
416425

@@ -531,7 +540,7 @@ def process_dataset(cloud_event: CloudEvent):
531540
json_payload.get("dataset_stable_id"),
532541
)
533542
if json_payload.get("use_bucket_latest", False):
534-
dataset_file = processor.process_from_bucket_latest()
543+
dataset_file = processor.process_from_bucket()
535544
else:
536545
dataset_file = processor.process_from_producer_url()
537546
except Exception as e:

functions-python/batch_process_dataset/src/pipeline_tasks.py

Lines changed: 1 addition & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88

99
from shared.database.database import with_db_session
1010
from shared.database_gen.sqlacodegen_models import Gtfsdataset
11-
from shared.helpers.utils import create_http_task
11+
from shared.helpers.utils import create_http_task, create_http_pmtiles_builder_task
1212

1313

1414
def create_http_reverse_geolocation_processor_task(
@@ -41,32 +41,6 @@ def create_http_reverse_geolocation_processor_task(
4141
)
4242

4343

44-
def create_http_pmtiles_builder_task(
45-
stable_id: str,
46-
dataset_stable_id: str,
47-
) -> None:
48-
"""
49-
Create a task to generate PMTiles for a dataset.
50-
"""
51-
client = tasks_v2.CloudTasksClient()
52-
body = json.dumps(
53-
{"feed_stable_id": stable_id, "dataset_stable_id": dataset_stable_id}
54-
).encode()
55-
queue_name = os.getenv("PMTILES_BUILDER_QUEUE")
56-
project_id = os.getenv("PROJECT_ID")
57-
gcp_region = os.getenv("GCP_REGION")
58-
gcp_env = os.getenv("ENVIRONMENT")
59-
60-
create_http_task(
61-
client,
62-
body,
63-
f"https://{gcp_region}-{project_id}.cloudfunctions.net/pmtiles-builder-{gcp_env}",
64-
project_id,
65-
gcp_region,
66-
queue_name,
67-
)
68-
69-
7044
@with_db_session
7145
def get_changed_files(
7246
dataset: Gtfsdataset,

functions-python/batch_process_dataset/tests/test_batch_process_dataset_main.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -488,16 +488,18 @@ def test_process_from_bucket_latest_happy_path(
488488
)
489489

490490
# Act
491-
result = processor.process_from_bucket_latest(public=True)
491+
result = processor.process_from_bucket(public=True)
492492

493493
# Assert: function returns None in current implementation
494-
self.assertIsNone(result)
494+
self.assertIsNone(result.zipped_size)
495495

496496
# Assert: downloads from the bucket latest.zip for this feed
497497
mock_download_from_gcs.assert_called_once()
498498
args, kwargs = mock_download_from_gcs.call_args
499499
self.assertEqual(args[0], "test-bucket") # bucket name
500-
self.assertEqual(args[1], "feed_stable_id/latest.zip") # blob path
500+
self.assertEqual(
501+
args[1], "feed_stable_id/dataset-stable-id-123/dataset-stable-id-123.zip"
502+
) # blob path
501503
self.assertIsNotNone(
502504
args[2]
503505
) # temp file path (random), so just ensure it exists

functions-python/batch_process_dataset/tests/test_pipeline_tasks.py

Lines changed: 0 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55

66
from pipeline_tasks import (
77
create_http_reverse_geolocation_processor_task,
8-
create_http_pmtiles_builder_task,
98
get_changed_files,
109
create_pipeline_tasks,
1110
)
@@ -86,48 +85,6 @@ def test_create_http_reverse_geolocation_processor_task(
8685
self.assertEqual(args[4], "northamerica-northeast1")
8786
self.assertEqual(args[5], "rev-geo-queue")
8887

89-
@patch.dict(
90-
os.environ,
91-
{
92-
"PMTILES_BUILDER_QUEUE": "pmtiles-queue",
93-
"PROJECT_ID": "my-project",
94-
"GCP_REGION": "northamerica-northeast1",
95-
"ENVIRONMENT": "dev",
96-
},
97-
clear=False,
98-
)
99-
@patch("pipeline_tasks.create_http_task")
100-
@patch("pipeline_tasks.tasks_v2.CloudTasksClient")
101-
def test_create_http_pmtiles_builder_task(
102-
self, mock_client_cls, mock_create_http_task
103-
):
104-
client_instance = MagicMock()
105-
mock_client_cls.return_value = client_instance
106-
107-
stable_id = "feed-456"
108-
dataset_stable_id = "dataset-def"
109-
110-
create_http_pmtiles_builder_task(
111-
stable_id=stable_id, dataset_stable_id=dataset_stable_id
112-
)
113-
114-
mock_client_cls.assert_called_once()
115-
self.assertEqual(mock_create_http_task.call_count, 1)
116-
args, _ = mock_create_http_task.call_args
117-
118-
payload = json.loads(args[1].decode("utf-8"))
119-
self.assertEqual(
120-
payload,
121-
{"feed_stable_id": stable_id, "dataset_stable_id": dataset_stable_id},
122-
)
123-
self.assertEqual(
124-
args[2],
125-
"https://northamerica-northeast1-my-project.cloudfunctions.net/pmtiles-builder-dev",
126-
)
127-
self.assertEqual(args[3], "my-project")
128-
self.assertEqual(args[4], "northamerica-northeast1")
129-
self.assertEqual(args[5], "pmtiles-queue")
130-
13188

13289
class TestHasFileChanged(unittest.TestCase):
13390
def _make_mock_session_chain(self, previous_dataset):

functions-python/helpers/tests/test_helpers.py

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -212,3 +212,44 @@ def test_create_http_task(self):
212212
url = "test"
213213
create_http_task(client, body, url, "test", "test", "test")
214214
client.create_task.assert_called_once()
215+
216+
@patch.dict(
217+
os.environ,
218+
{
219+
"PMTILES_BUILDER_QUEUE": "pmtiles-queue",
220+
"PROJECT_ID": "my-project",
221+
"GCP_REGION": "northamerica-northeast1",
222+
"ENVIRONMENT": "dev",
223+
},
224+
clear=False,
225+
)
226+
@patch("utils.create_http_task")
227+
@patch("google.cloud.tasks_v2.CloudTasksClient")
228+
def test_create_http_pmtiles_builder_task(
229+
self, mock_client_cls, mock_create_http_task
230+
):
231+
from utils import create_http_pmtiles_builder_task
232+
import json
233+
234+
client_instance = MagicMock()
235+
mock_client_cls.return_value = client_instance
236+
stable_id = "feed-456"
237+
dataset_stable_id = "dataset-def"
238+
create_http_pmtiles_builder_task(
239+
stable_id=stable_id, dataset_stable_id=dataset_stable_id
240+
)
241+
mock_client_cls.assert_called_once()
242+
self.assertEqual(mock_create_http_task.call_count, 1)
243+
args, _ = mock_create_http_task.call_args
244+
payload = json.loads(args[1].decode("utf-8"))
245+
self.assertEqual(
246+
payload,
247+
{"feed_stable_id": stable_id, "dataset_stable_id": dataset_stable_id},
248+
)
249+
self.assertEqual(
250+
args[2],
251+
"https://northamerica-northeast1-my-project.cloudfunctions.net/pmtiles-builder-dev",
252+
)
253+
self.assertEqual(args[3], "my-project")
254+
self.assertEqual(args[4], "northamerica-northeast1")
255+
self.assertEqual(args[5], "pmtiles-queue")

functions-python/helpers/utils.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -221,6 +221,35 @@ def create_http_task(
221221
)
222222

223223

224+
def create_http_pmtiles_builder_task(
225+
stable_id: str,
226+
dataset_stable_id: str,
227+
) -> None:
228+
"""
229+
Create a task to generate PMTiles for a dataset.
230+
"""
231+
from google.cloud import tasks_v2
232+
import json
233+
234+
client = tasks_v2.CloudTasksClient()
235+
body = json.dumps(
236+
{"feed_stable_id": stable_id, "dataset_stable_id": dataset_stable_id}
237+
).encode()
238+
queue_name = os.getenv("PMTILES_BUILDER_QUEUE")
239+
project_id = os.getenv("PROJECT_ID")
240+
gcp_region = os.getenv("GCP_REGION")
241+
gcp_env = os.getenv("ENVIRONMENT")
242+
243+
create_http_task(
244+
client,
245+
body,
246+
f"https://{gcp_region}-{project_id}.cloudfunctions.net/pmtiles-builder-{gcp_env}",
247+
project_id,
248+
gcp_region,
249+
queue_name,
250+
)
251+
252+
224253
def get_execution_id(json_payload: dict, stable_id: Optional[str]) -> str:
225254
"""
226255
Extracts the execution_id from the JSON payload.

functions-python/tasks_executor/src/main.py

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,18 +18,22 @@
1818

1919
import flask
2020
import functions_framework
21+
2122
from shared.helpers.logger import init_logger
22-
from tasks.refresh_feedsearch_view.refresh_materialized_view import (
23-
refresh_materialized_view_handler,
24-
)
2523
from tasks.dataset_files.rebuild_missing_dataset_files import (
2624
rebuild_missing_dataset_files_handler,
2725
)
26+
from tasks.missing_bounding_boxes.rebuild_missing_bounding_boxes import (
27+
rebuild_missing_bounding_boxes_handler,
28+
)
29+
from tasks.refresh_feedsearch_view.refresh_materialized_view import (
30+
refresh_materialized_view_handler,
31+
)
2832
from tasks.validation_reports.rebuild_missing_validation_reports import (
2933
rebuild_missing_validation_reports_handler,
3034
)
31-
from tasks.missing_bounding_boxes.rebuild_missing_bounding_boxes import (
32-
rebuild_missing_bounding_boxes_handler,
35+
from tasks.visualization_files.rebuild_missing_visualization_files import (
36+
rebuild_missing_visualization_files_handler,
3337
)
3438

3539
init_logger()
@@ -62,6 +66,10 @@
6266
"description": "Rebuilds missing dataset files for GTFS datasets.",
6367
"handler": rebuild_missing_dataset_files_handler,
6468
},
69+
"rebuild_missing_visualization_files": {
70+
"description": "Rebuilds missing visualization files for GTFS datasets.",
71+
"handler": rebuild_missing_visualization_files_handler,
72+
},
6573
}
6674

6775

0 commit comments

Comments
 (0)