@@ -185,7 +185,9 @@ def read_from_paths(self, paths):
185185 # Store results
186186 swc_dicts = deque ()
187187 for thread in as_completed (threads ):
188- swc_dicts .append (thread .result ())
188+ result = thread .result ()
189+ if result is not None :
190+ swc_dicts .append (result )
189191 pbar .update (1 )
190192 return swc_dicts
191193
@@ -258,7 +260,9 @@ def read_from_zip(self, zip_path):
258260 # Store results
259261 swc_dicts = deque ()
260262 for thread in as_completed (threads ):
261- swc_dicts .append (thread .result ())
263+ result = thread .result ()
264+ if result is not None :
265+ swc_dicts .append (result )
262266 return swc_dicts
263267
264268 def read_from_zipped_file (self , zipfile , path ):
@@ -281,7 +285,7 @@ def read_from_zipped_file(self, zipfile, path):
281285 """
282286 content = util .read_zip (zipfile , path ).splitlines ()
283287 filename = os .path .basename (path )
284- return self .parse (content , filename )
288+ return self .parse (content , filename ) if len ( content ) > 40 else None
285289
286290 def read_from_gcs (self , gcs_dict ):
287291 """
@@ -301,32 +305,60 @@ def read_from_gcs(self, gcs_dict):
301305
302306 """
303307 # List filenames
304- bucket = storage . Client (). bucket ( gcs_dict [ "bucket_name" ])
305- swc_paths = util .list_gcs_filenames (bucket , gcs_dict [ "path" ] , ".swc" )
306- zip_paths = util .list_gcs_filenames (bucket , gcs_dict [ "path" ] , ".zip" )
308+
309+ swc_paths = util .list_gcs_filenames (gcs_dict , ".swc" )
310+ zip_paths = util .list_gcs_filenames (gcs_dict , ".zip" )
307311
308312 # Call reader
309313 if len (swc_paths ) > 0 :
310- return self .read_from_gcs_swcs (bucket , swc_paths )
314+ return self .read_from_gcs_swcs (gcs_dict [ "bucket_name" ] , swc_paths )
311315 if len (zip_paths ) > 0 :
312- return self .read_from_gcs_zips (bucket , zip_paths )
316+ return self .read_from_gcs_zips (gcs_dict [ "bucket_name" ] , zip_paths )
313317
314318 # Error
315319 raise Exception (f"GCS Pointer is invalid -{ gcs_dict } -" )
316320
317- def read_from_gcs_swcs (self , bucket , swc_paths ):
318- pass
321+ def read_from_gcs_swcs (self , bucket_name , swc_paths ):
322+ pbar = tqdm (total = len (swc_paths ), desc = "Read SWCs" )
323+ with ThreadPoolExecutor () as executor :
324+ # Assign threads
325+ threads = list ()
326+ for path in swc_paths :
327+ threads .append (
328+ executor .submit (self .read_from_gcs_swc , bucket_name , path )
329+ )
330+ break
319331
320- def read_from_gcs_zips (self , bucket , zip_paths ):
321- # Main
332+ # Store results
333+ swc_dicts = deque ()
334+ for thread in as_completed (threads ):
335+ result = thread .result ()
336+ if result is not None :
337+ swc_dicts .append (result )
338+ pbar .update (1 )
339+ return swc_dicts
340+
341+ def read_from_gcs_swc (self , bucket_name , path ):
342+ # Initialize cloud reader
343+ client = storage .Client ()
344+ bucket = client .bucket (bucket_name )
345+ blob = bucket .blob (path )
346+
347+ # Parse swc contents
348+ content = blob .download_as_text ().splitlines ()
349+ filename = os .path .basename (path )
350+ return self .parse (content , filename )
351+
352+ def read_from_gcs_zips (self , bucket_name , zip_paths ):
322353 pbar = tqdm (total = len (zip_paths ), desc = "Read SWCs" )
323354 with ProcessPoolExecutor () as executor :
324355 # Assign processes
325356 processes = list ()
326357 for path in zip_paths :
327- zip_content = bucket .blob (path ).download_as_bytes ()
328358 processes .append (
329- executor .submit (self .read_from_gcs_zip , zip_content )
359+ executor .submit (
360+ self .read_from_gcs_zip , bucket_name , path
361+ )
330362 )
331363
332364 # Store results
@@ -336,7 +368,7 @@ def read_from_gcs_zips(self, bucket, zip_paths):
336368 pbar .update (1 )
337369 return swc_dicts
338370
339- def read_from_gcs_zip (self , zip_content ):
371+ def read_from_gcs_zip (self , bucket_name , path ):
340372 """
341373 Reads SWC files stored in a ZIP archive downloaded from a GCS
342374 bucket.
@@ -354,6 +386,12 @@ def read_from_gcs_zip(self, zip_content):
354386
355387
356388 """
389+ # Initialize cloud reader
390+ client = storage .Client ()
391+ bucket = client .bucket (bucket_name )
392+
393+ # Parse Zip
394+ zip_content = bucket .blob (path ).download_as_bytes ()
357395 with ZipFile (BytesIO (zip_content )) as zip_file :
358396 with ThreadPoolExecutor () as executor :
359397 # Assign threads
0 commit comments