2020)
2121from sqlalchemy .orm import sessionmaker
2222import logging
23+ from google .cloud import tasks_v2
2324
2425from shared .common .logging_utils import get_env_logging_level
2526
@@ -124,9 +125,6 @@ def create_refresh_materialized_view_task():
124125 Returns:
125126 dict: Response message and status code.
126127 """
127- from google .auth .transport .requests import Request
128- from google .oauth2 import id_token
129- from google .cloud import tasks_v2
130128 from google .protobuf import timestamp_pb2
131129 from datetime import datetime , timedelta
132130
@@ -159,33 +157,25 @@ def create_refresh_materialized_view_task():
159157 f"tasks-executor-{ os .getenv ('ENVIRONMENT_NAME' )} "
160158 )
161159
162- parent = client .queue_path (project , location , queue )
163160 task_name = client .task_path (project , location , queue , task_name )
164161
165162 # Convert to protobuf timestamp
166163 proto_time = timestamp_pb2 .Timestamp ()
167164 proto_time .FromDatetime (bucket_time )
168165
169- # Fetch an identity token for the target URL
170- auth_req = Request ()
171- token = id_token .fetch_id_token (auth_req , url )
172-
173- task = {
174- "name" : task_name ,
175- "http_request" : {
176- "http_method" : tasks_v2 .HttpMethod .GET ,
177- "url" : url ,
178- "headers" : {
179- "Content-Type" : "application/json" ,
180- "Authorization" : f"Bearer { token } " ,
181- },
182- },
183- "schedule_time" : proto_time ,
184- }
185-
186166 # Enqueue the task
187167 try :
188- client .create_task (request = {"parent" : parent , "task" : task })
168+ create_http_task_with_name (
169+ client = client ,
170+ body = b"" ,
171+ url = url ,
172+ project_id = project ,
173+ gcp_region = location ,
174+ queue_name = queue ,
175+ task_name = task_name ,
176+ task_time = proto_time ,
177+ http_method = tasks_v2 .HttpMethod .GET ,
178+ )
189179 logging .info (f"Scheduled refresh materialized view task for { timestamp_str } " )
190180 return {"message" : f"Refresh task for { timestamp_str } scheduled." }, 200
191181 except Exception as e :
@@ -211,6 +201,45 @@ def refresh_materialized_view(session: "Session", view_name: str) -> bool:
211201 return False
212202
213203
204+ def create_http_task_with_name (
205+ client , # type: tasks_v2.CloudTasksClient
206+ body : bytes ,
207+ url : str ,
208+ project_id : str ,
209+ gcp_region : str ,
210+ queue_name : str ,
211+ task_name : str ,
212+ task_time : str ,
213+ http_method : tasks_v2 .HttpMethod ,
214+ ):
215+ """Creates a GCP Cloud Task."""
216+
217+ token = (tasks_v2 .OidcToken (service_account_email = os .getenv ("SERVICE_ACCOUNT_EMAIL" )),)
218+
219+ task = tasks_v2 .Task (
220+ http_request = tasks_v2 .HttpRequest (
221+ url = url ,
222+ http_method = http_method ,
223+ oidc_token = token ,
224+ body = body ,
225+ headers = {"Content-Type" : "application/json" },
226+ )
227+ )
228+ task = {
229+ "name" : task_name ,
230+ "http_request" : {
231+ "http_method" : http_method ,
232+ "url" : url ,
233+ "headers" : {
234+ "Content-Type" : "application/json" ,
235+ "Authorization" : f"Bearer { token } " ,
236+ },
237+ },
238+ "schedule_time" : task_time ,
239+ }
240+ client .create_task (parent = client .queue_path (project_id , gcp_region , queue_name ), task = task )
241+
242+
214243def with_db_session (func = None , db_url : str | None = None ):
215244 """
216245 Decorator to handle the session management for the decorated function.
0 commit comments