|
| 1 | +import os |
| 2 | +import logging |
| 3 | +import json |
| 4 | +from time import sleep |
| 5 | + |
| 6 | +from google.cloud import workflows_v1 |
| 7 | +from google.cloud.workflows import executions_v1 |
| 8 | +from google.cloud.workflows.executions_v1 import Execution |
| 9 | + |
| 10 | +env = os.getenv("ENV", "dev").lower() |
| 11 | +bucket_name = f"mobilitydata-datasets-{env}" |
| 12 | + |
| 13 | + |
| 14 | +def execute_workflow( |
| 15 | + project: str, |
| 16 | + location: str = "northamerica-northeast1", |
| 17 | + workflow: str = "gtfs_validator_execution", |
| 18 | + input_data: dict = None, |
| 19 | +) -> Execution: |
| 20 | + """ |
| 21 | + Executes a workflow with input data and print the execution results. |
| 22 | + @param project: The Google Cloud project id which contains the workflow to execute. |
| 23 | + @param location: The location for the workflow. |
| 24 | + @param workflow: The ID of the workflow to execute. |
| 25 | + @param input_data: A dictionary containing input data for the workflow. |
| 26 | + @return: The execution response. |
| 27 | + """ |
| 28 | + execution_client = executions_v1.ExecutionsClient() |
| 29 | + workflows_client = workflows_v1.WorkflowsClient() |
| 30 | + parent = workflows_client.workflow_path(project, location, workflow) |
| 31 | + |
| 32 | + # Prepare the execution input as a JSON string. |
| 33 | + input_json = json.dumps(input_data) if input_data else "{}" |
| 34 | + |
| 35 | + # Create and configure the execution request with input data. |
| 36 | + execution_request = Execution(argument=input_json) |
| 37 | + response = execution_client.create_execution( |
| 38 | + parent=parent, execution=execution_request |
| 39 | + ) |
| 40 | + logging.info(f"Created execution: {response.name}") |
| 41 | + execution = execution_client.get_execution(request={"name": response.name}) |
| 42 | + return execution |
| 43 | + |
| 44 | + |
| 45 | +def execute_workflows( |
| 46 | + latest_datasets, |
| 47 | + validator_endpoint=None, |
| 48 | + bypass_db_update=False, |
| 49 | + reports_bucket_name=None, |
| 50 | +): |
| 51 | + """ |
| 52 | + Execute the workflow for the latest datasets that need their validation report to be updated |
| 53 | + :param latest_datasets: List of tuples containing the feed stable id and dataset stable id |
| 54 | + :param validator_endpoint: The URL of the validator |
| 55 | + :param bypass_db_update: Whether to bypass the database update |
| 56 | + :param reports_bucket_name: The name of the bucket where the reports are stored |
| 57 | + :return: List of dataset stable ids for which the workflow was executed |
| 58 | + """ |
| 59 | + project_id = f"mobility-feeds-{env}" |
| 60 | + location = os.getenv("LOCATION", "northamerica-northeast1") |
| 61 | + execution_triggered_datasets = [] |
| 62 | + batch_size = int(os.getenv("BATCH_SIZE", 5)) |
| 63 | + sleep_time = int(os.getenv("SLEEP_TIME", 5)) |
| 64 | + count = 0 |
| 65 | + logging.info(f"Executing workflow for {len(latest_datasets)} datasets") |
| 66 | + for feed_id, dataset_id in latest_datasets: |
| 67 | + try: |
| 68 | + input_data = { |
| 69 | + "data": { |
| 70 | + "bypass_db_update": bypass_db_update, |
| 71 | + "protoPayload": { |
| 72 | + "resourceName": "projects/_/" |
| 73 | + f"buckets/{bucket_name}/" |
| 74 | + f"objects/{feed_id}/{dataset_id}/{dataset_id}.zip" |
| 75 | + }, |
| 76 | + "resource": { |
| 77 | + "labels": {"location": location, "project_id": project_id}, |
| 78 | + }, |
| 79 | + } |
| 80 | + } |
| 81 | + if validator_endpoint: |
| 82 | + input_data["data"]["validator_endpoint"] = validator_endpoint |
| 83 | + if reports_bucket_name: |
| 84 | + input_data["data"]["reports_bucket_name"] = reports_bucket_name |
| 85 | + logging.info(f"Executing workflow for {feed_id}/{dataset_id}") |
| 86 | + execute_workflow(project_id, input_data=input_data) |
| 87 | + execution_triggered_datasets.append(dataset_id) |
| 88 | + except Exception as e: |
| 89 | + logging.error( |
| 90 | + f"Error while executing workflow for {feed_id}/{dataset_id}: {e}" |
| 91 | + ) |
| 92 | + count += 1 |
| 93 | + logging.info(f"Triggered workflow execution for {count} datasets") |
| 94 | + if count % batch_size == 0: |
| 95 | + logging.info( |
| 96 | + f"Sleeping for {sleep_time} seconds before next batch to avoid rate limiting.." |
| 97 | + ) |
| 98 | + sleep(sleep_time) |
| 99 | + return execution_triggered_datasets |
0 commit comments