Skip to content

Commit b175dbc

Browse files
committed
used http call to the refresh function in other GCP functions
1 parent 06e5a32 commit b175dbc

File tree

4 files changed

+69
-19
lines changed

4 files changed

+69
-19
lines changed

functions-python/backfill_dataset_service_date_range/src/main.py

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
from shared.helpers.logger import init_logger
66

7-
from shared.database.database import with_db_session, refresh_materialized_view
7+
from shared.database.database import with_db_session
88

99
from sqlalchemy.orm import joinedload, Session
1010
from sqlalchemy import or_, func
@@ -17,13 +17,14 @@
1717
from shared.database_gen.sqlacodegen_models import (
1818
Gtfsdataset,
1919
Validationreport,
20-
t_feedsearch,
2120
)
2221

23-
import requests
22+
import requests as http_requests
2423
import json
2524

2625
from google.cloud import storage
26+
from google.auth.transport import requests
27+
from google.oauth2 import id_token
2728

2829
env = os.getenv("ENV", "dev").lower()
2930
bucket_name = f"mobilitydata-datasets-{env}"
@@ -146,7 +147,27 @@ def backfill_datasets(session: "Session"):
146147
try:
147148
changes_count = 0
148149
session.commit()
149-
refresh_materialized_view(session, t_feedsearch.name)
150+
# Replace direct call to refresh_materialized_view with HTTP request to the refresh function
151+
refresh_url = os.getenv("FUNCTION_URL_REFRESH_MV")
152+
if not refresh_url:
153+
raise ValueError(
154+
"FUNCTION_URL_REFRESH_MV environment variable is not set"
155+
)
156+
157+
# Create an authorized request
158+
auth_req = requests.Request()
159+
160+
# Get an identity token for the target URL
161+
token = id_token.fetch_id_token(auth_req, refresh_url)
162+
163+
# Make the HTTP request with the ID token
164+
headers = {"Authorization": f"Bearer {token}"}
165+
response = http_requests.get(refresh_url, headers=headers)
166+
167+
response.raise_for_status()
168+
logging.info(
169+
"Materialized view refresh event triggered successfully."
170+
)
150171
logging.info(f"{changes_count} elements committed.")
151172
except Exception as e:
152173
logging.error("Error committing changes:", e)

functions-python/batch_process_dataset/src/main.py

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
import json
1919
import os
2020
import random
21-
import requests
21+
import requests as http_requests
2222
import uuid
2323
import zipfile
2424
from dataclasses import dataclass
@@ -30,7 +30,7 @@
3030
from google.cloud import storage
3131
from sqlalchemy import func
3232

33-
from shared.database_gen.sqlacodegen_models import Gtfsdataset, t_feedsearch
33+
from shared.database_gen.sqlacodegen_models import Gtfsdatasets
3434
from shared.dataset_service.main import DatasetTraceService, DatasetTrace, Status
3535
from shared.database.database import with_db_session
3636
import logging
@@ -39,6 +39,10 @@
3939
from shared.helpers.utils import download_and_get_hash
4040
from sqlalchemy.orm import Session
4141

42+
from google.auth.transport import requests
43+
from google.oauth2 import id_token
44+
45+
4246
init_logger()
4347

4448

@@ -250,19 +254,25 @@ def create_dataset(self, dataset_file: DatasetFile, db_session: Session):
250254
db_session.commit()
251255
self.logger.info(f"[{self.feed_stable_id}] Dataset created successfully.")
252256

253-
# Replace direct call to refresh_materialized_view with HTTP request to GCP function
254-
function_url = os.getenv("FUNCTION_URL_REFRESH_MV")
255-
if not function_url:
257+
# Replace direct call to refresh_materialized_view with HTTP request to the refresh function
258+
refresh_url = os.getenv("FUNCTION_URL_REFRESH_MV")
259+
if not refresh_url:
256260
raise ValueError(
257261
"FUNCTION_URL_REFRESH_MV environment variable is not set"
258262
)
259263

260-
response = requests.get(f"{function_url}/refresh-materialized-view")
264+
# Create an authorized request
265+
auth_req = requests.Request()
266+
267+
# Get an identity token for the target URL
268+
token = id_token.fetch_id_token(auth_req, refresh_url)
269+
270+
# Make the HTTP request with the ID token
271+
headers = {"Authorization": f"Bearer {token}"}
272+
response = http_requests.get(refresh_url, headers=headers)
273+
261274
response.raise_for_status()
262-
self.logger.info(
263-
f"[{self.feed_stable_id}] Materialized view refresh event triggered "
264-
"successfully."
265-
)
275+
self.logger.info("Materialized view refresh event triggered successfully.")
266276
except Exception as e:
267277
raise Exception(f"Error creating dataset: {e}")
268278

functions-python/helpers/feed_status.py

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,17 @@
11
import logging
22
from datetime import datetime, timezone
33
from sqlalchemy import text
4-
from shared.database_gen.sqlacodegen_models import Gtfsdataset, Feed, t_feedsearch
5-
from shared.database.database import refresh_materialized_view
4+
import requests as http_requests
5+
import os
6+
from shared.database_gen.sqlacodegen_models import Gtfsdataset, Feed
67
from typing import TYPE_CHECKING
78

89
if TYPE_CHECKING:
910
from sqlalchemy.orm import Session
1011

12+
from google.auth.transport import requests
13+
from google.oauth2 import id_token
14+
1115

1216
# query to update the status of the feeds based on the service date range of the latest dataset
1317
def update_feed_statuses_query(session: "Session", stable_feed_ids: list[str]):
@@ -75,7 +79,23 @@ def get_filters(status: str):
7579

7680
try:
7781
session.commit()
78-
refresh_materialized_view(session, t_feedsearch.name)
82+
# Replace direct call to refresh_materialized_view with HTTP request to the refresh function
83+
refresh_url = os.getenv("FUNCTION_URL_REFRESH_MV")
84+
if not refresh_url:
85+
raise ValueError("FUNCTION_URL_REFRESH_MV environment variable is not set")
86+
87+
# Create an authorized request
88+
auth_req = requests.Request()
89+
90+
# Get an identity token for the target URL
91+
token = id_token.fetch_id_token(auth_req, refresh_url)
92+
93+
# Make the HTTP request with the ID token
94+
headers = {"Authorization": f"Bearer {token}"}
95+
response = http_requests.get(refresh_url, headers=headers)
96+
97+
response.raise_for_status()
98+
logging.info("Materialized view refresh event triggered successfully.")
7999
logging.info("Feed Database changes for status committed.")
80100
logging.info("Status Changes: %s", diff_counts)
81101
session.close()

functions-python/reverse_geolocation/src/reverse_geolocation_processor.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@
3232
Osmlocationgroup,
3333
Feedosmlocationgroup,
3434
Location,
35-
t_feedsearch,
3635
Gtfsdataset,
3736
Gtfsfeed,
3837
)
@@ -380,7 +379,7 @@ def extract_location_aggregates(
380379
# Commit the changes to the database before refreshing the materialized view
381380
db_session.commit()
382381

383-
# Replace direct call to refresh_materialized_view with HTTP request to GCP function
382+
# Replace direct call to refresh_materialized_view with HTTP request to the refresh function
384383
refresh_url = os.getenv("FUNCTION_URL_REFRESH_MV")
385384
if not refresh_url:
386385
raise ValueError("FUNCTION_URL_REFRESH_MV environment variable is not set")

0 commit comments

Comments
 (0)