Skip to content

Commit eaeaf1c

Browse files
authored
Merge branch 'main' into 1172-refresh-feedsearch-view-asynchronically
2 parents 18d92ab + bf22847 commit eaeaf1c

File tree

10 files changed

+221
-33
lines changed

10 files changed

+221
-33
lines changed

.gitignore

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,4 +77,7 @@ coverage_reports
7777
tf.plan
7878

7979
# CSV generation output files
80-
functions-python/**/*.csv
80+
functions-python/**/*.csv
81+
82+
# Local emulators
83+
.cloudstorage

functions-python/batch_process_dataset/.coveragerc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ omit =
55
*/database_gen/*
66
*/dataset_service/*
77
*/shared/*
8+
*/scripts/*
89

910
[report]
1011
exclude_lines =

functions-python/batch_process_dataset/requirements_dev.txt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,4 +2,5 @@ Faker
22
pytest~=7.4.3
33
urllib3-mock
44
requests-mock
5-
python-dotenv~=1.0.0
5+
python-dotenv~=1.0.0
6+
gcp-storage-emulator

functions-python/batch_process_dataset/src/main.py

Lines changed: 30 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -130,18 +130,15 @@ def download_content(self, temporary_file_path):
130130
logger=self.logger,
131131
)
132132
is_zip = zipfile.is_zipfile(temporary_file_path)
133-
if is_zip:
134-
extracted_file_path = os.path.join(
135-
temporary_file_path.split(".")[0], "extracted"
136-
)
137-
with zipfile.ZipFile(temporary_file_path, "r") as zip_ref:
138-
zip_ref.extractall(os.path.dirname(extracted_file_path))
139-
# List all files in the extracted directory
140-
extracted_files = os.listdir(os.path.dirname(extracted_file_path))
141-
self.logger.info(f"Extracted files: {extracted_files}")
142133
return file_hash, is_zip
143134

144-
def upload_file_to_storage(self, source_file_path, dataset_stable_id):
135+
def upload_file_to_storage(
136+
self,
137+
source_file_path,
138+
dataset_stable_id,
139+
extracted_files_path,
140+
public=True,
141+
):
145142
"""
146143
Uploads a file to the GCP bucket
147144
"""
@@ -155,12 +152,12 @@ def upload_file_to_storage(self, source_file_path, dataset_stable_id):
155152
blob = bucket.blob(target_path)
156153
with open(source_file_path, "rb") as file:
157154
blob.upload_from_file(file)
158-
blob.make_public()
155+
if public:
156+
blob.make_public()
159157

160158
base_path, _ = os.path.splitext(source_file_path)
161-
extracted_files_path = os.path.join(base_path, "extracted")
162159
extracted_files: List[Gtfsfile] = []
163-
if not os.path.exists(extracted_files_path):
160+
if not extracted_files_path or not os.path.exists(extracted_files_path):
164161
self.logger.warning(
165162
f"Extracted files path {extracted_files_path} does not exist."
166163
)
@@ -172,7 +169,8 @@ def upload_file_to_storage(self, source_file_path, dataset_stable_id):
172169
f"{self.feed_stable_id}/{dataset_stable_id}/extracted/{file_name}"
173170
)
174171
file_blob.upload_from_filename(file_path)
175-
file_blob.make_public()
172+
if public:
173+
file_blob.make_public()
176174
self.logger.info(
177175
f"Uploaded extracted file {file_name} to {file_blob.public_url}"
178176
)
@@ -185,7 +183,7 @@ def upload_file_to_storage(self, source_file_path, dataset_stable_id):
185183
)
186184
return blob, extracted_files
187185

188-
def upload_dataset(self) -> DatasetFile or None:
186+
def upload_dataset(self, public=True) -> DatasetFile or None:
189187
"""
190188
Uploads a dataset to a GCP bucket as <feed_stable_id>/latest.zip and
191189
<feed_stable_id>/<feed_stable_id>-<upload_datetime>.zip
@@ -205,12 +203,12 @@ def upload_dataset(self) -> DatasetFile or None:
205203
self.logger.info(
206204
f"[{self.feed_stable_id}] File hash is {file_sha256_hash}."
207205
)
208-
209206
if self.latest_hash != file_sha256_hash:
210207
self.logger.info(
211208
f"[{self.feed_stable_id}] Dataset has changed (hash {self.latest_hash}"
212209
f"-> {file_sha256_hash}). Uploading new version."
213210
)
211+
extracted_files_path = self.unzip_files(temp_file_path)
214212
self.logger.info(
215213
f"Creating file {self.feed_stable_id}/latest.zip in bucket {self.bucket_name}"
216214
)
@@ -226,7 +224,10 @@ def upload_dataset(self) -> DatasetFile or None:
226224
f" in bucket {self.bucket_name}"
227225
)
228226
_, extracted_files = self.upload_file_to_storage(
229-
temp_file_path, dataset_stable_id
227+
temp_file_path,
228+
dataset_stable_id,
229+
extracted_files_path,
230+
public=public,
230231
)
231232

232233
return DatasetFile(
@@ -250,6 +251,18 @@ def upload_dataset(self) -> DatasetFile or None:
250251
os.remove(temp_file_path)
251252
return None
252253

254+
def unzip_files(self, temp_file_path):
255+
extracted_files_path = os.path.join(temp_file_path.split(".")[0], "extracted")
256+
self.logger.info(f"Unzipping files to {extracted_files_path}")
257+
# Create the directory for extracted files if it does not exist
258+
os.makedirs(extracted_files_path, exist_ok=True)
259+
with zipfile.ZipFile(temp_file_path, "r") as zip_ref:
260+
zip_ref.extractall(path=extracted_files_path)
261+
# List all files in the extracted directory
262+
extracted_files = os.listdir(extracted_files_path)
263+
self.logger.info(f"Extracted files: {extracted_files}")
264+
return extracted_files_path
265+
253266
def generate_temp_filename(self):
254267
"""
255268
Generates a temporary filename
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
import logging
2+
import os
3+
4+
from main import DatasetProcessor
5+
from gcp_storage_emulator.server import create_server
6+
7+
HOST = "localhost"
8+
PORT = 9023
9+
BUCKET_NAME = "verifier"
10+
PRODUCER_URL = "https://example.com/dataset.zip" # Replace with actual producer URL
11+
12+
13+
def verify_download_content(producer_url: str):
14+
"""
15+
Verifies the download_content is able to retrieve the file
16+
This is useful to simulate the download code locally and test issues related with user-agent and downloaded content.
17+
Not supported authenticated feeds currently.
18+
"""
19+
logging.info("Verifying downloaded content... (not implemented)")
20+
21+
logging.info(f"Producer URL: {producer_url}")
22+
23+
processor = DatasetProcessor(
24+
producer_url=producer_url,
25+
feed_id=None,
26+
feed_stable_id=None,
27+
execution_id=None,
28+
latest_hash=None,
29+
bucket_name=None,
30+
authentication_type=0,
31+
api_key_parameter_name=None,
32+
public_hosted_datasets_url=None,
33+
)
34+
tempfile = processor.generate_temp_filename()
35+
logging.info(f"Temp filename: {tempfile}")
36+
file_hash, is_zip = processor.download_content(tempfile)
37+
logging.info(f"File hash: {file_hash}")
38+
39+
40+
def verify_upload_dataset(producer_url: str):
41+
"""
42+
Verifies the upload_dataset is able to upload the dataset to the GCP storage emulator.
43+
This is useful to simulate the upload code locally and test issues related with user-agent and uploaded content.
44+
This function also tests the DatasetProcessor class methods for generating a temporary filename
45+
and uploading the dataset.
46+
:param producer_url:
47+
:return:
48+
"""
49+
processor = DatasetProcessor(
50+
producer_url=producer_url,
51+
feed_id="feed_id",
52+
feed_stable_id="feed_stable_id",
53+
execution_id=None,
54+
latest_hash="123",
55+
bucket_name=BUCKET_NAME,
56+
authentication_type=0,
57+
api_key_parameter_name=None,
58+
public_hosted_datasets_url=None,
59+
)
60+
tempfile = processor.generate_temp_filename()
61+
logging.info(f"Temp filename: {tempfile}")
62+
dataset_file = processor.upload_dataset(public=False)
63+
logging.info(f"Dataset File: {dataset_file}")
64+
65+
66+
if __name__ == "__main__":
67+
logging.basicConfig(level=logging.INFO)
68+
# Replace with actual producer URL
69+
try:
70+
os.environ["STORAGE_EMULATOR_HOST"] = f"http://{HOST}:{PORT}"
71+
server = create_server(
72+
host=HOST, port=PORT, in_memory=False, default_bucket=BUCKET_NAME
73+
)
74+
server.start()
75+
76+
verify_download_content(producer_url=PRODUCER_URL)
77+
logging.info("Download content verification completed successfully.")
78+
verify_upload_dataset(producer_url=PRODUCER_URL)
79+
verify_upload_dataset(producer_url=PRODUCER_URL)
80+
except Exception as e:
81+
logging.error(f"Error verifying download content: {e}")
82+
finally:
83+
server.stop()
84+
logging.info("Verification completed.")

functions-python/batch_process_dataset/tests/test_batch_process_dataset_main.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,9 @@ def create_cloud_event(mock_data):
4646
class TestDatasetProcessor(unittest.TestCase):
4747
@patch("main.DatasetProcessor.upload_file_to_storage")
4848
@patch("main.DatasetProcessor.download_content")
49+
@patch("main.DatasetProcessor.unzip_files")
4950
def test_upload_dataset_diff_hash(
50-
self, mock_download_url_content, upload_file_to_storage
51+
self, mock_unzip_files, mock_download_url_content, upload_file_to_storage
5152
):
5253
"""
5354
Test upload_dataset method of DatasetProcessor class with different hash from the latest one
@@ -57,6 +58,7 @@ def test_upload_dataset_diff_hash(
5758
mock_blob.path = public_url
5859
upload_file_to_storage.return_value = mock_blob, []
5960
mock_download_url_content.return_value = file_hash, True
61+
mock_unzip_files.return_value = [mock_blob, mock_blob]
6062

6163
processor = DatasetProcessor(
6264
public_url,
@@ -178,6 +180,7 @@ def test_upload_dataset_download_exception(
178180
def test_upload_file_to_storage(self):
179181
bucket_name = "test-bucket"
180182
source_file_path = "path/to/source/file"
183+
extracted_file_path = "path/to/source/file"
181184

182185
mock_blob = Mock()
183186
mock_blob.public_url = public_url
@@ -204,7 +207,9 @@ def test_upload_file_to_storage(self):
204207
test_hosted_public_url,
205208
)
206209
dataset_id = faker.Faker().uuid4()
207-
result, _ = processor.upload_file_to_storage(source_file_path, dataset_id)
210+
result, _ = processor.upload_file_to_storage(
211+
source_file_path, dataset_id, extracted_file_path
212+
)
208213
self.assertEqual(result.public_url, public_url)
209214
mock_client.get_bucket.assert_called_with(bucket_name)
210215
mock_bucket.blob.assert_called_with(

functions-python/gbfs_validator/README.md

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ The message published by the batch function to the Pub/Sub topic follows this fo
3030

3131
### Functionality Details
3232

33-
- **`gbfs-validator-batch`**: Triggered per execution ID, this function iterates over all GBFS feeds, preparing and publishing individual messages to the Pub/Sub topic.
33+
- **`gbfs-validator-batch`**: Triggered per execution ID, when the request is a POST request with a JSON body containing `feed_stable_ids`, it publishes events related to only those feeds. Otherwise, it publishes avents of all feeds to the Pub/Sub topic.
3434
- **`gbfs-validator-pubsub`**: Triggered per feed, this function performs the following steps:
3535
1. **Access the autodiscovery URL and update versions**: The function accesses the autodiscovery URL to update the **GBFSVersions** table.
3636
2. **Measure latency and validate the feed**: For each version, the function measures the response latency and validates the feed. The validation summary is stored in GCP, and the total error count is extracted and saved in the **GBFSValidationReport**.
@@ -46,6 +46,13 @@ The `gbfs-validator-batch` function requires the following environment variables
4646
- **`PROJECT_ID`**: The Google Cloud Project ID used to construct the full topic path.
4747
- **`FEEDS_DATABASE_URL`**: The database connection string for accessing the GBFS feeds.
4848

49+
Optional request body parameters for the batch function:
50+
```json
51+
{
52+
"feed_stable_ids": ["feed_id_1", "feed_id_2"]
53+
}
54+
```
55+
4956
### Pub/Sub Function Environment Variables
5057

5158
The `gbfs-validator-pubsub` function requires the following environment variables:

functions-python/gbfs_validator/src/main.py

Lines changed: 34 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -101,9 +101,11 @@ def gbfs_validator_pubsub(cloud_event: CloudEvent):
101101

102102
@with_db_session
103103
@functions_framework.http
104-
def gbfs_validator_batch(_, db_session: Session):
104+
def gbfs_validator_batch(request, db_session: Session):
105105
"""
106-
HTTP Cloud Function to trigger the GBFS Validator function for multiple datasets.
106+
HTTP Cloud Function to trigger the GBFS Validator function for multiple datasets.
107+
When the request is a POST request with a JSON body containing `feed_stable_ids`,
108+
it processes only those feeds. Otherwise, it processes all feeds in the database.
107109
@param _: The request object.
108110
@return: The response of the function.
109111
"""
@@ -113,9 +115,25 @@ def gbfs_validator_batch(_, db_session: Session):
113115
logging.error("PUBSUB_TOPIC_NAME environment variable not set.")
114116
return "PUBSUB_TOPIC_NAME environment variable not set.", 500
115117

116-
# Get all GBFS feeds from the database
117118
try:
118-
gbfs_feeds = fetch_all_gbfs_feeds(db_session)
119+
feed_stable_ids = None
120+
if request and request.method == "POST" and request.is_json:
121+
request_json = request.get_json()
122+
feed_stable_ids = (
123+
request_json.get("feed_stable_ids") if request_json else None
124+
)
125+
else:
126+
logging.info("Request body not provided or not a valid JSON.")
127+
except Exception as e:
128+
logging.error("Error parsing request body: %s", e)
129+
return "Invalid request body.", 400
130+
131+
try:
132+
if feed_stable_ids:
133+
gbfs_feeds = fetch_gbfs_feeds_by_stable_ids(db_session, feed_stable_ids)
134+
else:
135+
# Get all GBFS feeds from the database
136+
gbfs_feeds = fetch_all_gbfs_feeds(db_session)
119137
except Exception:
120138
return "Error getting all GBFS feeds.", 500
121139

@@ -150,3 +168,15 @@ def gbfs_validator_batch(_, db_session: Session):
150168
f"GBFS Validator batch function triggered successfully for {len(feeds_data)} feeds.",
151169
200,
152170
)
171+
172+
173+
def fetch_gbfs_feeds_by_stable_ids(db_session, feed_stable_ids):
174+
"""Fetch GBFS feeds by their IDs and not deprecated from the database"""
175+
gbfs_feeds = (
176+
db_session.query(Gbfsfeed)
177+
.filter(
178+
Gbfsfeed.stable_id.in_(feed_stable_ids), Gbfsfeed.status != "deprecated"
179+
)
180+
.all()
181+
)
182+
return gbfs_feeds

functions-python/gbfs_validator/tests/test_gbfs_validator.py

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,6 @@ def test_gbfs_validator_batch(
7575
):
7676
# Prepare mocks
7777
mock_session = MagicMock()
78-
# mock_database.return_value.start_db_session.return_value = mock_session
7978

8079
mock_publisher = MagicMock()
8180
mock_publisher_client.return_value = mock_publisher
@@ -179,3 +178,41 @@ def test_gbfs_validator_batch_publish_exception(
179178
# Call the function
180179
result = gbfs_validator_batch(None)
181180
self.assertEqual(result[1], 500)
181+
182+
@patch.dict(
183+
os.environ,
184+
{
185+
"PUBSUB_TOPIC_NAME": "mock-topic",
186+
},
187+
)
188+
@patch("main.pubsub_v1.PublisherClient")
189+
@patch("main.fetch_gbfs_feeds_by_stable_ids")
190+
def test_gbfs_validator_batch_by_feed_stable_ids(
191+
self, fetch_gbfs_feeds_by_stable_ids, mock_publisher_client
192+
):
193+
# Prepare mocks
194+
mock_session = MagicMock()
195+
196+
mock_publisher = MagicMock()
197+
mock_publisher_client.return_value = mock_publisher
198+
199+
mock_feed = MagicMock()
200+
mock_feed.stable_id = "mock-stable-id"
201+
mock_feed.id = str(uuid.uuid4())
202+
mock_feed.auto_discovery_url = "http://mock-url.com"
203+
mock_feed.gbfsversions = [MagicMock(version="1.0")]
204+
mock_feed_2 = copy.deepcopy(mock_feed)
205+
mock_feed_2.gbfsversions = []
206+
fetch_gbfs_feeds_by_stable_ids.return_value = [mock_feed, mock_feed_2]
207+
request = MagicMock()
208+
request.method = "POST"
209+
request.is_json = True
210+
request.get_json.return_value = {
211+
"feed_stable_ids": [mock_feed.id, mock_feed_2.id]
212+
}
213+
# Call the function
214+
result = gbfs_validator_batch(request, db_session=mock_session)
215+
self.assertEqual(result[1], 200)
216+
217+
fetch_gbfs_feeds_by_stable_ids.assert_called_once()
218+
self.assertEqual(mock_publisher.publish.call_count, 2)

0 commit comments

Comments
 (0)