Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
30c4e10
Remove V2 tests
aliddell Oct 8, 2025
b2d95a7
Remove ZarrVersion and references to V2 in tests/benchmarks/examples
aliddell Oct 9, 2025
cda2d13
Remove V2Array and V2MultiscaleArray
aliddell Oct 9, 2025
217ce98
V3Array -> Array
aliddell Oct 9, 2025
d6159f3
V3MultiscaleArray -> MultiscaleArray
aliddell Oct 9, 2025
9481f08
Update changelog
aliddell Oct 9, 2025
ae6d911
wip
aliddell Oct 12, 2025
4d462a2
Reinstate ZarrVersion, but remove V2
aliddell Oct 14, 2025
94798e2
Check only V3 is passed into constructor/setter
aliddell Oct 14, 2025
e26da44
Test that setting version to 2 raises a RuntimeError
aliddell Oct 14, 2025
e74d079
Test that setting version to anything but 3 fails to validate (in C++)
aliddell Oct 14, 2025
b4fc900
Default to `ZarrVersion_3` when `ZarrStreamSettings.version == 0`
aliddell Oct 14, 2025
e2caa4b
Merge branch 'remove-zarr-v2' into kill-sink
aliddell Oct 14, 2025
67d4fef
Merge branch 'main' into kill-sink
aliddell Oct 14, 2025
077f0ec
Break out compression and updating chunk table into their own methods
aliddell Oct 14, 2025
7928c07
Precompute frames before flush
aliddell Oct 14, 2025
5b5749c
(wip): builds but fails
aliddell Oct 16, 2025
096c463
Merge remote-tracking branch 'upstream/main' into kill-sink
aliddell Oct 16, 2025
064b450
(wip): tests ok (slow)
aliddell Oct 16, 2025
5531bd2
#include <cstring> where appropriate
aliddell Oct 16, 2025
2f92a0c
kill -9 Sink
aliddell Oct 16, 2025
97fa8b4
Remove LockedBuffer
aliddell Oct 16, 2025
2c89501
Add overwrite flag to Python benchmark
aliddell Oct 16, 2025
3dee8c6
Be consistent in vector/span defs
aliddell Oct 16, 2025
fd7c9a8
Set env a different way in S3 Python tests
aliddell Oct 16, 2025
e4a07fc
Merge branch 'main' into kill-sink
aliddell Oct 16, 2025
840283d
Merge remote-tracking branch 'upstream/main' into kill-sink
aliddell Oct 17, 2025
839a0f9
(wip) yet another Array refactor
aliddell Oct 17, 2025
812dc7d
Merge remote-tracking branch 'upstream/main' into kill-sink
aliddell Oct 17, 2025
ae18135
(wip) reinstate try-catch in array-write-even.cpp
aliddell Oct 17, 2025
e33e60d
(wip) don't block FSArray::write until closing the shard (1/n)
aliddell Oct 21, 2025
e6df041
(wip)
aliddell Oct 23, 2025
51832c3
(wip) again
aliddell Oct 23, 2025
2e89474
(wip)
aliddell Oct 24, 2025
9990b6a
(wip) remove unused FSStorage::write_binary
aliddell Oct 27, 2025
21803ae
Log a more useful error message in zarr-stream-partial-append.cpp
aliddell Oct 27, 2025
2239435
Write FSArray shard tables in one go (don't miss unwritten chunks)
aliddell Oct 27, 2025
fdd348e
Fix S3Array table flushing
aliddell Oct 29, 2025
f904073
Move `ShardLayer` to `S3Array`
aliddell Oct 30, 2025
0f89a9c
Rename `Array::finalize_io_streams_` to `Array::finalize_append_shard_`
aliddell Oct 30, 2025
15efd33
(wip): allow thread creation from other threads
aliddell Oct 30, 2025
aac1154
(wip): collect shard file data in structs, wait to close shard files …
aliddell Oct 30, 2025
60f2b5b
(wip): update stream-raw-to-filesystem to match benchmark.py config
aliddell Oct 30, 2025
c818bd0
(wip): sprinkle some more omp fairydust
aliddell Oct 30, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,12 @@ jobs:
run: python -m pip install ".[testing]"

- name: Test Python
run: python -m pytest -v -k test_stream_data_to_s3
env:
ZARR_S3_ENDPOINT: ${{ env.MINIO_URL }}
ZARR_S3_BUCKET_NAME: ${{ env.MINIO_BUCKET }}
AWS_ACCESS_KEY_ID: ${{ env.MINIO_ACCESS_KEY }}
AWS_SECRET_ACCESS_KEY: ${{ env.MINIO_SECRET_KEY }}
run: python -m pytest -s -k test_stream_data_to_s3


test-python:
Expand Down
13 changes: 11 additions & 2 deletions benchmarks/benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ def run_acquire_zarr_test(
data_type=aqz.DataType.UINT16,
)
],
overwrite=True,
)

# Create a ZarrStream for appending frames.
Expand All @@ -156,15 +157,23 @@ def run_acquire_zarr_test(
elapsed_times = []

total_start = time.perf_counter_ns()
chunk = np.empty((tchunk_size, 2048, 2048), dtype=np.uint16)
for i in range(data.shape[0]):
start_plane = time.perf_counter_ns()
stream.append(data[i])
chunk_idx = i % tchunk_size
chunk[chunk_idx] = data[i]
if chunk_idx == tchunk_size - 1:
stream.append(chunk)
elapsed = time.perf_counter_ns() - start_plane
elapsed_times.append(elapsed)
print(f"Acquire-zarr: Plane {i} written in {elapsed / 1e6:.3f} ms")

# Close (or flush) the stream to finalize writes.
del stream
start_close = time.perf_counter_ns()
stream.close()
elapsed = time.perf_counter_ns() - start_close
elapsed_times.append(elapsed)
print(f"Acquire-zarr: Final close took {elapsed / 1e6:.3f} ms")
total_elapsed = time.perf_counter_ns() - total_start
tot_ms = total_elapsed / 1e6
print(f"Acquire-zarr: Total write time: {tot_ms:.3f} ms")
Expand Down
4 changes: 2 additions & 2 deletions python/acquire-zarr-py.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -888,7 +888,7 @@ class PyZarrStreamSettings
settings_.overwrite = static_cast<int>(overwrite_);

if (s3_settings_) {
*(settings_.s3_settings) = *(s3_settings_->settings());
settings_.s3_settings = s3_settings_->settings();
}

// construct array lifetime props and set up arrays
Expand Down Expand Up @@ -1087,7 +1087,7 @@ class PyZarrStream
}

auto buf = contiguous_data.request();
auto* ptr = (uint8_t*)buf.ptr;
auto* ptr = static_cast<uint8_t*>(buf.ptr);

py::gil_scoped_release release;

Expand Down
50 changes: 27 additions & 23 deletions python/tests/test_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,6 @@ def test_stream_data_to_filesystem(
data[i, :, :] = i

stream.append(data)

stream.close() # close the stream, flush the files

chunk_size_bytes = data.dtype.itemsize
Expand All @@ -394,40 +393,40 @@ def test_stream_data_to_filesystem(
shard_size_bytes + table_size_bytes + 4
) # 4 bytes for crc32c checksum

for x in range(settings.arrays[0].dimensions[-1].array_size_px):
for y in range(settings.arrays[0].dimensions[-2].array_size_px):
for z in range(settings.arrays[0].dimensions[-3].array_size_px):
shard_file = store_path / "test.zarr" / "0" / "c" / str(z) / str(y) / str(x)
assert shard_file.is_file()
if compression_codec is None:
assert shard_file.stat().st_size == shard_size_bytes
else:
size = shard_file.stat().st_size
assert table_size_bytes < size <= shard_size_bytes

group = zarr.open(settings.store_path, mode="r")
array = group["0"]

assert array.shape == data.shape
for i in range(array.shape[0]):
assert np.array_equal(array[i, :, :], data[i, :, :])
assert np.array_equal(array[i, :, :], data[i, :, :]), f"Data mismatch at index {i}"

metadata = array.metadata
sharding_codec = metadata.codecs[0]
if compression_codec is not None:
cname = (
zblosc.BloscCname.lz4
if compression_codec == CompressionCodec.BLOSC_LZ4
else zblosc.BloscCname.zstd
)
blosc_codec = metadata.codecs[0].codecs[1]

assert len(sharding_codec.codecs) == 2
blosc_codec = sharding_codec.codecs[1]
assert blosc_codec.cname == cname
assert blosc_codec.clevel == 1
assert blosc_codec.shuffle == zblosc.BloscShuffle.shuffle

assert (
store_path / "test.zarr" / "0" / "c" / "0" / "0" / "0"
).is_file()
assert (
store_path / "test.zarr" / "0" / "c" / "0" / "0" / "0"
).stat().st_size <= shard_size_bytes
else:
assert len(metadata.codecs[0].codecs) == 1

assert (
store_path / "test.zarr" / "0" / "c" / "0" / "0" / "0"
).is_file()
assert (
store_path / "test.zarr" / "0" / "c" / "0" / "0" / "0"
).stat().st_size == shard_size_bytes
assert len(sharding_codec.codecs) == 1 # bytes codec


@pytest.mark.parametrize(
Expand Down Expand Up @@ -456,12 +455,12 @@ def test_stream_data_to_s3(
pytest.skip("S3 settings not set")

settings.store_path = f"{request.node.name}.zarr".replace("[", "").replace(
"]", ""
"]", "_"
)
settings.s3 = s3_settings
settings.data_type = np.uint16
settings.arrays[0].data_type = np.uint16
if compression_codec is not None:
settings.compression = CompressionSettings(
settings.arrays[0].compression = CompressionSettings(
compressor=Compressor.BLOSC1,
codec=compression_codec,
level=1,
Expand Down Expand Up @@ -501,18 +500,23 @@ def test_stream_data_to_s3(
assert np.array_equal(array[i, :, :], data[i, :, :])

metadata = array.metadata
assert len(metadata.codecs) == 1 # sharding codec
sharding_codec = metadata.codecs[0]

if compression_codec is not None:
cname = (
zblosc.BloscCname.lz4
if compression_codec == CompressionCodec.BLOSC_LZ4
else zblosc.BloscCname.zstd
)
blosc_codec = metadata.codecs[0].codecs[1]
assert len(sharding_codec.codecs) == 2

blosc_codec = sharding_codec.codecs[1]
assert blosc_codec.cname == cname
assert blosc_codec.clevel == 1
assert blosc_codec.shuffle == zblosc.BloscShuffle.shuffle
else:
assert len(metadata.codecs[0].codecs) == 1
assert len(sharding_codec.codecs) == 1 # bytes codec

# cleanup
s3 = s3fs.S3FileSystem(
Expand Down
22 changes: 14 additions & 8 deletions src/streaming/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@ add_library(${tgt}
acquire.zarr.cpp
array.dimensions.hh
array.dimensions.cpp
locked.buffer.hh
locked.buffer.cpp
frame.queue.hh
frame.queue.cpp
downsampler.hh
Expand All @@ -29,19 +27,27 @@ add_library(${tgt}
s3.connection.cpp
file.handle.hh
file.handle.cpp
sink.hh
sink.cpp
file.sink.hh
file.sink.cpp
${PLATFORM_CPP}
s3.sink.hh
s3.sink.cpp
s3.object.hh
s3.object.cpp
array.base.hh
array.base.cpp
array.hh
array.cpp
multiscale.array.hh
multiscale.array.cpp
fs.storage.hh
fs.storage.cpp
fs.array.hh
fs.array.cpp
fs.multiscale.array.hh
fs.multiscale.array.cpp
s3.storage.hh
s3.storage.cpp
s3.array.hh
s3.array.cpp
s3.multiscale.array.hh
s3.multiscale.array.cpp
plate.hh
plate.cpp
$<TARGET_OBJECTS:acquire-logger-obj>
Expand Down
1 change: 1 addition & 0 deletions src/streaming/acquire.zarr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include "zarr.stream.hh"

#include <bit> // bit_ceil
#include <cstring> // memcpy
#include <cstdint> // uint32_t
#include <unordered_set>
#include <vector>
Expand Down
96 changes: 1 addition & 95 deletions src/streaming/array.base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,12 @@
#include "multiscale.array.hh"

zarr::ArrayBase::ArrayBase(std::shared_ptr<ArrayConfig> config,
std::shared_ptr<ThreadPool> thread_pool,
std::shared_ptr<FileHandlePool> file_handle_pool,
std::shared_ptr<S3ConnectionPool> s3_connection_pool)
std::shared_ptr<ThreadPool> thread_pool)
: config_(config)
, thread_pool_(thread_pool)
, s3_connection_pool_(s3_connection_pool)
, file_handle_pool_(file_handle_pool)
{
CHECK(config_); // required
CHECK(thread_pool_); // required
EXPECT(s3_connection_pool_ != nullptr || file_handle_pool_ != nullptr,
"Either S3 connection pool or file handle pool must be provided.");
}

std::string
Expand All @@ -31,94 +25,6 @@ zarr::ArrayBase::node_path_() const
return key;
}

bool
zarr::ArrayBase::make_metadata_sinks_()
{
metadata_sinks_.clear();

try {
const auto sink_keys = metadata_keys_();
for (const auto& key : sink_keys) {
const std::string path = node_path_() + "/" + key;
std::unique_ptr<Sink> sink =
config_->bucket_name
? make_s3_sink(*config_->bucket_name, path, s3_connection_pool_)
: make_file_sink(path, file_handle_pool_);

if (sink == nullptr) {
LOG_ERROR("Failed to create metadata sink for ", key);
return false;
}
metadata_sinks_.emplace(key, std::move(sink));
}
} catch (const std::exception& exc) {
LOG_ERROR("Failed to create metadata sinks: ", exc.what());
return false;
}

return true;
}

bool
zarr::ArrayBase::write_metadata_()
{
if (!make_metadata_()) {
LOG_ERROR("Failed to make metadata.");
return false;
}

if (!make_metadata_sinks_()) {
LOG_ERROR("Failed to make metadata sinks.");
return false;
}

for (const auto& [key, metadata] : metadata_strings_) {
const auto it = metadata_sinks_.find(key);
if (it == metadata_sinks_.end()) {
LOG_ERROR("Metadata sink not found for key: ", key);
return false;
}

auto& sink = it->second;
if (!sink) {
LOG_ERROR("Metadata sink is null for key: ", key);
return false;
}

std::span data{ reinterpret_cast<const uint8_t*>(metadata.data()),
metadata.size() };
if (!sink->write(0, data)) {
LOG_ERROR("Failed to write metadata for key: ", key);
return false;
}
}

return true;
}

std::unique_ptr<zarr::ArrayBase>
zarr::make_array(std::shared_ptr<ArrayConfig> config,
std::shared_ptr<ThreadPool> thread_pool,
std::shared_ptr<FileHandlePool> file_handle_pool,
std::shared_ptr<S3ConnectionPool> s3_connection_pool)
{
// create a multiscale array at the dataset root (node_key is empty) or if
// we have a genuine multiscale dataset
const auto multiscale =
config->node_key.empty() || config->downsampling_method.has_value();

std::unique_ptr<ArrayBase> array;
if (multiscale) {
array = std::make_unique<MultiscaleArray>(
config, thread_pool, file_handle_pool, s3_connection_pool);
} else {
array = std::make_unique<Array>(
config, thread_pool, file_handle_pool, s3_connection_pool);
}

return array;
}

bool
zarr::finalize_array(std::unique_ptr<ArrayBase>&& array)
{
Expand Down
Loading
Loading