Skip to content

Commit e2a14da

Browse files
committed
added pipeline tasks to batch processing
1 parent be75a13 commit e2a14da

File tree

4 files changed

+87
-4
lines changed

4 files changed

+87
-4
lines changed

functions-python/batch_process_dataset/src/main.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
get_hash_from_file,
4242
download_from_gcs,
4343
)
44+
from pipeline_tasks import create_pipeline_tasks
4445

4546
init_logger()
4647

@@ -300,7 +301,8 @@ def process_from_bucket_latest(self, public=True) -> DatasetFile or None:
300301
else None
301302
),
302303
)
303-
self.create_dataset_entities(dataset_file, skip_dataset_creation=True)
304+
dataset = self.create_dataset_entities(dataset_file, skip_dataset_creation=True)
305+
create_pipeline_tasks(dataset)
304306
finally:
305307
if temp_file_path and os.path.exists(temp_file_path):
306308
os.remove(temp_file_path)
@@ -352,6 +354,7 @@ def create_dataset_entities(
352354
self.logger.info(
353355
f"[{self.feed_stable_id}] Creating new dataset for feed with stable id {dataset_file.stable_id}."
354356
)
357+
dataset = None
355358
if not skip_dataset_creation:
356359
dataset = Gtfsdataset(
357360
id=str(uuid.uuid4()),
@@ -394,6 +397,7 @@ def create_dataset_entities(
394397
self.logger.info(f"[{self.feed_stable_id}] Dataset created successfully.")
395398

396399
create_refresh_materialized_view_task()
400+
return latest_dataset if skip_dataset_creation else dataset
397401
except Exception as e:
398402
raise Exception(f"Error creating dataset: {e}")
399403

@@ -407,7 +411,8 @@ def process_from_producer_url(self) -> DatasetFile or None:
407411
if dataset_file is None:
408412
self.logger.info(f"[{self.feed_stable_id}] No database update required.")
409413
return None
410-
self.create_dataset_entities(dataset_file)
414+
dataset = self.create_dataset_entities(dataset_file)
415+
create_pipeline_tasks(dataset)
411416
return dataset_file
412417

413418

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
import json
2+
import os
3+
4+
from google.cloud import tasks_v2
5+
6+
from shared.database_gen.sqlacodegen_models import Gtfsdataset
7+
from shared.helpers.utils import create_http_task
8+
9+
10+
def create_http_reverse_geolocation_processor_task(
11+
stable_id: str,
12+
dataset_stable_id: str,
13+
stops_url: str,
14+
) -> None:
15+
"""
16+
Create a task to process reverse geolocation for a dataset.
17+
"""
18+
client = tasks_v2.CloudTasksClient()
19+
body = json.dumps(
20+
{"stable_id": stable_id, "stops_url": stops_url, "dataset_id": dataset_stable_id}
21+
).encode()
22+
queue_name = os.getenv("REVERSE_GEOLOCATION_QUEUE_NAME")
23+
project_id = os.getenv("PROJECT_ID")
24+
gcp_region = os.getenv("GCP_REGION")
25+
26+
create_http_task(
27+
client,
28+
body,
29+
f"https://{gcp_region}-{project_id}.cloudfunctions.net/reverse-geolocation-processor",
30+
project_id,
31+
gcp_region,
32+
queue_name,
33+
)
34+
35+
def create_http_pmtiles_builder_task(
36+
stable_id: str,
37+
dataset_stable_id: str,
38+
) -> None:
39+
"""
40+
Create a task to generate PMTiles for a dataset.
41+
"""
42+
client = tasks_v2.CloudTasksClient()
43+
body = json.dumps(
44+
{"feed_stable_id": stable_id, "dataset_stable_id": dataset_stable_id}
45+
).encode()
46+
queue_name = os.getenv("PMTILES_BUILDER_QUEUE_NAME")
47+
project_id = os.getenv("PROJECT_ID")
48+
gcp_region = os.getenv("GCP_REGION")
49+
50+
create_http_task(
51+
client,
52+
body,
53+
f"https://{gcp_region}-{project_id}.cloudfunctions.net/pmtiles_builder",
54+
project_id,
55+
gcp_region,
56+
queue_name,
57+
)
58+
59+
60+
def create_pipeline_tasks(
61+
dataset: Gtfsdataset
62+
) -> None:
63+
"""
64+
Create pipeline tasks for a dataset.
65+
"""
66+
stable_id = dataset.feed.stable_id
67+
dataset_stable_id = dataset.stable_id
68+
gtfs_files = dataset.gtfsfiles
69+
stops_file = next(
70+
(file for file in gtfs_files if file.file_name == "stops.txt"), None
71+
)
72+
stops_url = stops_file.hosted_url if stops_file else None
73+
74+
# Create reverse geolocation task
75+
if stops_url:
76+
create_http_reverse_geolocation_processor_task(stable_id, dataset_stable_id, stops_url)
77+
78+
# Create PMTiles builder task
79+
create_http_pmtiles_builder_task(stable_id, dataset_stable_id)
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
# TODO

functions-python/tasks_executor/requirements.txt

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,5 +26,3 @@ google-cloud-storage
2626

2727
# Configuration
2828
python-dotenv==1.0.0
29-
tippecanoe
30-

0 commit comments

Comments
 (0)