Skip to content

Commit 1391702

Browse files
committed
added create_http_task_with_deduplication function
1 parent 8ba561d commit 1391702

File tree

2 files changed

+46
-38
lines changed

2 files changed

+46
-38
lines changed

api/src/shared/database/database.py

Lines changed: 15 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
)
2121
from sqlalchemy.orm import sessionmaker
2222
import logging
23+
from shared.helpers.utils import create_http_task
2324

2425
from shared.common.logging_utils import get_env_logging_level
2526

@@ -124,8 +125,6 @@ def create_refresh_materialized_view_task():
124125
Returns:
125126
dict: Response message and status code.
126127
"""
127-
from google.auth.transport.requests import Request
128-
from google.oauth2 import id_token
129128
from google.cloud import tasks_v2
130129
from google.protobuf import timestamp_pb2
131130
from datetime import datetime, timedelta
@@ -148,44 +147,22 @@ def create_refresh_materialized_view_task():
148147
proto_time = timestamp_pb2.Timestamp()
149148
proto_time.FromDatetime(bucket_time)
150149

151-
# Cloud Tasks setup
152-
client = tasks_v2.CloudTasksClient()
153-
project = os.getenv("PROJECT_ID")
154-
location = os.getenv("LOCATION")
155-
queue = os.getenv("MATERIALIZED_VIEW_QUEUE")
156-
url = (
157-
f"https://{os.getenv('GCP_REGION')}-"
158-
f"{os.getenv('PROJECT_ID')}.cloudfunctions.net/"
159-
f"tasks-executor-{os.getenv('ENVIRONMENT_NAME')}"
160-
)
161-
162-
parent = client.queue_path(project, location, queue)
163-
task_name = client.task_path(project, location, queue, task_name)
164-
165-
# Convert to protobuf timestamp
166-
proto_time = timestamp_pb2.Timestamp()
167-
proto_time.FromDatetime(bucket_time)
168-
169-
# Fetch an identity token for the target URL
170-
auth_req = Request()
171-
token = id_token.fetch_id_token(auth_req, url)
172-
173-
task = {
174-
"name": task_name,
175-
"http_request": {
176-
"http_method": tasks_v2.HttpMethod.GET,
177-
"url": url,
178-
"headers": {
179-
"Content-Type": "application/json",
180-
"Authorization": f"Bearer {token}",
181-
},
182-
},
183-
"schedule_time": proto_time,
184-
}
185-
186150
# Enqueue the task
187151
try:
188-
client.create_task(request={"parent": parent, "task": task})
152+
create_http_task(
153+
client=tasks_v2.CloudTasksClient(),
154+
task_name=task_name,
155+
url=(
156+
f"https://{os.getenv('GCP_REGION')}-"
157+
f"{os.getenv('PROJECT_ID')}.cloudfunctions.net/"
158+
f"tasks-executor-{os.getenv('ENVIRONMENT_NAME')}"
159+
),
160+
project_id=os.getenv("PROJECT_ID"),
161+
gcp_region=os.getenv("GCP_REGION"),
162+
queue_name=os.getenv("MATERIALIZED_VIEW_QUEUE"),
163+
schedule_time=proto_time,
164+
body={"dry_run": False},
165+
)
189166
logging.info(f"Scheduled refresh materialized view task for {timestamp_str}")
190167
return {"message": f"Refresh task for {timestamp_str} scheduled."}, 200
191168
except Exception as e:

functions-python/helpers/utils.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -168,3 +168,34 @@ def create_http_task(
168168
client.create_task(
169169
parent=client.queue_path(project_id, gcp_region, queue_name), task=task
170170
)
171+
172+
173+
def create_http_task_with_deduplication(
174+
client,
175+
task_name,
176+
url: str,
177+
project_id: str,
178+
gcp_region: str,
179+
queue_name: str,
180+
schedule_time,
181+
body: bytes,
182+
) -> None:
183+
"""Creates a GCP Cloud Task."""
184+
from google.cloud import tasks_v2
185+
186+
task = tasks_v2.Task(
187+
name=task_name,
188+
schedule_time=schedule_time,
189+
http_request=tasks_v2.HttpRequest(
190+
url=url,
191+
http_method=tasks_v2.HttpMethod.POST,
192+
oidc_token=tasks_v2.OidcToken(
193+
service_account_email=os.getenv("SERVICE_ACCOUNT_EMAIL")
194+
),
195+
body=body,
196+
headers={"Content-Type": "application/json"},
197+
),
198+
)
199+
client.create_task(
200+
parent=client.queue_path(project_id, gcp_region, queue_name), task=task
201+
)

0 commit comments

Comments
 (0)