Skip to content

Commit dcf5d95

Browse files
committed
updated code to use create_refresh_materialized_view()
1 parent 68ea8c2 commit dcf5d95

File tree

5 files changed

+62
-86
lines changed

5 files changed

+62
-86
lines changed
Lines changed: 41 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
# Bath Process Dataset
2+
23
Subscribed to the topic set in the `batch-datasets` function, `batch-process-dataset` is triggered for each message published. It handles the processing of each feed individually, ensuring data consistency and integrity. The function performs the following operations:
34

45
1. **Download Data**: It retrieves the feed data from the provided URL.
@@ -8,34 +9,56 @@ Subscribed to the topic set in the `batch-datasets` function, `batch-process-dat
89

910
The URL format for accessing these datasets is standardized as `<bucket-url>/<feed_stable_id>/<dataset_id>.zip`, ensuring a consistent and predictable path for data retrieval.
1011

11-
1212
# Message format
13+
1314
The function expects a Pub/Sub message with the following format:
15+
1416
```json
15-
{
16-
"message": {
17-
"data":
18-
{
19-
"execution_id": "execution_id",
20-
"producer_url": "producer_url",
21-
"feed_stable_id": "feed_stable_id",
22-
"feed_id": "feed_id",
23-
"dataset_id": "dataset_id",
24-
"dataset_hash": "dataset_hash",
25-
"authentication_type": "authentication_type",
26-
"authentication_info_url": "authentication_info_url",
27-
"api_key_parameter_name": "api_key_parameter_name"
28-
}
29-
}
17+
{
18+
"message": {
19+
"data": {
20+
"execution_id": "execution_id",
21+
"producer_url": "producer_url",
22+
"feed_stable_id": "feed_stable_id",
23+
"feed_id": "feed_id",
24+
"dataset_id": "dataset_id",
25+
"dataset_hash": "dataset_hash",
26+
"authentication_type": "authentication_type",
27+
"authentication_info_url": "authentication_info_url",
28+
"api_key_parameter_name": "api_key_parameter_name"
3029
}
30+
}
31+
}
32+
```
33+
34+
# Example
35+
36+
```json
37+
{
38+
"message": {
39+
"data": {
40+
"execution_id": "JLU_20250721A",
41+
"producer_url": "http://api.511.org/transit/datafeeds?operator_id=CE",
42+
"feed_stable_id": "mdb-2684",
43+
"feed_id": "2f5d7b4e-bb9b-49ae-a011-b61d7d9b53ff",
44+
"dataset_id": null,
45+
"dataset_hash": null,
46+
"authentication_type": "1",
47+
"authentication_info_url": "https://511.org/open-data/token",
48+
"api_key_parameter_name": "api_key"
49+
}
50+
}
51+
}
3152
```
3253

3354
# Function configuration
55+
3456
The function is configured using the following environment variables:
57+
3558
- `DATASETS_BUCKET_NAME`: The name of the bucket where the datasets are stored.
3659
- `FEEDS_DATABASE_URL`: The URL of the feeds database.
3760
- `MAXIMUM_EXECUTIONS`: [Optional] The maximum number of executions per datasets. This controls the number of times a dataset can be processed per execution id. By default, is 1.
3861

39-
4062
# Local development
41-
The local development of this function follows the same steps as the other functions. Please refer to the [README.md](../README.md) file for more information.
63+
64+
The local development of this function follows the same steps as the other functions. Please refer to the [README.md](../README.md) file for more information.

functions-python/batch_process_dataset/src/main.py

Lines changed: 5 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,10 @@
3232

3333
from shared.database_gen.sqlacodegen_models import Gtfsdatasets
3434
from shared.dataset_service.main import DatasetTraceService, DatasetTrace, Status
35-
from shared.database.database import with_db_session
35+
from shared.database.database import (
36+
with_db_session,
37+
create_refresh_materialized_view_task,
38+
)
3639
import logging
3740

3841
from shared.helpers.logger import init_logger, get_logger
@@ -254,25 +257,7 @@ def create_dataset(self, dataset_file: DatasetFile, db_session: Session):
254257
db_session.commit()
255258
self.logger.info(f"[{self.feed_stable_id}] Dataset created successfully.")
256259

257-
# Replace direct call to refresh_materialized_view with HTTP request to the refresh function
258-
refresh_url = os.getenv("FUNCTION_URL_REFRESH_MV")
259-
if not refresh_url:
260-
raise ValueError(
261-
"FUNCTION_URL_REFRESH_MV environment variable is not set"
262-
)
263-
264-
# Create an authorized request
265-
auth_req = requests.Request()
266-
267-
# Get an identity token for the target URL
268-
token = id_token.fetch_id_token(auth_req, refresh_url)
269-
270-
# Make the HTTP request with the ID token
271-
headers = {"Authorization": f"Bearer {token}"}
272-
response = http_requests.get(refresh_url, headers=headers)
273-
274-
response.raise_for_status()
275-
self.logger.info("Materialized view refresh event triggered successfully.")
260+
create_refresh_materialized_view_task()
276261
except Exception as e:
277262
raise Exception(f"Error creating dataset: {e}")
278263

functions-python/helpers/feed_status.py

Lines changed: 4 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,14 @@
11
import logging
22
from datetime import datetime, timezone
33
from sqlalchemy import text
4-
import requests as http_requests
5-
import os
64
from shared.database_gen.sqlacodegen_models import Gtfsdataset, Feed
75
from typing import TYPE_CHECKING
86

97
if TYPE_CHECKING:
108
from sqlalchemy.orm import Session
11-
12-
from google.auth.transport import requests
13-
from google.oauth2 import id_token
9+
from shared.database.database import (
10+
create_refresh_materialized_view_task,
11+
)
1412

1513

1614
# query to update the status of the feeds based on the service date range of the latest dataset
@@ -78,30 +76,10 @@ def get_filters(status: str):
7876
raise Exception(f"Error updating feed statuses: {e}")
7977

8078
try:
81-
session.commit()
82-
# Replace direct call to refresh_materialized_view with HTTP request to the refresh function
83-
refresh_url = os.getenv("FUNCTION_URL_REFRESH_MV")
84-
if not refresh_url:
85-
raise ValueError("FUNCTION_URL_REFRESH_MV environment variable is not set")
86-
87-
# Create an authorized request
88-
auth_req = requests.Request()
89-
90-
# Get an identity token for the target URL
91-
token = id_token.fetch_id_token(auth_req, refresh_url)
92-
93-
# Make the HTTP request with the ID token
94-
headers = {"Authorization": f"Bearer {token}"}
95-
response = http_requests.get(refresh_url, headers=headers)
96-
97-
response.raise_for_status()
98-
logging.info("Materialized view refresh event triggered successfully.")
79+
create_refresh_materialized_view_task()
9980
logging.info("Feed Database changes for status committed.")
10081
logging.info("Status Changes: %s", diff_counts)
101-
session.close()
10282
return diff_counts
10383
except Exception as e:
10484
logging.error("Error committing changes:", e)
105-
session.rollback()
106-
session.close()
10785
raise Exception(f"Error creating dataset: {e}")

functions-python/reverse_geolocation/src/reverse_geolocation_processor.py

Lines changed: 6 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77

88
import flask
99
import pandas as pd
10-
import requests as http_requests
1110
import shapely.geometry
1211
from geoalchemy2 import WKTElement
1312
from geoalchemy2.shape import to_shape
@@ -24,7 +23,11 @@
2423
geopolygons_as_string,
2524
)
2625
from parse_request import parse_request_parameters
27-
from shared.database.database import with_db_session
26+
from shared.database.database import (
27+
with_db_session,
28+
create_refresh_materialized_view_task,
29+
)
30+
2831
from shared.database_gen.sqlacodegen_models import (
2932
Geopolygon,
3033
Feed,
@@ -37,9 +40,6 @@
3740
)
3841
from shared.helpers.logger import get_logger
3942

40-
from google.auth.transport import requests
41-
from google.oauth2 import id_token
42-
4343

4444
@with_db_session
4545
def get_cached_geopolygons(
@@ -379,23 +379,7 @@ def extract_location_aggregates(
379379
# Commit the changes to the database before refreshing the materialized view
380380
db_session.commit()
381381

382-
# Replace direct call to refresh_materialized_view with HTTP request to the refresh function
383-
refresh_url = os.getenv("FUNCTION_URL_REFRESH_MV")
384-
if not refresh_url:
385-
raise ValueError("FUNCTION_URL_REFRESH_MV environment variable is not set")
386-
387-
# Create an authorized request
388-
auth_req = requests.Request()
389-
390-
# Get an identity token for the target URL
391-
token = id_token.fetch_id_token(auth_req, refresh_url)
392-
393-
# Make the HTTP request with the ID token
394-
headers = {"Authorization": f"Bearer {token}"}
395-
response = http_requests.get(refresh_url, headers=headers)
396-
397-
response.raise_for_status()
398-
logger.info("Materialized view refresh event triggered successfully.")
382+
create_refresh_materialized_view_task()
399383

400384

401385
@with_db_session

functions-python/tasks_executor/README.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,12 @@ Example:
3131
"after_date": "2025-06-01"
3232
}
3333
}
34+
{
35+
"task": "refresh_materialized_view",
36+
"payload": {
37+
"dry_run": true
38+
}
39+
}
3440
```
3541

3642
To get the list of supported tasks use:

0 commit comments

Comments
 (0)