Skip to content

Commit c480b2e

Browse files
authored
feat: added batch fill for dataset size and files (#1302)
1 parent 159bc76 commit c480b2e

File tree

12 files changed

+541
-23
lines changed

12 files changed

+541
-23
lines changed

functions-python/batch_process_dataset/src/main.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
from cloudevents.http import CloudEvent
2929
from google.cloud import storage
3030
from sqlalchemy import func
31-
3231
from shared.common.gcp_utils import create_refresh_materialized_view_task
3332
from shared.database_gen.sqlacodegen_models import Gtfsdataset, Gtfsfile
3433

@@ -37,9 +36,10 @@
3736
import logging
3837

3938
from shared.helpers.logger import init_logger, get_logger
40-
from shared.helpers.utils import download_and_get_hash
39+
from shared.helpers.utils import download_and_get_hash, get_hash_from_file
4140
from sqlalchemy.orm import Session
4241

42+
4343
init_logger()
4444

4545

@@ -179,6 +179,8 @@ def upload_file_to_storage(
179179
id=str(uuid.uuid4()),
180180
file_name=file_name,
181181
file_size_bytes=os.path.getsize(file_path),
182+
hosted_url=file_blob.public_url if public else None,
183+
hash=get_hash_from_file(file_path),
182184
)
183185
)
184186
return blob, extracted_files

functions-python/helpers/utils.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import hashlib
1717
import logging
1818
import os
19+
import ssl
1920

2021
import requests
2122
import urllib3
@@ -78,6 +79,17 @@ def download_url_content(url, with_retry=False):
7879
raise e
7980

8081

82+
def get_hash_from_file(file_path, hash_algorithm="sha256", chunk_size=8192):
83+
"""
84+
Returns the hash of a file
85+
"""
86+
hash_object = hashlib.new(hash_algorithm)
87+
with open(file_path, "rb") as f:
88+
for chunk in iter(lambda: f.read(chunk_size), b""):
89+
hash_object.update(chunk)
90+
return hash_object.hexdigest()
91+
92+
8193
def download_and_get_hash(
8294
url,
8395
file_path,
@@ -87,6 +99,7 @@ def download_and_get_hash(
8799
api_key_parameter_name=None,
88100
credentials=None,
89101
logger=None,
102+
trusted_certs=False, # If True, disables SSL verification
90103
):
91104
"""
92105
Downloads the content of a URL and stores it in a file and returns the hash of the file
@@ -117,6 +130,10 @@ def download_and_get_hash(
117130
if authentication_type == 2 and api_key_parameter_name and credentials:
118131
headers[api_key_parameter_name] = credentials
119132

133+
if trusted_certs:
134+
ctx.check_hostname = False
135+
ctx.verify_mode = ssl.CERT_NONE
136+
120137
with urllib3.PoolManager(ssl_context=ctx) as http:
121138
with http.request(
122139
"GET", url, preload_content=False, headers=headers, redirect=True

functions-python/tasks_executor/README.md

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ Example:
2424
"filter_statuses": ["active", "inactive", "future"]
2525
}
2626
}
27+
```
28+
```json
2729
{
2830
"task": "rebuild_missing_bounding_boxes",
2931
"payload": {
@@ -40,12 +42,9 @@ Example:
4042
```
4143

4244
To get the list of supported tasks use:
43-
``
45+
```json
4446
{
4547
"name": "list_tasks",
4648
"payload": {}
4749
}
48-
49-
```
50-
5150
```

functions-python/tasks_executor/function_config.json

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,10 @@
1111
"secret_environment_variables": [
1212
{
1313
"key": "FEEDS_DATABASE_URL"
14+
},
15+
{
16+
"key": "FEEDS_CREDENTIALS",
17+
"secret": "FEEDS_CREDENTIALS"
1418
}
1519
],
1620
"ingress_settings": "ALLOW_ALL",

functions-python/tasks_executor/requirements.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ google-cloud-workflows
2222
google-cloud-pubsub
2323
google-cloud-tasks
2424
flask
25+
google-cloud-storage
2526

2627
# Configuration
2728
python-dotenv==1.0.0

functions-python/tasks_executor/src/main.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,9 @@
2222
from tasks.refresh_feedsearch_view.refresh_materialized_view import (
2323
refresh_materialized_view_handler,
2424
)
25+
from tasks.dataset_files.rebuild_missing_dataset_files import (
26+
rebuild_missing_dataset_files_handler,
27+
)
2528
from tasks.validation_reports.rebuild_missing_validation_reports import (
2629
rebuild_missing_validation_reports_handler,
2730
)
@@ -56,6 +59,10 @@
5659
"description": "Refreshes the materialized view.",
5760
"handler": refresh_materialized_view_handler,
5861
},
62+
"rebuild_missing_dataset_files": {
63+
"description": "Rebuilds missing dataset files for GTFS datasets.",
64+
"handler": rebuild_missing_dataset_files_handler,
65+
},
5966
}
6067

6168

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
# Rebuild Missing Dataset Files
2+
3+
This task rebuilds missing extracted files in GTFS datasets.
4+
It downloads datasets from their `hosted_url`, extracts all files, computes zipped and unzipped sizes, calculates hashes, uploads the files to a GCS bucket, and updates the database.
5+
6+
---
7+
8+
## Task ID
9+
10+
Use task ID: `rebuild_missing_dataset_files`
11+
12+
---
13+
14+
## Usage
15+
16+
The function accepts the following payload:
17+
18+
```json
19+
{
20+
"dry_run": true, // [optional] If true, do not upload or modify the database (default: true)
21+
"after_date": "YYYY-MM-DD", // [optional] Only include datasets downloaded after this ISO date
22+
"latest_only": true // [optional] If true, only process the latest version of each dataset (default: true)
23+
}
24+
```
25+
26+
### Example:
27+
28+
```json
29+
{
30+
"dry_run": false,
31+
"after_date": "2025-07-01",
32+
"latest_only": true
33+
}
34+
```
35+
36+
---
37+
38+
## What It Does
39+
40+
For each GTFS dataset with missing file information (missing zipped/unzipped sizes or missing extracted files), this function:
41+
42+
1. Downloads the `.zip` file from its `hosted_url`
43+
2. Computes the zipped size in bytes
44+
3. Extracts all GTFS files locally
45+
4. Computes the unzipped size in bytes
46+
5. Uploads each extracted file to a GCS bucket with the structure:
47+
48+
```
49+
{feed-stable-id}/{dataset-stable-id}/extracted/{file_name}
50+
```
51+
6. Makes each file publicly accessible and stores its GCS URL
52+
7. Computes SHA256 hashes for each file
53+
8. Stores metadata in the `Gtfsfile` table for later use
54+
55+
---
56+
57+
## GCP Environment Variables
58+
59+
The function requires the following environment variables:
60+
61+
| Variable | Description |
62+
| ---------------------- | ---------------------------------------------------------------------------- |
63+
| `DATASETS_BUCKET_NAME` | The name of the GCS bucket used to store extracted GTFS files |
64+
65+
---
66+
67+
## Additional Notes
68+
69+
* This function **disables SSL verification** when downloading files, as the sources are trusted internally.
70+
* Commits to the database occur in batches of 5 datasets to improve performance and avoid large transaction blocks.
71+
* If `dry_run` is enabled, no downloads, uploads, or DB modifications are performed. Only the number of affected datasets is logged.
72+
* The function is safe to rerun. It will only affect datasets missing required file metadata.

0 commit comments

Comments
 (0)