2222import zipfile
2323from dataclasses import dataclass
2424from datetime import datetime
25- from typing import Optional
25+ from typing import Optional , List
2626
2727import functions_framework
2828from cloudevents .http import CloudEvent
2929from google .cloud import storage
3030from sqlalchemy import func
3131
32- from shared .database_gen .sqlacodegen_models import Gtfsdataset , t_feedsearch
32+ from shared .database_gen .sqlacodegen_models import Gtfsdataset , t_feedsearch , Gtfsfile
3333from shared .dataset_service .main import DatasetTraceService , DatasetTrace , Status
3434from shared .database .database import with_db_session , refresh_materialized_view
3535import logging
@@ -48,8 +48,10 @@ class DatasetFile:
4848 """
4949
5050 stable_id : str
51+ extracted_files : List [Gtfsfile ] = None
5152 file_sha256_hash : Optional [str ] = None
5253 hosted_url : Optional [str ] = None
54+ zipped_size : Optional [int ] = None
5355
5456
5557class DatasetProcessor :
@@ -126,18 +128,60 @@ def download_content(self, temporary_file_path):
126128 logger = self .logger ,
127129 )
128130 is_zip = zipfile .is_zipfile (temporary_file_path )
131+ if is_zip :
132+ extracted_file_path = os .path .join (
133+ temporary_file_path .split ("." )[0 ], "extracted"
134+ )
135+ with zipfile .ZipFile (temporary_file_path , "r" ) as zip_ref :
136+ zip_ref .extractall (os .path .dirname (extracted_file_path ))
137+ # List all files in the extracted directory
138+ extracted_files = os .listdir (os .path .dirname (extracted_file_path ))
139+ self .logger .info (f"Extracted files: { extracted_files } " )
129140 return file_hash , is_zip
130141
131- def upload_file_to_storage (self , source_file_path , target_path ):
142+ def upload_file_to_storage (self , source_file_path , dataset_stable_id ):
132143 """
133144 Uploads a file to the GCP bucket
134145 """
135146 bucket = storage .Client ().get_bucket (self .bucket_name )
136- blob = bucket .blob (target_path )
137- with open (source_file_path , "rb" ) as file :
138- blob .upload_from_file (file )
139- blob .make_public ()
140- return blob
147+ target_paths = [
148+ f"{ self .feed_stable_id } /latest.zip" ,
149+ f"{ self .feed_stable_id } /{ dataset_stable_id } /{ dataset_stable_id } .zip" ,
150+ ]
151+ blob = None
152+ for target_path in target_paths :
153+ blob = bucket .blob (target_path )
154+ with open (source_file_path , "rb" ) as file :
155+ blob .upload_from_file (file )
156+ blob .make_public ()
157+
158+ base_path , _ = os .path .splitext (source_file_path )
159+ extracted_files_path = os .path .join (base_path , "extracted" )
160+ extracted_files : List [Gtfsfile ] = []
161+ if not os .path .exists (extracted_files_path ):
162+ self .logger .warning (
163+ f"Extracted files path { extracted_files_path } does not exist."
164+ )
165+ return blob , extracted_files
166+ for file_name in os .listdir (extracted_files_path ):
167+ file_path = os .path .join (extracted_files_path , file_name )
168+ if os .path .isfile (file_path ):
169+ file_blob = bucket .blob (
170+ f"{ self .feed_stable_id } /{ dataset_stable_id } /extracted/{ file_name } "
171+ )
172+ file_blob .upload_from_filename (file_path )
173+ file_blob .make_public ()
174+ self .logger .info (
175+ f"Uploaded extracted file { file_name } to { file_blob .public_url } "
176+ )
177+ extracted_files .append (
178+ Gtfsfile (
179+ id = str (uuid .uuid4 ()),
180+ file_name = file_name ,
181+ file_size_bytes = os .path .getsize (file_path ),
182+ )
183+ )
184+ return blob , extracted_files
141185
142186 def upload_dataset (self ) -> DatasetFile or None :
143187 """
@@ -168,9 +212,6 @@ def upload_dataset(self) -> DatasetFile or None:
168212 self .logger .info (
169213 f"Creating file { self .feed_stable_id } /latest.zip in bucket { self .bucket_name } "
170214 )
171- self .upload_file_to_storage (
172- temp_file_path , f"{ self .feed_stable_id } /latest.zip"
173- )
174215
175216 dataset_stable_id = self .create_dataset_stable_id (
176217 self .feed_stable_id , self .date
@@ -182,15 +223,18 @@ def upload_dataset(self) -> DatasetFile or None:
182223 f"Creating file: { dataset_full_path } "
183224 f" in bucket { self .bucket_name } "
184225 )
185- self .upload_file_to_storage (
186- temp_file_path ,
187- f"{ dataset_full_path } " ,
226+ _ , extracted_files = self .upload_file_to_storage (
227+ temp_file_path , dataset_stable_id
188228 )
189229
190230 return DatasetFile (
191231 stable_id = dataset_stable_id ,
192232 file_sha256_hash = file_sha256_hash ,
193233 hosted_url = f"{ self .public_hosted_datasets_url } /{ dataset_full_path } " ,
234+ extracted_files = extracted_files ,
235+ zipped_size = os .path .getsize (temp_file_path )
236+ if os .path .exists (temp_file_path )
237+ else None ,
194238 )
195239
196240 self .logger .info (
@@ -241,6 +285,15 @@ def create_dataset(self, dataset_file: DatasetFile, db_session: Session):
241285 hash = dataset_file .file_sha256_hash ,
242286 downloaded_at = func .now (),
243287 hosted_url = dataset_file .hosted_url ,
288+ gtfsfiles = dataset_file .extracted_files
289+ if dataset_file .extracted_files
290+ else [],
291+ zipped_size_bytes = dataset_file .zipped_size ,
292+ unzipped_size_bytes = sum (
293+ [ex .file_size_bytes for ex in dataset_file .extracted_files ]
294+ )
295+ if dataset_file .extracted_files
296+ else None ,
244297 )
245298 if latest_dataset :
246299 latest_dataset .latest = False
0 commit comments