Skip to content

Commit 181c30f

Browse files
authored
fix: mitigate 429 errors in cloud function execution for validation reports (#883)
1 parent cab579d commit 181c30f

File tree

6 files changed

+126
-14
lines changed

6 files changed

+126
-14
lines changed

functions-python/validation_report_processor/function_config.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,8 @@
1313
}
1414
],
1515
"ingress_settings": "ALLOW_INTERNAL_AND_GCLB",
16-
"max_instance_request_concurrency": 1,
17-
"max_instance_count": 5,
16+
"max_instance_request_concurrency": 8,
17+
"max_instance_count": 1,
1818
"min_instance_count": 0,
1919
"available_cpu": 1
2020
}

infra/functions-python/main.tf

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ terraform {
2323
#
2424

2525
locals {
26+
x_number_of_concurrent_instance = 4
27+
deployment_timestamp = formatdate("YYYYMMDDhhmmss", timestamp())
2628
function_tokens_config = jsondecode(file("${path.module}../../../functions-python/tokens/function_config.json"))
2729
function_tokens_zip = "${path.module}/../../functions-python/tokens/.dist/tokens.zip"
2830

@@ -346,6 +348,34 @@ resource "google_cloudfunctions2_function" "extract_location_batch" {
346348
}
347349

348350
# 3. functions/validation_report_processor cloud function
351+
# Create a queue for the cloud tasks
352+
# The 2X rate is defined as 4*2 concurrent dispatches and 1 dispatch per second
353+
# The name of the queue need to be dynamic due to GCP limitations
354+
# references:
355+
# - https://cloud.google.com/tasks/docs/deleting-appengine-queues-and-tasks#deleting_queues
356+
# - https://issuetracker.google.com/issues/263947953
357+
resource "google_cloud_tasks_queue" "cloud_tasks_2x_rate_queue" {
358+
name = "cloud-tasks-2x-rate-queue-${var.environment}-${local.deployment_timestamp}"
359+
location = var.gcp_region
360+
361+
rate_limits {
362+
max_concurrent_dispatches = local.x_number_of_concurrent_instance * 2
363+
max_dispatches_per_second = 1
364+
}
365+
366+
retry_config {
367+
# This will make the cloud task retry for ~two hours
368+
max_attempts = 120
369+
min_backoff = "20s"
370+
max_backoff = "60s"
371+
max_doublings = 2
372+
}
373+
}
374+
375+
output "processing_report_cloud_task_name" {
376+
value = google_cloud_tasks_queue.cloud_tasks_2x_rate_queue.name
377+
}
378+
349379
resource "google_cloudfunctions2_function" "process_validation_report" {
350380
name = local.function_process_validation_report_config.name
351381
description = local.function_process_validation_report_config.description

infra/main.tf

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,7 @@ module "workflows" {
116116
gcp_region = var.gcp_region
117117
environment = var.environment
118118
validator_endpoint = var.validator_endpoint
119+
processing_report_cloud_task_name = module.functions-python.processing_report_cloud_task_name
119120
}
120121

121122
module "feed-api-load-balancer" {

infra/workflows/main.tf

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,12 +52,32 @@ resource "google_project_iam_member" "workflows_invoker" {
5252
member = "serviceAccount:${google_service_account.workflows_service_account.email}"
5353
}
5454

55+
# This permission is added to allow the workflow to act as the service account and generate tokens.
56+
# This is required for the workflow to call a cloud tasks that generates a token to call the a cloud run service or function.
57+
resource "google_project_iam_member" "service_account_workflow_act_as_binding" {
58+
project = var.project_id
59+
role = "roles/iam.serviceAccountUser" #iam.serviceAccounts.actAs
60+
member = "serviceAccount:${google_service_account.workflows_service_account.email}"
61+
}
62+
5563
resource "google_project_iam_member" "cloud_run_invoker" {
5664
project = var.project_id
5765
role = "roles/run.invoker"
5866
member = "serviceAccount:${google_service_account.workflows_service_account.email}"
5967
}
6068

69+
resource "google_project_iam_member" "service_account_workflow_cloudtasks_enqueuer" {
70+
project = var.project_id
71+
role = "roles/cloudtasks.enqueuer"
72+
member = "serviceAccount:${google_service_account.workflows_service_account.email}"
73+
}
74+
75+
resource "google_project_iam_member" "service_account_workflow_cloudtasks_viewer" {
76+
project = var.project_id
77+
role = "roles/cloudtasks.viewer"
78+
member = "serviceAccount:${google_service_account.workflows_service_account.email}"
79+
}
80+
6181
# Workflow to execute the GTFS Validator
6282
resource "google_workflows_workflow" "gtfs_validator_execution" {
6383
name = "gtfs_validator_execution"
@@ -70,6 +90,7 @@ resource "google_workflows_workflow" "gtfs_validator_execution" {
7090
reports_bucket_name = lower(var.environment) == "prod" ? var.reports_bucket_name : "stg-${var.reports_bucket_name}"
7191
validator_endpoint = var.validator_endpoint
7292
environment = lower(var.environment)
93+
processing_report_cloud_task_name = var.processing_report_cloud_task_name
7394
}
7495
source_contents = file("${path.module}../../../workflows/gtfs_validator_execution.yml")
7596
}

infra/workflows/vars.tf

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,3 +51,8 @@ variable "validator_endpoint" {
5151
type = string
5252
description = "URL of the validator endpoint"
5353
}
54+
55+
variable "processing_report_cloud_task_name" {
56+
type = string
57+
description = "The cloud task name to call the process report task"
58+
}

workflows/gtfs_validator_execution.yml

Lines changed: 67 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,11 @@ main:
1919
- datasetID: ${text.split(data.resourceName, "/")[6]}
2020
- region: ${resource.labels.location}
2121
- projectID: ${resource.labels.project_id}
22+
- environment: ${sys.get_env("environment")}
23+
# The cloud tasks are sensitive to deletion.
24+
# This is why we need to have this as a parameter set at deploying time.
25+
- cloudTaskName: ${sys.get_env("processing_report_cloud_task_name")}
26+
- serviceAccountEmail: ${"workflows-service-account@mobility-feeds-" + environment + ".iam.gserviceaccount.com"}
2227
- url: ${"https://storage.googleapis.com/" + datasetsBucket + "/" + feedID + "/" + datasetID + "/" + datasetID + ".zip"}
2328
- headers:
2429
Content-Type: application/json
@@ -221,23 +226,73 @@ main:
221226
switch:
222227
- condition: ${byPassDbUpdate}
223228
next: successfulExecution
224-
- updateDatabase:
225-
call: http.post
226-
args:
227-
url: ${"https://" + region + "-" + projectID + ".cloudfunctions.net/process-validation-report"}
228-
auth:
229-
type: OIDC
230-
body:
229+
- createPayload:
230+
assign:
231+
- payloadTask:
231232
feed_id: ${feedID}
232233
dataset_id: ${datasetID}
233234
validator_version: ${validatorVersion}
234-
headers: ${headers}
235-
result: updateDatabaseResponse
236-
- logUpdateDatabaseResponse:
235+
- payloadTaskBase64: '${base64.encode(json.encode(payloadTask))}'
236+
- enqueueTask:
237+
call: googleapis.cloudtasks.v2beta3.projects.locations.queues.tasks.create
238+
args:
239+
parent: ${"projects/" + projectID + "/locations/" + region + "/queues/" + cloudTaskName}
240+
body:
241+
task:
242+
httpRequest:
243+
url: ${"https://" + region + "-" + projectID + ".cloudfunctions.net/process-validation-report"}
244+
httpMethod: POST
245+
oidcToken:
246+
serviceAccountEmail: ${serviceAccountEmail}
247+
body: ${payloadTaskBase64}
248+
headers:
249+
Content-Type: application/json
250+
result: taskResponse
251+
- logTaskCreation:
237252
call: sys.log
238253
args:
239-
text: ${updateDatabaseResponse}
240-
severity: INFO
254+
text: ${"Created task for feed " + feedID + " Task ID:" + taskResponse.name}
255+
severity: INFO
256+
# The waitForTaskCompletion will wait for HTTP error status 404 from google API.
257+
# The cloud tasks API doesn't return a status field and when the task success it HTTP returns 404(tasks not found).
258+
- waitForTaskCompletion:
259+
try:
260+
steps:
261+
- getTaskStatus:
262+
call: googleapis.cloudtasks.v2beta3.projects.locations.queues.tasks.get
263+
args:
264+
name: ${taskResponse.name}
265+
result: taskStatus
266+
- printTaskStatus:
267+
call: sys.log
268+
args:
269+
text: ${taskStatus}
270+
severity: INFO
271+
next: retryTaskStatus
272+
except:
273+
as: error
274+
steps:
275+
- printExceptTaskStatus:
276+
call: sys.log
277+
args:
278+
text: ${"HTTP error raised while calling get tasks endpoint:" + error.message}
279+
severity: INFO
280+
- handle404:
281+
switch:
282+
- condition: ${error.code == 404}
283+
next: logTaskCompletion
284+
- condition: ${error.code != 404}
285+
raise: ${error}
286+
- retryTaskStatus:
287+
call: sys.sleep
288+
args:
289+
seconds: 30
290+
next: waitForTaskCompletion
291+
- logTaskCompletion:
292+
call: sys.log
293+
args:
294+
text: ${"Completed task for feed " + feedID + " Task ID:" + taskResponse.name}
295+
severity: INFO
241296
next: successfulExecution
242297
- fileExistenceTimeout:
243298
steps:

0 commit comments

Comments
 (0)