44from datetime import datetime , timedelta
55import functions_framework
66from google .protobuf import timestamp_pb2
7+ from google .auth .transport .requests import Request
8+ from google .oauth2 import id_token
79
810from shared .helpers .logger import init_logger
911from shared .database .database import with_db_session
@@ -50,12 +52,19 @@ def refresh_materialized_view_function(request):
5052 proto_time = timestamp_pb2 .Timestamp ()
5153 proto_time .FromDatetime (bucket_time )
5254
55+ # Fetch an identity token for the target URL
56+ auth_req = Request ()
57+ token = id_token .fetch_id_token (auth_req , url )
58+
5359 task = {
5460 "name" : task_name ,
5561 "http_request" : {
5662 "http_method" : tasks_v2 .HttpMethod .GET ,
5763 "url" : url ,
58- "headers" : {"Content-Type" : "application/json" },
64+ "headers" : {
65+ "Content-Type" : "application/json" ,
66+ "Authorization" : f"Bearer { token } " ,
67+ },
5968 },
6069 "schedule_time" : proto_time ,
6170 }
@@ -66,9 +75,7 @@ def refresh_materialized_view_function(request):
6675 logging .info (
6776 f"Scheduled refresh materialized view task for { timestamp_str } "
6877 )
69- return {
70- "message" : f"Scheduled materialized view task for { timestamp_str } "
71- }, 200
78+ return {"message" : f"Refresh task for { timestamp_str } scheduled." }, 200
7279 except Exception as e :
7380 if "ALREADY_EXISTS" in str (e ):
7481 logging .info (f"Task already exists for { timestamp_str } , skipping." )
0 commit comments