Skip to content

Commit d9e5938

Browse files
committed
cloud task for refresh materialized view
1 parent 1366ec3 commit d9e5938

File tree

2 files changed

+104
-8
lines changed
  • functions-python/refresh_materialized_view/src
  • infra/functions-python

2 files changed

+104
-8
lines changed
Lines changed: 85 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,33 +1,90 @@
11
import logging
22
import functions_framework
3-
from flask import Request
3+
from flask import Request, jsonify
4+
from google.cloud import tasks_v2
5+
from datetime import datetime
46
from shared.helpers.logger import init_logger
5-
from shared.database.database import with_db_session, refresh_materialized_view
7+
from shared.database.database import with_db_session
68

79
init_logger()
810

911

12+
@functions_framework.http
13+
def refresh_materialized_view_function(request: Request):
14+
"""
15+
Enqueues a Cloud Task to refresh a materialized view asynchronously.
16+
17+
Returns:
18+
tuple: (response_message, status_code)
19+
"""
20+
try:
21+
logging.info("Starting materialized view refresh function.")
22+
23+
# Generate deduplication key based on current timestamp
24+
timestamp = datetime.now().strftime("%Y-%m-%d-%H-%M")
25+
task_name = f"refresh-feed-search-{timestamp}"
26+
27+
# Cloud Tasks client setup
28+
client = tasks_v2.CloudTasksClient()
29+
project_id = os.getenv("PROJECT_ID")
30+
queue = os.getenv("QUEUE_NAME")
31+
location = os.getenv("LOCATION")
32+
url = os.getenv("FUNCTION_URL")
33+
34+
parent = client.queue_path(project_id, location, queue)
35+
36+
# Task payload
37+
payload = {"view_name": "feedsearch"}
38+
39+
task = {
40+
"name": client.task_path(project_id, location, queue, task_name),
41+
"http_request": {
42+
"http_method": tasks_v2.HttpMethod.POST,
43+
"url": url,
44+
"headers": {"Content-Type": "application/json"},
45+
"body": jsonify(payload).data,
46+
},
47+
}
48+
49+
# Enqueue the task
50+
response = client.create_task(request={"parent": parent, "task": task})
51+
logging.info(f"Task {response.name} enqueued successfully.")
52+
53+
return {"message": f"Task {response.name} enqueued successfully."}, 200
54+
55+
except Exception as error:
56+
error_msg = f"Error enqueuing task: {error}"
57+
logging.error(error_msg)
58+
return {"error": error_msg}, 500
59+
60+
1061
@with_db_session
1162
@functions_framework.http
12-
def refresh_materialized_view_function(request: Request, db_session):
63+
def refresh_materialized_view_task(request: Request, db_session):
1364
"""
1465
Refreshes a materialized view using the CONCURRENTLY command to avoid
15-
table locks.
66+
table locks. This function is triggered by a Cloud Task.
1667
1768
Returns:
1869
tuple: (response_message, status_code)
1970
"""
2071
try:
21-
logging.info("Starting materialized view refresh function.")
72+
logging.info("Starting materialized view refresh task.")
2273

23-
view_name = "my_materialized_view"
24-
logging.info(f"Refreshing materialized view: {view_name}")
74+
data = request.get_json()
75+
view_name = data.get("view_name")
76+
deduplication_key = data.get("deduplication_key")
77+
78+
logging.info(
79+
"Refreshing materialized view: "
80+
f"{view_name} with key: {deduplication_key}"
81+
)
2582

2683
# Call the refresh function
2784
success = refresh_materialized_view(db_session, view_name)
2885

2986
if success:
30-
success_msg = f"Successfully refreshed materialized view: {view_name}"
87+
success_msg = "Successfully refreshed materialized view: " f"{view_name}"
3188
logging.info(success_msg)
3289
return {"message": success_msg}, 200
3390
else:
@@ -39,3 +96,23 @@ def refresh_materialized_view_function(request: Request, db_session):
3996
error_msg = f"Error refreshing materialized view: {error}"
4097
logging.error(error_msg)
4198
return {"error": error_msg}, 500
99+
100+
101+
def refresh_materialized_view(db_session, view_name):
102+
"""
103+
Refreshes the materialized view in the database.
104+
105+
Args:
106+
db_session: Database session object.
107+
view_name (str): Name of the materialized view to refresh.
108+
109+
Returns:
110+
bool: True if refresh was successful, False otherwise.
111+
"""
112+
try:
113+
db_session.execute(f"REFRESH MATERIALIZED VIEW CONCURRENTLY {view_name};")
114+
db_session.commit()
115+
return True
116+
except Exception as error:
117+
logging.error("Error refreshing materialized view " f"{view_name}: {error}")
118+
return False

infra/functions-python/main.tf

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1542,4 +1542,23 @@ resource "google_cloudfunctions2_function" "refresh_materialized_view" {
15421542
}
15431543
}
15441544
}
1545+
}
1546+
1547+
15. functions/refresh_materialized_view
1548+
# Task queue to invoke refresh_materialized_view function
1549+
resource "google_cloud_tasks_queue" "refresh_materialized_view_queue" {
1550+
name = "refresh-materialized-view-queue-${var.environment}"
1551+
location = var.gcp_region
1552+
1553+
rate_limits {
1554+
max_concurrent_dispatches = 10
1555+
max_dispatches_per_second = 5
1556+
}
1557+
1558+
retry_config {
1559+
max_attempts = 5
1560+
min_backoff = "10s"
1561+
max_backoff = "60s"
1562+
max_doublings = 2
1563+
}
15451564
}

0 commit comments

Comments
 (0)