Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
b2638c5
Formatting in zarr.common
aliddell Sep 19, 2025
e8a4262
Don't #include <fstream> in sink.hh
aliddell Sep 19, 2025
9bf74e4
Implement some platform-specific functions
aliddell Sep 19, 2025
ac91c84
Add an align function to zarr.common
aliddell Sep 19, 2025
4da078b
Vector-write chunks
aliddell Sep 19, 2025
ce6ef9f
Move align_to_system to `Sink` (for S3 purposes)
aliddell Sep 22, 2025
bc49326
Fix offset bug in S3 sink
aliddell Sep 22, 2025
7c188d2
Merge remote-tracking branch 'upstream/main' into with-vectorized-wri…
aliddell Sep 24, 2025
363c20a
Add click to benchmark.py; add some parameters, write JSON output
aliddell Sep 24, 2025
2c7334f
Add benchmarking workflow
aliddell Sep 24, 2025
d44cd21
Fix charset error in benchmark script
aliddell Sep 24, 2025
9adec3a
Temporarily build wheels
aliddell Sep 24, 2025
271f408
ugh
aliddell Sep 24, 2025
eaa7e20
No more builds
aliddell Sep 24, 2025
28974a0
Merge branch 'main' into with-vectorized-writing
aliddell Sep 24, 2025
0411e63
Update changelog
aliddell Sep 24, 2025
4e33aac
(wip) get timing
aliddell Sep 24, 2025
3633b79
Implement FileHandle class for Windows and POSIX
aliddell Sep 29, 2025
13b6640
Only allow pushing jobs to the thread pool from the main thread
aliddell Sep 29, 2025
70c8cd2
Committing everything
aliddell Sep 29, 2025
4194b80
Increase timeout in tests for slowpoke Windows
aliddell Sep 29, 2025
6cf4275
Merge remote-tracking branch 'upstream/main' into handle-pool
aliddell Sep 29, 2025
5a38dad
add profiler
aliddell Sep 30, 2025
2ced080
Merge branch 'with-vectorized-writing' into handle-pool-with-vectors
aliddell Sep 30, 2025
126ad72
Merge remote-tracking branch 'upstream/main' into handle-pool-with-ve…
aliddell Oct 3, 2025
42cf4e5
Remove "profile.cpp" from tests
aliddell Oct 3, 2025
9e529fd
Do some aligning in a test
aliddell Oct 6, 2025
c31dd1e
Merge remote-tracking branch 'upstream/main' into with-vectorized-wri…
aliddell Oct 17, 2025
977d1af
Merge remote-tracking branch 'upstream/main' into with-vectorized-wri…
aliddell Oct 17, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Changed

- File handles are now managed by a pool to centrally limit the number of open files (#161)
- Vectorized file writing to improve performance when writing many small chunks (#156)
- File handles are now managed by a pool to centrally limit the number of open files (#161)

### Removed

Expand Down
3 changes: 1 addition & 2 deletions benchmarks/benchmark.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# /// script
# requires-python = ">=3.10"
# dependencies = [
# "acquire-zarr>=0.5.2",
# "acquire-zarr>=0.2.4",
# "zarr",
# "rich",
# "tensorstore",
Expand Down Expand Up @@ -203,7 +203,6 @@ def compare(

print("\nRunning acquire-zarr test:")
az_path = "acquire_zarr_test.zarr"
print("I'm saving to ", Path(az_path).absolute())

# Pre-generate the data (timing excluded)
data = CyclicArray(
Expand Down
115 changes: 107 additions & 8 deletions src/streaming/file.sink.cpp
Original file line number Diff line number Diff line change
@@ -1,27 +1,57 @@
#include "file.sink.hh"
#include "macros.hh"
#include "zarr.common.hh"

#include <string_view>

size_t
get_page_size();

size_t
get_sector_size(const std::string&);

size_t
align_to_system_size(size_t, size_t, size_t);

void*
make_flags();
make_flags(bool);

void
destroy_flags(void*);
destroy_flags(void* flags);

bool
seek_and_write(void* handle, size_t offset, ConstByteSpan data);

bool
write_vectors(void*,
size_t,
size_t,
size_t,
const std::vector<std::vector<uint8_t>>&);

bool
flush_file(void* handle);

namespace {
// only use vectorized writes if >= 8 threads
constexpr size_t VECTORIZE_THRESHOLD = 8;
const bool CAN_WRITE_VECTORIZED =
std::thread::hardware_concurrency() > VECTORIZE_THRESHOLD;
} // namespace

zarr::FileSink::FileSink(std::string_view filename,
std::shared_ptr<FileHandlePool> file_handle_pool)
: file_handle_pool_(file_handle_pool)
, filename_(filename)
, flags_(make_flags())
, flags_(make_flags(CAN_WRITE_VECTORIZED))
, vectorized_(CAN_WRITE_VECTORIZED)
, page_size_(0)
, sector_size_(0)
{
EXPECT(file_handle_pool_ != nullptr, "File handle pool not provided.");

page_size_ = get_page_size();
sector_size_ = get_sector_size(filename_);
}

zarr::FileSink::~FileSink()
Expand All @@ -37,14 +67,23 @@ zarr::FileSink::write(size_t offset, ConstByteSpan data)
return true;
}

auto handle = file_handle_pool_->get_handle(filename_, flags_);
if (handle == nullptr) {
LOG_ERROR("Failed to get file handle for ", filename_);
return false;
}
std::unique_ptr<FileHandle> handle;

bool retval = false;
try {
if (vectorized_) {
destroy_flags(flags_);
flags_ = nullptr;
flags_ = make_flags(false);
vectorized_ = false;
}

handle = file_handle_pool_->get_handle(filename_, flags_);
if (handle == nullptr) {
LOG_ERROR("Failed to get file handle for ", filename_);
return false;
}

retval = seek_and_write(handle->get(), offset, data);
} catch (const std::exception& exc) {
LOG_ERROR("Failed to write to file ", filename_, ": ", exc.what());
Expand All @@ -55,6 +94,66 @@ zarr::FileSink::write(size_t offset, ConstByteSpan data)
return retval;
}

bool
zarr::FileSink::write(size_t offset,
const std::vector<std::vector<uint8_t>>& buffers)
{
if (buffers.empty()) {
return true;
}

// fallback to non-vectorized (consolidated) write if not supported
if (!CAN_WRITE_VECTORIZED) {
size_t consolidated_size = 0;
for (const auto& buffer : buffers) {
consolidated_size += buffer.size();
}
std::vector<uint8_t> consolidated(consolidated_size, 0);

consolidated_size = 0;
for (const auto& buffer : buffers) {
std::ranges::copy(buffer, consolidated.data() + consolidated_size);
consolidated_size += buffer.size();
}

return write(offset, consolidated);
}

bool retval = false;
std::unique_ptr<FileHandle> handle;

try {
if (!vectorized_) {
destroy_flags(flags_);
flags_ = nullptr;
flags_ = make_flags(true);
vectorized_ = true;
}

handle = file_handle_pool_->get_handle(filename_, flags_);
if (handle == nullptr) {
LOG_ERROR("Failed to get file handle for ", filename_);
return false;
}

offset = align_to_system_size(offset);
retval = write_vectors(
handle->get(), offset, page_size_, sector_size_, buffers);
} catch (const std::exception& exc) {
LOG_ERROR("Failed to write to file ", filename_, ": ", exc.what());
}

file_handle_pool_->return_handle(std::move(handle));

return retval;
}

size_t
zarr::FileSink::align_to_system_size(size_t size)
{
return ::align_to_system_size(size, page_size_, sector_size_);
}

bool
zarr::FileSink::flush_()
{
Expand Down
12 changes: 9 additions & 3 deletions src/streaming/file.sink.hh
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
#include "file.handle.hh"
#include "sink.hh"

#include <fstream>
#include <string_view>

namespace zarr {
Expand All @@ -15,14 +14,21 @@ class FileSink : public Sink
~FileSink() override;

bool write(size_t offset, ConstByteSpan data) override;
bool write(size_t offset,
const std::vector<std::vector<uint8_t>>& buffers) override;
size_t align_to_system_size(size_t size) override;

protected:
bool flush_() override;

private:
std::shared_ptr<FileHandlePool> file_handle_pool_;
std::string filename_; // keep a copy of the filename for reopening

std::string filename_;
void* flags_;
void* flags_; // platform-specific flags for opening the file
void* handle_; // platform-specific file handle
bool vectorized_; // whether to use vectorized writes
size_t page_size_; // cached system page size
size_t sector_size_; // cached system sector size
};
} // namespace zarr
81 changes: 76 additions & 5 deletions src/streaming/posix/platform.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,32 @@
std::string
get_last_error_as_string()
{
return strerror(errno);
if (auto* err = strerror(errno); err != nullptr) {
return std::string(err);
}
return "";
}

size_t
get_page_size()
{
return sysconf(_SC_PAGESIZE);
}

size_t
get_sector_size(const std::string& /*path*/)
{
return 0; // no additional alignment needed on POSIX
}

size_t
align_to_system_size(const size_t size, const size_t, const size_t)
{
return size; // no additional alignment needed on POSIX
}

void*
make_flags()
make_flags(bool /* vectorized */)
{
auto* flags = new int;
*flags = O_WRONLY | O_CREAT;
Expand Down Expand Up @@ -46,12 +67,12 @@ init_handle(const std::string& filename, void* flags)
{
auto* fd = new int;

*fd = open(filename.data(), *static_cast<int*>(flags), 0644);
*fd = open(filename.c_str(), *static_cast<int*>(flags), 0644);
if (*fd < 0) {
const auto err = get_last_error_as_string();
delete fd;
throw std::runtime_error("Failed to open file: '" +
std::string(filename) + "': " + err);
throw std::runtime_error("Failed to open file: '" + filename +
"': " + err);
}
return fd;
}
Expand Down Expand Up @@ -105,4 +126,54 @@ destroy_handle(void* handle)
}
delete fd;
}
}

void
reopen_handle(void*, const std::string&, bool)
{
// no-op for POSIX implementation, as the same flags are used for sequential
// or vectorized writes
}

bool
write_vectors(void* handle,
size_t offset,
size_t /* page_size */,
size_t /* sector_size */,
const std::vector<std::vector<uint8_t>>& buffers)
{
CHECK(handle);
const auto* fd = static_cast<int*>(handle);

std::vector<iovec> iovecs(buffers.size());

for (auto i = 0; i < buffers.size(); ++i) {
auto* iov = &iovecs[i];
memset(iov, 0, sizeof(iovec));
iov->iov_base =
const_cast<void*>(static_cast<const void*>(buffers[i].data()));
iov->iov_len = buffers[i].size();
}

ssize_t total_bytes = 0;
for (const auto& buffer : buffers) {
total_bytes += static_cast<ssize_t>(buffer.size());
}

const ssize_t bytes_written = pwritev(*fd,
iovecs.data(),
static_cast<int>(iovecs.size()),
static_cast<off_t>(offset));

if (bytes_written != total_bytes) {
LOG_ERROR("Failed to write file: ",
get_last_error_as_string(),
". Expected bytes written: ",
total_bytes,
", actual: ",
bytes_written);
return false;
}

return true;
}
25 changes: 25 additions & 0 deletions src/streaming/s3.sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,31 @@ zarr::S3Sink::write(size_t offset, ConstByteSpan data)
return true;
}

bool
zarr::S3Sink::write(size_t offset,
const std::vector<std::vector<uint8_t>>& buffers)
{
if (buffers.empty()) {
return true;
}

for (const auto& buffer : buffers) {
if (!write(offset, buffer)) {
return false;
}
offset += buffer.size();
}

return true;
}

size_t
zarr::S3Sink::align_to_system_size(size_t size)
{
// S3 does not require alignment
return size;
}

bool
zarr::S3Sink::put_object_()
{
Expand Down
3 changes: 3 additions & 0 deletions src/streaming/s3.sink.hh
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ class S3Sink : public Sink
std::shared_ptr<S3ConnectionPool> connection_pool);

bool write(size_t offset, ConstByteSpan data) override;
bool write(size_t offset,
const std::vector<std::vector<uint8_t>>& buffers) override;
size_t align_to_system_size(size_t size) override;

protected:
bool flush_() override;
Expand Down
23 changes: 21 additions & 2 deletions src/streaming/sink.hh
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,30 @@ class Sink
* @brief Write data to the sink.
* @param offset The offset in the sink to write to.
* @param data The buffer to write to the sink.
* @param bytes_of_buf The number of bytes to write from @p buf.
* @return True if the write was successful, false otherwise.
* @return True if the data was written successfully, false otherwise.
*/
[[nodiscard]] virtual bool write(size_t offset, ConstByteSpan data) = 0;

/**
* @brief Write multiple buffers to the sink.
* @param[in, out] offset The offset in the sink to write to. May be aligned
* to the system sector size, in which case the aligned offset is written
* back to this parameter.
* @param buffers The buffers to write to the sink.
* @return True if the data was written successfully, false otherwise.
*/
[[nodiscard]] virtual bool write(
size_t offset,
const std::vector<std::vector<uint8_t>>& buffers) = 0;

/**
* @brief Align a size to the system sector size (or don't align if not
* appropriate.)
* @param size The size to align.
* @return The aligned size.
*/
virtual size_t align_to_system_size(size_t size) = 0;

protected:
/**
* @brief Flush any buffered data to the sink.
Expand Down
Loading
Loading