Skip to content

Commit 08325b7

Browse files
anna-grimanna-grim
andauthored
feat: read gcs swcs (#107)
Co-authored-by: anna-grim <[email protected]>
1 parent ad4e267 commit 08325b7

File tree

2 files changed

+59
-21
lines changed

2 files changed

+59
-21
lines changed

src/segmentation_skeleton_metrics/utils/swc_util.py

Lines changed: 53 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,9 @@ def read_from_paths(self, paths):
185185
# Store results
186186
swc_dicts = deque()
187187
for thread in as_completed(threads):
188-
swc_dicts.append(thread.result())
188+
result = thread.result()
189+
if result is not None:
190+
swc_dicts.append(result)
189191
pbar.update(1)
190192
return swc_dicts
191193

@@ -258,7 +260,9 @@ def read_from_zip(self, zip_path):
258260
# Store results
259261
swc_dicts = deque()
260262
for thread in as_completed(threads):
261-
swc_dicts.append(thread.result())
263+
result = thread.result()
264+
if result is not None:
265+
swc_dicts.append(result)
262266
return swc_dicts
263267

264268
def read_from_zipped_file(self, zipfile, path):
@@ -281,7 +285,7 @@ def read_from_zipped_file(self, zipfile, path):
281285
"""
282286
content = util.read_zip(zipfile, path).splitlines()
283287
filename = os.path.basename(path)
284-
return self.parse(content, filename)
288+
return self.parse(content, filename) if len(content) > 40 else None
285289

286290
def read_from_gcs(self, gcs_dict):
287291
"""
@@ -301,32 +305,60 @@ def read_from_gcs(self, gcs_dict):
301305
302306
"""
303307
# List filenames
304-
bucket = storage.Client().bucket(gcs_dict["bucket_name"])
305-
swc_paths = util.list_gcs_filenames(bucket, gcs_dict["path"], ".swc")
306-
zip_paths = util.list_gcs_filenames(bucket, gcs_dict["path"], ".zip")
308+
309+
swc_paths = util.list_gcs_filenames(gcs_dict, ".swc")
310+
zip_paths = util.list_gcs_filenames(gcs_dict, ".zip")
307311

308312
# Call reader
309313
if len(swc_paths) > 0:
310-
return self.read_from_gcs_swcs(bucket, swc_paths)
314+
return self.read_from_gcs_swcs(gcs_dict["bucket_name"], swc_paths)
311315
if len(zip_paths) > 0:
312-
return self.read_from_gcs_zips(bucket, zip_paths)
316+
return self.read_from_gcs_zips(gcs_dict["bucket_name"], zip_paths)
313317

314318
# Error
315319
raise Exception(f"GCS Pointer is invalid -{gcs_dict}-")
316320

317-
def read_from_gcs_swcs(self, bucket, swc_paths):
318-
pass
321+
def read_from_gcs_swcs(self, bucket_name, swc_paths):
322+
pbar = tqdm(total=len(swc_paths), desc="Read SWCs")
323+
with ThreadPoolExecutor() as executor:
324+
# Assign threads
325+
threads = list()
326+
for path in swc_paths:
327+
threads.append(
328+
executor.submit(self.read_from_gcs_swc, bucket_name, path)
329+
)
330+
break
319331

320-
def read_from_gcs_zips(self, bucket, zip_paths):
321-
# Main
332+
# Store results
333+
swc_dicts = deque()
334+
for thread in as_completed(threads):
335+
result = thread.result()
336+
if result is not None:
337+
swc_dicts.append(result)
338+
pbar.update(1)
339+
return swc_dicts
340+
341+
def read_from_gcs_swc(self, bucket_name, path):
342+
# Initialize cloud reader
343+
client = storage.Client()
344+
bucket = client.bucket(bucket_name)
345+
blob = bucket.blob(path)
346+
347+
# Parse swc contents
348+
content = blob.download_as_text().splitlines()
349+
filename = os.path.basename(path)
350+
return self.parse(content, filename)
351+
352+
def read_from_gcs_zips(self, bucket_name, zip_paths):
322353
pbar = tqdm(total=len(zip_paths), desc="Read SWCs")
323354
with ProcessPoolExecutor() as executor:
324355
# Assign processes
325356
processes = list()
326357
for path in zip_paths:
327-
zip_content = bucket.blob(path).download_as_bytes()
328358
processes.append(
329-
executor.submit(self.read_from_gcs_zip, zip_content)
359+
executor.submit(
360+
self.read_from_gcs_zip, bucket_name, path
361+
)
330362
)
331363

332364
# Store results
@@ -336,7 +368,7 @@ def read_from_gcs_zips(self, bucket, zip_paths):
336368
pbar.update(1)
337369
return swc_dicts
338370

339-
def read_from_gcs_zip(self, zip_content):
371+
def read_from_gcs_zip(self, bucket_name, path):
340372
"""
341373
Reads SWC files stored in a ZIP archive downloaded from a GCS
342374
bucket.
@@ -354,6 +386,12 @@ def read_from_gcs_zip(self, zip_content):
354386
355387
356388
"""
389+
# Initialize cloud reader
390+
client = storage.Client()
391+
bucket = client.bucket(bucket_name)
392+
393+
# Parse Zip
394+
zip_content = bucket.blob(path).download_as_bytes()
357395
with ZipFile(BytesIO(zip_content)) as zip_file:
358396
with ThreadPoolExecutor() as executor:
359397
# Assign threads

src/segmentation_skeleton_metrics/utils/util.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
"""
1212

1313
from random import sample
14+
from google.cloud import storage
1415
from xlwt import Workbook
1516

1617
import os
@@ -187,16 +188,14 @@ def list_files_in_zip(zip_content):
187188
return zip_file.namelist()
188189

189190

190-
def list_gcs_filenames(bucket, prefix, extension):
191+
def list_gcs_filenames(gcs_dict, extension):
191192
"""
192193
Lists all files in a GCS bucket with the given extension.
193194
194195
Parameters
195196
----------
196-
bucket : google.cloud.client
197-
Name of bucket to be read from.
198-
prefix : str
199-
Path to directory in "bucket".
197+
gcs_dict : dict
198+
...
200199
extension : str
201200
File extension of filenames to be listed.
202201
@@ -206,7 +205,8 @@ def list_gcs_filenames(bucket, prefix, extension):
206205
Filenames stored at "cloud" path with the given extension.
207206
208207
"""
209-
blobs = bucket.list_blobs(prefix=prefix)
208+
bucket = storage.Client().bucket(gcs_dict["bucket_name"])
209+
blobs = bucket.list_blobs(prefix=gcs_dict["path"])
210210
return [blob.name for blob in blobs if extension in blob.name]
211211

212212

0 commit comments

Comments
 (0)