Skip to content

Commit 159bc76

Browse files
authored
Merge pull request #1294 from MobilityData/1172-refresh-feedsearch-view-asynchronically
feat: 1172 refresh feedsearch view asynchronically
2 parents bf22847 + eaeaf1c commit 159bc76

File tree

22 files changed

+336
-75
lines changed

22 files changed

+336
-75
lines changed

api/src/shared/common/gcp_utils.py

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
import logging
2+
import os
3+
from google.cloud import tasks_v2
4+
from google.protobuf.timestamp_pb2 import Timestamp
5+
6+
7+
def create_refresh_materialized_view_task():
8+
"""
9+
Asynchronously refresh a materialized view.
10+
Ensures deduplication by generating a unique task name.
11+
12+
Returns:
13+
dict: Response message and status code.
14+
"""
15+
from google.protobuf import timestamp_pb2
16+
from datetime import datetime, timedelta
17+
18+
try:
19+
logging.info("Creating materialized view refresh task.")
20+
now = datetime.now()
21+
22+
# BOUNCE WINDOW: next :00 or :30
23+
minute = now.minute
24+
if minute < 30:
25+
bucket_time = now.replace(minute=30, second=0, microsecond=0)
26+
else:
27+
bucket_time = now.replace(minute=0, second=0, microsecond=0) + timedelta(hours=1)
28+
29+
timestamp_str = bucket_time.strftime("%Y-%m-%d-%H-%M")
30+
task_name = f"refresh-materialized-view-{timestamp_str}"
31+
32+
# Convert to protobuf timestamp
33+
proto_time = timestamp_pb2.Timestamp()
34+
proto_time.FromDatetime(bucket_time)
35+
36+
# Cloud Tasks setup
37+
client = tasks_v2.CloudTasksClient()
38+
project = os.getenv("PROJECT_ID")
39+
location = os.getenv("LOCATION")
40+
queue = os.getenv("MATERIALIZED_VIEW_QUEUE")
41+
url = (
42+
f"https://{os.getenv('GCP_REGION')}-"
43+
f"{os.getenv('PROJECT_ID')}.cloudfunctions.net/"
44+
f"tasks-executor-{os.getenv('ENVIRONMENT_NAME')}"
45+
)
46+
47+
task_name = client.task_path(project, location, queue, task_name)
48+
49+
# Enqueue the task
50+
try:
51+
create_http_task_with_name(
52+
client=client,
53+
body=b"",
54+
url=url,
55+
project_id=project,
56+
gcp_region=location,
57+
queue_name=queue,
58+
task_name=task_name,
59+
task_time=proto_time,
60+
http_method=tasks_v2.HttpMethod.GET,
61+
)
62+
logging.info(f"Scheduled refresh materialized view task for {timestamp_str}")
63+
return {"message": f"Refresh task for {timestamp_str} scheduled."}, 200
64+
except Exception as e:
65+
if "ALREADY_EXISTS" in str(e):
66+
logging.info(f"Task already exists for {timestamp_str}, skipping.")
67+
68+
except Exception as error:
69+
error_msg = f"Error enqueuing task: {error}"
70+
logging.error(error_msg)
71+
return {"error": error_msg}, 500
72+
73+
74+
def create_http_task_with_name(
75+
client: "tasks_v2.CloudTasksClient",
76+
body: bytes,
77+
url: str,
78+
project_id: str,
79+
gcp_region: str,
80+
queue_name: str,
81+
task_name: str,
82+
task_time: Timestamp,
83+
http_method: "tasks_v2.HttpMethod",
84+
):
85+
"""Creates a GCP Cloud Task."""
86+
87+
token = tasks_v2.OidcToken(service_account_email=os.getenv("SERVICE_ACCOUNT_EMAIL"))
88+
89+
task = tasks_v2.Task(
90+
name=task_name,
91+
schedule_time=task_time,
92+
http_request=tasks_v2.HttpRequest(
93+
url=url,
94+
http_method=http_method,
95+
oidc_token=token,
96+
body=body,
97+
headers={"Content-Type": "application/json"},
98+
),
99+
)
100+
client.create_task(parent=client.queue_path(project_id, gcp_region, queue_name), task=task)

api/src/shared/database/database.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
from sqlalchemy.orm import sessionmaker
2222
import logging
2323

24+
2425
from shared.common.logging_utils import get_env_logging_level
2526

2627

functions-python/backfill_dataset_service_date_range/requirements.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ geoalchemy2==0.14.7
1717
# Google specific packages for this function
1818
cloudevents~=1.10.1
1919
google-cloud-storage
20+
google-cloud-tasks
2021

2122
# Configuration
2223
python-dotenv==1.0.0

functions-python/backfill_dataset_service_date_range/src/main.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,10 @@
22
import os
33
import functions_framework
44

5+
from shared.common.gcp_utils import create_refresh_materialized_view_task
56
from shared.helpers.logger import init_logger
67

7-
from shared.database.database import with_db_session, refresh_materialized_view
8+
from shared.database.database import with_db_session
89

910
from sqlalchemy.orm import joinedload, Session
1011
from sqlalchemy import or_, func
@@ -17,13 +18,12 @@
1718
from shared.database_gen.sqlacodegen_models import (
1819
Gtfsdataset,
1920
Validationreport,
20-
t_feedsearch,
2121
)
2222

23-
import requests
2423
import json
2524

2625
from google.cloud import storage
26+
import requests
2727

2828
env = os.getenv("ENV", "dev").lower()
2929
bucket_name = f"mobilitydata-datasets-{env}"
@@ -146,7 +146,7 @@ def backfill_datasets(session: "Session"):
146146
try:
147147
changes_count = 0
148148
session.commit()
149-
refresh_materialized_view(session, t_feedsearch.name)
149+
create_refresh_materialized_view_task()
150150
logging.info(f"{changes_count} elements committed.")
151151
except Exception as e:
152152
logging.error("Error committing changes:", e)
Lines changed: 41 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
# Bath Process Dataset
2+
23
Subscribed to the topic set in the `batch-datasets` function, `batch-process-dataset` is triggered for each message published. It handles the processing of each feed individually, ensuring data consistency and integrity. The function performs the following operations:
34

45
1. **Download Data**: It retrieves the feed data from the provided URL.
@@ -8,34 +9,56 @@ Subscribed to the topic set in the `batch-datasets` function, `batch-process-dat
89

910
The URL format for accessing these datasets is standardized as `<bucket-url>/<feed_stable_id>/<dataset_id>.zip`, ensuring a consistent and predictable path for data retrieval.
1011

11-
1212
# Message format
13+
1314
The function expects a Pub/Sub message with the following format:
15+
1416
```json
15-
{
16-
"message": {
17-
"data":
18-
{
19-
"execution_id": "execution_id",
20-
"producer_url": "producer_url",
21-
"feed_stable_id": "feed_stable_id",
22-
"feed_id": "feed_id",
23-
"dataset_id": "dataset_id",
24-
"dataset_hash": "dataset_hash",
25-
"authentication_type": "authentication_type",
26-
"authentication_info_url": "authentication_info_url",
27-
"api_key_parameter_name": "api_key_parameter_name"
28-
}
29-
}
17+
{
18+
"message": {
19+
"data": {
20+
"execution_id": "execution_id",
21+
"producer_url": "producer_url",
22+
"feed_stable_id": "feed_stable_id",
23+
"feed_id": "feed_id",
24+
"dataset_id": "dataset_id",
25+
"dataset_hash": "dataset_hash",
26+
"authentication_type": "authentication_type",
27+
"authentication_info_url": "authentication_info_url",
28+
"api_key_parameter_name": "api_key_parameter_name"
3029
}
30+
}
31+
}
32+
```
33+
34+
# Example
35+
36+
```json
37+
{
38+
"message": {
39+
"data": {
40+
"execution_id": "JLU_20250721A",
41+
"producer_url": "http://api.511.org/transit/datafeeds?operator_id=CE",
42+
"feed_stable_id": "mdb-2684",
43+
"feed_id": "2f5d7b4e-bb9b-49ae-a011-b61d7d9b53ff",
44+
"dataset_id": null,
45+
"dataset_hash": null,
46+
"authentication_type": "1",
47+
"authentication_info_url": "https://511.org/open-data/token",
48+
"api_key_parameter_name": "api_key"
49+
}
50+
}
51+
}
3152
```
3253

3354
# Function configuration
55+
3456
The function is configured using the following environment variables:
57+
3558
- `DATASETS_BUCKET_NAME`: The name of the bucket where the datasets are stored.
3659
- `FEEDS_DATABASE_URL`: The URL of the feeds database.
3760
- `MAXIMUM_EXECUTIONS`: [Optional] The maximum number of executions per datasets. This controls the number of times a dataset can be processed per execution id. By default, is 1.
3861

39-
4062
# Local development
41-
The local development of this function follows the same steps as the other functions. Please refer to the [README.md](../README.md) file for more information.
63+
64+
The local development of this function follows the same steps as the other functions. Please refer to the [README.md](../README.md) file for more information.

functions-python/batch_process_dataset/requirements.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ google-api-core
2121
google-cloud-firestore
2222
google-cloud-datastore
2323
google-cloud-bigquery
24+
google-cloud-tasks
2425
cloudevents~=1.10.1
2526

2627
# Configuration

functions-python/batch_process_dataset/src/main.py

Lines changed: 18 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,11 @@
2929
from google.cloud import storage
3030
from sqlalchemy import func
3131

32-
from shared.database_gen.sqlacodegen_models import Gtfsdataset, t_feedsearch, Gtfsfile
32+
from shared.common.gcp_utils import create_refresh_materialized_view_task
33+
from shared.database_gen.sqlacodegen_models import Gtfsdataset, Gtfsfile
34+
3335
from shared.dataset_service.main import DatasetTraceService, DatasetTrace, Status
34-
from shared.database.database import with_db_session, refresh_materialized_view
36+
from shared.database.database import with_db_session
3537
import logging
3638

3739
from shared.helpers.logger import init_logger, get_logger
@@ -233,9 +235,11 @@ def upload_dataset(self, public=True) -> DatasetFile or None:
233235
file_sha256_hash=file_sha256_hash,
234236
hosted_url=f"{self.public_hosted_datasets_url}/{dataset_full_path}",
235237
extracted_files=extracted_files,
236-
zipped_size=os.path.getsize(temp_file_path)
237-
if os.path.exists(temp_file_path)
238-
else None,
238+
zipped_size=(
239+
os.path.getsize(temp_file_path)
240+
if os.path.exists(temp_file_path)
241+
else None
242+
),
239243
)
240244

241245
self.logger.info(
@@ -298,15 +302,15 @@ def create_dataset(self, dataset_file: DatasetFile, db_session: Session):
298302
hash=dataset_file.file_sha256_hash,
299303
downloaded_at=func.now(),
300304
hosted_url=dataset_file.hosted_url,
301-
gtfsfiles=dataset_file.extracted_files
302-
if dataset_file.extracted_files
303-
else [],
305+
gtfsfiles=(
306+
dataset_file.extracted_files if dataset_file.extracted_files else []
307+
),
304308
zipped_size_bytes=dataset_file.zipped_size,
305-
unzipped_size_bytes=sum(
306-
[ex.file_size_bytes for ex in dataset_file.extracted_files]
307-
)
308-
if dataset_file.extracted_files
309-
else None,
309+
unzipped_size_bytes=(
310+
sum([ex.file_size_bytes for ex in dataset_file.extracted_files])
311+
if dataset_file.extracted_files
312+
else None
313+
),
310314
)
311315
if latest_dataset:
312316
latest_dataset.latest = False
@@ -315,10 +319,7 @@ def create_dataset(self, dataset_file: DatasetFile, db_session: Session):
315319
db_session.commit()
316320
self.logger.info(f"[{self.feed_stable_id}] Dataset created successfully.")
317321

318-
refresh_materialized_view(db_session, t_feedsearch.name)
319-
self.logger.info(
320-
f"[{self.feed_stable_id}] Materialized view refresh event triggered successfully."
321-
)
322+
create_refresh_materialized_view_task()
322323
except Exception as e:
323324
raise Exception(f"Error creating dataset: {e}")
324325

functions-python/helpers/feed_status.py

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
import logging
22
from datetime import datetime, timezone
33
from sqlalchemy import text
4-
from shared.database_gen.sqlacodegen_models import Gtfsdataset, Feed, t_feedsearch
5-
from shared.database.database import refresh_materialized_view
4+
from shared.database_gen.sqlacodegen_models import Gtfsdataset, Feed
65
from typing import TYPE_CHECKING
76

87
if TYPE_CHECKING:
98
from sqlalchemy.orm import Session
9+
from shared.common.gcp_utils import create_refresh_materialized_view_task
1010

1111

1212
# query to update the status of the feeds based on the service date range of the latest dataset
@@ -74,14 +74,10 @@ def get_filters(status: str):
7474
raise Exception(f"Error updating feed statuses: {e}")
7575

7676
try:
77-
session.commit()
78-
refresh_materialized_view(session, t_feedsearch.name)
77+
create_refresh_materialized_view_task()
7978
logging.info("Feed Database changes for status committed.")
8079
logging.info("Status Changes: %s", diff_counts)
81-
session.close()
8280
return diff_counts
8381
except Exception as e:
8482
logging.error("Error committing changes:", e)
85-
session.rollback()
86-
session.close()
8783
raise Exception(f"Error creating dataset: {e}")

functions-python/helpers/utils.py

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -151,20 +151,21 @@ def create_http_task(
151151
gcp_region: str,
152152
queue_name: str,
153153
) -> None:
154-
"""Creates a GCP Cloud Task."""
154+
from shared.common.gcp_utils import create_http_task_with_name
155155
from google.cloud import tasks_v2
156+
from google.protobuf import timestamp_pb2
156157

157-
task = tasks_v2.Task(
158-
http_request=tasks_v2.HttpRequest(
159-
url=url,
160-
http_method=tasks_v2.HttpMethod.POST,
161-
oidc_token=tasks_v2.OidcToken(
162-
service_account_email=os.getenv("SERVICE_ACCOUNT_EMAIL")
163-
),
164-
body=body,
165-
headers={"Content-Type": "application/json"},
166-
)
167-
)
168-
client.create_task(
169-
parent=client.queue_path(project_id, gcp_region, queue_name), task=task
158+
proto_time = timestamp_pb2.Timestamp()
159+
proto_time.GetCurrentTime()
160+
161+
create_http_task_with_name(
162+
client=client,
163+
body=body,
164+
url=url,
165+
project_id=project_id,
166+
gcp_region=gcp_region,
167+
queue_name=queue_name,
168+
task_name="task_name",
169+
task_time=proto_time,
170+
http_method=tasks_v2.HttpMethod.POST,
170171
)

functions-python/process_validation_report/requirements.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ geoalchemy2==0.14.7
1616

1717
# Google specific packages for this function
1818
cloudevents~=1.10.1
19+
google-cloud-tasks
1920

2021
# Configuration
2122
python-dotenv==1.0.0

0 commit comments

Comments
 (0)