Skip to content

Commit 1e7f3a9

Browse files
authored
fix: 1308 refresh view cloud task not created when a new dataset is downloaded (#1316)
1 parent 125e8b8 commit 1e7f3a9

File tree

4 files changed

+67
-24
lines changed

4 files changed

+67
-24
lines changed

api/src/shared/common/gcp_utils.py

Lines changed: 22 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -34,41 +34,35 @@ def create_refresh_materialized_view_task():
3434
proto_time.FromDatetime(bucket_time)
3535

3636
# Cloud Tasks setup
37-
client = tasks_v2.CloudTasksClient()
3837
project = os.getenv("PROJECT_ID")
39-
location = os.getenv("LOCATION")
4038
queue = os.getenv("MATERIALIZED_VIEW_QUEUE")
41-
url = (
42-
f"https://{os.getenv('GCP_REGION')}-"
43-
f"{os.getenv('PROJECT_ID')}.cloudfunctions.net/"
44-
f"tasks-executor-{os.getenv('ENVIRONMENT_NAME')}"
45-
)
46-
47-
task_name = client.task_path(project, location, queue, task_name)
39+
logging.debug("Queue name from env: %s", queue)
40+
gcp_region = os.getenv("GCP_REGION")
41+
environment_name = os.getenv("ENVIRONMENT")
42+
url = f"https://{gcp_region}-" f"{project}.cloudfunctions.net/" f"tasks-executor-{environment_name}"
4843

4944
# Enqueue the task
5045
try:
5146
create_http_task_with_name(
52-
client=client,
47+
client=tasks_v2.CloudTasksClient(),
5348
body=b"",
5449
url=url,
5550
project_id=project,
56-
gcp_region=location,
51+
gcp_region=gcp_region,
5752
queue_name=queue,
5853
task_name=task_name,
5954
task_time=proto_time,
6055
http_method=tasks_v2.HttpMethod.GET,
6156
)
62-
logging.info(f"Scheduled refresh materialized view task for {timestamp_str}")
63-
return {"message": f"Refresh task for {timestamp_str} scheduled."}, 200
57+
logging.info("Scheduled refresh materialized view task for %s", task_name)
58+
return {"message": "Refresh task for %s scheduled." % task_name}, 200
6459
except Exception as e:
6560
if "ALREADY_EXISTS" in str(e):
66-
logging.info(f"Task already exists for {timestamp_str}, skipping.")
61+
logging.info("Task already exists for %s, skipping.", task_name)
6762

6863
except Exception as error:
69-
error_msg = f"Error enqueuing task: {error}"
70-
logging.error(error_msg)
71-
return {"error": error_msg}, 500
64+
logging.error("Error enqueuing task: %s", error)
65+
return {"error": "Error enqueuing task: %s" % error}, 500
7266

7367

7468
def create_http_task_with_name(
@@ -83,11 +77,13 @@ def create_http_task_with_name(
8377
http_method: "tasks_v2.HttpMethod",
8478
):
8579
"""Creates a GCP Cloud Task."""
86-
8780
token = tasks_v2.OidcToken(service_account_email=os.getenv("SERVICE_ACCOUNT_EMAIL"))
8881

82+
parent = client.queue_path(project_id, gcp_region, queue_name)
83+
logging.info("Queue parent path: %s", parent)
84+
8985
task = tasks_v2.Task(
90-
name=task_name,
86+
name=f"{parent}/tasks/{task_name}",
9187
schedule_time=task_time,
9288
http_request=tasks_v2.HttpRequest(
9389
url=url,
@@ -97,4 +93,10 @@ def create_http_task_with_name(
9793
headers={"Content-Type": "application/json"},
9894
),
9995
)
100-
client.create_task(parent=client.queue_path(project_id, gcp_region, queue_name), task=task)
96+
logging.info("Task created with task_name: %s", task_name)
97+
try:
98+
response = client.create_task(parent=parent, task=task)
99+
except Exception as e:
100+
logging.error("Error creating task: %s", e)
101+
logging.error("response: %s", response)
102+
logging.info("Successfully created task in create_http_task_with_name")

functions-python/helpers/utils.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,7 @@ def create_http_task(
182182
project_id=project_id,
183183
gcp_region=gcp_region,
184184
queue_name=queue_name,
185-
task_name="task_name",
185+
task_name=None, # No specific task name provided
186186
task_time=proto_time,
187187
http_method=tasks_v2.HttpMethod.POST,
188188
)

infra/batch/main.tf

Lines changed: 42 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@ locals {
4141
public_hosted_datasets_url = lower(var.environment) == "prod" ? "https://${var.public_hosted_datasets_dns}" : "https://${var.environment}-${var.public_hosted_datasets_dns}"
4242
# 1day=86400, 7days=604800, 31days=2678400
4343
retention_duration_seconds = lower(var.environment) == "prod" ? 2678400 : 604800
44+
45+
deployment_timestamp = formatdate("YYYYMMDDhhmmss", timestamp())
4446
}
4547

4648
data "google_vpc_access_connector" "vpc_connector" {
@@ -229,6 +231,20 @@ resource "google_project_iam_member" "datastore_owner" {
229231
member = "serviceAccount:${google_service_account.functions_service_account.email}"
230232
}
231233

234+
# Grant the batch functions service account permission to enqueue Cloud Tasks
235+
resource "google_project_iam_member" "queue_enqueuer" {
236+
project = var.project_id
237+
role = "roles/cloudtasks.enqueuer"
238+
member = "serviceAccount:${google_service_account.functions_service_account.email}"
239+
}
240+
241+
# This permission is added to allow the function to act as the service account and generate tokens.
242+
resource "google_project_iam_member" "service_account_workflow_act_as_binding" {
243+
project = var.project_id
244+
role = "roles/iam.serviceAccountUser" #iam.serviceAccounts.actAs
245+
member = "serviceAccount:${google_service_account.functions_service_account.email}"
246+
}
247+
232248
resource "google_pubsub_topic" "pubsub_topic" {
233249
name = "datasets-batch-topic-${var.environment}"
234250
}
@@ -264,6 +280,11 @@ resource "google_cloudfunctions2_function" "pubsub_function" {
264280
DB_REUSE_SESSION = "True"
265281
ENVIRONMENT = var.environment
266282
PUBLIC_HOSTED_DATASETS_URL = local.public_hosted_datasets_url
283+
PROJECT_ID = var.project_id
284+
GCP_REGION = var.gcp_region
285+
SERVICE_ACCOUNT_EMAIL = google_service_account.functions_service_account.email
286+
MATERIALIZED_VIEW_QUEUE = google_cloud_tasks_queue.refresh_materialized_view_task_queue.name
287+
267288
}
268289
dynamic "secret_environment_variables" {
269290
for_each = local.function_batch_process_dataset_config.secret_environment_variables
@@ -288,6 +309,26 @@ resource "google_cloudfunctions2_function" "pubsub_function" {
288309
}
289310
}
290311

312+
# Task queue to invoke refresh_materialized_view
313+
resource "google_cloud_tasks_queue" "refresh_materialized_view_task_queue" {
314+
project = var.project_id
315+
location = var.gcp_region
316+
name = "refresh-materialized-view-task-queue-${var.environment}-${local.deployment_timestamp}"
317+
318+
rate_limits {
319+
max_concurrent_dispatches = 1
320+
max_dispatches_per_second = 0.5
321+
}
322+
323+
retry_config {
324+
# ~22 minutes total: 120 + 240 + 480 + 480 = 1320s (initial attempt + 4 retries)
325+
max_attempts = 5
326+
min_backoff = "120s"
327+
max_backoff = "480s"
328+
max_doublings = 2
329+
}
330+
}
331+
291332
resource "google_cloudfunctions2_function_iam_member" "pubsub_function_invoker" {
292333
cloud_function = google_cloudfunctions2_function.pubsub_function.name
293334
project = var.project_id
@@ -374,4 +415,4 @@ resource "google_compute_global_forwarding_rule" "files_http_lb_rule_ipv4" {
374415
port_range = "443"
375416
ip_address = data.google_compute_global_address.files_http_lb_ipv4.address
376417
load_balancing_scheme = "EXTERNAL_MANAGED"
377-
}
418+
}

infra/functions-python/main.tf

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1422,10 +1422,10 @@ resource "google_cloud_tasks_queue" "refresh_materialized_view_task_queue" {
14221422
}
14231423

14241424
retry_config {
1425-
# This will make the cloud task retry for ~30 minutes
1425+
# ~22 minutes total: 120 + 240 + 480 + 480 = 1320s (initial attempt + 4 retries)
14261426
max_attempts = 5
14271427
min_backoff = "120s"
1428-
max_backoff = "120s"
1428+
max_backoff = "480s"
14291429
max_doublings = 2
14301430
}
14311431
}

0 commit comments

Comments
 (0)