2929 ThreadPoolExecutor ,
3030)
3131from google .cloud import storage
32- from io import StringIO
32+ from io import BytesIO , StringIO
3333from tqdm import tqdm
3434from zipfile import ZipFile
3535
@@ -186,7 +186,7 @@ def read_from_paths(self, paths):
186186 swc_dicts = deque ()
187187 for thread in as_completed (threads ):
188188 result = thread .result ()
189- if result is not None :
189+ if result :
190190 swc_dicts .append (result )
191191 pbar .update (1 )
192192 return swc_dicts
@@ -261,7 +261,7 @@ def read_from_zip(self, zip_path):
261261 swc_dicts = deque ()
262262 for thread in as_completed (threads ):
263263 result = thread .result ()
264- if result is not None :
264+ if result :
265265 swc_dicts .append (result )
266266 return swc_dicts
267267
@@ -333,7 +333,7 @@ def read_from_gcs_swcs(self, bucket_name, swc_paths):
333333 swc_dicts = deque ()
334334 for thread in as_completed (threads ):
335335 result = thread .result ()
336- if result is not None :
336+ if result :
337337 swc_dicts .append (result )
338338 pbar .update (1 )
339339 return swc_dicts
@@ -351,21 +351,22 @@ def read_from_gcs_swc(self, bucket_name, path):
351351
352352 def read_from_gcs_zips (self , bucket_name , zip_paths ):
353353 pbar = tqdm (total = len (zip_paths ), desc = "Read SWCs" )
354- with ProcessPoolExecutor () as executor :
354+ swc_dicts = deque ()
355+ with ThreadPoolExecutor () as executor :
355356 # Assign processes
356- processes = list ()
357- for path in zip_paths :
358- processes .append (
357+ threads = list ()
358+ for zip_path in zip_paths :
359+ threads .append (
359360 executor .submit (
360- self .read_from_gcs_zip , bucket_name , path
361+ self .read_from_gcs_zip , bucket_name , zip_path
361362 )
362363 )
363364
364365 # Store results
365- swc_dicts = deque ()
366- for process in as_completed (processes ):
367- swc_dicts .extend (process .result ())
366+ for thread in as_completed (threads ):
367+ swc_dicts .extend (thread .result ())
368368 pbar .update (1 )
369+ print ("# swcs:" , len (swc_dicts ))
369370 return swc_dicts
370371
371372 def read_from_gcs_zip (self , bucket_name , path ):
@@ -391,6 +392,7 @@ def read_from_gcs_zip(self, bucket_name, path):
391392 bucket = client .bucket (bucket_name )
392393
393394 # Parse Zip
395+ swc_dicts = deque ()
394396 zip_content = bucket .blob (path ).download_as_bytes ()
395397 with ZipFile (BytesIO (zip_content )) as zip_file :
396398 with ThreadPoolExecutor () as executor :
@@ -405,7 +407,6 @@ def read_from_gcs_zip(self, bucket_name, path):
405407 )
406408
407409 # Process results
408- swc_dicts = deque ()
409410 for thread in as_completed (threads ):
410411 result = thread .result ()
411412 if result :
0 commit comments