Skip to content

Commit 18d92ab

Browse files
committed
fixed broken tests
1 parent 2845dda commit 18d92ab

File tree

8 files changed

+118
-129
lines changed

8 files changed

+118
-129
lines changed

api/src/shared/common/gcp_utils.py

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
import logging
2+
import os
3+
from google.cloud import tasks_v2
4+
from google.protobuf.timestamp_pb2 import Timestamp
5+
6+
7+
def create_refresh_materialized_view_task():
8+
"""
9+
Asynchronously refresh a materialized view.
10+
Ensures deduplication by generating a unique task name.
11+
12+
Returns:
13+
dict: Response message and status code.
14+
"""
15+
from google.protobuf import timestamp_pb2
16+
from datetime import datetime, timedelta
17+
18+
try:
19+
logging.info("Creating materialized view refresh task.")
20+
now = datetime.now()
21+
22+
# BOUNCE WINDOW: next :00 or :30
23+
minute = now.minute
24+
if minute < 30:
25+
bucket_time = now.replace(minute=30, second=0, microsecond=0)
26+
else:
27+
bucket_time = now.replace(minute=0, second=0, microsecond=0) + timedelta(hours=1)
28+
29+
timestamp_str = bucket_time.strftime("%Y-%m-%d-%H-%M")
30+
task_name = f"refresh-materialized-view-{timestamp_str}"
31+
32+
# Convert to protobuf timestamp
33+
proto_time = timestamp_pb2.Timestamp()
34+
proto_time.FromDatetime(bucket_time)
35+
36+
# Cloud Tasks setup
37+
client = tasks_v2.CloudTasksClient()
38+
project = os.getenv("PROJECT_ID")
39+
location = os.getenv("LOCATION")
40+
queue = os.getenv("MATERIALIZED_VIEW_QUEUE")
41+
url = (
42+
f"https://{os.getenv('GCP_REGION')}-"
43+
f"{os.getenv('PROJECT_ID')}.cloudfunctions.net/"
44+
f"tasks-executor-{os.getenv('ENVIRONMENT_NAME')}"
45+
)
46+
47+
task_name = client.task_path(project, location, queue, task_name)
48+
49+
# Enqueue the task
50+
try:
51+
create_http_task_with_name(
52+
client=client,
53+
body=b"",
54+
url=url,
55+
project_id=project,
56+
gcp_region=location,
57+
queue_name=queue,
58+
task_name=task_name,
59+
task_time=proto_time,
60+
http_method=tasks_v2.HttpMethod.GET,
61+
)
62+
logging.info(f"Scheduled refresh materialized view task for {timestamp_str}")
63+
return {"message": f"Refresh task for {timestamp_str} scheduled."}, 200
64+
except Exception as e:
65+
if "ALREADY_EXISTS" in str(e):
66+
logging.info(f"Task already exists for {timestamp_str}, skipping.")
67+
68+
except Exception as error:
69+
error_msg = f"Error enqueuing task: {error}"
70+
logging.error(error_msg)
71+
return {"error": error_msg}, 500
72+
73+
74+
def create_http_task_with_name(
75+
client: "tasks_v2.CloudTasksClient",
76+
body: bytes,
77+
url: str,
78+
project_id: str,
79+
gcp_region: str,
80+
queue_name: str,
81+
task_name: str,
82+
task_time: Timestamp,
83+
http_method: "tasks_v2.HttpMethod",
84+
):
85+
"""Creates a GCP Cloud Task."""
86+
87+
token = tasks_v2.OidcToken(service_account_email=os.getenv("SERVICE_ACCOUNT_EMAIL"))
88+
89+
task = tasks_v2.Task(
90+
name=task_name,
91+
schedule_time=task_time,
92+
http_request=tasks_v2.HttpRequest(
93+
url=url,
94+
http_method=http_method,
95+
oidc_token=token,
96+
body=body,
97+
headers={"Content-Type": "application/json"},
98+
),
99+
)
100+
client.create_task(parent=client.queue_path(project_id, gcp_region, queue_name), task=task)

api/src/shared/database/database.py

Lines changed: 1 addition & 113 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
from sqlalchemy.orm import sessionmaker
2222
import logging
2323

24+
2425
from shared.common.logging_utils import get_env_logging_level
2526

2627

@@ -116,78 +117,6 @@ def mapper_configure_listener(mapper, class_):
116117
event.listen(mapper, "mapper_configured", mapper_configure_listener)
117118

118119

119-
def create_refresh_materialized_view_task():
120-
"""
121-
Asynchronously refresh a materialized view.
122-
Ensures deduplication by generating a unique task name.
123-
124-
Returns:
125-
dict: Response message and status code.
126-
"""
127-
from google.protobuf import timestamp_pb2
128-
from datetime import datetime, timedelta
129-
from google.cloud import tasks_v2
130-
131-
try:
132-
logging.info("Creating materialized view refresh task.")
133-
now = datetime.now()
134-
135-
# BOUNCE WINDOW: next :00 or :30
136-
minute = now.minute
137-
if minute < 30:
138-
bucket_time = now.replace(minute=30, second=0, microsecond=0)
139-
else:
140-
bucket_time = now.replace(minute=0, second=0, microsecond=0) + timedelta(hours=1)
141-
142-
timestamp_str = bucket_time.strftime("%Y-%m-%d-%H-%M")
143-
task_name = f"refresh-materialized-view-{timestamp_str}"
144-
145-
# Convert to protobuf timestamp
146-
proto_time = timestamp_pb2.Timestamp()
147-
proto_time.FromDatetime(bucket_time)
148-
149-
# Cloud Tasks setup
150-
client = tasks_v2.CloudTasksClient()
151-
project = os.getenv("PROJECT_ID")
152-
location = os.getenv("LOCATION")
153-
queue = os.getenv("MATERIALIZED_VIEW_QUEUE")
154-
url = (
155-
f"https://{os.getenv('GCP_REGION')}-"
156-
f"{os.getenv('PROJECT_ID')}.cloudfunctions.net/"
157-
f"tasks-executor-{os.getenv('ENVIRONMENT_NAME')}"
158-
)
159-
160-
task_name = client.task_path(project, location, queue, task_name)
161-
162-
# Convert to protobuf timestamp
163-
proto_time = timestamp_pb2.Timestamp()
164-
proto_time.FromDatetime(bucket_time)
165-
166-
# Enqueue the task
167-
try:
168-
create_http_task_with_name(
169-
client=client,
170-
body=b"",
171-
url=url,
172-
project_id=project,
173-
gcp_region=location,
174-
queue_name=queue,
175-
task_name=task_name,
176-
task_time=proto_time,
177-
http_method=tasks_v2.HttpMethod.GET,
178-
)
179-
logging.info(f"Scheduled refresh materialized view task for {timestamp_str}")
180-
return {"message": f"Refresh task for {timestamp_str} scheduled."}, 200
181-
except Exception as e:
182-
if "ALREADY_EXISTS" in str(e):
183-
logging.info(f"Task already exists for {timestamp_str}, skipping.")
184-
185-
except Exception as error:
186-
error_msg = f"Error enqueuing task: {error}"
187-
logging.error(error_msg)
188-
return {"error": error_msg}, 500
189-
190-
191120
def refresh_materialized_view(session: "Session", view_name: str) -> bool:
192121
"""
193122
Refresh Materialized view by name.
@@ -201,47 +130,6 @@ def refresh_materialized_view(session: "Session", view_name: str) -> bool:
201130
return False
202131

203132

204-
def create_http_task_with_name(
205-
client, # type: tasks_v2.CloudTasksClient
206-
body: bytes,
207-
url: str,
208-
project_id: str,
209-
gcp_region: str,
210-
queue_name: str,
211-
task_name: str,
212-
task_time: str,
213-
http_method,
214-
):
215-
from google.cloud import tasks_v2
216-
217-
"""Creates a GCP Cloud Task."""
218-
219-
token = (tasks_v2.OidcToken(service_account_email=os.getenv("SERVICE_ACCOUNT_EMAIL")),)
220-
221-
task = tasks_v2.Task(
222-
http_request=tasks_v2.HttpRequest(
223-
url=url,
224-
http_method=http_method,
225-
oidc_token=token,
226-
body=body,
227-
headers={"Content-Type": "application/json"},
228-
)
229-
)
230-
task = {
231-
"name": task_name,
232-
"http_request": {
233-
"http_method": http_method,
234-
"url": url,
235-
"headers": {
236-
"Content-Type": "application/json",
237-
"Authorization": f"Bearer {token}",
238-
},
239-
},
240-
"schedule_time": task_time,
241-
}
242-
client.create_task(parent=client.queue_path(project_id, gcp_region, queue_name), task=task)
243-
244-
245133
def with_db_session(func=None, db_url: str | None = None):
246134
"""
247135
Decorator to handle the session management for the decorated function.

functions-python/backfill_dataset_service_date_range/src/main.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,10 @@
22
import os
33
import functions_framework
44

5+
from shared.common.gcp_utils import create_refresh_materialized_view_task
56
from shared.helpers.logger import init_logger
67

7-
from shared.database.database import (
8-
with_db_session,
9-
create_refresh_materialized_view_task,
10-
)
8+
from shared.database.database import with_db_session
119

1210
from sqlalchemy.orm import joinedload, Session
1311
from sqlalchemy import or_, func

functions-python/batch_process_dataset/src/main.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,13 +29,11 @@
2929
from google.cloud import storage
3030
from sqlalchemy import func
3131

32+
from shared.common.gcp_utils import create_refresh_materialized_view_task
3233
from shared.database_gen.sqlacodegen_models import Gtfsdataset, Gtfsfile
3334

3435
from shared.dataset_service.main import DatasetTraceService, DatasetTrace, Status
35-
from shared.database.database import (
36-
with_db_session,
37-
create_refresh_materialized_view_task,
38-
)
36+
from shared.database.database import with_db_session
3937
import logging
4038

4139
from shared.helpers.logger import init_logger, get_logger

functions-python/helpers/feed_status.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,7 @@
66

77
if TYPE_CHECKING:
88
from sqlalchemy.orm import Session
9-
from shared.database.database import (
10-
create_refresh_materialized_view_task,
11-
)
9+
from shared.common.gcp_utils import create_refresh_materialized_view_task
1210

1311

1412
# query to update the status of the feeds based on the service date range of the latest dataset

functions-python/helpers/utils.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,12 @@ def create_http_task(
151151
gcp_region: str,
152152
queue_name: str,
153153
) -> None:
154-
from shared.database.database import create_http_task_with_name
154+
from shared.common.gcp_utils import create_http_task_with_name
155+
from google.cloud import tasks_v2
156+
from google.protobuf import timestamp_pb2
157+
158+
proto_time = timestamp_pb2.Timestamp()
159+
proto_time.GetCurrentTime()
155160

156161
create_http_task_with_name(
157162
client=client,
@@ -160,4 +165,7 @@ def create_http_task(
160165
project_id=project_id,
161166
gcp_region=gcp_region,
162167
queue_name=queue_name,
168+
task_name="task_name",
169+
task_time=proto_time,
170+
http_method=tasks_v2.HttpMethod.POST,
163171
)

functions-python/process_validation_report/requirements.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ geoalchemy2==0.14.7
1616

1717
# Google specific packages for this function
1818
cloudevents~=1.10.1
19+
google-cloud-tasks
1920

2021
# Configuration
2122
python-dotenv==1.0.0

functions-python/reverse_geolocation/src/reverse_geolocation_processor.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,8 @@
2323
geopolygons_as_string,
2424
)
2525
from parse_request import parse_request_parameters
26-
from shared.database.database import (
27-
with_db_session,
28-
create_refresh_materialized_view_task,
29-
)
26+
from shared.common.gcp_utils import create_refresh_materialized_view_task
27+
from shared.database.database import with_db_session
3028

3129
from shared.database_gen.sqlacodegen_models import (
3230
Geopolygon,

0 commit comments

Comments
 (0)