2020)
2121from sqlalchemy .orm import sessionmaker
2222import logging
23- from shared .helpers .utils import create_http_task
2423
2524from shared .common .logging_utils import get_env_logging_level
2625
@@ -125,6 +124,8 @@ def create_refresh_materialized_view_task():
125124 Returns:
126125 dict: Response message and status code.
127126 """
127+ from google .auth .transport .requests import Request
128+ from google .oauth2 import id_token
128129 from google .cloud import tasks_v2
129130 from google .protobuf import timestamp_pb2
130131 from datetime import datetime , timedelta
@@ -147,22 +148,44 @@ def create_refresh_materialized_view_task():
147148 proto_time = timestamp_pb2 .Timestamp ()
148149 proto_time .FromDatetime (bucket_time )
149150
151+ # Cloud Tasks setup
152+ client = tasks_v2 .CloudTasksClient ()
153+ project = os .getenv ("PROJECT_ID" )
154+ location = os .getenv ("LOCATION" )
155+ queue = os .getenv ("MATERIALIZED_VIEW_QUEUE" )
156+ url = (
157+ f"https://{ os .getenv ('GCP_REGION' )} -"
158+ f"{ os .getenv ('PROJECT_ID' )} .cloudfunctions.net/"
159+ f"tasks-executor-{ os .getenv ('ENVIRONMENT_NAME' )} "
160+ )
161+
162+ parent = client .queue_path (project , location , queue )
163+ task_name = client .task_path (project , location , queue , task_name )
164+
165+ # Convert to protobuf timestamp
166+ proto_time = timestamp_pb2 .Timestamp ()
167+ proto_time .FromDatetime (bucket_time )
168+
169+ # Fetch an identity token for the target URL
170+ auth_req = Request ()
171+ token = id_token .fetch_id_token (auth_req , url )
172+
173+ task = {
174+ "name" : task_name ,
175+ "http_request" : {
176+ "http_method" : tasks_v2 .HttpMethod .GET ,
177+ "url" : url ,
178+ "headers" : {
179+ "Content-Type" : "application/json" ,
180+ "Authorization" : f"Bearer { token } " ,
181+ },
182+ },
183+ "schedule_time" : proto_time ,
184+ }
185+
150186 # Enqueue the task
151187 try :
152- create_http_task (
153- client = tasks_v2 .CloudTasksClient (),
154- task_name = task_name ,
155- url = (
156- f"https://{ os .getenv ('GCP_REGION' )} -"
157- f"{ os .getenv ('PROJECT_ID' )} .cloudfunctions.net/"
158- f"tasks-executor-{ os .getenv ('ENVIRONMENT_NAME' )} "
159- ),
160- project_id = os .getenv ("PROJECT_ID" ),
161- gcp_region = os .getenv ("GCP_REGION" ),
162- queue_name = os .getenv ("MATERIALIZED_VIEW_QUEUE" ),
163- schedule_time = proto_time ,
164- body = {"dry_run" : False },
165- )
188+ client .create_task (request = {"parent" : parent , "task" : task })
166189 logging .info (f"Scheduled refresh materialized view task for { timestamp_str } " )
167190 return {"message" : f"Refresh task for { timestamp_str } scheduled." }, 200
168191 except Exception as e :
0 commit comments