Skip to content

Commit f0701a8

Browse files
committed
Add local testing and fix function response
1 parent 1024c95 commit f0701a8

File tree

6 files changed

+88
-36
lines changed

6 files changed

+88
-36
lines changed

.gitignore

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,4 +77,7 @@ coverage_reports
7777
tf.plan
7878

7979
# CSV generation output files
80-
functions-python/**/*.csv
80+
functions-python/**/*.csv
81+
82+
# Local emulators
83+
.cloudstorage

functions-python/batch_process_dataset/requirements_dev.txt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,4 +2,5 @@ Faker
22
pytest~=7.4.3
33
urllib3-mock
44
requests-mock
5-
python-dotenv~=1.0.0
5+
python-dotenv~=1.0.0
6+
gcp-storage-emulator

functions-python/batch_process_dataset/src/main.py

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,11 @@ def download_content(self, temporary_file_path):
131131
return file_hash, is_zip
132132

133133
def upload_file_to_storage(
134-
self, source_file_path, dataset_stable_id, extracted_files_path
134+
self,
135+
source_file_path,
136+
dataset_stable_id,
137+
extracted_files_path,
138+
public=True,
135139
):
136140
"""
137141
Uploads a file to the GCP bucket
@@ -146,7 +150,8 @@ def upload_file_to_storage(
146150
blob = bucket.blob(target_path)
147151
with open(source_file_path, "rb") as file:
148152
blob.upload_from_file(file)
149-
blob.make_public()
153+
if public:
154+
blob.make_public()
150155

151156
base_path, _ = os.path.splitext(source_file_path)
152157
extracted_files: List[Gtfsfile] = []
@@ -162,7 +167,8 @@ def upload_file_to_storage(
162167
f"{self.feed_stable_id}/{dataset_stable_id}/extracted/{file_name}"
163168
)
164169
file_blob.upload_from_filename(file_path)
165-
file_blob.make_public()
170+
if public:
171+
file_blob.make_public()
166172
self.logger.info(
167173
f"Uploaded extracted file {file_name} to {file_blob.public_url}"
168174
)
@@ -175,7 +181,7 @@ def upload_file_to_storage(
175181
)
176182
return blob, extracted_files
177183

178-
def upload_dataset(self) -> DatasetFile or None:
184+
def upload_dataset(self, public=True) -> DatasetFile or None:
179185
"""
180186
Uploads a dataset to a GCP bucket as <feed_stable_id>/latest.zip and
181187
<feed_stable_id>/<feed_stable_id>-<upload_datetime>.zip
@@ -185,9 +191,7 @@ def upload_dataset(self) -> DatasetFile or None:
185191
try:
186192
self.logger.info("Accessing URL %s", self.producer_url)
187193
temp_file_path = self.generate_temp_filename()
188-
file_sha256_hash, is_zip, extracted_files_path = self.download_content(
189-
temp_file_path
190-
)
194+
file_sha256_hash, is_zip = self.download_content(temp_file_path)
191195
if not is_zip:
192196
self.logger.error(
193197
f"[{self.feed_stable_id}] The downloaded file from {self.producer_url} is not a valid ZIP file."
@@ -202,9 +206,7 @@ def upload_dataset(self) -> DatasetFile or None:
202206
f"[{self.feed_stable_id}] Dataset has changed (hash {self.latest_hash}"
203207
f"-> {file_sha256_hash}). Uploading new version."
204208
)
205-
extracted_files_path = self.unzip_files(
206-
extracted_files_path, temp_file_path
207-
)
209+
extracted_files_path = self.unzip_files(temp_file_path)
208210
self.logger.info(
209211
f"Creating file {self.feed_stable_id}/latest.zip in bucket {self.bucket_name}"
210212
)
@@ -220,7 +222,10 @@ def upload_dataset(self) -> DatasetFile or None:
220222
f" in bucket {self.bucket_name}"
221223
)
222224
_, extracted_files = self.upload_file_to_storage(
223-
temp_file_path, dataset_stable_id, extracted_files_path
225+
temp_file_path,
226+
dataset_stable_id,
227+
extracted_files_path,
228+
public=public,
224229
)
225230

226231
return DatasetFile(
@@ -242,7 +247,7 @@ def upload_dataset(self) -> DatasetFile or None:
242247
os.remove(temp_file_path)
243248
return None
244249

245-
def unzip_files(self, extracted_files_path, temp_file_path):
250+
def unzip_files(self, temp_file_path):
246251
extracted_files_path = os.path.join(temp_file_path.split(".")[0], "extracted")
247252
self.logger.info(f"Unzipping files to {extracted_files_path}")
248253
# Create the directory for extracted files if it does not exist

functions-python/batch_process_dataset/src/scripts/download_verifier.py

Lines changed: 49 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,12 @@
22
import os
33

44
from main import DatasetProcessor
5+
from gcp_storage_emulator.server import create_server
6+
7+
HOST = "localhost"
8+
PORT = 9023
9+
BUCKET_NAME = "verifier"
10+
PRODUCER_URL = "https://example.com/dataset.zip" # Replace with actual producer URL
511

612

713
def verify_download_content(producer_url: str):
@@ -27,17 +33,52 @@ def verify_download_content(producer_url: str):
2733
)
2834
tempfile = processor.generate_temp_filename()
2935
logging.info(f"Temp filename: {tempfile}")
30-
file_hash, is_zip, extracted_files_path = processor.download_content(tempfile)
31-
are_files_extracted = os.path.exists(extracted_files_path)
36+
file_hash, is_zip = processor.download_content(tempfile)
3237
logging.info(f"File hash: {file_hash}")
33-
logging.info(
34-
f"File path: {extracted_files_path} "
35-
f"- { 'Files extracted' if are_files_extracted else 'Files not extracted'}"
38+
39+
40+
def verify_upload_dataset(producer_url: str):
41+
"""
42+
Verifies the upload_dataset is able to upload the dataset to the GCP storage emulator.
43+
This is useful to simulate the upload code locally and test issues related with user-agent and uploaded content.
44+
This function also tests the DatasetProcessor class methods for generating a temporary filename
45+
and uploading the dataset.
46+
:param producer_url:
47+
:return:
48+
"""
49+
processor = DatasetProcessor(
50+
producer_url=producer_url,
51+
feed_id="feed_id",
52+
feed_stable_id="feed_stable_id",
53+
execution_id=None,
54+
latest_hash="123",
55+
bucket_name=BUCKET_NAME,
56+
authentication_type=0,
57+
api_key_parameter_name=None,
58+
public_hosted_datasets_url=None,
3659
)
37-
logging.info(f"Downloaded file path: {extracted_files_path}")
60+
tempfile = processor.generate_temp_filename()
61+
logging.info(f"Temp filename: {tempfile}")
62+
dataset_file = processor.upload_dataset(public=False)
63+
logging.info(f"Dataset File: {dataset_file}")
3864

3965

4066
if __name__ == "__main__":
4167
logging.basicConfig(level=logging.INFO)
42-
# Example usage: Replace the placeholder URL with the actual producer URL before running.
43-
verify_download_content(producer_url="https://example.com/your-dataset.zip")
68+
# Replace with actual producer URL
69+
try:
70+
os.environ["STORAGE_EMULATOR_HOST"] = f"http://{HOST}:{PORT}"
71+
server = create_server(
72+
host=HOST, port=PORT, in_memory=False, default_bucket=BUCKET_NAME
73+
)
74+
server.start()
75+
76+
verify_download_content(producer_url=PRODUCER_URL)
77+
logging.info("Download content verification completed successfully.")
78+
verify_upload_dataset(producer_url=PRODUCER_URL)
79+
verify_upload_dataset(producer_url=PRODUCER_URL)
80+
except Exception as e:
81+
logging.error(f"Error verifying download content: {e}")
82+
finally:
83+
server.stop()
84+
logging.info("Verification completed.")

functions-python/batch_process_dataset/tests/test_batch_process_dataset_main.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ def test_upload_dataset_diff_hash(
5757
mock_blob.public_url = public_url
5858
mock_blob.path = public_url
5959
upload_file_to_storage.return_value = mock_blob, []
60-
mock_download_url_content.return_value = file_hash, True, "path/file"
60+
mock_download_url_content.return_value = file_hash, True
6161
mock_unzip_files.return_value = [mock_blob, mock_blob]
6262

6363
processor = DatasetProcessor(
@@ -96,7 +96,7 @@ def test_upload_dataset_same_hash(
9696
mock_blob = MagicMock()
9797
mock_blob.public_url = public_url
9898
upload_file_to_storage.return_value = mock_blob
99-
mock_download_url_content.return_value = file_hash, True, "path/file"
99+
mock_download_url_content.return_value = file_hash, True
100100

101101
processor = DatasetProcessor(
102102
public_url,
@@ -128,7 +128,7 @@ def test_upload_dataset_not_zip(
128128
mock_blob = MagicMock()
129129
mock_blob.public_url = public_url
130130
upload_file_to_storage.return_value = mock_blob
131-
mock_download_url_content.return_value = file_hash, False, "path/file"
131+
mock_download_url_content.return_value = file_hash, False
132132

133133
processor = DatasetProcessor(
134134
public_url,

functions-python/helpers/logger.py

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ def filter(self, record):
4242

4343

4444
lock = threading.Lock()
45+
lock_logger = threading.Lock()
4546
_logging_initialized = False
4647

4748

@@ -73,14 +74,15 @@ def get_logger(name: str, stable_id: str = None):
7374
If stable_id is provided, the StableIdFilter is added.
7475
This method can be called multiple times for the same logger name without creating a side effect.
7576
"""
76-
# Create the logger with the provided name to avoid retuning the same logger instance
77-
logger = (
78-
logging.getLogger(name)
79-
if not stable_id
80-
else logging.getLogger(f"{name}_{stable_id}")
81-
)
82-
if stable_id and not any(
83-
isinstance(log_filter, StableIdFilter) for log_filter in logger.filters
84-
):
85-
logger.addFilter(StableIdFilter(stable_id))
86-
return logger
77+
with lock_logger:
78+
# Create the logger with the provided name to avoid retuning the same logger instance
79+
logger = (
80+
logging.getLogger(f"{name}_{stable_id}")
81+
if stable_id
82+
else logging.getLogger(name)
83+
)
84+
if stable_id and not any(
85+
isinstance(log_filter, StableIdFilter) for log_filter in logger.filters
86+
):
87+
logger.addFilter(StableIdFilter(stable_id))
88+
return logger

0 commit comments

Comments
 (0)