Skip to content

Commit c01b6bd

Browse files
committed
applied bouncing mechanism
1 parent 3958575 commit c01b6bd

File tree

3 files changed

+38
-71
lines changed

3 files changed

+38
-71
lines changed

functions-python/refresh_materialized_view/src/main.py

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,9 @@
22
import os
33
import json
44
from google.cloud import tasks_v2
5-
from datetime import datetime
5+
from datetime import datetime, timedelta
66
import functions_framework
7+
from shared.helpers import timezone
78
from shared.helpers.logger import init_logger
89
from shared.database.database import with_db_session
910

@@ -21,9 +22,16 @@ def refresh_materialized_view_function(request):
2122
"""
2223
try:
2324
logging.info("Starting materialized view refresh function.")
25+
now = datetime.now(timezone.utc)
26+
delay_minutes = 5
27+
# Round up to the next 5-minute mark (bounce window)
28+
delay_target = now + timedelta(minutes=delay_minutes)
29+
bucket_time = delay_target.replace(
30+
minute=(delay_target.minute // 5) * 5, second=0, microsecond=0
31+
)
2432

2533
# Generate deduplication key based on current timestamp
26-
timestamp = datetime.now().strftime("%Y-%m-%d-%H-%M")
34+
timestamp = bucket_time.strftime("%Y-%m-%d-%H-%M")
2735
task_name = f"refresh-materialized-view-{timestamp}"
2836

2937
# Cloud Tasks client setup
@@ -46,6 +54,7 @@ def refresh_materialized_view_function(request):
4654
"headers": {"Content-Type": "application/json"},
4755
"body": json.dumps(payload).encode(),
4856
},
57+
"schedule_time": timestamp,
4958
}
5059

5160
# Enqueue the task
@@ -70,7 +79,7 @@ def refresh_materialized_view_task(request, db_session):
7079
dict: Response message and status code.
7180
"""
7281
try:
73-
logging.info("Starting materialized view refresh task.")
82+
logging.info("Materialized view refresh task initiated.")
7483

7584
data = request.json()
7685
view_name = data.get("view_name")
@@ -81,7 +90,6 @@ def refresh_materialized_view_task(request, db_session):
8190
f"{view_name} with key: {deduplication_key}"
8291
)
8392

84-
# Call the refresh function
8593
success = refresh_materialized_view(db_session, view_name)
8694

8795
if success:
Lines changed: 25 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -1,77 +1,35 @@
11
import pytest
2-
from unittest.mock import Mock, patch
3-
import sys
4-
import os
2+
from fastapi.testclient import TestClient
3+
from src.main import app
54

6-
# Add the src directory to the path
7-
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "src"))
5+
client = TestClient(app)
86

9-
from main import refresh_materialized_view_function # noqa: E402
107

8+
@pytest.fixture
9+
def mock_environment_variables(monkeypatch):
10+
monkeypatch.setenv("PROJECT_ID", "test-project")
11+
monkeypatch.setenv("QUEUE_NAME", "test-queue")
12+
monkeypatch.setenv("LOCATION", "us-central1")
13+
monkeypatch.setenv("FUNCTION_URL", "http://localhost")
1114

12-
class TestRefreshMaterializedView:
13-
"""Test class for the refresh materialized view function."""
1415

15-
@patch("main.refresh_materialized_view")
16-
def test_refresh_view_success(self, mock_refresh):
17-
"""Test successful view refresh."""
18-
mock_refresh.return_value = True
16+
@pytest.fixture
17+
def mock_cloud_tasks_client(mocker):
18+
mock_client = mocker.Mock()
19+
mocker.patch("src.main.tasks_v2.CloudTasksClient", return_value=mock_client)
20+
return mock_client
1921

20-
# Mock request object
21-
request = Mock()
22-
request.method = "GET"
2322

24-
# Mock database session
25-
db_session = Mock()
23+
@pytest.mark.parametrize("endpoint", ["/refresh-materialized-view"])
24+
def test_refresh_materialized_view_function(endpoint):
25+
response = client.post(endpoint, json={})
26+
assert response.status_code == 200
27+
assert "Task" in response.json()["message"]
2628

27-
# Call the function
28-
response, status_code = refresh_materialized_view_function(request, db_session)
2929

30-
# Assertions
31-
assert "Successfully refreshed materialized view" in response["message"]
32-
mock_refresh.assert_called_once_with(db_session, "my_materialized_view")
33-
34-
@patch("main.refresh_materialized_view")
35-
def test_refresh_view_failure(self, mock_refresh):
36-
"""Test failure to refresh the view."""
37-
mock_refresh.return_value = False
38-
39-
# Mock request object
40-
request = Mock()
41-
request.method = "GET"
42-
43-
# Mock database session
44-
db_session = Mock()
45-
46-
# Call the function
47-
response, status_code = refresh_materialized_view_function(request, db_session)
48-
49-
# Assertions
50-
assert status_code == 500
51-
assert "Failed to refresh materialized view" in response["error"]
52-
mock_refresh.assert_called_once_with(db_session, "my_materialized_view")
53-
54-
@patch("main.refresh_materialized_view")
55-
def test_refresh_view_exception(self, mock_refresh):
56-
"""Test exception during view refresh."""
57-
mock_refresh.side_effect = Exception("Database connection failed")
58-
59-
# Mock request object
60-
request = Mock()
61-
request.method = "GET"
62-
63-
# Mock database session
64-
db_session = Mock()
65-
66-
# Call the function
67-
response, status_code = refresh_materialized_view_function(request, db_session)
68-
69-
# Assertions
70-
assert status_code == 500
71-
assert "Error refreshing materialized view" in response["error"]
72-
assert "Database connection failed" in response["error"]
73-
mock_refresh.assert_called_once_with(db_session, "my_materialized_view")
74-
75-
76-
if __name__ == "__main__":
77-
pytest.main([__file__])
30+
@pytest.mark.parametrize("endpoint", ["/refresh-materialized-view-task"])
31+
def test_refresh_materialized_view_task(endpoint):
32+
payload = {"view_name": "test_view", "deduplication_key": "test_key"}
33+
response = client.post(endpoint, json=payload)
34+
assert response.status_code == 200
35+
assert "Successfully refreshed materialized view" in response.json()["message"]

infra/functions-python/main.tf

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1518,6 +1518,7 @@ resource "google_cloudfunctions2_function" "refresh_materialized_view" {
15181518
PROJECT_ID = var.project_id
15191519
QUEUE_NAME = google_cloud_tasks_queue.refresh_materialized_view_queue.name
15201520
LOCATION = var.gcp_region
1521+
FUNCTION_URL = "https://${var.gcp_region}-${var.project_id}.cloudfunctions.net/refresh_materialized_view"
15211522
}
15221523
available_memory = "512Mi"
15231524
timeout_seconds = 60

0 commit comments

Comments
 (0)