Skip to content

Commit 5103ca7

Browse files
authored
refactor: simplified
1 parent c9e45bd commit 5103ca7

File tree

2 files changed

+157
-34
lines changed

2 files changed

+157
-34
lines changed

src/aind_exaspim_image_compression/utils/img_util.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,19 +10,24 @@
1010

1111
from bm4d import bm4d
1212
from concurrent.futures import ThreadPoolExecutor
13+
from imagecodecs.numcodecs import Jpegxl
1314
from itertools import product
1415
from numcodecs import Blosc, register_codec
1516
from ome_zarr.writer import write_multiscale
1617
from scipy.ndimage import uniform_filter
18+
from typing import Any
1719
from xarray_multiscale import multiscale, windowed_mode
1820

1921
import gcsfs
2022
import matplotlib.pyplot as plt
2123
import numpy as np
24+
import os
2225
import s3fs
2326
import tifffile
2427
import zarr
2528

29+
from aind_exaspim_image_compression.utils import util
30+
2631

2732
# --- Image Reader ---
2833
def read(img_path):
@@ -68,6 +73,7 @@ def _read_zarr(img_path):
6873
np.ndarray
6974
Loaded image volume.
7075
"""
76+
register_codec(Jpegxl)
7177
if _is_gcs_path(img_path):
7278
fs = gcsfs.GCSFileSystem(anon=False)
7379
store = zarr.storage.FSStore(img_path, fs=fs)
@@ -442,7 +448,7 @@ def compress_patch(idx):
442448

443449

444450
def compress_and_decompress_jpeg(
445-
img, codec, patch_shape=(128, 128, 64), max_workers=32
451+
img, codec, patch_shape=(32, 256, 256), max_workers=32
446452
):
447453
# Helper routine
448454
def process_patch(idx):
@@ -491,12 +497,11 @@ def init_ome_zarr(
491497
compressor=Blosc(cname="zstd", clevel=5, shuffle=Blosc.SHUFFLE),
492498
):
493499
# Setup output store
494-
register_codec(compressor)
495500
store = zarr.DirectoryStore(output_path, dimension_separator="/")
496501
zgroup = zarr.group(store=store)
497502

498503
# Create top-level dataset
499-
print("Creating ome-zarr image with shape:", img.shape)
504+
print("Creating OMEZARR Image with Shape:", img.shape)
500505
output_zarr = zgroup.create_dataset(
501506
name=0,
502507
shape=img.shape,
@@ -526,7 +531,6 @@ def write_ome_zarr(
526531
pyramid = [level.data for level in pyramid]
527532

528533
# Prepare Zarr store
529-
register_codec(compressor)
530534
store = zarr.DirectoryStore(output_path, dimension_separator="/")
531535
zgroup = zarr.open(store=store, mode="w")
532536

src/aind_exaspim_image_compression/utils/util.py

Lines changed: 149 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -253,59 +253,52 @@ def copy_gcs_directory(bucket_name, source_prefix, destination_prefix):
253253
bucket.copy_blob(blob, bucket, new_blob_name)
254254

255255

256-
def get_gcs_directory_size(bucket_name, prefix):
256+
def find_subprefix_with_keyword(bucket_name, prefix, keyword):
257257
"""
258-
Calculates the total size of all objects under a given prefix in a GCS
259-
bucket.
258+
Finds the first GCS subprefix under a given prefix that contains a
259+
specified keyword.
260260
261261
Parameters
262262
----------
263263
bucket_name : str
264264
Name of the GCS bucket.
265265
prefix : str
266-
Path prefix within the bucket.
266+
The prefix to search under.
267+
keyword : str
268+
Keyword to look for within the subprefixes.
267269
268270
Returns
269271
-------
270-
float
271-
Total size in gigabytes (GB) of all objects under the given prefix.
272+
str
273+
First subprefix containing the keyword.
272274
"""
273-
# Download blobs
274-
client = storage.Client()
275-
bucket = client.bucket(bucket_name)
276-
blobs = client.list_blobs(bucket, prefix=prefix)
277-
278-
# Compute size of blobs
279-
total_size = 0
280-
for blob in blobs:
281-
total_size += blob.size
282-
283-
return total_size / 1024 ** 3
275+
for subprefix in list_gcs_subprefixes(bucket_name, prefix):
276+
if keyword in subprefix:
277+
return subprefix
278+
raise Exception(f"Prefix with keyword '{keyword}' not found in {prefix}")
284279

285280

286-
def find_subprefix_with_keyword(bucket_name, prefix, keyword):
281+
def get_gcs_directory_size(bucket_name, prefix):
287282
"""
288-
Finds the first GCS subprefix under a given prefix that contains a
289-
specified keyword.
283+
Calculate the total size of a GCS "directory" (i.e., objects under a prefix),
284+
and return it in gigabytes (GB).
290285
291286
Parameters
292287
----------
293288
bucket_name : str
294289
Name of the GCS bucket.
295290
prefix : str
296-
The prefix to search under.
297-
keyword : str
298-
Keyword to look for within the subprefixes.
291+
GCS path prefix (e.g., 'my_folder/' to list everything under that directory).
299292
300293
Returns
301294
-------
302-
str
303-
First subprefix containing the keyword.
295+
float
296+
Total size in gigabytes.
304297
"""
305-
for subprefix in list_gcs_subprefixes(bucket_name, prefix):
306-
if keyword in subprefix:
307-
return subprefix
308-
raise Exception(f"Prefix with keyword '{keyword}' not found in {prefix}")
298+
client = storage.Client()
299+
bucket = client.bucket(bucket_name)
300+
blobs = bucket.list_blobs(prefix=prefix)
301+
return sum(blob.size for blob in blobs) / (1024 ** 3)
309302

310303

311304
def list_block_paths(brain_id):
@@ -427,9 +420,135 @@ def upload_directory_to_gcs(bucket_name, source_dir, destination_dir):
427420

428421

429422
# --- S3 utils ---
423+
def exists_in_prefix(bucket_name, prefix, name):
424+
"""
425+
Checks if a given filename is in a prefix.
426+
427+
Parameters
428+
----------
429+
bucket_name : str
430+
Name of the S3 bucket to search.
431+
prefix : str
432+
S3 prefix to search within.
433+
name : str
434+
Filename to search for.
435+
436+
Returns
437+
-------
438+
bool
439+
Indiciation of whether a given file is in a prefix.
440+
"""
441+
prefixes = list_s3_prefixes(bucket_name, prefix)
442+
return sum([1 for prefix in prefixes if name in prefix]) > 0
443+
444+
445+
def list_s3_prefixes(bucket_name, prefix):
446+
"""
447+
Lists all immediate subdirectories of a given S3 path (prefix).
448+
449+
Parameters
450+
-----------
451+
bucket_name : str
452+
Name of the S3 bucket to search.
453+
prefix : str
454+
S3 prefix to search within.
455+
456+
Returns
457+
-------
458+
List[str]
459+
List of immediate subdirectories under the specified prefix.
460+
"""
461+
# Check prefix is valid
462+
if not prefix.endswith("/"):
463+
prefix += "/"
464+
465+
# Call the list_objects_v2 API
466+
s3 = boto3.client("s3")
467+
response = s3.list_objects_v2(
468+
Bucket=bucket_name, Prefix=prefix, Delimiter="/"
469+
)
470+
if "CommonPrefixes" in response:
471+
return [cp["Prefix"] for cp in response["CommonPrefixes"]]
472+
else:
473+
return list()
474+
475+
476+
def list_s3_bucket_prefixes(bucket_name, keyword=None):
477+
"""
478+
Lists all top-level prefixes (directories) in an S3 bucket, optionally
479+
filtering by a keyword.
480+
481+
Parameters
482+
-----------
483+
bucket_name : str
484+
Name of the S3 bucket to search.
485+
keyword : str, optional
486+
Keyword used to filter the prefixes. Default is None.
487+
488+
Returns
489+
--------
490+
prefixes : List[str]
491+
A list of top-level prefixes (directories) in the S3 bucket. If a
492+
keyword is provided, only the matching prefixes are returned.
493+
"""
494+
# Initializations
495+
prefixes = list()
496+
continuation_token = None
497+
s3 = boto3.client("s3")
498+
499+
# Main
500+
keyword = keyword.lower()
501+
while True:
502+
# Call the list_objects_v2 API
503+
list_kwargs = {"Bucket": bucket_name, "Delimiter": "/"}
504+
if continuation_token:
505+
list_kwargs["ContinuationToken"] = continuation_token
506+
response = s3.list_objects_v2(**list_kwargs)
507+
508+
# Collect the top-level prefixes
509+
if "CommonPrefixes" in response:
510+
for prefix in response["CommonPrefixes"]:
511+
if keyword and keyword in prefix["Prefix"].lower():
512+
prefixes.append(prefix["Prefix"])
513+
elif keyword is None:
514+
prefixes.append(prefix["Prefix"])
515+
516+
# Check if there are more pages to fetch
517+
if response.get("IsTruncated"):
518+
continuation_token = response.get("NextContinuationToken")
519+
else:
520+
break
521+
return prefixes
522+
523+
524+
def is_file_in_prefix(bucket_name, prefix, filename):
525+
"""
526+
Checks if a specific file exists within a given S3 prefix.
527+
528+
Parameters
529+
----------
530+
bucket_name : str
531+
Name of the S3 bucket to searched.
532+
prefix : str
533+
S3 prefix (path) under which to look for the file.
534+
filename : str
535+
Name of the file to search for within the specified prefix.
536+
537+
Returns
538+
-------
539+
bool
540+
Returns "True" if the file exists within the given prefix,
541+
otherwise "False".
542+
"""
543+
for sub_prefix in list_s3_prefixes(bucket_name, prefix):
544+
if filename in sub_prefix:
545+
return True
546+
return False
547+
548+
430549
def write_to_s3(local_path, bucket_name, prefix):
431550
"""
432-
Writes a single file on local machine to an S3 bucket.
551+
Writes a single file on local machine to an s3 bucket.
433552
434553
Parameters
435554
----------

0 commit comments

Comments
 (0)