Skip to content

Commit d6007d6

Browse files
committed
removed refresh function and used tasks executor
1 parent b175dbc commit d6007d6

File tree

12 files changed

+154
-320
lines changed

12 files changed

+154
-320
lines changed

api/src/shared/database/database.py

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,13 @@
2020
import logging
2121

2222
from shared.common.logging_utils import get_env_logging_level
23+
from google.protobuf import timestamp_pb2
24+
from google.auth.transport.requests import Request
25+
from google.auth import id_token
26+
from google.cloud import tasks_v2
27+
from datetime import datetime, timedelta
28+
29+
from shared.helpers.pub_sub import publish_messages
2330

2431

2532
def generate_unique_id() -> str:
@@ -87,6 +94,81 @@ def mapper_configure_listener(mapper, class_):
8794
event.listen(mapper, "mapper_configured", mapper_configure_listener)
8895

8996

97+
def create_refresh_materialized_view_task():
98+
"""
99+
Asynchronously refresh a materialized view.
100+
Ensures deduplication by generating a unique task name.
101+
102+
Returns:
103+
dict: Response message and status code.
104+
"""
105+
try:
106+
logging.info("Creating materialized view refresh task.")
107+
now = datetime.now()
108+
109+
# BOUNCE WINDOW: next :00 or :30
110+
minute = now.minute
111+
if minute < 30:
112+
bucket_time = now.replace(minute=30, second=0, microsecond=0)
113+
else:
114+
bucket_time = now.replace(minute=0, second=0, microsecond=0) + timedelta(hours=1)
115+
116+
timestamp_str = bucket_time.strftime("%Y-%m-%d-%H-%M")
117+
task_name = f"refresh-materialized-view-{timestamp_str}"
118+
119+
# Convert to protobuf timestamp
120+
proto_time = timestamp_pb2.Timestamp()
121+
proto_time.FromDatetime(bucket_time)
122+
123+
# Cloud Tasks setup
124+
client = tasks_v2.CloudTasksClient()
125+
project = os.getenv("PROJECT_ID")
126+
location = os.getenv("LOCATION")
127+
queue = os.getenv("QUEUE_NAME")
128+
url = f"https://{os.getenv('GCP_REGION')}-{os.getenv('PROJECT_ID')}.cloudfunctions.net/tasks-executor-{os.getenv('ENVIRONMENT_NAME')}"
129+
130+
parent = client.queue_path(project, location, queue)
131+
task_name = client.task_path(project, location, queue, task_name)
132+
133+
# Convert to protobuf timestamp
134+
proto_time = timestamp_pb2.Timestamp()
135+
proto_time.FromDatetime(bucket_time)
136+
137+
# Fetch an identity token for the target URL
138+
auth_req = Request()
139+
token = id_token.fetch_id_token(auth_req, url)
140+
141+
task = {
142+
"name": task_name,
143+
"http_request": {
144+
"http_method": tasks_v2.HttpMethod.GET,
145+
"url": url,
146+
"headers": {
147+
"Content-Type": "application/json",
148+
"Authorization": f"Bearer {token}",
149+
},
150+
},
151+
"schedule_time": proto_time,
152+
}
153+
154+
# Enqueue the task
155+
try:
156+
client.create_task(request={"parent": parent, "task": task})
157+
logging.info(f"Scheduled refresh materialized view task for {timestamp_str}")
158+
return {"message": f"Refresh task for {timestamp_str} scheduled."}, 200
159+
except Exception as e:
160+
if "ALREADY_EXISTS" in str(e):
161+
logging.info(f"Task already exists for {timestamp_str}, skipping.")
162+
return {"message": f"Task already exists for {timestamp_str}, skipping."}, 200
163+
else:
164+
raise
165+
166+
except Exception as error:
167+
error_msg = f"Error enqueuing task: {error}"
168+
logging.error(error_msg)
169+
return {"error": error_msg}, 500
170+
171+
90172
def refresh_materialized_view(session: "Session", view_name: str) -> bool:
91173
"""
92174
Refresh Materialized view by name.

functions-python/backfill_dataset_service_date_range/src/main.py

Lines changed: 5 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,10 @@
44

55
from shared.helpers.logger import init_logger
66

7-
from shared.database.database import with_db_session
7+
from shared.database.database import (
8+
with_db_session,
9+
create_refresh_materialized_view_task,
10+
)
811

912
from sqlalchemy.orm import joinedload, Session
1013
from sqlalchemy import or_, func
@@ -147,27 +150,7 @@ def backfill_datasets(session: "Session"):
147150
try:
148151
changes_count = 0
149152
session.commit()
150-
# Replace direct call to refresh_materialized_view with HTTP request to the refresh function
151-
refresh_url = os.getenv("FUNCTION_URL_REFRESH_MV")
152-
if not refresh_url:
153-
raise ValueError(
154-
"FUNCTION_URL_REFRESH_MV environment variable is not set"
155-
)
156-
157-
# Create an authorized request
158-
auth_req = requests.Request()
159-
160-
# Get an identity token for the target URL
161-
token = id_token.fetch_id_token(auth_req, refresh_url)
162-
163-
# Make the HTTP request with the ID token
164-
headers = {"Authorization": f"Bearer {token}"}
165-
response = http_requests.get(refresh_url, headers=headers)
166-
167-
response.raise_for_status()
168-
logging.info(
169-
"Materialized view refresh event triggered successfully."
170-
)
153+
create_refresh_materialized_view_task()
171154
logging.info(f"{changes_count} elements committed.")
172155
except Exception as e:
173156
logging.error("Error committing changes:", e)

functions-python/refresh_materialized_view/.coveragerc

Whitespace-only changes.

functions-python/refresh_materialized_view/README.md

Lines changed: 0 additions & 76 deletions
This file was deleted.

functions-python/refresh_materialized_view/function_config.json

Lines changed: 0 additions & 20 deletions
This file was deleted.

functions-python/refresh_materialized_view/requirements.txt

Lines changed: 0 additions & 26 deletions
This file was deleted.

functions-python/refresh_materialized_view/requirements_dev.txt

Lines changed: 0 additions & 9 deletions
This file was deleted.

0 commit comments

Comments
 (0)