Skip to content

Commit 49ec4e8

Browse files
Add support for extracting data from zarr v3
1 parent e3f24f5 commit 49ec4e8

File tree

2 files changed

+80
-11
lines changed

2 files changed

+80
-11
lines changed

flamingo_tools/s3_utils.py

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -115,14 +115,23 @@ def get_s3_path(
115115
bucket_name, service_endpoint, credential_file
116116
)
117117

118-
s3_filesystem = create_s3_target(url=service_endpoint, anon=False, credential_file=credential_file)
118+
zarr_major_version = int(zarr.__version__.split(".")[0])
119+
s3_filesystem = create_s3_target(
120+
url=service_endpoint, anon=False, credential_file=credential_file, asynchronous=zarr_major_version == 3,
121+
)
119122

120123
zarr_path = f"{bucket_name}/{input_path}"
121124

122-
if not s3_filesystem.exists(zarr_path):
125+
if zarr_major_version == 2 and not s3_filesystem.exists(zarr_path):
123126
print(f"Error: S3 path {zarr_path} does not exist!")
124127

125-
s3_path = zarr.storage.FSStore(zarr_path, fs=s3_filesystem)
128+
# The approach for opening a dataset from S3 differs in zarr v2 and zarr v3.
129+
if zarr_major_version == 2:
130+
s3_path = zarr.storage.FSStore(zarr_path, fs=s3_filesystem)
131+
elif zarr_major_version == 3:
132+
s3_path = zarr.storage.FsspecStore(fs=s3_filesystem, path=zarr_path)
133+
else:
134+
raise RuntimeError(f"Unsupported zarr version {zarr_major_version}")
126135

127136
return s3_path, s3_filesystem
128137

@@ -153,6 +162,7 @@ def create_s3_target(
153162
url: Optional[str] = None,
154163
anon: Optional[str] = False,
155164
credential_file: Optional[str] = None,
165+
asynchronous: bool = False,
156166
) -> s3fs.core.S3FileSystem:
157167
"""Create file system for S3 bucket based on a service endpoint and an optional credential file.
158168
If the credential file is not provided, the s3fs.S3FileSystem function checks the environment variables
@@ -162,14 +172,17 @@ def create_s3_target(
162172
url: Service endpoint for S3 bucket
163173
anon: Option for anon argument of S3FileSystem
164174
credential_file: File path to credentials
175+
asynchronous: Whether to open the file system in async mode.
165176
166177
Returns:
167178
s3_filesystem
168179
"""
169180
client_kwargs = {"endpoint_url": SERVICE_ENDPOINT if url is None else url}
170181
if credential_file is not None:
171182
key, secret = read_s3_credentials(credential_file)
172-
s3_filesystem = s3fs.S3FileSystem(key=key, secret=secret, client_kwargs=client_kwargs)
183+
s3_filesystem = s3fs.S3FileSystem(
184+
key=key, secret=secret, client_kwargs=client_kwargs, asynchronous=asynchronous
185+
)
173186
else:
174-
s3_filesystem = s3fs.S3FileSystem(anon=anon, client_kwargs=client_kwargs)
187+
s3_filesystem = s3fs.S3FileSystem(anon=anon, client_kwargs=client_kwargs, asynchronous=asynchronous)
175188
return s3_filesystem

scripts/measurements/synapse_colocalization.py

Lines changed: 62 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
import numpy as np
55
import pandas as pd
66

7-
from flamingo_tools.s3_utils import BUCKET_NAME, create_s3_target
7+
from flamingo_tools.s3_utils import BUCKET_NAME, SERVICE_ENDPOINT, create_s3_target, get_s3_path
88
from flamingo_tools.validation import match_detections
99

1010
COCHLEA = "M_LR_000215_R"
@@ -53,7 +53,7 @@ def _run_colocalization(riba_table, ctbp2_table, max_dist=2.0):
5353
return matches_riba, unmatched_riba, unmatched_ctbp2
5454

5555

56-
def check_and_filter_synapses():
56+
def _get_synapse_tables():
5757
name_ctbp2 = "CTBP2_synapse_v3_ihc_v4b"
5858
name_riba = "RibA_synapse_v3_ihc_v4b"
5959

@@ -62,13 +62,20 @@ def check_and_filter_synapses():
6262
info = json.loads(content.read())
6363
sources = info["sources"]
6464

65-
# TODO load from S3 instead
66-
ihc_labels = pd.read_csv("./ihc_counts/ihc-annotation.tsv", sep="\t")
67-
valid_ihcs = ihc_labels.label_id[ihc_labels.ihc == "is_ihc"].values
65+
ihc_table_path = sources["IHC_v4b"]["segmentation"]["tableData"]["tsv"]["relativePath"]
66+
table_content = s3.open(os.path.join(BUCKET_NAME, COCHLEA, ihc_table_path, "default.tsv"), mode="rb")
67+
ihc_table = pd.read_csv(table_content, sep="\t")
68+
valid_ihcs = ihc_table.label_id[ihc_table.is_ihc == 1].values
6869

6970
riba_table = _load_table(s3, sources[name_riba], valid_ihcs)
7071
ctbp2_table = _load_table(s3, sources[name_ctbp2], valid_ihcs)
7172

73+
return riba_table, ctbp2_table, valid_ihcs
74+
75+
76+
def check_and_filter_synapses():
77+
riba_table, ctbp2_table, valid_ihcs = _get_synapse_tables()
78+
7279
# Save the single synapse marker tables.
7380
_save_ihc_table(riba_table, "RibA")
7481
_save_ihc_table(ctbp2_table, "CTBP2")
@@ -91,8 +98,57 @@ def check_and_filter_synapses():
9198
_save_ihc_table(coloc_table, "coloc")
9299

93100

101+
def check_synapse_predictions():
102+
import napari
103+
import z5py
104+
import zarr
105+
106+
pred_path_ctbp2 = "/mnt/vast-nhr/projects/nim00007/data/moser/cochlea-lightsheet/predictions/M_LR_000215_R/CTBP2_synapses_v3/predictions.zarr" # noqa
107+
# pred_path_riba = "/mnt/vast-nhr/projects/nim00007/data/moser/cochlea-lightsheet/predictions/M_LR_000215_R/RibA_synapses_v3" # noqa
108+
109+
location_mobie = [836.654754589637, 1255.8010489404858, 677.1537312920972]
110+
resolution = 0.38
111+
location = [int(loc / resolution) for loc in location_mobie[::-1]]
112+
113+
halo = (32, 256, 256)
114+
start = np.array([loc - ha for loc, ha in zip(location, halo)])
115+
stop = np.array([loc + ha for loc, ha in zip(location, halo)])
116+
bb = tuple(slice(sta, sto) for sta, sto in zip(start, stop))
117+
118+
print("Loading tables ...")
119+
_, ctbp2_table, _ = _get_synapse_tables()
120+
ids = ctbp2_table.spot_id.values
121+
coords = ctbp2_table[["z", "y", "x"]].values / resolution
122+
mask = np.logical_and(
123+
(coords > start[None, :]).all(axis=1),
124+
(coords < stop[None, :]).all(axis=1),
125+
)
126+
ids = ids[mask]
127+
det_ctbp2 = coords[mask]
128+
det_ctbp2 -= start[None, :]
129+
print("Found", len(det_ctbp2), "detection in the crop")
130+
131+
print("Loading image data ...")
132+
s3_store, fs = get_s3_path(
133+
f"{COCHLEA}/images/ome-zarr/CTBP2.ome.zarr", bucket_name=BUCKET_NAME, service_endpoint=SERVICE_ENDPOINT
134+
)
135+
f = zarr.open(s3_store, mode="r")
136+
ctbp2 = f["s0"][bb]
137+
138+
print("Loading prediction ...")
139+
with z5py.File(pred_path_ctbp2, "r") as f:
140+
pred_ctbp2 = f["prediction"][bb]
141+
142+
v = napari.Viewer()
143+
v.add_image(ctbp2)
144+
v.add_image(pred_ctbp2)
145+
v.add_points(det_ctbp2)
146+
napari.run()
147+
148+
94149
def main():
95-
check_and_filter_synapses()
150+
# check_and_filter_synapses()
151+
check_synapse_predictions()
96152

97153

98154
if __name__ == "__main__":

0 commit comments

Comments
 (0)