Skip to content

Commit 9e37f39

Browse files
anna-grimanna-grim
andauthored
Feat s3 loading (#174)
* refactor: improved txt reader * remove print * feat: load swcs from s3 --------- Co-authored-by: anna-grim <anna.grim@alleninstitute.org>
1 parent f8fa4d9 commit 9e37f39

File tree

4 files changed

+118
-3
lines changed

4 files changed

+118
-3
lines changed

src/segmentation_skeleton_metrics/data_handling/graph_loading.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -588,7 +588,7 @@ def build_labels_graph(self, connections_path):
588588
labels_graph.add_nodes_from(self.valid_labels)
589589

590590
# Main
591-
for line in util.read_txt(connections_path):
591+
for line in util.read_txt(connections_path).splitlines():
592592
ids = line.split(",")
593593
id_1 = util.get_segment_id(ids[0])
594594
id_2 = util.get_segment_id(ids[1])

src/segmentation_skeleton_metrics/data_handling/swc_loading.py

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
these attributes in the same order.
1919
"""
2020

21+
from botocore import UNSIGNED
22+
from botocore.client import Config
2123
from collections import deque
2224
from concurrent.futures import (
2325
as_completed,
@@ -29,6 +31,7 @@
2931
from tqdm import tqdm
3032
from zipfile import ZipFile
3133

34+
import boto3
3235
import numpy as np
3336
import os
3437

@@ -433,6 +436,33 @@ def read_from_s3(self, s3_path):
433436
"""
434437
Reads and parses SWC files from an S3 directory.
435438
439+
Parameters
440+
----------
441+
s3_path : str
442+
Path to a directory in an S3 bucket containing SWC files or ZIPs
443+
of SWC files to be read.
444+
445+
Returns
446+
-------
447+
swc_dicts : Dequeue[dict]
448+
Dictionaries whose keys and values are the attribute names and
449+
values from an SWC file.
450+
"""
451+
# List filenames
452+
bucket_name, prefix = util.parse_cloud_path(s3_path)
453+
swc_paths = util.list_s3_filenames(bucket_name, prefix, ".swc")
454+
zip_paths = util.list_s3_filenames(bucket_name, prefix, ".zip")
455+
456+
# Call reader
457+
if len(swc_paths) > 0:
458+
return self.read_from_s3_swcs(bucket_name, swc_paths)
459+
if len(zip_paths) > 0:
460+
return self.read_from_s3_zips(bucket_name, zip_paths)
461+
462+
def read_from_s3_swcs(self, bucket_name, swc_paths):
463+
"""
464+
Reads and parses SWC files from an S3 directory.
465+
436466
Parameters
437467
----------
438468
s3_path : str
@@ -459,6 +489,86 @@ def read_from_s3(self, s3_path):
459489
swc_dicts.append(result)
460490
return swc_dicts
461491

492+
def read_from_s3_zips(self, bucket_name, zip_paths):
493+
"""
494+
Reads SWC files stored in a list of ZIP archives stored in an S3
495+
bucket.
496+
497+
Parameters
498+
----------
499+
bucket_name : str
500+
Name of bucket containing SWC files.
501+
zip_paths : str
502+
Path to ZIP archive containing SWC files to be read.
503+
504+
Returns
505+
-------
506+
swc_dicts : Dequeue[dict]
507+
Dictionaries whose keys and values are the attribute names and
508+
values from an SWC file.
509+
"""
510+
with ProcessPoolExecutor() as executor:
511+
# Submit processes
512+
processes = list()
513+
for zip_path in zip_paths:
514+
processes.append(
515+
executor.submit(
516+
self.read_from_s3_zip, bucket_name, zip_path
517+
)
518+
)
519+
520+
# Store results
521+
pbar = tqdm(total=len(processes), desc="Read SWCs")
522+
swc_dicts = deque()
523+
for process in as_completed(processes):
524+
result = process.result()
525+
if result:
526+
swc_dicts.extend(result)
527+
return swc_dicts
528+
529+
def read_from_s3_zip(self, bucket_name, path):
530+
"""
531+
Reads SWC files stored in a ZIP archive downloaded from an S3
532+
bucket.
533+
534+
Parameters
535+
----------
536+
bucket_name : str
537+
Name of bucket containing SWC files.
538+
path : str
539+
Path to ZIP archive containing SWC files to be read.
540+
541+
Returns
542+
-------
543+
swc_dicts : Dequeue[dict]
544+
Dictionaries whose keys and values are the attribute names and
545+
values from an SWC file.
546+
"""
547+
# Initialize cloud reader
548+
s3 = boto3.client("s3", config=Config(signature_version=UNSIGNED))
549+
zip_obj = s3.get_object(Bucket=bucket_name, Key=path)
550+
zip_content = zip_obj["Body"].read()
551+
552+
# Parse ZIP
553+
swc_dicts = deque()
554+
with ZipFile(BytesIO(zip_content), "r") as zip_file:
555+
with ThreadPoolExecutor() as executor:
556+
# Assign threads for reading files
557+
threads = [
558+
executor.submit(
559+
self.read_from_zipped_file, zip_file, filename
560+
)
561+
for filename in zip_file.namelist()
562+
if self.confirm_read(filename)
563+
]
564+
565+
# Collect results
566+
for thread in as_completed(threads):
567+
result = thread.result()
568+
if result:
569+
swc_dicts.append(result)
570+
return swc_dicts
571+
462572
def confirm_read(self, filename):
463573
"""
464574
Checks whether the swc_id corresponding to the given filename is

src/segmentation_skeleton_metrics/skeleton_metrics.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -922,7 +922,12 @@ def __call__(self, gt_graphs, fragment_graphs, merge_sites):
922922
DataFrame where the indices are the dictionary keys and values are
923923
stored under a column called "self.name".
924924
"""
925-
pbar = self.get_pbar(len(merge_sites.index))
925+
# Check if merge sites is non-empty
926+
if len(merge_sites) == 0:
927+
return _
928+
929+
# Compute metric
930+
pbar = self.get_pbar(len(merge_sites))
926931
pair_to_length = dict()
927932
for i in merge_sites.index:
928933
# Extract site info

src/segmentation_skeleton_metrics/utils/util.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -398,7 +398,7 @@ def is_s3_path(path):
398398
return path.startswith("s3://")
399399

400400

401-
def list_s3_paths(bucket_name, prefix, extension=""):
401+
def list_s3_filenames(bucket_name, prefix, extension=""):
402402
"""
403403
Lists all object keys in a public S3 bucket under a given prefix,
404404
optionally filters by file extension.

0 commit comments

Comments
 (0)