|
28 | 28 | ProcessPoolExecutor, |
29 | 29 | ThreadPoolExecutor, |
30 | 30 | ) |
| 31 | +from google.cloud import storage |
31 | 32 | from io import StringIO |
32 | 33 | from tqdm import tqdm |
33 | 34 | from zipfile import ZipFile |
@@ -95,6 +96,10 @@ def read(self, swc_pointer): |
95 | 96 | - "swc_id": name of SWC file, minus the ".swc". |
96 | 97 |
|
97 | 98 | """ |
| 99 | + # Dictionary with GCS specs |
| 100 | + if isinstance(swc_pointer, dict): |
| 101 | + return self.read_from_gcs(swc_pointer) |
| 102 | + |
98 | 103 | # List of paths to SWC files |
99 | 104 | if isinstance(swc_pointer, list): |
100 | 105 | return self.read_from_paths(swc_pointer) |
@@ -278,6 +283,97 @@ def read_from_zipped_file(self, zipfile, path): |
278 | 283 | filename = os.path.basename(path) |
279 | 284 | return self.parse(content, filename) |
280 | 285 |
|
| 286 | + def read_from_gcs(self, gcs_dict): |
| 287 | + """ |
| 288 | + Reads SWC files from ZIP archives stored in a GCS bucket. |
| 289 | +
|
| 290 | + Parameters |
| 291 | + ---------- |
| 292 | + gcs_dict : dict |
| 293 | + Dictionary with the keys "bucket_name" and "path" that specify |
| 294 | + where the ZIP archives are located in a GCS bucket. |
| 295 | +
|
| 296 | + Returns |
| 297 | + ------- |
| 298 | + Dequeue[dict] |
| 299 | + List of dictionaries whose keys and values are the attribute |
| 300 | + names and values from an SWC file. |
| 301 | +
|
| 302 | + """ |
| 303 | + # List filenames |
| 304 | + bucket = storage.Client().bucket(gcs_dict["bucket_name"]) |
| 305 | + swc_paths = util.list_gcs_filenames(bucket, gcs_dict["path"], ".swc") |
| 306 | + zip_paths = util.list_gcs_filenames(bucket, gcs_dict["path"], ".zip") |
| 307 | + |
| 308 | + # Call reader |
| 309 | + if len(swc_paths) > 0: |
| 310 | + return self.read_from_gcs_swcs(bucket, swc_paths) |
| 311 | + if len(zip_paths) > 0: |
| 312 | + return self.read_from_gcs_zips(bucket, zip_paths) |
| 313 | + |
| 314 | + # Error |
| 315 | + raise Exception(f"GCS Pointer is invalid -{gcs_dict}-") |
| 316 | + |
| 317 | + def read_from_gcs_swcs(self, bucket, swc_paths): |
| 318 | + pass |
| 319 | + |
| 320 | + def read_from_gcs_zips(self, bucket, zip_paths): |
| 321 | + # Main |
| 322 | + pbar = tqdm(total=len(zip_paths), desc="Read SWCs") |
| 323 | + with ProcessPoolExecutor() as executor: |
| 324 | + # Assign processes |
| 325 | + processes = list() |
| 326 | + for path in zip_paths: |
| 327 | + zip_content = bucket.blob(path).download_as_bytes() |
| 328 | + processes.append( |
| 329 | + executor.submit(self.read_from_gcs_zip, zip_content) |
| 330 | + ) |
| 331 | + |
| 332 | + # Store results |
| 333 | + swc_dicts = deque() |
| 334 | + for process in as_completed(processes): |
| 335 | + swc_dicts.extend(process.result()) |
| 336 | + pbar.update(1) |
| 337 | + return swc_dicts |
| 338 | + |
| 339 | + def read_from_gcs_zip(self, zip_content): |
| 340 | + """ |
| 341 | + Reads SWC files stored in a ZIP archive downloaded from a GCS |
| 342 | + bucket. |
| 343 | +
|
| 344 | + Parameters |
| 345 | + ---------- |
| 346 | + zip_content : bytes |
| 347 | + Content of a ZIP archive. |
| 348 | +
|
| 349 | + Returns |
| 350 | + ------- |
| 351 | + Dequeue[dict] |
| 352 | + List of dictionaries whose keys and values are the attribute |
| 353 | + names and values from an SWC file. |
| 354 | +
|
| 355 | +
|
| 356 | + """ |
| 357 | + with ZipFile(BytesIO(zip_content)) as zip_file: |
| 358 | + with ThreadPoolExecutor() as executor: |
| 359 | + # Assign threads |
| 360 | + threads = list() |
| 361 | + for filename in util.list_files_in_zip(zip_content): |
| 362 | + if self.confirm_read(filename): |
| 363 | + threads.append( |
| 364 | + executor.submit( |
| 365 | + self.read_from_zipped_file, zip_file, filename |
| 366 | + ) |
| 367 | + ) |
| 368 | + |
| 369 | + # Process results |
| 370 | + swc_dicts = deque() |
| 371 | + for thread in as_completed(threads): |
| 372 | + result = thread.result() |
| 373 | + if result: |
| 374 | + swc_dicts.append(result) |
| 375 | + return swc_dicts |
| 376 | + |
281 | 377 | def confirm_read(self, filename): |
282 | 378 | """ |
283 | 379 | Checks whether the swc_id corresponding to the given filename is |
|
0 commit comments