diff --git a/CHANGELOG.md b/CHANGELOG.md index 8b2320a0..b9fa618b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,7 +9,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Changed -- File handles are now managed by a pool to centrally limit the number of open files (#161) +- Vectorized file writing to improve performance when writing many small chunks (#156) +- File handles are now managed by a pool to centrally limit the number of open files (#161) ### Removed diff --git a/benchmarks/benchmark.py b/benchmarks/benchmark.py index 1d0b3831..3fce1964 100644 --- a/benchmarks/benchmark.py +++ b/benchmarks/benchmark.py @@ -1,7 +1,7 @@ # /// script # requires-python = ">=3.10" # dependencies = [ -# "acquire-zarr>=0.5.2", +# "acquire-zarr>=0.2.4", # "zarr", # "rich", # "tensorstore", @@ -203,7 +203,6 @@ def compare( print("\nRunning acquire-zarr test:") az_path = "acquire_zarr_test.zarr" - print("I'm saving to ", Path(az_path).absolute()) # Pre-generate the data (timing excluded) data = CyclicArray( diff --git a/src/streaming/file.sink.cpp b/src/streaming/file.sink.cpp index 20fbeec3..030aba2c 100644 --- a/src/streaming/file.sink.cpp +++ b/src/streaming/file.sink.cpp @@ -1,27 +1,57 @@ #include "file.sink.hh" #include "macros.hh" +#include "zarr.common.hh" #include +size_t +get_page_size(); + +size_t +get_sector_size(const std::string&); + +size_t +align_to_system_size(size_t, size_t, size_t); + void* -make_flags(); +make_flags(bool); void -destroy_flags(void*); +destroy_flags(void* flags); bool seek_and_write(void* handle, size_t offset, ConstByteSpan data); +bool +write_vectors(void*, + size_t, + size_t, + size_t, + const std::vector>&); + bool flush_file(void* handle); +namespace { +// only use vectorized writes if >= 8 threads +constexpr size_t VECTORIZE_THRESHOLD = 8; +const bool CAN_WRITE_VECTORIZED = + std::thread::hardware_concurrency() > VECTORIZE_THRESHOLD; +} // namespace + zarr::FileSink::FileSink(std::string_view filename, std::shared_ptr file_handle_pool) : file_handle_pool_(file_handle_pool) , filename_(filename) - , flags_(make_flags()) + , flags_(make_flags(CAN_WRITE_VECTORIZED)) + , vectorized_(CAN_WRITE_VECTORIZED) + , page_size_(0) + , sector_size_(0) { EXPECT(file_handle_pool_ != nullptr, "File handle pool not provided."); + + page_size_ = get_page_size(); + sector_size_ = get_sector_size(filename_); } zarr::FileSink::~FileSink() @@ -37,14 +67,23 @@ zarr::FileSink::write(size_t offset, ConstByteSpan data) return true; } - auto handle = file_handle_pool_->get_handle(filename_, flags_); - if (handle == nullptr) { - LOG_ERROR("Failed to get file handle for ", filename_); - return false; - } + std::unique_ptr handle; bool retval = false; try { + if (vectorized_) { + destroy_flags(flags_); + flags_ = nullptr; + flags_ = make_flags(false); + vectorized_ = false; + } + + handle = file_handle_pool_->get_handle(filename_, flags_); + if (handle == nullptr) { + LOG_ERROR("Failed to get file handle for ", filename_); + return false; + } + retval = seek_and_write(handle->get(), offset, data); } catch (const std::exception& exc) { LOG_ERROR("Failed to write to file ", filename_, ": ", exc.what()); @@ -55,6 +94,66 @@ zarr::FileSink::write(size_t offset, ConstByteSpan data) return retval; } +bool +zarr::FileSink::write(size_t offset, + const std::vector>& buffers) +{ + if (buffers.empty()) { + return true; + } + + // fallback to non-vectorized (consolidated) write if not supported + if (!CAN_WRITE_VECTORIZED) { + size_t consolidated_size = 0; + for (const auto& buffer : buffers) { + consolidated_size += buffer.size(); + } + std::vector consolidated(consolidated_size, 0); + + consolidated_size = 0; + for (const auto& buffer : buffers) { + std::ranges::copy(buffer, consolidated.data() + consolidated_size); + consolidated_size += buffer.size(); + } + + return write(offset, consolidated); + } + + bool retval = false; + std::unique_ptr handle; + + try { + if (!vectorized_) { + destroy_flags(flags_); + flags_ = nullptr; + flags_ = make_flags(true); + vectorized_ = true; + } + + handle = file_handle_pool_->get_handle(filename_, flags_); + if (handle == nullptr) { + LOG_ERROR("Failed to get file handle for ", filename_); + return false; + } + + offset = align_to_system_size(offset); + retval = write_vectors( + handle->get(), offset, page_size_, sector_size_, buffers); + } catch (const std::exception& exc) { + LOG_ERROR("Failed to write to file ", filename_, ": ", exc.what()); + } + + file_handle_pool_->return_handle(std::move(handle)); + + return retval; +} + +size_t +zarr::FileSink::align_to_system_size(size_t size) +{ + return ::align_to_system_size(size, page_size_, sector_size_); +} + bool zarr::FileSink::flush_() { diff --git a/src/streaming/file.sink.hh b/src/streaming/file.sink.hh index 49b06f38..bb7053bc 100644 --- a/src/streaming/file.sink.hh +++ b/src/streaming/file.sink.hh @@ -3,7 +3,6 @@ #include "file.handle.hh" #include "sink.hh" -#include #include namespace zarr { @@ -15,14 +14,21 @@ class FileSink : public Sink ~FileSink() override; bool write(size_t offset, ConstByteSpan data) override; + bool write(size_t offset, + const std::vector>& buffers) override; + size_t align_to_system_size(size_t size) override; protected: bool flush_() override; private: std::shared_ptr file_handle_pool_; + std::string filename_; // keep a copy of the filename for reopening - std::string filename_; - void* flags_; + void* flags_; // platform-specific flags for opening the file + void* handle_; // platform-specific file handle + bool vectorized_; // whether to use vectorized writes + size_t page_size_; // cached system page size + size_t sector_size_; // cached system sector size }; } // namespace zarr diff --git a/src/streaming/posix/platform.cpp b/src/streaming/posix/platform.cpp index cfe9bca5..063e79d5 100644 --- a/src/streaming/posix/platform.cpp +++ b/src/streaming/posix/platform.cpp @@ -12,11 +12,32 @@ std::string get_last_error_as_string() { - return strerror(errno); + if (auto* err = strerror(errno); err != nullptr) { + return std::string(err); + } + return ""; +} + +size_t +get_page_size() +{ + return sysconf(_SC_PAGESIZE); +} + +size_t +get_sector_size(const std::string& /*path*/) +{ + return 0; // no additional alignment needed on POSIX +} + +size_t +align_to_system_size(const size_t size, const size_t, const size_t) +{ + return size; // no additional alignment needed on POSIX } void* -make_flags() +make_flags(bool /* vectorized */) { auto* flags = new int; *flags = O_WRONLY | O_CREAT; @@ -46,12 +67,12 @@ init_handle(const std::string& filename, void* flags) { auto* fd = new int; - *fd = open(filename.data(), *static_cast(flags), 0644); + *fd = open(filename.c_str(), *static_cast(flags), 0644); if (*fd < 0) { const auto err = get_last_error_as_string(); delete fd; - throw std::runtime_error("Failed to open file: '" + - std::string(filename) + "': " + err); + throw std::runtime_error("Failed to open file: '" + filename + + "': " + err); } return fd; } @@ -105,4 +126,54 @@ destroy_handle(void* handle) } delete fd; } +} + +void +reopen_handle(void*, const std::string&, bool) +{ + // no-op for POSIX implementation, as the same flags are used for sequential + // or vectorized writes +} + +bool +write_vectors(void* handle, + size_t offset, + size_t /* page_size */, + size_t /* sector_size */, + const std::vector>& buffers) +{ + CHECK(handle); + const auto* fd = static_cast(handle); + + std::vector iovecs(buffers.size()); + + for (auto i = 0; i < buffers.size(); ++i) { + auto* iov = &iovecs[i]; + memset(iov, 0, sizeof(iovec)); + iov->iov_base = + const_cast(static_cast(buffers[i].data())); + iov->iov_len = buffers[i].size(); + } + + ssize_t total_bytes = 0; + for (const auto& buffer : buffers) { + total_bytes += static_cast(buffer.size()); + } + + const ssize_t bytes_written = pwritev(*fd, + iovecs.data(), + static_cast(iovecs.size()), + static_cast(offset)); + + if (bytes_written != total_bytes) { + LOG_ERROR("Failed to write file: ", + get_last_error_as_string(), + ". Expected bytes written: ", + total_bytes, + ", actual: ", + bytes_written); + return false; + } + + return true; } \ No newline at end of file diff --git a/src/streaming/s3.sink.cpp b/src/streaming/s3.sink.cpp index adf7a0f9..c5fcb17e 100644 --- a/src/streaming/s3.sink.cpp +++ b/src/streaming/s3.sink.cpp @@ -89,6 +89,31 @@ zarr::S3Sink::write(size_t offset, ConstByteSpan data) return true; } +bool +zarr::S3Sink::write(size_t offset, + const std::vector>& buffers) +{ + if (buffers.empty()) { + return true; + } + + for (const auto& buffer : buffers) { + if (!write(offset, buffer)) { + return false; + } + offset += buffer.size(); + } + + return true; +} + +size_t +zarr::S3Sink::align_to_system_size(size_t size) +{ + // S3 does not require alignment + return size; +} + bool zarr::S3Sink::put_object_() { diff --git a/src/streaming/s3.sink.hh b/src/streaming/s3.sink.hh index 0edeb51e..59d5f672 100644 --- a/src/streaming/s3.sink.hh +++ b/src/streaming/s3.sink.hh @@ -16,6 +16,9 @@ class S3Sink : public Sink std::shared_ptr connection_pool); bool write(size_t offset, ConstByteSpan data) override; + bool write(size_t offset, + const std::vector>& buffers) override; + size_t align_to_system_size(size_t size) override; protected: bool flush_() override; diff --git a/src/streaming/sink.hh b/src/streaming/sink.hh index 3c514d19..81663a4f 100644 --- a/src/streaming/sink.hh +++ b/src/streaming/sink.hh @@ -19,11 +19,30 @@ class Sink * @brief Write data to the sink. * @param offset The offset in the sink to write to. * @param data The buffer to write to the sink. - * @param bytes_of_buf The number of bytes to write from @p buf. - * @return True if the write was successful, false otherwise. + * @return True if the data was written successfully, false otherwise. */ [[nodiscard]] virtual bool write(size_t offset, ConstByteSpan data) = 0; + /** + * @brief Write multiple buffers to the sink. + * @param[in, out] offset The offset in the sink to write to. May be aligned + * to the system sector size, in which case the aligned offset is written + * back to this parameter. + * @param buffers The buffers to write to the sink. + * @return True if the data was written successfully, false otherwise. + */ + [[nodiscard]] virtual bool write( + size_t offset, + const std::vector>& buffers) = 0; + + /** + * @brief Align a size to the system sector size (or don't align if not + * appropriate.) + * @param size The size to align. + * @return The aligned size. + */ + virtual size_t align_to_system_size(size_t size) = 0; + protected: /** * @brief Flush any buffered data to the sink. diff --git a/src/streaming/win32/platform.cpp b/src/streaming/win32/platform.cpp index 1d033727..811053d4 100644 --- a/src/streaming/win32/platform.cpp +++ b/src/streaming/win32/platform.cpp @@ -4,28 +4,29 @@ #include #include +#include std::string get_last_error_as_string() { - auto error_message_id = ::GetLastError(); + const DWORD error_message_id = ::GetLastError(); if (error_message_id == 0) { - return std::string(); // No error message has been recorded + return ""; // No error message has been recorded } LPSTR message_buffer = nullptr; - const auto format = FORMAT_MESSAGE_ALLOCATE_BUFFER | - FORMAT_MESSAGE_FROM_SYSTEM | - FORMAT_MESSAGE_IGNORE_INSERTS; - const auto lang_id = MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT); - size_t size = FormatMessageA(format, - nullptr, - error_message_id, - lang_id, - reinterpret_cast(&message_buffer), - 0, - nullptr); + constexpr auto format = FORMAT_MESSAGE_ALLOCATE_BUFFER | + FORMAT_MESSAGE_FROM_SYSTEM | + FORMAT_MESSAGE_IGNORE_INSERTS; + constexpr auto lang_id = MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT); + const size_t size = FormatMessageA(format, + nullptr, + error_message_id, + lang_id, + reinterpret_cast(&message_buffer), + 0, + nullptr); std::string message(message_buffer, size); @@ -34,11 +35,60 @@ get_last_error_as_string() return message; } +size_t +get_page_size() +{ + SYSTEM_INFO si; + GetSystemInfo(&si); + EXPECT(si.dwPageSize > 0, "Could not get system page size"); + + return si.dwPageSize; +} + +size_t +get_sector_size(const std::string& path) +{ + // get volume root path + char volume_path[MAX_PATH]; + EXPECT(GetVolumePathNameA(path.c_str(), volume_path, MAX_PATH), + "Failed to get volume name for path '", + path, + "'"); + + DWORD sectors_per_cluster; + DWORD bytes_per_sector; + DWORD number_of_free_clusters; + DWORD total_number_of_clusters; + + EXPECT(GetDiskFreeSpaceA(volume_path, + §ors_per_cluster, + &bytes_per_sector, + &number_of_free_clusters, + &total_number_of_clusters), + "Failed to get disk free space for volume: " + + std::string(volume_path)); + + EXPECT(bytes_per_sector > 0, "Could not get sector size"); + + return bytes_per_sector; +} + +size_t +align_to_system_size(const size_t size, + const size_t page_size, + const size_t sector_size) +{ + return zarr::align_to(zarr::align_to(size, page_size), sector_size); +} + void* -make_flags() +make_flags(bool vectorized) { auto* flags = new DWORD; *flags = FILE_FLAG_OVERLAPPED; + if (vectorized) { + *flags |= FILE_FLAG_NO_BUFFERING | FILE_FLAG_SEQUENTIAL_SCAN; + } return flags; } @@ -68,10 +118,10 @@ init_handle(const std::string& filename, void* flags) nullptr); if (*fd == INVALID_HANDLE_VALUE) { - const auto err = get_last_error_as_string(); + const std::string err = get_last_error_as_string(); delete fd; - throw std::runtime_error("Failed to open file: '" + - std::string(filename) + "': " + err); + throw std::runtime_error("Failed to open file: '" + filename + + "': " + err); } return fd; } @@ -89,7 +139,7 @@ seek_and_write(void* handle, size_t offset, ConstByteSpan data) OVERLAPPED overlapped = { 0 }; overlapped.hEvent = CreateEventA(nullptr, TRUE, FALSE, nullptr); - constexpr auto max_retries = 3; + constexpr size_t max_retries = 3; while (cur < end && retries < max_retries) { DWORD written = 0; const auto remaining = static_cast(end - cur); // may truncate @@ -121,7 +171,8 @@ bool flush_file(void* handle) { CHECK(handle); - if (const auto* fd = static_cast(handle); *fd != INVALID_HANDLE_VALUE) { + if (const auto* fd = static_cast(handle); + *fd != INVALID_HANDLE_VALUE) { return FlushFileBuffers(*fd); } return true; @@ -137,4 +188,97 @@ destroy_handle(void* handle) } delete fd; } +} + +void* +reopen_handle(void* handle, const std::string& filename, void* flags) +{ + destroy_handle(handle); + return init_handle(filename, flags); +} + +bool +write_vectors(void* handle, + size_t offset, + size_t page_size, + size_t sector_size, + const std::vector>& buffers) +{ + EXPECT(handle, "Expected nonnull pointer to file handle."); + const auto* fd = static_cast(handle); + if (fd == nullptr || *fd == INVALID_HANDLE_VALUE) { + throw std::runtime_error("Expected valid file handle"); + } + + size_t total_bytes_to_write = 0; + for (const auto& buffer : buffers) { + total_bytes_to_write += buffer.size(); + } + + const size_t offset_aligned = zarr::align_to(offset, get_page_size()); + if (offset_aligned != offset) { + LOG_ERROR("Aligned offset is not equalt to offset: ", + offset_aligned, + " != ", + offset); + return false; + } + + const size_t nbytes_aligned = + align_to_system_size(total_bytes_to_write, page_size, sector_size); + if (nbytes_aligned < total_bytes_to_write) { + LOG_ERROR("Aligned size is less than total bytes to write: ", + nbytes_aligned, + " < ", + total_bytes_to_write); + return false; + } + + auto* aligned_ptr = + static_cast(_aligned_malloc(nbytes_aligned, get_page_size())); + if (!aligned_ptr) { + return false; + } + + auto* cur = aligned_ptr; + for (const auto& buffer : buffers) { + std::ranges::copy(buffer, cur); + cur += buffer.size(); + } + + std::vector segments(nbytes_aligned / + get_page_size()); + + cur = aligned_ptr; + for (auto& segment : segments) { + memset(&segment, 0, sizeof(segment)); + segment.Buffer = PtrToPtr64(cur); + cur += get_page_size(); + } + + OVERLAPPED overlapped = { 0 }; + overlapped.Offset = static_cast(offset & 0xFFFFFFFF); + overlapped.OffsetHigh = static_cast(offset >> 32); + overlapped.hEvent = CreateEvent(nullptr, TRUE, FALSE, nullptr); + + DWORD bytes_written; + + if (!WriteFileGather( + *fd, segments.data(), nbytes_aligned, nullptr, &overlapped)) { + if (GetLastError() != ERROR_IO_PENDING) { + LOG_ERROR("Failed to write file: ", get_last_error_as_string()); + return false; + } + + // Wait for the operation to complete + if (!GetOverlappedResult(*fd, &overlapped, &bytes_written, TRUE)) { + LOG_ERROR("Failed to get overlapped result: ", + get_last_error_as_string()); + return false; + } + } + + _aligned_free(aligned_ptr); + + return true; } \ No newline at end of file diff --git a/src/streaming/zarr.common.cpp b/src/streaming/zarr.common.cpp index 52b7bb49..cefbd91d 100644 --- a/src/streaming/zarr.common.cpp +++ b/src/streaming/zarr.common.cpp @@ -160,3 +160,12 @@ zarr::regularize_key(const std::string_view key) return regularized_key; } + + +size_t +zarr::align_to(const size_t size, const size_t align) +{ + EXPECT(align > 0 && (align & (align - 1)) == 0, + "Alignment must be a power of two."); + return (size + align - 1) & ~(align - 1); +} diff --git a/src/streaming/zarr.common.hh b/src/streaming/zarr.common.hh index 3f7b605e..01d58bb4 100644 --- a/src/streaming/zarr.common.hh +++ b/src/streaming/zarr.common.hh @@ -105,4 +105,7 @@ regularize_key(const char* key); */ std::string regularize_key(std::string_view key); + +size_t +align_to(size_t size, size_t align); } // namespace zarr \ No newline at end of file diff --git a/tests/integration/CMakeLists.txt b/tests/integration/CMakeLists.txt index 5b50264b..58b83f6b 100644 --- a/tests/integration/CMakeLists.txt +++ b/tests/integration/CMakeLists.txt @@ -16,6 +16,7 @@ set(tests stream-pure-hcs-acquisition stream-mixed-flat-and-hcs-acquisition stream-with-ragged-final-shard + get-timing-info ) foreach (name ${tests}) @@ -48,5 +49,7 @@ foreach (name ${tests}) list(APPEND test_labels "s3") endif () - set_tests_properties(test-${tgt} PROPERTIES LABELS "${test_labels}") + if (NOT name MATCHES ".*timing.*") + set_tests_properties(test-${tgt} PROPERTIES LABELS "${test_labels}") + endif () endforeach () \ No newline at end of file diff --git a/tests/integration/get-timing-info.cpp b/tests/integration/get-timing-info.cpp new file mode 100644 index 00000000..85753ffb --- /dev/null +++ b/tests/integration/get-timing-info.cpp @@ -0,0 +1,185 @@ +#include "acquire.zarr.h" +#include "test.macros.hh" + +#include + +#include +#include +#include +#include +#include + +namespace fs = std::filesystem; + +namespace { +constexpr uint32_t frame_size = 2048; +const std::vector +frame_data(frame_size* frame_size, 0); + +const std::vector chunk_sizes{ 32, 64, 128, 256 }; +const std::vector chunks_per_shard{ 8, 16, 32, 64 }; +const std::vector layers_per_shard{ 1, 2, 4, 8, 16 }; + +} // namespace + +ZarrStream* +make_stream(uint32_t chunk_size, + uint32_t n_chunks_per_shard, + uint32_t n_layers_per_shard) +{ + ZarrStreamSettings settings{ .store_path = TEST ".zarr", + .version = ZarrVersion_3, + .overwrite = true }; + + EXPECT(ZarrStreamSettings_create_arrays(&settings, 1) == + ZarrStatusCode_Success, + "Failed to create array settings"); + EXPECT(ZarrArraySettings_create_dimension_array(settings.arrays, 5) == + ZarrStatusCode_Success, + "Failed to create dimension array"); + + settings.arrays->data_type = ZarrDataType_uint8; + + settings.arrays[0].dimensions[0] = + DIM("t", ZarrDimensionType_Time, 0, 1, n_layers_per_shard, nullptr, 1.0); + settings.arrays[0].dimensions[1] = + DIM("c", ZarrDimensionType_Channel, 1, 1, 1, nullptr, 1.0); + settings.arrays[0].dimensions[2] = DIM("z", + ZarrDimensionType_Space, + chunk_sizes.back(), + chunk_size, + n_chunks_per_shard, + "millimeter", + 1.0); + settings.arrays[0].dimensions[3] = DIM("y", + ZarrDimensionType_Space, + frame_size, + chunk_size, + n_chunks_per_shard, + "micrometer", + 1.0); + settings.arrays[0].dimensions[4] = DIM("x", + ZarrDimensionType_Space, + frame_size, + chunk_size, + n_chunks_per_shard, + "micrometer", + 1.0); + + auto* stream = ZarrStream_create(&settings); + + // cleanup + ZarrStreamSettings_destroy_arrays(&settings); + + return stream; +} + +int +main() +{ + int retval = 1; + nlohmann::json results_arr = nlohmann::json::array(); + ZarrStream* stream = nullptr; + + try { + for (auto& layers : layers_per_shard) { + for (auto& cps : chunks_per_shard) { + for (auto& chunk_size : chunk_sizes) { + const size_t chunks_xyz = + (frame_size + chunk_size - 1) / chunk_size; + if (cps > chunks_xyz) { + continue; + } + + const auto n_chunks = chunks_xyz * chunks_xyz * chunks_xyz; + const auto n_frames = chunk_size * cps * layers; + + nlohmann::json j; + j["chunk_size"] = chunk_size; + j["chunks_per_shard"] = cps; + j["layers_per_shard"] = layers; + j["n_chunks"] = n_chunks; + j["frames_written"] = n_frames; + + std::cout + << "Testing chunk size " << chunk_size + << ", chunks per shard " << cps << ", layers per shard " + << layers << ", chunk count " << n_chunks << " (" + << n_frames << " frames)... " << std::flush << std::endl; + + stream = make_stream(chunk_size, cps, layers); + EXPECT(stream != nullptr, "Failed to create stream"); + + auto start = std::chrono::high_resolution_clock::now(); + + for (auto i = 0; i < n_frames; ++i) { + size_t bytes_written = 0; + ZarrStatusCode status = + ZarrStream_append(stream, + frame_data.data(), + frame_data.size(), + &bytes_written, + nullptr); + EXPECT(status == ZarrStatusCode_Success, + "Failed to append frame ", + i, + ", status code ", + int(status)); + EXPECT(bytes_written == frame_data.size(), + "Expected to write ", + frame_data.size(), + " bytes, but wrote ", + bytes_written); + std::cout << "." << std::flush; + } + auto end_append = std::chrono::high_resolution_clock::now(); + std::chrono::duration elapsed_append = + end_append - start; + + std::cout << "\nFinalizing... " << std::flush; + ZarrStream_destroy(stream); + stream = nullptr; + std::cout << "done." << std::endl; + + auto end_destroy = + std::chrono::high_resolution_clock::now(); + std::chrono::duration elapsed_destroy = + end_destroy - start; + + const double fps = n_frames / elapsed_append.count(); + + j["elapsed_time_append"] = elapsed_append.count(); + j["elapsed_time_destroy"] = elapsed_destroy.count(); + + std::cout + << "Wrote " << n_frames << " frames in " + << elapsed_append.count() << " seconds (" << fps + << " fps); time to destroy: " << elapsed_destroy.count() + << " seconds" << std::endl; + + results_arr.push_back(j); + } + } + } + + retval = 0; + } catch (const std::exception& err) { + LOG_ERROR("Failed: ", err.what()); + } + + // write out results to file + std::ofstream results_file(TEST "-timing-results.json"); + results_file << results_arr.dump(2) << "\n"; + results_file.close(); + + // cleanup + if (stream != nullptr) { + ZarrStream_destroy(stream); + } + + if (fs::exists(TEST ".zarr")) { + fs::remove_all(TEST ".zarr"); + } + + return retval; +} diff --git a/tests/integration/stream-with-ragged-final-shard.cpp b/tests/integration/stream-with-ragged-final-shard.cpp index 0c65ab74..e9e8a76c 100644 --- a/tests/integration/stream-with-ragged-final-shard.cpp +++ b/tests/integration/stream-with-ragged-final-shard.cpp @@ -5,8 +5,65 @@ #include #include +#ifdef _WIN32 +#include +#endif + namespace fs = std::filesystem; +namespace { +size_t +align_to(size_t size, size_t align) +{ + if (align == 0) { + return size; + } + return (size + align - 1) & ~(align - 1); +} + +size_t +align_to_system_size(size_t size, const std::string& path) +{ +#ifdef _WIN32 + + // get page size + SYSTEM_INFO si; + GetSystemInfo(&si); + EXPECT(si.dwPageSize > 0, "Could not get system page size"); + size_t page_size = si.dwPageSize; + + // get sector size + char volume_path[MAX_PATH]; + EXPECT(GetVolumePathNameA(path.c_str(), volume_path, MAX_PATH), + "Failed to get volume name for path '", + path, + "'"); + + DWORD sectors_per_cluster; + DWORD bytes_per_sector; + DWORD number_of_free_clusters; + DWORD total_number_of_clusters; + + EXPECT(GetDiskFreeSpaceA(volume_path, + §ors_per_cluster, + &bytes_per_sector, + &number_of_free_clusters, + &total_number_of_clusters), + "Failed to get disk free space for volume: " + + std::string(volume_path)); + + EXPECT(bytes_per_sector > 0, "Could not get sector size"); + + size_t sector_size = bytes_per_sector; + + return align_to(align_to(size, page_size), sector_size); + +#else + return size; // no additional alignment needed on POSIX +#endif +} +} + int main() { @@ -100,8 +157,9 @@ main() "Expected shard file does not exist: ", first_shard.string()); - constexpr auto expected_full_shard_size = - 16 * expected_chunk_size + table_size; + const size_t expected_full_shard_size = + 16 * align_to_system_size(expected_chunk_size, first_shard.string()) + + table_size; EXPECT(fs::file_size(first_shard) == expected_full_shard_size, "Expected ", first_shard.string(), @@ -115,8 +173,9 @@ main() "Expected shard file does not exist: ", last_shard.string()); - constexpr auto expected_partial_shard_size = - expected_chunk_size + table_size; + const size_t expected_partial_shard_size = + align_to_system_size(expected_chunk_size, last_shard.string()) + + table_size; EXPECT(fs::file_size(last_shard) == expected_partial_shard_size, "Expected ", last_shard.string(), diff --git a/tests/unit-tests/array-write-even.cpp b/tests/unit-tests/array-write-even.cpp index f60cc442..2b1bebaf 100644 --- a/tests/unit-tests/array-write-even.cpp +++ b/tests/unit-tests/array-write-even.cpp @@ -5,6 +5,7 @@ #include #include +#include namespace fs = std::filesystem; diff --git a/tests/unit-tests/array-write-ragged-append-dim.cpp b/tests/unit-tests/array-write-ragged-append-dim.cpp index f6d1d8f4..45172092 100644 --- a/tests/unit-tests/array-write-ragged-append-dim.cpp +++ b/tests/unit-tests/array-write-ragged-append-dim.cpp @@ -5,6 +5,7 @@ #include #include +#include namespace fs = std::filesystem; diff --git a/tests/unit-tests/array-write-ragged-internal-dim.cpp b/tests/unit-tests/array-write-ragged-internal-dim.cpp index 9c9d52f7..a2f73079 100644 --- a/tests/unit-tests/array-write-ragged-internal-dim.cpp +++ b/tests/unit-tests/array-write-ragged-internal-dim.cpp @@ -5,6 +5,7 @@ #include #include +#include namespace fs = std::filesystem; diff --git a/tests/unit-tests/file-sink-write.cpp b/tests/unit-tests/file-sink-write.cpp index a06d75ca..c8049792 100644 --- a/tests/unit-tests/file-sink-write.cpp +++ b/tests/unit-tests/file-sink-write.cpp @@ -3,7 +3,6 @@ #include #include -#include namespace fs = std::filesystem;