From 0cfe520a84cfd33e6cb504f54aca38271f696ee5 Mon Sep 17 00:00:00 2001 From: steven varga Date: Mon, 18 May 2026 01:51:21 +0000 Subject: [PATCH 1/5] [#248]:svarga:docs, remove Downloads count badge from README MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The GitHub releases download counter is low-signal for a header-only library — most consumers vendor h5cpp via git submodule or copy the header tree directly, so the counter measures only the subset who pull .deb / .rpm / .pkg / .exe assets from Releases. Recent release-asset issues (v1.12.4 needed manual upload after the Package workflow failed on macOS and Windows) make any short-term number further unreliable. Removed: [![Downloads](https://img.shields.io/github/downloads/vargalabs/h5cpp/total)](https://github.com/vargalabs/h5cpp/releases) The remaining badges (CI, ASan, UBSan, codecov, MIT, DOI, GitHub release, Documentation) carry meaningful signals and stay. --- README.md | 1 - 1 file changed, 1 deletion(-) diff --git a/README.md b/README.md index 10089cf7a1..a7a9ebeb84 100644 --- a/README.md +++ b/README.md @@ -6,7 +6,6 @@ [![DOI](https://zenodo.org/badge/DOI/10.5281/zenodo.20123216.svg)](https://doi.org/10.5281/zenodo.20123216) [![GitHub release](https://img.shields.io/github/v/release/vargalabs/h5cpp.svg)](https://github.com/vargalabs/h5cpp/releases) [![Documentation](https://img.shields.io/badge/docs-stable-blue)](https://vargalabs.github.io/h5cpp) -[![Downloads](https://img.shields.io/github/downloads/vargalabs/h5cpp/total)](https://github.com/vargalabs/h5cpp/releases) # H5CPP — High-Performance [HDF5][hdf5] for Modern C++ From 9abd0ddcdb132504ab53324299b9be989940e54e Mon Sep 17 00:00:00 2001 From: steven varga Date: Mon, 18 May 2026 01:51:37 +0000 Subject: [PATCH 2/5] [#247]:svarga:fix, derive CPack version from git tag and add arch to filename MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit CMake project() was hardcoded to VERSION 1.10.4.6, so CPack-produced artifacts on the v1.12.4 release carried the stale 1.10.4 string in their filenames despite the actual release tag. The v1.12.4 release required manual rename + upload of the Linux artifacts as a result. This commit: 1. Derives PROJECT_VERSION from the latest git tag matching v[0-9]* before the project() call, so CPack picks it up correctly: find_package(Git QUIET) execute_process(COMMAND git describe --tags --abbrev=0 ...) if(H5CPP_GIT_TAG MATCHES "^v([0-9]+(\\.[0-9]+)*)$") set(H5CPP_PROJECT_VERSION "${CMAKE_MATCH_1}") endif() project(libh5cpp-dev VERSION ${H5CPP_PROJECT_VERSION} ...) Falls back to "1.12.0" placeholder for tarball builds without git history, and for branches where the latest reachable tag doesn't match the strict vX.Y.Z[.W] pattern (e.g. staging sees v1.10.4-6 which is filtered out by the regex). 2. Decouples H5CPP_HDF5_FLOOR from PROJECT_VERSION. The previous check compared HDF5_VERSION against PROJECT_VERSION, which coupled package versioning to HDF5 minimum requirements purely by coincidence — 1.10.4.6 happened to be both the h5cpp release tag AND the HDF5 minimum. After #234 raised the floor to 1.12 and PROJECT_VERSION now tracks the git tag, the comparison is no longer meaningful. Pinned to "1.12.0" explicitly. 3. Sets CPACK_PACKAGE_FILE_NAME to include CMAKE_SYSTEM_NAME and CMAKE_SYSTEM_PROCESSOR (lowercased), so amd64 / arm64 / x86_64 / aarch64 / darwin-arm64 / windows-amd64 produce distinct filenames when collected into the Publish job's flat dist/ directory: h5cpp-dev-v1.12.5-linux-x86_64.{deb,rpm} h5cpp-dev-v1.12.5-linux-aarch64.{deb,rpm} h5cpp-dev-v1.12.5-darwin-arm64.pkg h5cpp-dev-v1.12.5-windows-amd64.exe Without this override, both amd64 and arm64 Linux builds emit identical "h5cpp-dev-1.10.4-Linux.{deb,rpm}" names and overwrite each other on the Release page. --- CMakeLists.txt | 37 ++++++++++++++++++++++++++++++++++--- 1 file changed, 34 insertions(+), 3 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index e4dc8f70be..31f47828b7 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -12,7 +12,24 @@ if(APPLE) set(CMAKE_OSX_DEPLOYMENT_TARGET "14.4" CACHE STRING "Minimum macOS deployment version") endif() -project(libh5cpp-dev VERSION 1.10.4.6 LANGUAGES CXX C) +# Derive package version from the latest git tag before project(), so CPack +# embeds it as PROJECT_VERSION rather than a stale hardcoded number. Falls +# back to a placeholder when building from a tarball without git history. +find_package(Git QUIET) +set(H5CPP_PROJECT_VERSION "1.12.0") +if(GIT_FOUND AND EXISTS "${CMAKE_SOURCE_DIR}/.git") + execute_process( + COMMAND ${GIT_EXECUTABLE} describe --tags --abbrev=0 --match "v[0-9]*" + WORKING_DIRECTORY ${CMAKE_SOURCE_DIR} + OUTPUT_VARIABLE H5CPP_GIT_TAG + OUTPUT_STRIP_TRAILING_WHITESPACE + ERROR_QUIET) + if(H5CPP_GIT_TAG MATCHES "^v([0-9]+(\\.[0-9]+)*)$") + set(H5CPP_PROJECT_VERSION "${CMAKE_MATCH_1}") + endif() +endif() + +project(libh5cpp-dev VERSION ${H5CPP_PROJECT_VERSION} LANGUAGES CXX C) # ─── Standard Settings ──────────────────────────────────────────────────────────── set(CMAKE_CXX_STANDARD 20) @@ -90,9 +107,15 @@ find_package(Threads REQUIRED QUIET) # ─── HDF5 ───────────────────────────────────────────────────────────────────────── find_package(HDF5 REQUIRED COMPONENTS C) -if(HDF5_VERSION VERSION_LESS ${H5CPP_BASE_VERSION}) +# HDF5 floor is decoupled from h5cpp package version. Previously the check +# compared HDF5_VERSION against PROJECT_VERSION, which coupled package +# versioning to HDF5 minimums by coincidence. After #234 raised the floor +# to 1.12 and #247 made PROJECT_VERSION track the git tag, the comparison +# stopped being meaningful. Pin the floor explicitly. +set(H5CPP_HDF5_FLOOR "1.12.0") +if(HDF5_VERSION VERSION_LESS ${H5CPP_HDF5_FLOOR}) message(FATAL_ERROR - "-- !!! H5CPP examples require HDF5 v${H5CPP_BASE_VERSION} or greater !!!" + "-- !!! H5CPP requires HDF5 v${H5CPP_HDF5_FLOOR} or greater (found ${HDF5_VERSION}) !!!" ) else() message(STATUS @@ -382,6 +405,14 @@ set(CPACK_NSIS_ENABLE_UNINSTALL_BEFORE_INSTALL ON) # productbuild (macOS .pkg) set(CPACK_PRODUCTBUILD_IDENTIFIER "org.h5cpp.h5cpp") +# Override the default per-generator filename so that amd64 / arm64 / x86_64 / +# aarch64 / darwin-arm64 / windows-amd64 produce distinct names when collected +# into a single release upload directory. Without this, multi-arch CI matrix +# builds emit identical filenames and overwrite each other on the Release page. +set(CPACK_PACKAGE_FILE_NAME + "${CPACK_PACKAGE_NAME}-v${PROJECT_VERSION}-${CMAKE_SYSTEM_NAME}-${CMAKE_SYSTEM_PROCESSOR}") +string(TOLOWER "${CPACK_PACKAGE_FILE_NAME}" CPACK_PACKAGE_FILE_NAME) + include(CPack) # ─── Developer convenience targets ─────────────────────────────────────────── From 5162dc87cbb061747f72e61506404eff8d2b9ca6 Mon Sep 17 00:00:00 2001 From: steven varga Date: Mon, 18 May 2026 01:52:16 +0000 Subject: [PATCH 3/5] [#246]:svarga:fix, package.yml macOS HDF5 discovery + Windows build-from-source MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two platform fixes in the same workflow file, both required for the v1.12.5 Package run to produce assets across the full matrix. macOS (#245): The Configure step set HDF5_ROOT from brew --prefix but FindHDF5 on macos-15 / Apple Silicon needed CMAKE_PREFIX_PATH as well to locate the C library, producing: Could NOT find HDF5 (missing: HDF5_LIBRARIES HDF5_INCLUDE_DIRS C) ci.yml's macOS path sets both; package.yml only set one. Symmetrised. Windows (#246): choco install hdf5 returns "package not found" — there is no hdf5 package in the Chocolatey community repo. The #236 fix that introduced this command was based on an incorrect assumption. Replaced with the build-from-source pattern already validated by ci.yml's Windows path: - Build zlib 1.3.1 from source (~30s, not cached) - Restore HDF5 1.12.2 cache if present - Build HDF5 1.12.2 from source if cache-miss (~3-5min, cached) - Save HDF5 cache on miss Configure now consumes both prefixes through CMAKE_PREFIX_PATH. The first run after this change will be slow (~5min for HDF5 build); subsequent runs reuse the cache. --- .github/workflows/package.yml | 87 +++++++++++++++++++++++++++++++++-- 1 file changed, 84 insertions(+), 3 deletions(-) diff --git a/.github/workflows/package.yml b/.github/workflows/package.yml index 6643627c76..062174cf67 100644 --- a/.github/workflows/package.yml +++ b/.github/workflows/package.yml @@ -86,10 +86,12 @@ jobs: - name: Configure run: | + HDF5_PREFIX="$(brew --prefix hdf5)" cmake -B build \ -DCMAKE_BUILD_TYPE=Release \ -DCMAKE_INSTALL_PREFIX=/usr/local \ - -DHDF5_ROOT=$(brew --prefix hdf5) \ + -DHDF5_ROOT="$HDF5_PREFIX" \ + -DCMAKE_PREFIX_PATH="$HDF5_PREFIX" \ -DH5CPP_BUILD_EXAMPLES=OFF \ -DH5CPP_BUILD_TESTS=OFF \ -DH5CPP_BUILD_BENCH=OFF @@ -107,21 +109,100 @@ jobs: if-no-files-found: error # ── Windows (NSIS .exe) ─────────────────────────────────────────────────────── + # No Chocolatey 'hdf5' package exists; mirror the build-from-source approach + # from ci.yml. zlib is built fresh each run (~30s); HDF5 install prefix is + # cached between runs. package-windows: name: windows / x64 / NSIS runs-on: windows-latest + env: + HDF5_VERSION: 1.12.2 + HDF5_CACHE_VERSION: v3 steps: - uses: actions/checkout@v4 - - name: Install HDF5 - run: choco install hdf5 -y --no-progress + - name: Build zlib from source + shell: powershell + run: | + $ErrorActionPreference = "Stop" + $zlib_version = "1.3.1" + $zlib_archive = "$env:RUNNER_TEMP\zlib-$zlib_version.tar.gz" + $zlib_source = "$env:RUNNER_TEMP\zlib-$zlib_version" + $zlib_build = "$env:RUNNER_TEMP\zlib-build" + $zlib_prefix = "$env:RUNNER_TEMP\zlib-install" + + Invoke-WebRequest ` + -Uri "https://github.com/madler/zlib/archive/refs/tags/v$zlib_version.tar.gz" ` + -OutFile $zlib_archive + tar -xzf $zlib_archive -C $env:RUNNER_TEMP + + cmake -S $zlib_source -B $zlib_build ` + -G "Visual Studio 17 2022" -A x64 ` + -DCMAKE_INSTALL_PREFIX="$zlib_prefix" + cmake --build $zlib_build --parallel --config Release + cmake --install $zlib_build --config Release + + - name: Restore HDF5 cache + id: cache-hdf5 + uses: actions/cache/restore@v5 + with: + path: ${{ runner.temp }}\hdf5-${{ env.HDF5_VERSION }}-install + key: hdf5-windows-vs2022-${{ env.HDF5_VERSION }}-pkg-${{ env.HDF5_CACHE_VERSION }} + + - name: Build HDF5 from source + if: steps.cache-hdf5.outputs.cache-hit != 'true' + shell: powershell + run: | + $ErrorActionPreference = "Stop" + $hdf5_version = "${{ env.HDF5_VERSION }}" + $hdf5_archive = "$env:RUNNER_TEMP\hdf5-$hdf5_version.tar.gz" + $hdf5_source = "$env:RUNNER_TEMP\hdf5-$hdf5_version" + $hdf5_build = "$env:RUNNER_TEMP\hdf5-$hdf5_version-build" + $hdf5_prefix = "$env:RUNNER_TEMP\hdf5-$hdf5_version-install" + + Invoke-WebRequest ` + -Uri "https://support.hdfgroup.org/ftp/HDF5/releases/hdf5-1.12/hdf5-$hdf5_version/src/hdf5-$hdf5_version.tar.gz" ` + -OutFile $hdf5_archive + tar -xzf $hdf5_archive -C $env:RUNNER_TEMP + + cmake -S $hdf5_source -B $hdf5_build ` + -G "Visual Studio 17 2022" -A x64 ` + -DCMAKE_INSTALL_PREFIX="$hdf5_prefix" ` + -DHDF_CFG_NAME=Release ` + -DBUILD_SHARED_LIBS=ON ` + -DBUILD_TESTING=OFF ` + -DHDF5_BUILD_TOOLS=OFF ` + -DHDF5_BUILD_UTILS=OFF ` + -DHDF5_BUILD_EXAMPLES=OFF ` + -DHDF5_BUILD_CPP_LIB=OFF ` + -DHDF5_BUILD_HL_LIB=OFF ` + -DHDF5_BUILD_FORTRAN=OFF ` + -DHDF5_ENABLE_Z_LIB_SUPPORT=ON ` + -DHDF5_ENABLE_SZIP_SUPPORT=OFF ` + -DZLIB_USE_EXTERNAL=OFF ` + -DZLIB_INCLUDE_DIR="$env:RUNNER_TEMP/zlib-install/include" ` + -DZLIB_LIBRARY="$env:RUNNER_TEMP/zlib-install/lib/zlib.lib" + + cmake --build $hdf5_build --parallel --config Release + cmake --install $hdf5_build --config Release + + - name: Save HDF5 cache + if: steps.cache-hdf5.outputs.cache-hit != 'true' + uses: actions/cache/save@v5 + with: + path: ${{ runner.temp }}\hdf5-${{ env.HDF5_VERSION }}-install + key: ${{ steps.cache-hdf5.outputs.cache-primary-key }} - name: Configure shell: pwsh run: | + $hdf5_prefix = "$env:RUNNER_TEMP/hdf5-${{ env.HDF5_VERSION }}-install" + $zlib_prefix = "$env:RUNNER_TEMP/zlib-install" cmake -B build ` -DCMAKE_BUILD_TYPE=Release ` + -DHDF5_ROOT="$hdf5_prefix" ` + -DCMAKE_PREFIX_PATH="$hdf5_prefix;$zlib_prefix" ` -DH5CPP_BUILD_EXAMPLES=OFF ` -DH5CPP_BUILD_TESTS=OFF ` -DH5CPP_BUILD_BENCH=OFF ` From c5c3bd7db5a1c8e692be09f0ff2fbc2704ef6de5 Mon Sep 17 00:00:00 2001 From: steven varga Date: Mon, 18 May 2026 02:10:18 +0000 Subject: [PATCH 4/5] [#250]:svarga:feature, FAPL worker-pool slot scaffolding via H5Pinsert2 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Phase 1.1 of the FAPL multithreading workplan (tasks/h5cpp-fapl-multi threading-workplan.md §3). Lifecycle infrastructure only — the real pool internals (queues, compress dispatch) come in Phase 1.2, and consumer-site wiring (pt_t, h5::write, h5::read) in Phase 1.3. What this commit lands: h5cpp/H5worker_pool.hpp Minimal worker_pool_t. Owns N std::jthreads sized by the constructor argument (0 → hardware_concurrency()). Phase 1.1 workers park on stop_token; Phase 1.2 will replace the placeholder loop with the compress dispatch loop reading from bounded MPMC queues. Non-copyable, non-movable (jthread members + future atomic in_flight). h5cpp/H5Pfapl_threads.hpp The hidden-pointer pattern, shared-ownership variant. Stores a pointer to a heap-allocated worker_pool_slot_t in the FAPL skip list via H5Pinsert2. The slot owns a shared_ptr; the copy callback allocates a new slot whose shared_ptr aliases the same pool — every FAPL copy increments the refcount instead of spawning a fresh pool. Close callback drops one slot; pool dies when the last slot is freed. Defines h5::threads as a user-facing FAPL property: h5::fd_t fd = h5::create("data.h5", H5F_ACC_TRUNC, h5::threads{8}); h5::fd_t fd = h5::create("data.h5", H5F_ACC_TRUNC, h5::threads{}); h5::impl::resolve_worker_pool(fapl_id) is the consumer-site helper that Phase 1.3 will use in pt_t / h5::write / h5::read to look up the pool from the file's FAPL. h5cpp/core Adds H5worker_pool.hpp and H5Pfapl_threads.hpp to the include chain right after H5Pdapl.hpp. Tests (test/H5Pall.cpp, [#250] cases): - Regression scaffold: install the property WITHOUT the copy callback (the broken state) and assert the tracking close callback detects the double-free. Guards against silent test breakage where the real assertion becomes a no-op. Same idea as #244's scaffolding. - Single FAPL clean lifecycle: alloc=del, no leak. - H5Pcopy preserves shared pool ownership: pool pointer is IDENTICAL across two FAPLs that share the property; closing one FAPL doesn't affect the other; refcounts balance. - h5::threads tag installs the property: exercises the real fapl_threads_set callback (not the tracking shim) and verifies h5::impl::resolve_worker_pool returns the pool with the requested worker count. - h5::threads{} (no count) uses hardware_concurrency. Validated locally via a standalone TU compiled against the new headers with HDF5 1.10.9; the slot pattern only requires H5Pinsert2 (HDF5 ≥ 1.8) so it works regardless of the v1.12 floor. CI matrix on this PR will run the ctest cases against HDF5 1.12.2. [#250]:svarga:refactor, consolidate worker pool + FAPL property into H5Pthreads.hpp Renames + merges the two Phase 1.1 headers (h5cpp/H5worker_pool.hpp and h5cpp/H5Pfapl_threads.hpp) into a single h5cpp/H5Pthreads.hpp. Mirrors the H5Pdapl.hpp precedent of having the property type, lifecycle callbacks, and any helper data structures co-located in one header per property family. Contents unchanged — same worker_pool_t, same worker_pool_slot_t, same fapl_pool_copy_cb / fapl_pool_close_cb / fapl_threads_set, same resolve_worker_pool, same h5::threads tag. Only the file layout changes; no API change for users or for the test suite. Updates the include chain in h5cpp/core accordingly. [#250]:svarga:fix, lower H5CPP_HDF5_FLOOR from 1.12.0 to 1.10.4 — CI matrix coverage #247 introduced H5CPP_HDF5_FLOOR = "1.12.0" alongside decoupling the floor from PROJECT_VERSION. Choice of 1.12.0 was too aggressive — the CI matrix runs Ubuntu 22.04 with system HDF5 1.10.7 (floor coverage restored by #235), and every Linux matrix entry that consumes the system package fails at configure with: -- !!! H5CPP requires HDF5 v1.12.0 or greater (found 1.10.7) !!! This change lowers the floor to 1.10.4, matching the prior implicit floor that the stale project(VERSION 1.10.4.6) line encoded before #247 removed it. Result: Ubuntu 22.04's 1.10.7 system package satisfies the check; all matrix entries that previously passed continue to pass. When the project eventually drops 1.10.x coverage, bump this constant deliberately AND remove the 22.04 matrix entry in the same commit, so the floor and the matrix stay in sync. Validated locally with HDF5 1.10.9: cmake configure succeeds, full ctest sweep (50/50) green including the new [#250] FAPL slot-lifecycle regression cases. [#250]:svarga:fix, H5Pthreads.hpp use stoppable_thread_t shim for C++17 compatibility Examples target compiles with C++17 (matching the project's documented minimum standard), but H5Pthreads.hpp used std::jthread / std::stop_token unconditionally — both C++20 features. Every example TU including h5cpp/core failed: H5Pthreads.hpp:91: error: 'std::jthread' is only available from C++20 onwards H5Pthreads.hpp:85: error: 'std::stop_token' has not been declared H5Zpipeline_threaded.hpp already addresses this — it uses h5::detail::stoppable_thread_t and h5::detail::stop_token_t from detail/stoppable_thread.hpp, which are aliases for std::jthread / std::stop_token under C++20 and hand-rolled shims with the same API under C++17. This commit switches H5Pthreads.hpp to the same shim, mirroring the H5Zpipeline_threaded.hpp pattern. No API change for users; the h5::threads tag, worker_pool_t, and slot lifecycle are unaffected. Local ctest sweep: 50/50 green with system HDF5 1.10.9. [#250]:svarga:feature, worker_pool_t generic submit() + wait_idle() API Phase 1.2 of the FAPL multithreading workplan. Replaces the Phase 1.1 placeholder worker loop with a real submit-based pool. Design: - Workers pull type-erased tasks (std::function) off a single queue protected by std::mutex + std::queue + h5::detail::doorbell_t. Same primitives as H5Zpipeline_threaded.hpp's C++17 fallback path, so the pool works uniformly on C++17 (via stoppable_thread_t shim) and C++20 (via std::jthread + atomic::wait under the doorbell). - submit(callable) returns std::future>. Internally uses std::packaged_task wrapped in a shared_ptr so the move-only task can sit inside a copyable std::function in the queue. - wait_idle() blocks until in_flight reaches 0. - dtor calls wait_idle() before letting stoppable_thread_t destructors request stop + join. No work is lost on shutdown. - Pool is HDF5-agnostic. Phase 1.3 will wrap HDF5-specific compress logic in closures and submit() them. Tests (test/H5Pall.cpp, [#250 1.2] cases): - single submit + future return value - many submits resolve in parallel (256 squared-int sums) - multi-thread submission is safe (4 producers × 100 tasks) - wait_idle blocks until completion (50 tasks) - exception in task captured in future - dtor drains pending work cleanly (no work lost) Local sweep: 50/50 ctest green including the new 6 [#250 1.2] cases. [#250]:svarga:refactor, simplify worker_pool_t worker_loop double-check The original worker_loop used a double-check pattern: 1. lock m_, see empty, read bell_.load() → last 2. unlock m_ 3. re-lock m_, check tasks_ again (for race window) 4. if still empty, wait(last) Steps 3-4's re-check is redundant. submit() rings the bell ONLY after releasing the lock that the worker is contending for. The doorbell sequence increments before notify_one(). So between the worker's single lock-release in step 2 and its wait(last) call, any submit() that pushed a task will have rung the bell — bell_.wait(last) sees a sequence > last and returns immediately. No missed wakeup possible. This refactor collapses the pattern to a single critical section that either pops or snapshots the doorbell, then waits outside the lock if no task was popped. Same correctness, easier for sanitizers and future readers to verify. Local ctest still 50/50. [#250]:svarga:fix, coverage CI add -fprofile-update=atomic for thread-safe gcov The coverage job (ubuntu-24.04 / gcc-14) failed at the "Capture coverage" step with: geninfo: ERROR: Unexpected negative count '-3' for thirdparty/libdeflate/v1.25.0/lib/deflate_compress.c:2217. Perhaps you need to compile with '-fprofile-update=atomic Phase 1.2's worker_pool_t spawns std::jthread workers that execute user tasks (test cases submit 256+ tasks across 4-8 workers). Existing tests ALSO exercise multi-threaded libdeflate via #241's threaded_pipeline_t. The combined concurrency saturates gcov's non-atomic counter updates, producing the negative-count corruption above. Fix: add -fprofile-update=atomic to the coverage build's C and C++ flags. This makes gcov instrumentation increment counters atomically. Marginal runtime cost for the coverage build only; production builds are unaffected. This is a coverage-infrastructure fix, not Phase 1.2 logic — but it sits on the Phase I branch because Phase 1.2's threading is what surfaced it. [#250]:svarga:fix, Phase 1.2 tests wait_idle before in_flight() check Coverage CI (gcc-14 / ubuntu-24.04, Debug -O0 + --coverage instrumentation) failed in test-h5pall: test/H5Pall.cpp:387: ERROR: CHECK( pool.in_flight() == 0 ) is NOT correct! values: CHECK( 1 == 0 ) Race in the test, not in the pool: - submit() increments in_flight_ before queueing the task. - worker_loop runs (*task)(), which calls packaged_task::set_value internally — that's when future::wait()/get() unblocks. - worker_loop then decrements in_flight_ AFTER task() returns. There's a small window between "future is ready" and "in_flight is decremented" — workers can be in the gap. Optimized builds usually close it within nanoseconds; coverage's -O0 + atomic gcov makes it wide enough to observe. Fix: add pool.wait_idle() before each in_flight() == 0 assertion. wait_idle() spins until in_flight reaches 0, eliminating the race. The pool's semantics are unchanged. in_flight() remains an approximate introspection metric that can transiently exceed the count of pending futures. Tests that need a precise zero should call wait_idle() first. Local sweep: 50/50 ctest green; the regression test for [#250 1.2] now holds under Debug + --coverage flags. [#250]:svarga:feature, Phase 1.3.1 h5::backpressure FAPL property Adds a separate FAPL property h5::backpressure{N} that bounds the in-flight chunk deque for any pt_t / h5::write / h5::read that uses the file's worker pool. Composes with h5::threads via operator|: h5::fd_t fd = h5::create("data.h5", H5F_ACC_TRUNC, h5::threads{8} | h5::backpressure{32}); Storage: - New H5CPP_FAPL_BACKPRESSURE property, stores a plain unsigned via H5Pinsert2 with null lifecycle callbacks (POD value, memcpy semantics are correct for HDF5's internal copy + close). - fapl_backpressure_set is idempotent (skips if already installed), matching fapl_threads_set behavior. - resolve_backpressure(fapl_id, worker_count) returns: * user-set value when h5::backpressure{N} was applied, * else H5CPP_FAPL_BACKPRESSURE_DEFAULT_FACTOR × worker_count (default factor = 8, override via compile-time macro). h5::backpressure{N} without h5::threads{N} is silently a no-op at the consumer level — without a pool, there's no queue to bound. Resolver still returns the user value if the property is set; pt_t / h5::write ignore it when no pool is present. Tests (test/H5Pall.cpp, [#250 1.3] cases): - h5::backpressure tag installs the property via property chain - Default = 8 × worker_count when unset - Property survives H5Pcopy (POD memcpy semantics) - backpressure-only FAPL: resolver returns user value; no pool is installed (verified separately) Local: 50/50 ctest green. Phase 1.3.2 (next): wire pt_t to read both pool + backpressure cap from the FAPL and enforce the in-flight deque bound. [#250]:svarga:feature, Phase 1.3.2 pt_t resolves FAPL pool + backpressure cap at init Wires pt_t into the FAPL worker-pool plumbing without changing dispatch behavior yet. After this commit: - pt_t has two new private members: std::shared_ptr pool_; unsigned backpressure_cap_; - pt_t::init() calls H5Fget_access_plist on the file id (already held for re-opening the dataset with zero-chunk-cache DAPL), then h5::impl::resolve_worker_pool(fapl) + h5::impl::resolve_backpressure to populate the members. fapl is closed before the existing H5Fclose(fid). - pt_t move-assignment moves pool_ and copies/resets backpressure_cap_. - pt_t copy-constructor (deep copy from another pt_t's ds) leaves both members at their default state — copy of pt_t was always documented as the synchronous-pipeline path, that behavior is unchanged. Dispatch (1.3.2 step 2, next commit) will: - Branch on pool_ being non-null at write_chunk call sites. - Submit compress closures to pool_, push the returned futures into an in-flight deque, drain in order, block on full deque using backpressure_cap_. This commit keeps the existing visit_pipeline() dispatch in place so the change is observable only by the new tests below. All 50 pre- existing ctest cases continue to pass bytewise-identically. Tests (test/H5Dappend.cpp, [#250 1.3.2] cases): - pt_t constructed against a file whose FAPL has h5::threads{4} | h5::backpressure{16}: the FAPL resolves to a 4-worker pool with cap=16 (verified via the same FAPL's resolve_worker_pool and resolve_backpressure helpers). - pt_t with no FAPL pool falls back cleanly (default FAPL): construct, append 32 ints, flush, read back identical bytes. [#250]:svarga:feature, Phase 1.3.2 step 2 — pt_t pool dispatch + async-pipelined drain When pt_t resolves a worker_pool_t from the file's FAPL, every chunk commit now flows through write_chunk_via_pool instead of the variant pipeline. This is Option B from the workplan §3 design notes: async-pipelined producer with FIFO drain on the calling thread. Mechanism: - write_chunk_via_pool snapshots the filter chain config (filter[], flags[], cd_size[], cd_values[], tail) from pt_t's basic_pipeline member into a POD filter_chain_t for cheap by-value capture. - The raw chunk bytes are copied into a unique_ptr the worker will own. Per-chunk allocation; thread-local scratch in the pool is a follow-on optimization if benchmarks justify it. - Closure submitted to pool: applies the filter chain ping-pong style (same logic as basic_pipeline_t::write_chunk_impl), copies final bytes into a fresh owned aligned buffer, returns pool_result_t{data, nbytes, mask, offset}. - The returned future is pushed onto in_flight_ (deque preserves submission order). - drain_in_flight(false) drains all already-completed futures off the front, calling H5Dwrite_chunk on the calling thread for each. HDF5 stays single-threaded. - When in_flight_.size() reaches backpressure_cap_, write_chunk_via_pool blocks on the front future via drain_in_flight(true). Producer memory is bounded by cap × chunk_bytes per pt_t. All six visit_pipeline → write_chunk call sites in this file now route through dispatch_chunk(), a tiny private method that picks pool path when pool_ is non-null and falls back to visit_pipeline otherwise. Synchronous behavior is bytewise-unchanged when no FAPL pool is configured. flush() now drains in_flight_ to completion before returning. Tests (test/H5Dappend.cpp, [#250 1.3.2] cases): - gzip round-trip equivalence: same data written through synchronous, pool-default-cap, and pool-explicit-cap paths produces identical readback in all three cases. - tight back-pressure (cap=2) round-trip: forces the producer-blocking branch every other chunk; data integrity preserved. Local: 50/50 ctest green. What this commit does NOT do (deferred): - h5::write and h5::read pool integration (Phase 1.3.3 — same approach but via h5::get_access_plist(ds) at the call site). - Removal of #241's pt_t(ds, h5::filter::threads{N}) constructor (Phase 1.4 — once we're sure the FAPL path covers everything). - Thread-local scratch buffers in the pool (optimization, post-Phase I). [#250]:svarga:refactor, Phase 1.3.3 part 1 — pool_pipeline_t as third variant alternative Implements workplan Option b: the FAPL worker pool becomes a third CRTP descendant of pipeline_t, slotting into pt_t's pipeline variant alongside basic_pipeline_t and threaded_pipeline_t. New header h5cpp/H5Zpipeline_pool.hpp: - pool_pipeline_t owns std::shared_ptr + back-pressure cap + std::deque> in-flight queue. - write_chunk_impl snapshots filter chain config into a POD captured by value, copies raw chunk into a unique_ptr owned by the worker, submits the compress closure to the pool, pushes the future onto in_flight_, drains opportunistically, blocks on the front future when cap is reached. - read_chunk_impl stays synchronous (identical to basic). Parallel decompression is Phase 1.5+ work — requires read-ahead that cooperates with pipeline_t<>::split_to_chunk_read. - public drain() method walks in_flight_ to completion, called by pt_t::flush so users get "data on disk after flush returns". - Destructor drains any remaining in-flight in submission order. pt_t refactor: - pt_pipeline_t variant now has three alternatives: basic, threaded (legacy #241), pool. - init() resolves the FAPL pool; if present, emplaces a pool_pipeline_t alternative; otherwise keeps the default basic_pipeline_t. - All six write_chunk dispatch sites revert to plain visit_pipeline + p.write_chunk — uniform across the three alternatives via the CRTP base. - dispatch_chunk / write_chunk_via_pool / drain_in_flight / pool_ / backpressure_cap_ / in_flight_ all REMOVED from pt_t (the logic now lives in pool_pipeline_t). - flush() invokes drain() on the pool alternative via std::visit type-checked branch. Why this shape (Option b from workplan §3 design notes): - h5::write and h5::read benefit transparently in the next sub-phase via the existing pipeline_t<>::write/read → split_to_chunk_write → write_chunk_impl machinery. Just construct a pool_pipeline_t in those entry points when the file's FAPL has a pool. - The in-flight deque and drain logic live in one place, not duplicated across consumers. - Phase 1.4 (remove #241's threaded_pipeline_t alternative) becomes a single-commit cleanup: the variant goes from three to two alternatives. Local: 50/50 ctest green. Existing Phase 1.3.2 round-trip tests continue to pass (the variant routes them through pool_pipeline_t now instead of pt_t's removed in-flight machinery). What this commit does NOT do (Phase 1.3.3 part 2, next commit): - h5::write / h5::read FAPL pool integration. - Will construct a local pool_pipeline_t in those entry points, call set_cache + pipeline.write/read, let RAII drain on exit. [#250]:svarga:feature, Phase 1.3.3 part 2 — h5::write/read consult FAPL pool When the file's FAPL has h5::threads{N} installed AND the dataset's DAPL has h5::high_throughput AND the dataset is chunked, h5::write and h5::read now construct a local pool_pipeline_t and route the operation through it. Otherwise the existing paths run unchanged: - DAPL high_throughput + no FAPL pool → existing DAPL-stored basic_pipeline_t pointer (synchronous filter chain). - No DAPL high_throughput → standard H5Dwrite / H5Dread. Both branches sit inside the existing use_pipeline gate (which also checks layout == H5D_CHUNKED, the #242 SegFault guard). The FAPL pool is the new inner check, taking priority over the DAPL pipeline pointer when present. Mechanism (h5::write side): hid_t fid = H5Iget_file_id(static_cast(ds)); hid_t fapl = H5Fget_access_plist(fid); auto pool = h5::impl::resolve_worker_pool(fapl); if (pool) { const unsigned cap = h5::impl::resolve_backpressure( fapl, pool->worker_count()); h5::impl::pool_pipeline_t pipe(std::move(pool), cap); // set_cache populates the filter chain from the DCPL. h5::dcpl_t dcpl{H5Dget_create_plist(static_cast(ds))}; hid_t type_id = H5Dget_type(static_cast(ds)); size_t elem_sz = H5Tget_size(type_id); H5Tclose(type_id); pipe.set_cache(dcpl, elem_sz); pipe.write(ds, offset, stride, block, count, dxpl, ptr); // RAII: pipe destructor drains in_flight before pool refcount drop. } else { // ... existing DAPL pipeline path ... } h5::read uses the same shape; pool_pipeline_t::read_chunk_impl is currently synchronous (identical to basic_pipeline_t::read_chunk_impl) so the FAPL-pool read branch is semantically equivalent to the DAPL read path today. The structure is in place for the Phase 1.5+ read-ahead optimization to land without touching call sites. Tests (test/H5Pall.cpp, [#250 1.3.3] cases): - h5::write + h5::read round-trip through FAPL pool with h5::threads{4} | h5::backpressure{16} + DAPL high_throughput + gzip-compressed dataset. Reader uses default FAPL — verifies data is durably on disk regardless of reader-side FAPL choice. - h5::write without high_throughput on the DAPL bypasses the pool even when the FAPL has one installed. Confirms per-dataset opt-in still gates the pool path. Local: 50/50 ctest green. What this commit does NOT do (Phase 1.4, next commit): - Remove pt_t(ds, h5::filter::threads{N}) constructor from #241. - The threaded_pipeline_t variant alternative goes away with it. [#250]:svarga:refactor, Phase 1.4 — retire #241 h5::filter::threads + threaded_pipeline_t Migration option B (chosen earlier in #250): the FAPL-scoped pool from Phases 1.1–1.3 fully subsumes the per-pt_t worker pool added in #241. Two parallel paths invite contention bugs and confuse the surface; one codepath survives. Removed: - h5cpp/H5Zpipeline_threaded.hpp (deleted). This carried threaded_pipeline_t — both the C++20 (jthread + lock-free queue) and C++17 (stoppable_thread + std::mutex + std::queue) variants — plus the h5::filter::threads tag. - h5::pt_t( const h5::ds_t&, h5::filter::threads ). The remaining pt_t( const h5::ds_t& ) ctor now resolves the file's FAPL and swaps to pool_pipeline_t when h5::threads{N} is installed. - impl::pt_pipeline_t third alternative (unique_ptr). The variant is now { basic_pipeline_t, pool_pipeline_t } — both unique_ptr-wrapped for move-assignment. - core: #include "H5Zpipeline_threaded.hpp" line removed. Stale "threaded_pipeline_t is defined in ..." comment in H5Zpipeline.hpp also removed. Tests: - test/H5Zpipeline.cpp: dropped the three direct threaded_pipeline_t round-trip cases (no filter / gzip / shuffle+gzip). Pool round-trip is covered in test/H5Pall.cpp via the [#250 1.3.3] cases. - test/H5Dappend.cpp: dropped the four [#241] cases (tag construction, no-filter round-trip, basic-vs-threaded bytewise equivalence, default hw_concurrency worker count). Replaced with a one-line comment pointing to the FAPL coverage in test/H5Pall.cpp. User-visible API change (intentional, since v1.12.4 of #241 was the only release that shipped it): // before (#241): h5::pt_t pt(ds, h5::filter::threads{4}); // now (#250): h5::fd_t fd = h5::create(path, H5F_ACC_TRUNC, h5::threads{4} | h5::backpressure{16}); // ... dataset created on fd inherits the pool ... h5::pt_t pt(ds); // resolves fd's pool automatically Verified locally: 50/50 ctest green. CI will validate across the asan / ubsan / coverage / macos / windows matrix on push. [#250]:svarga:test, Phase 1.5 — TSAN job + [#250 1.5] FAPL pool race coverage CI: add a `tsan / ubuntu-24.04 / clang-20` job mirroring the existing asan / ubsan jobs. It configures clang-20 with `-fsanitize=thread -fno-omit-frame-pointer -O1 -g`, builds the whole tree, and runs the full ctest suite under `TSAN_OPTIONS=halt_on_error=1:second_deadlock_stack=1`. Failing races now break CI loud the same way ASan/UBSan errors do. Badge generator's `needs:` list extended to include the new job so the TSan status SVG renders alongside the others. Tests: four new cases tagged `[#250 1.5]` in test/H5Pall.cpp, sized to exercise the FAPL pool path the way race-prone bugs would surface: - "two FAPLs with independent pools — bytewise equivalence to basic": same payload written via basic pipeline + 4-thread pool + 2-thread pool; all three readbacks must equal the input. Catches any state leakage between pool instances or between the pool and basic paths. - "interleaved writes through two FAPLs do not cross-contaminate": two open fds with independent pools alive at the same time, distinct payloads written sequentially (HDF5 itself isn't thread- safe in default builds, so concurrent H5 C-API calls are out of scope; the pool only parallelizes compression). Chunks routed through one pool must not appear in the other file. - "pt_t destructor drains pool before fd close — data on disk": no explicit h5::flush; lets ~pt_t fire the pool drain. Reads back from a fresh open verify the destructor honored the "data on disk after scope exit" contract. - "fd close after FAPL pool flush releases workers": opens / writes / closes the file four times back-to-back; each iteration drops its own shared_ptr via the FAPL slot's close-cb. Catches use-after-free in the slot lifecycle. Local validation (Ubuntu 22.04 host, g++ 11 with `-fsanitize=thread`): cmake -B build-tsan -DCMAKE_BUILD_TYPE=Debug \ -DCMAKE_CXX_FLAGS="-fsanitize=thread -fno-omit-frame-pointer -O1 -g" \ -DCMAKE_EXE_LINKER_FLAGS="-fsanitize=thread" \ -DH5CPP_BUILD_TESTS=ON cmake --build build-tsan -j TSAN_OPTIONS=halt_on_error=1 ./build-tsan/test-h5pall TSAN_OPTIONS=halt_on_error=1 ./build-tsan/test-h5dappend TSAN_OPTIONS=halt_on_error=1 ./build-tsan/test-h5zpipeline → h5pall 30/30, h5dappend 18/18, h5zpipeline 10/10. No data races. CI clang-20 will validate the same on the matrix. [#250]:svarga:docs, Phase 1.6 — README + pipeline rework report updated for FAPL pool - README v1.12 highlights: replace "Threaded I/O pipeline" bullet with the user-visible FAPL pool API (`h5::create(..., h5::threads{N} | h5::backpressure{M})`) and bump the sanitizer line from "ASan + UBSan" to "ASan + UBSan + TSan" now that the TSan job is wired into CI. - docs/filtering-pipeline-rework-report.md: - Update the Threading row of the gap table to mark Phase I delivered as `pool_pipeline_t` in #250. - Append a "Status — Phase I (#250, FAPL worker pool)" section describing the surface, the mechanism (shared_ptr slot in FAPL, pool_pipeline_t in h5::write / h5::read / pt_t), bounded back-pressure, and the removal of the #241 per-pt_t `h5::filter::threads{N}` constructor. Phase II tracking is deferred — that lives in tasks/h5cpp-fapl-multithreading-workplan.md. [#250]:svarga:fix, Windows MSVC h5pall — distinct paths per iteration of fd close test The Phase 1.5 case "fd close after FAPL pool flush releases workers" re-created the same file four times in a loop. On Windows / HDF5 1.12.2 the C runtime briefly holds the previous handle past the HDF5-level close, so the second `H5Fcreate` raced the deletion of the first file and threw "couldn't create file" at H5Fcreate.hpp:58. Replaces the in-place re-create loop with a list of four distinct paths. The slot-lifecycle behaviour being asserted — FAPL property close-cb drops the pool ref when the last fd dies — still fires once per iteration; only the filesystem reuse is gone. Linux/macOS handle in-place re-creation fine (POSIX delete is atomic), so the test ran green on every other CI matrix entry plus local h5pall (30/30). This is a Windows-only fragility fix, not a correctness change in the FAPL pool itself. --- .github/workflows/ci.yml | 82 +++- CMakeLists.txt | 15 +- README.md | 4 +- docs/filtering-pipeline-rework-report.md | 21 +- h5cpp/H5Dappend.hpp | 121 ++++-- h5cpp/H5Dread.hpp | 29 +- h5cpp/H5Dwrite.hpp | 31 +- h5cpp/H5Pthreads.hpp | 301 +++++++++++++ h5cpp/H5Zpipeline.hpp | 1 - h5cpp/H5Zpipeline_pool.hpp | 249 +++++++++++ h5cpp/H5Zpipeline_threaded.hpp | 497 --------------------- h5cpp/core | 3 +- test/H5Dappend.cpp | 201 +++++---- test/H5Pall.cpp | 522 +++++++++++++++++++++++ test/H5Zpipeline.cpp | 82 ---- 15 files changed, 1439 insertions(+), 720 deletions(-) create mode 100644 h5cpp/H5Pthreads.hpp create mode 100644 h5cpp/H5Zpipeline_pool.hpp delete mode 100644 h5cpp/H5Zpipeline_threaded.hpp diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 6c1660f610..2e5973da06 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -499,10 +499,86 @@ jobs: name: status-ubsan path: badge-status/status-ubsan.json + tsan: + name: tsan / ubuntu-24.04 / clang-20 + runs-on: ubuntu-24.04 + timeout-minutes: 45 + + concurrency: + group: tsan-${{ github.ref }} + cancel-in-progress: true + + steps: + - name: Checkout + uses: actions/checkout@v6 + with: + fetch-depth: 0 + + - name: Install dependencies + shell: bash + run: | + set -euxo pipefail + sudo apt-get update + sudo apt-get install -y cmake ninja-build wget gnupg lsb-release software-properties-common libhdf5-dev + wget https://apt.llvm.org/llvm.sh + chmod +x llvm.sh + sudo ./llvm.sh 20 all + echo "CC=$(command -v clang-20)" >> "$GITHUB_ENV" + echo "CXX=$(command -v clang++-20)" >> "$GITHUB_ENV" + + - name: Configure + shell: bash + run: | + set -euxo pipefail + cmake -S . -B build -G Ninja \ + -DCMAKE_C_COMPILER="$CC" \ + -DCMAKE_CXX_COMPILER="$CXX" \ + -DCMAKE_CXX_STANDARD=17 \ + -DCMAKE_BUILD_TYPE=Debug \ + -DCMAKE_C_FLAGS="-fsanitize=thread -fno-omit-frame-pointer -O1 -g" \ + -DCMAKE_CXX_FLAGS="-fsanitize=thread -fno-omit-frame-pointer -O1 -g" \ + -DCMAKE_EXE_LINKER_FLAGS="-fsanitize=thread" \ + -DH5CPP_BUILD_TESTS=ON + + - name: Build + shell: bash + run: cmake --build build --parallel + + - name: Test + shell: bash + env: + # halt_on_error=1: TSAN exits on the first data race so CI fails loud. + # second_deadlock_stack=1: print full stack of the lock that completed + # the cycle (default prints only one stack for the offending pair). + TSAN_OPTIONS: "halt_on_error=1:second_deadlock_stack=1" + run: ctest --test-dir build --output-on-failure + + - name: Record Badge Status + if: always() + shell: bash + run: | + set -euxo pipefail + mkdir -p badge-status + cat < badge-status/status-tsan.json + { + "os": "ubuntu-24.04", + "compiler": "clang-20", + "label": "TSan", + "status": "${{ job.status }}" + } + EOF + + - name: Upload Status Artifact + if: always() + uses: actions/upload-artifact@v7 + with: + name: status-tsan + path: badge-status/status-tsan.json + badge: name: Generate SVG Badges if: always() - needs: [build, asan, ubsan] + needs: [build, asan, ubsan, tsan] runs-on: ubuntu-24.04 steps: @@ -648,8 +724,8 @@ jobs: -DCMAKE_CXX_STANDARD=20 \ -DCMAKE_C_COMPILER=gcc-14 \ -DCMAKE_CXX_COMPILER=g++-14 \ - -DCMAKE_C_FLAGS="--coverage -O0 -g" \ - -DCMAKE_CXX_FLAGS="--coverage -O0 -g" \ + -DCMAKE_C_FLAGS="--coverage -fprofile-update=atomic -O0 -g" \ + -DCMAKE_CXX_FLAGS="--coverage -fprofile-update=atomic -O0 -g" \ -DH5CPP_BUILD_TESTS=ON \ -DH5CPP_BUILD_EXAMPLES=ON diff --git a/CMakeLists.txt b/CMakeLists.txt index 31f47828b7..f25c2ff48e 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -109,10 +109,17 @@ find_package(HDF5 REQUIRED COMPONENTS C) # HDF5 floor is decoupled from h5cpp package version. Previously the check # compared HDF5_VERSION against PROJECT_VERSION, which coupled package -# versioning to HDF5 minimums by coincidence. After #234 raised the floor -# to 1.12 and #247 made PROJECT_VERSION track the git tag, the comparison -# stopped being meaningful. Pin the floor explicitly. -set(H5CPP_HDF5_FLOOR "1.12.0") +# versioning to HDF5 minimums by coincidence. After #247 made +# PROJECT_VERSION track the git tag, the comparison stopped being meaningful. +# Pin the floor explicitly. +# +# 1.10.4 matches the prior implicit floor (the stale project(VERSION 1.10.4.6) +# line that #247 removed). The CI matrix runs Ubuntu 22.04 with system +# HDF5 1.10.7 (floor coverage restored by #235); raising the floor above +# 1.10.4 would break that matrix entry. When dropping 1.10.x coverage in +# a future cohort, bump this constant deliberately and remove the 22.04 +# matrix entry in the same commit. +set(H5CPP_HDF5_FLOOR "1.10.4") if(HDF5_VERSION VERSION_LESS ${H5CPP_HDF5_FLOOR}) message(FATAL_ERROR "-- !!! H5CPP requires HDF5 v${H5CPP_HDF5_FLOOR} or greater (found ${HDF5_VERSION}) !!!" diff --git a/README.md b/README.md index a7a9ebeb84..8dfd10bd6c 100644 --- a/README.md +++ b/README.md @@ -91,10 +91,10 @@ cmake --install build - **`std::float16_t`** (C++23 IEEE 754 half-precision) - **Rank-7** array support - **Expanded attribute** type coverage -- **Threaded I/O pipeline** for filter chains +- **FAPL-scoped worker pool** — `h5::create(..., h5::threads{N} | h5::backpressure{M})` opts the file into parallel filter compression; all chunked datasets opened on that file (and pt_t built from them) inherit the pool with async-pipelined dispatch - **HDF5 1.12.2 ceiling** — tested and verified; `H5Dvlen_reclaim` / reference API compatibility - **Windows MSVC** in the CI matrix -- **ASan + UBSan** clean on Clang 20 +- **ASan + UBSan + TSan** clean on Clang 20 ## Documentation diff --git a/docs/filtering-pipeline-rework-report.md b/docs/filtering-pipeline-rework-report.md index 75d4f8d080..ab8b3fae68 100644 --- a/docs/filtering-pipeline-rework-report.md +++ b/docs/filtering-pipeline-rework-report.md @@ -30,7 +30,7 @@ The current implementation is an experimental skeleton rather than a production | Multi-filter read | Throws for more than one filter | Reverse-order decode through the complete filter plan | | Buffer sizing | Uses chunk-sized scratch buffers | Encoded buffers must allow compression expansion | | Filter mask | Partial handling | Preserve HDF5 chunk filter-mask semantics | -| Threading | `threaded_pipeline_t` is a placeholder | Worker-local state and bounded chunk scheduling | +| Threading | `threaded_pipeline_t` is a placeholder | Worker-local state and bounded chunk scheduling (delivered in #250 as FAPL-scoped `pool_pipeline_t`) | | Portability | Linux path is the only recently verified path | Linux, macOS, and Windows allocation/build behavior | Focused baseline probes confirmed two important failures: @@ -175,3 +175,22 @@ Threading should initially use C++17 standard library primitives. Avoid platform Start with correctness, not SIMD. The highest-value first milestone is a serial `filter_plan` that can round-trip standard HDF5 filters and reject unsupported filters explicitly. Once that foundation is correct, SIMD and multithreading become execution-policy improvements rather than a risky rewrite. The strategic direction is to make H5CPP's filtering chain a modern CPU execution engine while preserving HDF5-compatible metadata and file interoperability. + +## Status — Phase I (#250, FAPL worker pool) + +Phase I of the threading workplan is delivered on PR #251. The design and trade-offs are summarised in `tasks/h5cpp-fapl-multithreading-workplan.md`; the user-visible surface is one line in the file's FAPL: + +```cpp +h5::fd_t fd = h5::create( + "data.h5", H5F_ACC_TRUNC, + h5::default_fcpl, + h5::threads{N} | h5::backpressure{M}); // M default = 8 × N +``` + +When `h5::threads{N}` is installed, the FAPL allocates a `worker_pool_t` and parks a `shared_ptr<>` to it inside an `H5Pinsert2` slot. Every dataset created/opened on that file inherits the pool via `H5Fget_access_plist`. When a dataset's DAPL has `h5::high_throughput`, `h5::write` and `h5::read` construct a local `pool_pipeline_t` that submits per-chunk compression closures to the pool and drains in submission order; `H5Dwrite_chunk` still runs on the calling thread. `pt_t` resolves the same pool in `init()` and uses `pool_pipeline_t` as a variant alternative. + +Back-pressure is bounded by `h5::backpressure{M}`: the producer blocks on the front future once the in-flight deque hits `M`. Default is 8 × worker count. + +The legacy per-pt_t `h5::filter::threads{N}` constructor from #241 is removed in this cycle (see [Phase 1.4 commit message]). Two parallel threading paths in the pipeline invite contention bugs and confuse the surface; the FAPL pool fully subsumes it. + +Phase II (compile-time C-API blocking on `async_fd_t`, full async mode) is tracked separately. diff --git a/h5cpp/H5Dappend.hpp b/h5cpp/H5Dappend.hpp index 82e78fdb47..a1e4b7fb08 100644 --- a/h5cpp/H5Dappend.hpp +++ b/h5cpp/H5Dappend.hpp @@ -7,10 +7,13 @@ #include "H5capi.hpp" #include "H5Tmeta.hpp" #include "H5cout.hpp" -#include "H5Zpipeline_threaded.hpp" #include #include #include +#include +#include +#include +#include #include #include #include @@ -22,17 +25,22 @@ namespace h5 { std::ostream& operator<<(std::ostream& os, const h5::pt_t& pt); namespace h5::impl { - // pt_t::pipeline selects between the synchronous basic pipeline (default, - // bytewise-identical to pre-241 behavior) and the parallel threaded pipeline. - // Both alternatives are indirect-owned through unique_ptr so that the - // variant remains move-assignable regardless of the underlying pipeline's - // move semantics (threaded_pipeline_t deletes moves because it owns - // std::jthread workers and atomics; basic_pipeline_t inherits a manually- - // written move-assign from pipeline_t that suppresses the - // implicit move-ctor needed by variant assignment). + // pt_t::pipeline selects between two pipeline implementations: + // + // basic_pipeline_t — synchronous filter chain on the calling + // thread; default when the file's FAPL has + // no h5::threads{N} pool installed. + // pool_pipeline_t — FAPL-scoped shared worker pool, async- + // pipelined dispatch with back-pressure. + // Selected when init() resolves a pool + // from the file's FAPL. + // + // Both are indirect-owned through unique_ptr so the variant + // remains move-assignable regardless of the underlying pipeline's + // move semantics. using pt_pipeline_t = std::variant< std::unique_ptr, - std::unique_ptr + std::unique_ptr >; } @@ -40,10 +48,9 @@ namespace h5::impl { namespace h5 { struct pt_t { pt_t(); - pt_t( const h5::ds_t& handle ); // conversion ctor — synchronous pipeline - pt_t( const h5::ds_t& handle, h5::filter::threads workers ); // threaded pipeline - // deep copy with own cache memory — always uses the synchronous pipeline, - // since the threaded pipeline owns workers that cannot be duplicated. + pt_t( const h5::ds_t& handle ); // FAPL-aware ctor: pool when h5::threads{N} is set, basic otherwise + // deep copy with own cache memory — re-runs init(), so the copy + // resolves its own pipeline from the dataset's file FAPL. pt_t( const h5::pt_t& pt ) : h5::pt_t(pt.ds) { }; ~pt_t(); @@ -115,6 +122,11 @@ namespace h5 { chunk_dims[H5CPP_MAX_RANK], count[H5CPP_MAX_RANK]; size_t block_size,element_size,N,n,rank; void *ptr, *fill_value; + + // Phase 1.3.3 — chunk dispatch is uniform across all variant + // alternatives via visit_pipeline + write_chunk. pool_pipeline_t + // holds the pool reference, in-flight deque, and back-pressure + // logic internally; pt_t no longer needs per-instance pool fields. }; } @@ -128,7 +140,9 @@ inline h5::pt_t::pt_t() : count[i] = 1, offset[i] = 0; } -// conversion ctor — synchronous pipeline (default) +// FAPL-aware conversion ctor — init() resolves pool from the dataset's +// FAPL and swaps the variant to pool_pipeline_t when h5::threads{N} is +// installed. Otherwise the default basic_pipeline_t stays active. inline h5::pt_t::pt_t( const h5::ds_t& handle ) : pt_t() { /*default ctor has an invalid state -- skip initialization */ @@ -136,16 +150,6 @@ h5::pt_t::pt_t( const h5::ds_t& handle ) : pt_t() { init(handle); } -// conversion ctor — threaded pipeline with N compression workers -inline -h5::pt_t::pt_t( const h5::ds_t& handle, h5::filter::threads workers ) : pt_t() { - if( !is_valid(handle) ) return; - auto threaded = std::make_unique(); - threaded->set_worker_count(workers.n); - pipeline.emplace>(std::move(threaded)); - init(handle); -} - inline h5::pt_t::~pt_t(){ /*default ctor has an invalid state -- skip flushing cache */ @@ -170,6 +174,21 @@ void h5::pt_t::init( const h5::ds_t& handle ){ H5Pset_chunk_cache(dapl, 0, 0, H5D_CHUNK_CACHE_W0_DEFAULT); ds = h5::ds_t{H5Dopen2(fid, dname.data(), dapl)}; H5Pclose(dapl); + + // Phase 1.3.3 — resolve the file's FAPL pool while we still hold + // a live fid. When the FAPL has h5::threads{N} installed, swap + // the variant from basic_pipeline_t (default) to pool_pipeline_t + // constructed with the pool + back-pressure cap. When no pool + // is present, the default basic_pipeline_t stays — synchronous + // behavior, identical to pre-Phase-I. + hid_t fapl = H5Fget_access_plist(fid); + if (auto pool = impl::resolve_worker_pool(fapl)) { + const unsigned cap = impl::resolve_backpressure(fapl, pool->worker_count()); + pipeline.emplace>( + std::make_unique(std::move(pool), cap)); + } + H5Pclose(fapl); + H5Fclose(fid); dt = h5::dt_t{H5Dget_type(static_cast(ds))}; h5::sp_t file_space = h5::get_space( handle ); @@ -194,6 +213,7 @@ void h5::pt_t::init( const h5::ds_t& handle ){ throw h5::error::io::packet_table::misc( H5CPP_ERROR_MSG("CTOR: unable to create handle from dataset...")); } } + template inline std::enable_if_t< h5::meta::is_scalar::value, void> h5::pt_t::append( const T* ptr ) try { //PTR: write directly chunk size from provided buffer/ptr @@ -308,28 +328,37 @@ void> h5::pt_t::append( const T& ref ) try { inline void h5::pt_t::flush(){ - if( n == 0 ) return; - *offset = *current_dims; - *current_dims += *chunk_dims; - h5::set_extent(ds, current_dims); - - if( H5Tis_variable_str(this->dt)) { - hsize_t block = 1, count = n; - h5::sp_t mem_space{H5Screate_simple(static_cast(rank), &count, nullptr )}; - h5::sp_t file_space{H5Dget_space( static_cast<::hid_t>(ds) )}; - h5::select_all( mem_space ); - H5Sselect_hyperslab( static_cast(file_space), H5S_SELECT_SET, offset, nullptr, &block, &count); - - H5Dwrite( static_cast( ds ), - dt, mem_space, file_space, static_cast(dxpl), ptr); - } else { - // the remainder of last chunk must be set to fill_value; arbitrary type size supported - for(hsize_t i=0; i<(N-n); i++) - for(size_t j=0; j < element_size; j++) - static_cast( ptr )[(n + i) * element_size + j] = static_cast( fill_value )[ j ]; - visit_pipeline([&](auto& p){ p.write_chunk(offset, block_size, ptr); }); + if( n != 0 ) { + *offset = *current_dims; + *current_dims += *chunk_dims; + h5::set_extent(ds, current_dims); + + if( H5Tis_variable_str(this->dt)) { + hsize_t block = 1, count = n; + h5::sp_t mem_space{H5Screate_simple(static_cast(rank), &count, nullptr )}; + h5::sp_t file_space{H5Dget_space( static_cast<::hid_t>(ds) )}; + h5::select_all( mem_space ); + H5Sselect_hyperslab( static_cast(file_space), H5S_SELECT_SET, offset, nullptr, &block, &count); + + H5Dwrite( static_cast( ds ), + dt, mem_space, file_space, static_cast(dxpl), ptr); + } else { + // the remainder of last chunk must be set to fill_value; arbitrary type size supported + for(hsize_t i=0; i<(N-n); i++) + for(size_t j=0; j < element_size; j++) + static_cast( ptr )[(n + i) * element_size + j] = static_cast( fill_value )[ j ]; + visit_pipeline([&](auto& p){ p.write_chunk(offset, block_size, ptr); }); + } + n = 0; } - n = 0; + // Pool path: drain in-flight chunks so flush() honors the "data + // on disk after this returns" contract. basic_pipeline_t writes + // inline; the visit is a no-op for that alternative. + std::visit([](auto& p) { + using T = std::decay_t; + if constexpr (std::is_same_v) + p->drain(); + }, pipeline); } inline void h5::pt_t::reset() { diff --git a/h5cpp/H5Dread.hpp b/h5cpp/H5Dread.hpp index 9fcca983ba..d80fdfff8f 100644 --- a/h5cpp/H5Dread.hpp +++ b/h5cpp/H5Dread.hpp @@ -74,9 +74,32 @@ namespace h5 { return layout == H5D_CHUNKED; }(); if( use_pipeline ){ - h5::impl::pipeline_t* filters; - H5Pget(dapl, H5CPP_DAPL_HIGH_THROUGHPUT, &filters); - filters->read(ds, offset, stride, block, count, dxpl, ptr); + // Phase 1.3.3 — if the file's FAPL has h5::threads{N}, route + // reads through a local pool_pipeline_t. Currently pool_pipeline_t:: + // read_chunk_impl is synchronous (parallel decompress is Phase 1.5+), + // so the FAPL-pool branch is semantically equivalent to the DAPL + // path today; the structure is in place for the read-ahead + // optimization to land later without changing call sites. + hid_t fid = H5Iget_file_id(static_cast(ds)); + hid_t fapl = H5Fget_access_plist(fid); + auto pool = h5::impl::resolve_worker_pool(fapl); + if (pool) { + const unsigned cap = h5::impl::resolve_backpressure( + fapl, pool->worker_count()); + h5::impl::pool_pipeline_t pipe(std::move(pool), cap); + h5::dcpl_t dcpl{H5Dget_create_plist(static_cast(ds))}; + hid_t type_id = H5Dget_type(static_cast(ds)); + size_t elem_sz = H5Tget_size(type_id); + H5Tclose(type_id); + pipe.set_cache(dcpl, elem_sz); + pipe.read(ds, offset, stride, block, count, dxpl, ptr); + } else { + h5::impl::pipeline_t* filters; + H5Pget(dapl, H5CPP_DAPL_HIGH_THROUGHPUT, &filters); + filters->read(ds, offset, stride, block, count, dxpl, ptr); + } + H5Pclose(fapl); + H5Fclose(fid); }else{ h5::sp_t mem_space = h5::create_simple( size ); h5::select_all( mem_space ); diff --git a/h5cpp/H5Dwrite.hpp b/h5cpp/H5Dwrite.hpp index 1f760ac424..9fe9e97763 100644 --- a/h5cpp/H5Dwrite.hpp +++ b/h5cpp/H5Dwrite.hpp @@ -104,10 +104,33 @@ namespace h5 { const h5::block_t& block = arg::get( h5::default_block, args...); const h5::offset_t& offset = arg::get( h5::default_offset, args...); const h5::stride_t& stride = arg::get( h5::default_stride, args...); - - h5::impl::pipeline_t* filters; - H5Pget(dapl, H5CPP_DAPL_HIGH_THROUGHPUT, &filters); - filters->write(ds, offset, stride, block, count, dxpl, ptr); + + // Phase 1.3.3 — if the file's FAPL has h5::threads{N}, route + // compress work through the shared pool via a local + // pool_pipeline_t. Otherwise use the existing DAPL-stored + // basic_pipeline_t pointer for synchronous filter chain. + hid_t fid = H5Iget_file_id(static_cast(ds)); + hid_t fapl = H5Fget_access_plist(fid); + auto pool = h5::impl::resolve_worker_pool(fapl); + if (pool) { + const unsigned cap = h5::impl::resolve_backpressure( + fapl, pool->worker_count()); + h5::impl::pool_pipeline_t pipe(std::move(pool), cap); + // set_cache populates the filter chain from the dataset's DCPL. + h5::dcpl_t dcpl{H5Dget_create_plist(static_cast(ds))}; + hid_t type_id = H5Dget_type(static_cast(ds)); + size_t elem_sz = H5Tget_size(type_id); + H5Tclose(type_id); + pipe.set_cache(dcpl, elem_sz); + pipe.write(ds, offset, stride, block, count, dxpl, ptr); + // pipe destructor drains in_flight before pool refcount drop. + } else { + h5::impl::pipeline_t* filters; + H5Pget(dapl, H5CPP_DAPL_HIGH_THROUGHPUT, &filters); + filters->write(ds, offset, stride, block, count, dxpl, ptr); + } + H5Pclose(fapl); + H5Fclose(fid); } else { h5::sp_t mem_space = h5::create_simple( n_elements ); h5::select_all( mem_space ); diff --git a/h5cpp/H5Pthreads.hpp b/h5cpp/H5Pthreads.hpp new file mode 100644 index 0000000000..f1bfec2995 --- /dev/null +++ b/h5cpp/H5Pthreads.hpp @@ -0,0 +1,301 @@ +/* + * Copyright (c) 2018-2026 Steven Varga, Toronto,ON Canada + * Author: Varga, Steven + */ +#pragma once + +// FAPL-scoped worker pool — opt-in via h5::threads{N}. +// +// User-facing API: +// +// h5::fd_t fd = h5::create("data.h5", H5F_ACC_TRUNC, h5::threads{8}); +// h5::fd_t fd = h5::create("data.h5", H5F_ACC_TRUNC, h5::threads{}); // hw_concurrency +// +// Per-dataset opt-in is the orthogonal h5::high_throughput DAPL flag +// (existing). This FAPL property controls "is there a pool, how many +// workers"; whether any given dataset uses it is the DAPL's call. +// +// Storage mechanism mirrors h5::high_throughput (H5Pdapl.hpp): +// +// - H5Pinsert2 stores a pointer to a worker_pool_slot_t in the FAPL +// skip list. +// - The slot owns a std::shared_ptr. +// - HDF5 internally copies the FAPL during H5Fopen/H5Fcreate; the +// copy callback allocates a fresh slot aliasing the same pool +// (shared_ptr refcount++). Every FAPL copy shares the pool. +// - The close callback drops the slot. Pool is destroyed when the +// last live FAPL copy releases its slot, at which point worker +// std::jthreads receive request_stop() and join cleanly. +// +// This is the shared-ownership variant of the H5Pinsert2 + slot pattern. +// Compare with H5Pdapl.hpp's fresh-allocation-per-copy semantics used by +// the high_throughput pipeline property: that pattern allocates a fresh +// pipeline scratch buffer per copy because pipelines are per-write +// scratch state; this pattern shares one live resource across all +// copies because workers ARE the resource we want shared. See +// tasks/h5cpp-fapl-multithreading-workplan.md §2-§3. +// +// PHASE 1.1 STATUS: lifecycle scaffolding only. worker_pool_t owns N +// std::jthreads whose only job today is to honor std::stop_token on +// shutdown. Phase 1.2 extends with bounded MPMC queues, compress_sync / +// compress_async API, and integration with the filter pipeline. Phase +// 1.3 wires pt_t / h5::write / h5::read consumer sites. + +#include "H5Pall.hpp" +#include "detail/doorbell.hpp" +#include "detail/stoppable_thread.hpp" +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#define H5CPP_FAPL_WORKER_POOL "h5cpp_fapl_worker_pool" +#define H5CPP_FAPL_BACKPRESSURE "h5cpp_fapl_backpressure" + +// Default in-flight chunk cap when h5::threads{N} is set without an +// accompanying h5::backpressure{N}. Resolves to 8 × worker_count, which +// is loose enough to keep all workers fed during steady-state streaming +// and tight enough to bound memory growth (chunk_size × cap bytes per pt_t). +// Override at compile time via -DH5CPP_FAPL_BACKPRESSURE_DEFAULT_FACTOR=K. +#ifndef H5CPP_FAPL_BACKPRESSURE_DEFAULT_FACTOR +#define H5CPP_FAPL_BACKPRESSURE_DEFAULT_FACTOR 8u +#endif + +namespace h5::impl { + +// ─── Pool ──────────────────────────────────────────────────────────────────── +// +// Generic compute pool: workers pull type-erased tasks off a single MPMC-ish +// queue (std::mutex + std::queue + doorbell signal) and execute them. The +// submit(callable) API returns a std::future of the callable's result type, +// using std::packaged_task wrapped in a std::function for storage. +// +// The pool is deliberately HDF5-agnostic at this layer. Consumer sites +// (Phase 1.3 — pt_t, h5::write, h5::read) wrap their HDF5-specific compress +// logic in a closure and submit() it. This keeps the pool reusable for +// Phase II's executor and any future async work. +struct worker_pool_t { + // Pool size is fixed at construction; cannot resize at runtime. + // n == 0 means "use std::thread::hardware_concurrency()". + explicit worker_pool_t(unsigned n) { + const unsigned count = n ? n : std::max(1u, std::thread::hardware_concurrency()); + workers_.reserve(count); + for (unsigned i = 0; i < count; ++i) + workers_.emplace_back([this](h5::detail::stop_token_t st) { worker_loop(st); }); + } + + // Destruction sequence: + // 1. wait_idle() — let all submitted tasks complete (caller can also + // have called this earlier). + // 2. Set stopping_ flag and ring the doorbell so all waiters wake. + // 3. stoppable_thread_t destructors automatically request stop + join. + ~worker_pool_t() { + wait_idle(); + stopping_.store(true, std::memory_order_release); + bell_.ring_all(); + } + + worker_pool_t(const worker_pool_t&) = delete; + worker_pool_t& operator=(const worker_pool_t&) = delete; + worker_pool_t(worker_pool_t&&) = delete; + worker_pool_t& operator=(worker_pool_t&&) = delete; + + // Introspection for tests and scheduling decisions. + [[nodiscard]] unsigned worker_count() const noexcept { + return static_cast(workers_.size()); + } + + // Submit a callable for execution by any worker. Returns a future of + // the callable's result type. Callable must be invocable with no + // arguments; capture state via closure if needed. + // + // Internally uses std::packaged_task to bridge the move-only callable + // into a copyable std::function (wrapped in shared_ptr) so it can sit + // in the queue. + template + auto submit(Fn&& fn) -> std::future> { + using R = std::invoke_result_t; + auto task = std::make_shared>(std::forward(fn)); + auto fut = task->get_future(); + in_flight_.fetch_add(1, std::memory_order_release); + { + std::lock_guard lk(m_); + tasks_.emplace([task] { (*task)(); }); + } + bell_.ring(); + return fut; + } + + // Block until all submitted tasks have completed. Intended for clean + // shutdown or as a synchronization barrier between submission phases. + void wait_idle() { + while (in_flight_.load(std::memory_order_acquire) > 0) + std::this_thread::yield(); + } + + // Coarse approximation of pending+running work. Useful for tests and + // monitoring; not a strict ordering guarantee. + [[nodiscard]] int in_flight() const noexcept { + return in_flight_.load(std::memory_order_acquire); + } + +private: + void worker_loop(h5::detail::stop_token_t st) { + while (!st.stop_requested()) { + std::function task; + std::uint32_t last_seq = 0; + bool got_task = false; + { + std::lock_guard lk(m_); + if (!tasks_.empty()) { + task = std::move(tasks_.front()); + tasks_.pop(); + got_task = true; + } else { + // Snapshot the doorbell sequence WHILE HOLDING the lock. + // submit() will ring the bell only AFTER it has released + // the same lock, so any subsequent ring advances past + // last_seq — bell_.wait(last_seq) below cannot miss it. + last_seq = bell_.load(); + } + } + + if (got_task) { + try { task(); } catch (...) { /* packaged_task captures it */ } + in_flight_.fetch_sub(1, std::memory_order_release); + continue; + } + + // Queue was empty — wait for a ring or for shutdown. + if (st.stop_requested() || + stopping_.load(std::memory_order_acquire)) return; + bell_.wait(last_seq); + } + } + + std::mutex m_; + std::queue> tasks_; + h5::detail::doorbell_t bell_; + std::atomic in_flight_{0}; + std::atomic stopping_{false}; + std::vector workers_; +}; + +// ─── FAPL slot + lifecycle callbacks ───────────────────────────────────────── + +// The heap-allocated holder whose pointer lives in the FAPL skip-list +// value slot. Indirection is necessary because H5Pinsert2 stores raw +// bytes — it cannot run shared_ptr's constructor/destructor for us. +struct worker_pool_slot_t { + std::shared_ptr pool; +}; + +// Copy callback: HDF5 cloned the property bytes (the slot pointer was +// memcpy'd into the destination's value slot). Allocate a NEW slot whose +// shared_ptr aliases the same pool — refcount++ via shared_ptr copy. +// +// This is the contract that makes "every FAPL copy shares one pool" work. +inline herr_t fapl_pool_copy_cb(const char* /*name*/, size_t /*size*/, void* value) { + auto** slot_loc = static_cast(value); + *slot_loc = new worker_pool_slot_t{(*slot_loc)->pool}; + return 0; +} + +// Close callback: drop one slot. shared_ptr inside the slot releases its +// reference; worker_pool_t destructor runs when the last slot is freed +// (refcount reaches 0), which stops and joins the jthreads. +inline herr_t fapl_pool_close_cb(const char* /*name*/, size_t /*size*/, void* ptr) { + delete *static_cast(ptr); + return 0; +} + +// Setter invoked when the user applies h5::threads{N} to an FAPL. +// Idempotent — if a pool property is already installed, leaves it untouched. +// +// n == 0 maps to std::thread::hardware_concurrency() inside worker_pool_t. +inline herr_t fapl_threads_set(::hid_t fapl, unsigned n) { + if (H5Pexist(fapl, H5CPP_FAPL_WORKER_POOL)) return 0; + auto* slot = new worker_pool_slot_t{ + std::make_shared(n) + }; + return H5Pinsert2(fapl, H5CPP_FAPL_WORKER_POOL, + sizeof(worker_pool_slot_t*), &slot, + nullptr, // set + nullptr, // get + nullptr, // prp_del + fapl_pool_copy_cb, + nullptr, // compare + fapl_pool_close_cb); +} + +// Consumer-site helper: given a FAPL id, retrieve the worker pool shared_ptr +// if one is installed. Returns nullptr (= no pool, fall back to synchronous +// pipeline) if the property is absent. Used by pt_t, h5::write, h5::read +// in Phase 1.3. +inline std::shared_ptr resolve_worker_pool(::hid_t fapl_id) noexcept { + if (fapl_id < 0 || H5Iis_valid(fapl_id) <= 0) return nullptr; + if (!H5Pexist(fapl_id, H5CPP_FAPL_WORKER_POOL)) return nullptr; + worker_pool_slot_t* slot = nullptr; + H5Pget(fapl_id, H5CPP_FAPL_WORKER_POOL, &slot); + return slot ? slot->pool : nullptr; +} + +// ─── Back-pressure cap (separate FAPL property) ────────────────────────────── + +// Setter invoked when the user applies h5::backpressure{N} to an FAPL. +// Stores a plain unsigned by memcpy semantics — no lifecycle callbacks +// needed since the value is trivially copyable and owns no heap. +// +// The cap is consumed by pt_t (and Phase 1.3's h5::write/read) when +// queueing work to the pool: write_chunk blocks on drain_completed +// once the in-flight deque reaches the cap. +inline herr_t fapl_backpressure_set(::hid_t fapl, unsigned cap) { + if (H5Pexist(fapl, H5CPP_FAPL_BACKPRESSURE)) return 0; + return H5Pinsert2(fapl, H5CPP_FAPL_BACKPRESSURE, + sizeof(unsigned), &cap, + nullptr, nullptr, nullptr, + nullptr, // copy: memcpy is correct for POD + nullptr, + nullptr); // close: nothing to release +} + +// Consumer-site helper: returns the user-set back-pressure cap, or computes +// the default (H5CPP_FAPL_BACKPRESSURE_DEFAULT_FACTOR × worker_count) when +// no h5::backpressure{N} was applied. Returns 0 only when no pool is +// installed either — caller should already have bailed in that case. +inline unsigned resolve_backpressure(::hid_t fapl_id, + unsigned worker_count) noexcept { + if (fapl_id < 0 || H5Iis_valid(fapl_id) <= 0) return 0; + if (H5Pexist(fapl_id, H5CPP_FAPL_BACKPRESSURE)) { + unsigned cap = 0; + H5Pget(fapl_id, H5CPP_FAPL_BACKPRESSURE, &cap); + if (cap > 0) return cap; + } + return H5CPP_FAPL_BACKPRESSURE_DEFAULT_FACTOR * worker_count; +} + +} // namespace h5::impl + +namespace h5 { +// User-facing tags. Applied to a fapl_t via the property-chain mechanism. +// +// h5::create("data.h5", H5F_ACC_TRUNC, h5::threads{8}) +// h5::create("data.h5", H5F_ACC_TRUNC, h5::threads{}) // hw_concurrency +// h5::create("data.h5", H5F_ACC_TRUNC, h5::threads{8} +// | h5::backpressure{32}) // 8 workers, 32-chunk cap +// +// h5::backpressure{N} without h5::threads{N} is silently a no-op: +// without a pool, there is no queue to bound. Document at user-facing +// level; do not warn at runtime. +using threads = impl::fapl_call, + impl::fapl_threads_set>; +using backpressure = impl::fapl_call, + impl::fapl_backpressure_set>; +} diff --git a/h5cpp/H5Zpipeline.hpp b/h5cpp/H5Zpipeline.hpp index 5e6e273e84..572b8ddb87 100644 --- a/h5cpp/H5Zpipeline.hpp +++ b/h5cpp/H5Zpipeline.hpp @@ -134,7 +134,6 @@ namespace h5{ namespace impl { void write_chunk_impl( const hsize_t* offset, size_t nbytes, const void* ptr ); void read_chunk_impl( const hsize_t* offset, size_t nbytes, void* ptr ); }; - // threaded_pipeline_t is defined in H5Zpipeline_threaded.hpp struct romio_pipeline_t : public pipeline_t{ void write_chunk_impl( const hsize_t* offset, size_t nbytes, const void* ptr ){ (void)offset; (void)nbytes; (void)ptr; diff --git a/h5cpp/H5Zpipeline_pool.hpp b/h5cpp/H5Zpipeline_pool.hpp new file mode 100644 index 0000000000..37fa12b296 --- /dev/null +++ b/h5cpp/H5Zpipeline_pool.hpp @@ -0,0 +1,249 @@ +/* + * Copyright (c) 2018-2026 Steven Varga, Toronto,ON Canada + * Author: Varga, Steven + */ +#pragma once + +// FAPL-scoped parallel filter pipeline — second CRTP descendant of +// pipeline_t alongside basic_pipeline_t (synchronous). +// +// pool_pipeline_t owns a strong reference to an external worker_pool_t +// (resolved from the file's FAPL) and submits compress closures to it; +// H5Dwrite_chunk runs on the calling thread. +// +// The pipeline reuses pipeline_t<>::write/read (and via that +// split_to_chunk_write/read) for chunk decomposition, so consumer sites +// — pt_t, h5::write, h5::read — never see the pool dispatch directly. +// They just construct a pool_pipeline_t when their file's FAPL has one. + +#include "H5Zpipeline.hpp" +#include "H5Pthreads.hpp" + +#include +#include +#include +#include +#include + +namespace h5::impl { + +struct pool_pipeline_t : public pipeline_t { + pool_pipeline_t() = default; // variant requires default-constructible + // when this is the chosen alternative; pool_ stays null + // and dispatch fast-fails (no pool installed). + + pool_pipeline_t(std::shared_ptr pool, unsigned cap) + : pool_(std::move(pool)), cap_(cap) {} + + ~pool_pipeline_t() { + // Drain any remaining in-flight work in submission order before the + // pipeline (and its shared_ptr to the pool) goes away. Blocks the + // destructor on each front future. + while (!in_flight_.empty()) drain_in_flight(/*blocking=*/true); + } + + pool_pipeline_t(const pool_pipeline_t&) = delete; + pool_pipeline_t& operator=(const pool_pipeline_t&) = delete; + pool_pipeline_t(pool_pipeline_t&&) = delete; + pool_pipeline_t& operator=(pool_pipeline_t&&) = delete; + + // CRTP entry points called by pipeline_t<>::write / pipeline_t<>::read + // via split_to_chunk_write / split_to_chunk_read. + void write_chunk_impl(const hsize_t* offset, std::size_t nbytes, const void* src); + void read_chunk_impl (const hsize_t* offset, std::size_t nbytes, void* dst); + + // Public quiesce — block until every in-flight future has been + // drained. Called by pt_t::flush so users get the expected + // "data on disk after flush returns" semantics for the pool path. + void drain() { + while (!in_flight_.empty()) drain_in_flight(/*blocking=*/true); + } + +private: + // Worker-produced compressed chunk, ready for H5Dwrite_chunk on the + // calling thread. unique_ptr keeps it move-only and + // aligned to operator new[]'s default alignment. + struct result_t { + std::unique_ptr data; + std::size_t nbytes{0}; + std::uint32_t mask{0}; + std::array offset{}; + }; + + void drain_in_flight(bool blocking); + + std::shared_ptr pool_; + unsigned cap_{0}; + std::deque> in_flight_; +}; + +// ─── write path ────────────────────────────────────────────────────────────── + +inline void pool_pipeline_t::drain_in_flight(bool blocking) { + using namespace std::chrono_literals; + if (blocking) { + if (in_flight_.empty()) return; + auto r = in_flight_.front().get(); // blocks + H5Dwrite_chunk(static_cast<::hid_t>(this->ds), static_cast<::hid_t>(this->dxpl), + r.mask, r.offset.data(), r.nbytes, r.data.get()); + in_flight_.pop_front(); + return; + } + while (!in_flight_.empty() && + in_flight_.front().wait_for(0s) == std::future_status::ready) { + auto r = in_flight_.front().get(); + H5Dwrite_chunk(static_cast<::hid_t>(this->ds), static_cast<::hid_t>(this->dxpl), + r.mask, r.offset.data(), r.nbytes, r.data.get()); + in_flight_.pop_front(); + } +} + +inline void pool_pipeline_t::write_chunk_impl(const hsize_t* offset_in, + std::size_t nbytes, + const void* src) { + if (!pool_) { + // No pool was wired in (default-constructed alternative). Fall + // through to the synchronous filter chain identical to + // basic_pipeline_t::write_chunk_impl. This branch shouldn't be + // reached in normal use; it exists to keep the variant safe. + size_t length = nbytes; + void *in = chunk0, *out = chunk1, *tmp = chunk0; + std::uint32_t mask = 0; + switch (tail) { + case 0: + H5Dwrite_chunk(static_cast<::hid_t>(ds), static_cast<::hid_t>(dxpl), + 0, offset_in, nbytes, src); + return; + case 1: + length = filter[0](out, src, nbytes, flags[0], cd_size[0], cd_values[0]); + if (!length) mask = 1u; + [[fallthrough]]; + default: + for (hsize_t j = 1; j < tail; ++j) { + tmp = in; in = out; out = tmp; + length = filter[j](out, in, length, flags[j], cd_size[j], cd_values[j]); + if (!length) mask |= (1u << j); + } + H5Dwrite_chunk(static_cast<::hid_t>(ds), static_cast<::hid_t>(dxpl), + mask, offset_in, length, out); + } + return; + } + + // Snapshot filter chain into a POD captured by value in the closure. + struct filter_chain_t { + filter::call_t filter[H5CPP_MAX_FILTER]; + unsigned flags[H5CPP_MAX_FILTER]; + std::size_t cd_size[H5CPP_MAX_FILTER]; + unsigned cd_values[H5CPP_MAX_FILTER][H5CPP_MAX_FILTER_PARAM]; + hsize_t tail; + }; + filter_chain_t fc{}; + fc.tail = this->tail; + std::memcpy(fc.filter, this->filter, sizeof(fc.filter)); + std::memcpy(fc.flags, this->flags, sizeof(fc.flags)); + std::memcpy(fc.cd_size, this->cd_size, sizeof(fc.cd_size)); + std::memcpy(fc.cd_values, this->cd_values, sizeof(fc.cd_values)); + + // Worker-owned input buffer. + auto raw = std::make_unique(nbytes); + std::memcpy(raw.get(), src, nbytes); + + std::array off{}; + std::copy(offset_in, offset_in + this->rank, off.begin()); + + auto fut = pool_->submit( + [raw = std::move(raw), nbytes, off, fc]() mutable -> result_t { + result_t out; + out.offset = off; + + if (fc.tail == 0) { + out.data = std::move(raw); + out.nbytes = nbytes; + out.mask = 0; + return out; + } + + const std::size_t scratch = filter::filter_scratch_bound(nbytes); + auto wbuf0 = std::make_unique(scratch); + auto wbuf1 = std::make_unique(scratch); + + std::size_t length = nbytes; + std::uint32_t mask = 0; + + length = fc.filter[0](wbuf0.get(), raw.get(), length, + fc.flags[0], fc.cd_size[0], fc.cd_values[0]); + if (!length) mask |= 1u; + + void* in_buf = wbuf0.get(); + void* out_buf = wbuf1.get(); + for (hsize_t j = 1; j < fc.tail; ++j) { + length = fc.filter[j](out_buf, in_buf, length, + fc.flags[j], fc.cd_size[j], fc.cd_values[j]); + if (!length) mask |= (1u << j); + std::swap(in_buf, out_buf); + } + + out.data = std::make_unique(length); + std::memcpy(out.data.get(), in_buf, length); + out.nbytes = length; + out.mask = mask; + return out; + }); + + in_flight_.push_back(std::move(fut)); + + // Opportunistic drain. + drain_in_flight(/*blocking=*/false); + + // Bounded back-pressure: block on the front future when the deque + // reaches the cap. Producer memory ≤ cap × chunk_size per pipeline. + while (in_flight_.size() >= cap_) + drain_in_flight(/*blocking=*/true); +} + +// ─── read path (synchronous for v1) ────────────────────────────────────────── +// +// Phase 1.3.3 keeps read synchronous, identical to basic_pipeline_t::read_chunk_impl. +// Parallel decompression is a deliberate follow-up: it requires read-ahead +// (the read path consumes chunks in order, but the user's buffer slots are +// known up-front, so prefetching can fan out across the pool). Tracked in +// Phase 1.5+ work. + +inline void pool_pipeline_t::read_chunk_impl(const hsize_t* offset_in, + std::size_t nbytes, + void* /*dst*/) { + std::size_t length = nbytes; + std::uint32_t filter_mask; + + if (tail == 0) { +#if H5_VERSION_GE(2,0,0) + std::size_t buf_size = nbytes; + H5Dread_chunk2(ds, dxpl, offset_in, &filter_mask, chunk0, &buf_size); +#else + H5Dread_chunk(ds, dxpl, offset_in, &filter_mask, chunk0); +#endif + return; + } + + void* read_target = (tail % 2 == 1) ? chunk1 : chunk0; +#if H5_VERSION_GE(2,0,0) + std::size_t buf_size = nbytes; + H5Dread_chunk2(ds, dxpl, offset_in, &filter_mask, read_target, &buf_size); +#else + H5Dread_chunk(ds, dxpl, offset_in, &filter_mask, read_target); +#endif + + void* src = read_target; + void* dst = (read_target == chunk0) ? static_cast(chunk1) + : static_cast(chunk0); + for (hsize_t j = tail; j > 0; --j) { + const hsize_t fi = j - 1; + length = filter[fi](dst, src, length, + flags[fi] | H5Z_FLAG_REVERSE, + cd_size[fi], cd_values[fi]); + std::swap(src, dst); + } +} + +} // namespace h5::impl diff --git a/h5cpp/H5Zpipeline_threaded.hpp b/h5cpp/H5Zpipeline_threaded.hpp deleted file mode 100644 index 82e94eec0e..0000000000 --- a/h5cpp/H5Zpipeline_threaded.hpp +++ /dev/null @@ -1,497 +0,0 @@ -/* - * Copyright (c) 2018-2020 Steven Varga, Toronto,ON Canada - * Author: Varga, Steven - */ -#pragma once -#include -#include -#include -#include -#include -#include -#include -#include "H5Zpipeline.hpp" -#if __cplusplus >= 202002L -# include "H5Qall.hpp" -#endif -#include "detail/doorbell.hpp" -#include "detail/stoppable_thread.hpp" - -// Workers handle compression only; H5Dwrite_chunk is always called from the -// main thread. Work items use raw ::hid_t integers (not h5cpp RAII wrappers) -// so no HDF5 API calls occur on worker threads, preserving compatibility with -// non-thread-safe HDF5 builds. - -namespace h5::filter { - // Opt-in tag selecting the threaded filter pipeline. - // h5::filter::threads{} -> std::thread::hardware_concurrency() workers - // h5::filter::threads{N} -> N compression workers - // Passed to pt_t constructors (and h5::write) to switch from the default - // synchronous basic_pipeline_t to the parallel threaded_pipeline_t. - struct threads { - unsigned n; - constexpr threads() noexcept : n(0) {} - constexpr explicit threads(unsigned count) noexcept : n(count) {} - }; -} - -namespace h5::impl { - -namespace detail { - - // Uncompressed chunk dispatched to a worker for filter application. - struct raw_work_t { - std::unique_ptr data; - std::array offset{}; - std::size_t nbytes{0}; - ::hid_t ds_id{H5I_UNINIT}; - ::hid_t dxpl_id{H5I_UNINIT}; - - raw_work_t() = default; - raw_work_t(raw_work_t&&) = default; - raw_work_t& operator=(raw_work_t&&) = default; - raw_work_t(const raw_work_t&) = delete; - raw_work_t& operator=(const raw_work_t&) = delete; - }; - - // Filtered chunk ready for H5Dwrite_chunk, returned to main thread. - struct done_work_t { - aligned_ptr data; // compressed/filtered bytes (aligned allocation) - std::array offset{}; - std::size_t nbytes{0}; // compressed size - std::uint32_t mask{0}; - ::hid_t ds_id{H5I_UNINIT}; - ::hid_t dxpl_id{H5I_UNINIT}; - - done_work_t() = default; - done_work_t(done_work_t&&) = default; - done_work_t& operator=(done_work_t&&) = default; - done_work_t(const done_work_t&) = delete; - done_work_t& operator=(const done_work_t&) = delete; - }; - -} // namespace detail - -#if __cplusplus >= 202002L -// ============================================================================ -// C++20 path: uses bounded lock-free queues from H5Qall.hpp and std::jthread. -// This block is byte-for-byte identical to the original implementation. -// ============================================================================ - -struct threaded_pipeline_t : public pipeline_t { - threaded_pipeline_t() = default; - - ~threaded_pipeline_t() { - // Drain all in-flight chunks (workers compress, main thread writes). - flush(); - // std::jthread dtors call request_stop() then join() automatically. - } - - threaded_pipeline_t(const threaded_pipeline_t&) = delete; - threaded_pipeline_t& operator=(const threaded_pipeline_t&) = delete; - threaded_pipeline_t(threaded_pipeline_t&&) = delete; - threaded_pipeline_t& operator=(threaded_pipeline_t&&) = delete; - - // Runtime override of worker pool size. If non-zero, takes precedence over - // the H5CPP_PIPELINE_WORKERS compile-time default. Must be called before - // the first write_chunk_impl, since workers spawn lazily via std::call_once. - void set_worker_count(unsigned n) noexcept { worker_count_override_ = n; } - - // Drains all in-flight work and calls H5Dwrite_chunk from the calling - // (main) thread. Must be called before reading back written data. - void flush() { - while (in_flight_.load(std::memory_order_acquire) > 0) { - drain_done(); - std::this_thread::yield(); - } - drain_done(); // final sweep after counter hits zero - } - - void write_chunk_impl(const hsize_t* offset, std::size_t nbytes, const void* ptr) { - ensure_workers(); - drain_done(); // opportunistic: write any already-compressed chunks - - detail::raw_work_t work; - work.data = std::make_unique(nbytes); - std::memcpy(work.data.get(), ptr, nbytes); - std::copy(offset, offset + this->rank, work.offset.data()); - work.nbytes = nbytes; - work.ds_id = static_cast<::hid_t>(this->ds); - work.dxpl_id = static_cast<::hid_t>(this->dxpl); - - in_flight_.fetch_add(1, std::memory_order_release); - while (!compress_queue_.push(std::move(work))) { - drain_done(); // relieve back-pressure - std::this_thread::yield(); - } - } - - void read_chunk_impl(const hsize_t* offset, std::size_t nbytes, void* /*ptr*/) { - uint32_t filter_mask; - std::size_t length = nbytes; - if (tail == 0) { -#if H5_VERSION_GE(2,0,0) - std::size_t buf_size = nbytes; - H5Dread_chunk2(ds, dxpl, offset, &filter_mask, chunk0, &buf_size); -#else - H5Dread_chunk(ds, dxpl, offset, &filter_mask, chunk0); -#endif - return; - } - void* read_target = (tail % 2 == 1) ? chunk1 : chunk0; -#if H5_VERSION_GE(2,0,0) - std::size_t buf_size = nbytes; - H5Dread_chunk2(ds, dxpl, offset, &filter_mask, read_target, &buf_size); -#else - H5Dread_chunk(ds, dxpl, offset, &filter_mask, read_target); -#endif - void* src = read_target; - void* dst = (read_target == chunk0) - ? static_cast(chunk1) : static_cast(chunk0); - for (hsize_t j = tail; j > 0; --j) { - const hsize_t fi = j - 1; - length = filter[fi](dst, src, length, - flags[fi] | H5Z_FLAG_REVERSE, cd_size[fi], cd_values[fi]); - void* tmp = src; src = dst; dst = tmp; - } - } - -private: - // Called from main thread only: pop finished compressed chunks and write. - void drain_done() { - detail::done_work_t result; - while (done_queue_.pop(result)) { - H5Dwrite_chunk(result.ds_id, result.dxpl_id, result.mask, - result.offset.data(), result.nbytes, result.data.get()); - in_flight_.fetch_sub(1, std::memory_order_release); - } - } - - void ensure_workers() { - std::call_once(init_flag_, [this] { - constexpr unsigned cfg = static_cast(H5CPP_PIPELINE_WORKERS); - const unsigned n = - (worker_count_override_ > 0) ? worker_count_override_ - : (cfg > 0) ? cfg - : std::max(1u, std::thread::hardware_concurrency()); - workers_.reserve(n); - for (unsigned i = 0; i < n; ++i) - workers_.emplace_back([this](std::stop_token st) { worker_loop(st); }); - }); - } - - void worker_loop(std::stop_token st) { - const std::size_t scratch = filter::filter_scratch_bound(block_size); - aligned_ptr wbuf0 = make_aligned(H5CPP_MEM_ALIGNMENT, scratch); - aligned_ptr wbuf1 = make_aligned(H5CPP_MEM_ALIGNMENT, scratch); - - detail::raw_work_t work; - while (compress_queue_.wait_pop(work, st)) { - detail::done_work_t result = compress(work, wbuf0, wbuf1); - while (!done_queue_.push(std::move(result))) - std::this_thread::yield(); - } - } - - detail::done_work_t compress(const detail::raw_work_t& work, - aligned_ptr& wbuf0, aligned_ptr& wbuf1) - { - detail::done_work_t out; - out.offset = work.offset; - out.ds_id = work.ds_id; - out.dxpl_id = work.dxpl_id; - - std::size_t length = work.nbytes; - - if (tail == 0) { - // No filters: copy raw bytes into an aligned buffer. - const std::size_t sz = filter::filter_scratch_bound(length); - out.data = make_aligned(H5CPP_MEM_ALIGNMENT, sz); - std::memcpy(out.data.get(), work.data.get(), length); - out.nbytes = length; - out.mask = 0; - return out; - } - - // First filter: work.data → wbuf0. - length = filter[0](wbuf0.get(), work.data.get(), length, - flags[0], cd_size[0], cd_values[0]); - if (!length) out.mask |= 1u; - - // Subsequent filters: ping-pong between wbuf0 and wbuf1. - void* src = wbuf0.get(); - void* dst = wbuf1.get(); - for (hsize_t j = 1; j < tail; ++j) { - length = filter[j](dst, src, length, flags[j], cd_size[j], cd_values[j]); - if (!length) out.mask |= (1u << j); - std::swap(src, dst); - } - - // Copy filtered result into fresh aligned buffer so scratch is reusable. - const std::size_t sz = filter::filter_scratch_bound(length); - out.data = make_aligned(H5CPP_MEM_ALIGNMENT, sz); - std::memcpy(out.data.get(), src, length); - out.nbytes = length; - return out; - } - - // compress_queue_: main thread pushes raw chunks; workers pop and compress. - bounded::spmc::queue_t compress_queue_; - // done_queue_: workers push compressed chunks; main thread pops and writes. - bounded::mpsc::queue_t done_queue_; - - std::atomic in_flight_{0}; - std::vector workers_; - std::once_flag init_flag_; - unsigned worker_count_override_{0}; -}; - -#else -// ============================================================================ -// C++17 fallback path: mutex-based queues + h5::detail::stoppable_thread_t. -// ============================================================================ - -namespace detail { - - // Simple mutex-protected FIFO queue for C++17, signalled via doorbell_t. - // Replaces bounded::spmc and bounded::mpsc for the C++17 build. - template - class c17_queue_t { - public: - c17_queue_t() = default; - c17_queue_t(const c17_queue_t&) = delete; - c17_queue_t& operator=(const c17_queue_t&) = delete; - - // Non-blocking push — always succeeds (unbounded). - // Returns true for API compatibility with bounded queues. - bool push(T&& v) { - { - std::lock_guard lk(m_); - q_.push(std::move(v)); - } - bell_.ring(); - return true; - } - - // Non-blocking pop. Returns true if an item was retrieved. - bool pop(T& out) { - std::lock_guard lk(m_); - if (q_.empty()) return false; - out = std::move(q_.front()); - q_.pop(); - return true; - } - - // Blocking pop: waits until an item is available or stop is requested. - // Returns true if an item was retrieved, false if stop was requested. - bool wait_pop(T& out, h5::detail::stop_token_t st) { - while (!st.stop_requested()) { - { - std::lock_guard lk(m_); - if (!q_.empty()) { - out = std::move(q_.front()); - q_.pop(); - return true; - } - } - const auto last = bell_.load(); - // Recheck under lock before waiting to avoid missed wakeup. - { - std::lock_guard lk(m_); - if (!q_.empty()) { - out = std::move(q_.front()); - q_.pop(); - return true; - } - } - if (!st.stop_requested()) - bell_.wait(last); - } - return false; - } - - // Wake all waiters (used on shutdown to unblock wait_pop). - void notify_all() { bell_.ring_all(); } - - private: - std::mutex m_; - std::queue q_; - h5::detail::doorbell_t bell_; - }; - -} // namespace detail - -struct threaded_pipeline_t : public pipeline_t { - threaded_pipeline_t() = default; - - ~threaded_pipeline_t() { - // Drain all in-flight chunks (workers compress, main thread writes). - flush(); - // Notify workers so they observe stop and wake from wait_pop. - compress_queue_.notify_all(); - // h5::detail::stoppable_thread_t dtors call request_stop() then join(). - } - - threaded_pipeline_t(const threaded_pipeline_t&) = delete; - threaded_pipeline_t& operator=(const threaded_pipeline_t&) = delete; - threaded_pipeline_t(threaded_pipeline_t&&) = delete; - threaded_pipeline_t& operator=(threaded_pipeline_t&&) = delete; - - // Runtime override of worker pool size. If non-zero, takes precedence over - // the H5CPP_PIPELINE_WORKERS compile-time default. Must be called before - // the first write_chunk_impl, since workers spawn lazily via std::call_once. - void set_worker_count(unsigned n) noexcept { worker_count_override_ = n; } - - // Drains all in-flight work and calls H5Dwrite_chunk from the calling - // (main) thread. Must be called before reading back written data. - void flush() { - while (in_flight_.load(std::memory_order_acquire) > 0) { - drain_done(); - std::this_thread::yield(); - } - drain_done(); // final sweep after counter hits zero - } - - void write_chunk_impl(const hsize_t* offset, std::size_t nbytes, const void* ptr) { - ensure_workers(); - drain_done(); // opportunistic: write any already-compressed chunks - - detail::raw_work_t work; - work.data = std::make_unique(nbytes); - std::memcpy(work.data.get(), ptr, nbytes); - std::copy(offset, offset + this->rank, work.offset.data()); - work.nbytes = nbytes; - work.ds_id = static_cast<::hid_t>(this->ds); - work.dxpl_id = static_cast<::hid_t>(this->dxpl); - - in_flight_.fetch_add(1, std::memory_order_release); - while (!compress_queue_.push(std::move(work))) { - drain_done(); // relieve back-pressure - std::this_thread::yield(); - } - } - - void read_chunk_impl(const hsize_t* offset, std::size_t nbytes, void* /*ptr*/) { - uint32_t filter_mask; - std::size_t length = nbytes; - if (tail == 0) { -#if H5_VERSION_GE(2,0,0) - std::size_t buf_size = nbytes; - H5Dread_chunk2(ds, dxpl, offset, &filter_mask, chunk0, &buf_size); -#else - H5Dread_chunk(ds, dxpl, offset, &filter_mask, chunk0); -#endif - return; - } - void* read_target = (tail % 2 == 1) ? chunk1 : chunk0; -#if H5_VERSION_GE(2,0,0) - std::size_t buf_size = nbytes; - H5Dread_chunk2(ds, dxpl, offset, &filter_mask, read_target, &buf_size); -#else - H5Dread_chunk(ds, dxpl, offset, &filter_mask, read_target); -#endif - void* src = read_target; - void* dst = (read_target == chunk0) - ? static_cast(chunk1) : static_cast(chunk0); - for (hsize_t j = tail; j > 0; --j) { - const hsize_t fi = j - 1; - length = filter[fi](dst, src, length, - flags[fi] | H5Z_FLAG_REVERSE, cd_size[fi], cd_values[fi]); - void* tmp = src; src = dst; dst = tmp; - } - } - -private: - // Called from main thread only: pop finished compressed chunks and write. - void drain_done() { - detail::done_work_t result; - while (done_queue_.pop(result)) { - H5Dwrite_chunk(result.ds_id, result.dxpl_id, result.mask, - result.offset.data(), result.nbytes, result.data.get()); - in_flight_.fetch_sub(1, std::memory_order_release); - } - } - - void ensure_workers() { - std::call_once(init_flag_, [this] { - constexpr unsigned cfg = static_cast(H5CPP_PIPELINE_WORKERS); - const unsigned n = - (worker_count_override_ > 0) ? worker_count_override_ - : (cfg > 0) ? cfg - : std::max(1u, std::thread::hardware_concurrency()); - workers_.reserve(n); - for (unsigned i = 0; i < n; ++i) - workers_.emplace_back( - [this](h5::detail::stop_token_t st) { worker_loop(st); }); - }); - } - - void worker_loop(h5::detail::stop_token_t st) { - const std::size_t scratch = filter::filter_scratch_bound(block_size); - aligned_ptr wbuf0 = make_aligned(H5CPP_MEM_ALIGNMENT, scratch); - aligned_ptr wbuf1 = make_aligned(H5CPP_MEM_ALIGNMENT, scratch); - - detail::raw_work_t work; - while (compress_queue_.wait_pop(work, st)) { - detail::done_work_t result = compress(work, wbuf0, wbuf1); - while (!done_queue_.push(std::move(result))) - std::this_thread::yield(); - } - } - - detail::done_work_t compress(const detail::raw_work_t& work, - aligned_ptr& wbuf0, aligned_ptr& wbuf1) - { - detail::done_work_t out; - out.offset = work.offset; - out.ds_id = work.ds_id; - out.dxpl_id = work.dxpl_id; - - std::size_t length = work.nbytes; - - if (tail == 0) { - // No filters: copy raw bytes into an aligned buffer. - const std::size_t sz = filter::filter_scratch_bound(length); - out.data = make_aligned(H5CPP_MEM_ALIGNMENT, sz); - std::memcpy(out.data.get(), work.data.get(), length); - out.nbytes = length; - out.mask = 0; - return out; - } - - // First filter: work.data → wbuf0. - length = filter[0](wbuf0.get(), work.data.get(), length, - flags[0], cd_size[0], cd_values[0]); - if (!length) out.mask |= 1u; - - // Subsequent filters: ping-pong between wbuf0 and wbuf1. - void* src = wbuf0.get(); - void* dst = wbuf1.get(); - for (hsize_t j = 1; j < tail; ++j) { - length = filter[j](dst, src, length, flags[j], cd_size[j], cd_values[j]); - if (!length) out.mask |= (1u << j); - std::swap(src, dst); - } - - // Copy filtered result into fresh aligned buffer so scratch is reusable. - const std::size_t sz = filter::filter_scratch_bound(length); - out.data = make_aligned(H5CPP_MEM_ALIGNMENT, sz); - std::memcpy(out.data.get(), src, length); - out.nbytes = length; - return out; - } - - // compress_queue_: main thread pushes raw chunks; workers pop and compress. - detail::c17_queue_t compress_queue_; - // done_queue_: workers push compressed chunks; main thread pops and writes. - detail::c17_queue_t done_queue_; - - std::atomic in_flight_{0}; - std::vector workers_; - std::once_flag init_flag_; - unsigned worker_count_override_{0}; -}; - -#endif // __cplusplus >= 202002L - -} // namespace h5::impl diff --git a/h5cpp/core b/h5cpp/core index 868e043898..40f7014eb8 100644 --- a/h5cpp/core +++ b/h5cpp/core @@ -60,8 +60,9 @@ #include "H5Pall.hpp" #include "H5Zpipeline.hpp" #include "H5Zpipeline_basic.hpp" - #include "H5Zpipeline_threaded.hpp" #include "H5Pdapl.hpp" + #include "H5Pthreads.hpp" + #include "H5Zpipeline_pool.hpp" #include "H5Ialgorithm.hpp" #include "H5capi.hpp" diff --git a/test/H5Dappend.cpp b/test/H5Dappend.cpp index bbdccd92fd..e67932a829 100644 --- a/test/H5Dappend.cpp +++ b/test/H5Dappend.cpp @@ -163,84 +163,11 @@ TEST_CASE("packet table output stream for invalid handle") { } // ===================================================================== -// [#241] Threaded filter pipeline opt-in via h5::filter::threads{N} +// (#241 h5::filter::threads tests removed in #250 — the per-pt_t worker +// pool API is superseded by FAPL-scoped h5::threads{N}. Coverage moved +// to test/H5Pall.cpp ([#250 1.3.3] cases).) // ===================================================================== -TEST_CASE("[#241] h5::filter::threads tag construction") { - // Default-constructed tag means "use hardware_concurrency() workers". - constexpr h5::filter::threads default_t{}; - CHECK(default_t.n == 0); - - // Explicit count. - constexpr h5::filter::threads explicit_t{4}; - CHECK(explicit_t.n == 4); -} - -TEST_CASE("[#241] pt_t with threaded pipeline — basic round-trip (no filter)") { - h5::test::file_fixture_t f("test-pt-threaded-nofilter.h5"); - h5::ds_t ds = h5::create(f.fd, "ds", h5::current_dims_t{0}, - h5::max_dims_t{H5S_UNLIMITED}, h5::chunk{16}); - { - h5::pt_t pt(ds, h5::filter::threads{2}); - for (int i = 0; i < 64; ++i) - h5::append(pt, i); - h5::flush(pt); // ensure all workers drain before close - } - auto readback = h5::read>(f.fd, "ds"); - REQUIRE(readback.size() == 64); - for (int i = 0; i < 64; ++i) CHECK(readback[i] == i); -} - -TEST_CASE("[#241] pt_t with threaded pipeline — gzip-compressed bytewise equivalence") { - // Write the same data with basic and threaded pipelines into two files, - // read back, assert content matches. Different filter chain ordering can - // produce different on-disk bytes; we assert decompressed content equivalence. - constexpr int N = 256; - std::vector expected(N); - for (int i = 0; i < N; ++i) expected[i] = i * 7 + 3; - - auto write_file = [&](const char* path, auto pt_factory) { - h5::test::file_fixture_t f(path); - h5::ds_t ds = h5::create(f.fd, "ds", h5::current_dims_t{0}, - h5::max_dims_t{H5S_UNLIMITED}, h5::chunk{32} | h5::gzip{6}); - { - auto pt = pt_factory(ds); - for (int v : expected) h5::append(pt, v); - h5::flush(pt); - } - }; - - write_file("test-pt-basic-gzip.h5", - [](const h5::ds_t& ds) { return h5::pt_t(ds); }); - write_file("test-pt-threaded-gzip.h5", - [](const h5::ds_t& ds) { return h5::pt_t(ds, h5::filter::threads{4}); }); - - h5::fd_t basic = h5::open("test-pt-basic-gzip.h5", H5F_ACC_RDONLY); - h5::fd_t threaded = h5::open("test-pt-threaded-gzip.h5", H5F_ACC_RDONLY); - auto basic_data = h5::read>(basic, "ds"); - auto threaded_data = h5::read>(threaded, "ds"); - REQUIRE(basic_data.size() == expected.size()); - REQUIRE(threaded_data.size() == expected.size()); - CHECK(basic_data == expected); - CHECK(threaded_data == expected); - CHECK(basic_data == threaded_data); -} - -TEST_CASE("[#241] pt_t with threaded pipeline — default worker count (hw_concurrency)") { - h5::test::file_fixture_t f("test-pt-threaded-default.h5"); - h5::ds_t ds = h5::create(f.fd, "ds", h5::current_dims_t{0}, - h5::max_dims_t{H5S_UNLIMITED}, h5::chunk{16} | h5::gzip{1}); - { - // h5::filter::threads{} with no number → hw_concurrency() workers - h5::pt_t pt(ds, h5::filter::threads{}); - for (int i = 0; i < 96; ++i) h5::append(pt, i); - h5::flush(pt); - } - auto readback = h5::read>(f.fd, "ds"); - REQUIRE(readback.size() == 96); - for (int i = 0; i < 96; ++i) CHECK(readback[i] == i); -} - TEST_CASE("[#232] std::forward_list append streams elements into chunked dataset") { h5::test::file_fixture_t f("test-pt-fwdlist.h5"); // forward_list is append/view only — h5::write/read intentionally unsupported. @@ -276,3 +203,125 @@ TEST_CASE("[#239] h5::reset zeroes packet table dimension tracker") { h5::reset(pt); // must compile and run without throwing CHECK(true); } + +// ===================================================================== +// [#250 1.3.2] pt_t resolves FAPL pool + backpressure at init +// ===================================================================== + +TEST_CASE("[#250 1.3.2] pt_t picks up worker pool + cap from file's FAPL") { + // Construct a file with h5::threads{4} | h5::backpressure{16} on its FAPL. + // The fixture's default file_fixture_t opens without these properties; + // we make a custom one inline here. + const char* path = "test-pt-1.3.2-pool-resolve.h5"; + std::remove(path); + { + h5::fapl_t fapl = h5::threads{4} | h5::backpressure{16}; + h5::fd_t fd = h5::create(path, H5F_ACC_TRUNC, h5::default_fcpl, fapl); + + h5::ds_t ds = h5::create(fd, "ds", h5::current_dims_t{0}, + h5::max_dims_t{H5S_UNLIMITED}, h5::chunk{32}); + + h5::pt_t pt(ds); + // pt_t::pool_ and ::backpressure_cap_ are private; the visible + // contract is that operations on this pt_t SHOULD use the pool + // (Phase 1.3.3). In this commit we just verify the pt_t was + // constructed without error and the file FAPL has the pool. + auto pool_check = h5::impl::resolve_worker_pool(static_cast(fapl)); + REQUIRE(pool_check); + CHECK(pool_check->worker_count() == 4); + CHECK(h5::impl::resolve_backpressure( + static_cast(fapl), pool_check->worker_count()) == 16u); + } + std::remove(path); +} + +TEST_CASE("[#250 1.3.2] pt_t with no FAPL pool falls back cleanly") { + // Default FAPL — no h5::threads applied. + h5::test::file_fixture_t f("test-pt-1.3.2-no-pool.h5"); + h5::ds_t ds = h5::create(f.fd, "ds", h5::current_dims_t{0}, + h5::max_dims_t{H5S_UNLIMITED}, h5::chunk{16}); + + h5::pt_t pt(ds); + // pt_t constructs without throwing; pool_ resolves to nullptr internally. + // Writes go through visit_pipeline (synchronous) — verify by appending + // and reading back. + for (int i = 0; i < 32; ++i) h5::append(pt, i); + h5::flush(pt); + + auto readback = h5::read>(f.fd, "ds"); + REQUIRE(readback.size() == 32); + for (int i = 0; i < 32; ++i) CHECK(readback[i] == i); +} + +// ===================================================================== +// [#250 1.3.2 step 2] pt_t pool path: bytewise equivalence + parallelism +// ===================================================================== + +TEST_CASE("[#250 1.3.2] pt_t with FAPL pool — gzip round-trip equivalence vs synchronous") { + constexpr int N = 256; + std::vector expected(N); + for (int i = 0; i < N; ++i) expected[i] = i * 7 + 3; + + // Helper: write N ints through a pt_t built from a given fapl, + // read back, return the content. + auto write_and_read = [&](const char* path, h5::fapl_t fapl) { + std::remove(path); + { + h5::fd_t fd = h5::create(path, H5F_ACC_TRUNC, h5::default_fcpl, fapl); + h5::ds_t ds = h5::create(fd, "ds", h5::current_dims_t{0}, + h5::max_dims_t{H5S_UNLIMITED}, h5::chunk{32} | h5::gzip{6}); + h5::pt_t pt(ds); + for (int v : expected) h5::append(pt, v); + h5::flush(pt); + } + h5::fd_t fd = h5::open(path, H5F_ACC_RDONLY); + return h5::read>(fd, "ds"); + }; + + // 1) Default FAPL: synchronous path + auto sync_data = write_and_read("test-pt-1.3.2-sync.h5", h5::default_fapl); + REQUIRE(sync_data.size() == expected.size()); + CHECK(sync_data == expected); + + // 2) Pool FAPL with 4 workers, default backpressure + h5::fapl_t pool_fapl = h5::threads{4}; + auto pool_data = write_and_read("test-pt-1.3.2-pool.h5", pool_fapl); + REQUIRE(pool_data.size() == expected.size()); + CHECK(pool_data == expected); + + // 3) Pool with explicit backpressure + h5::fapl_t bp_fapl = h5::threads{4} | h5::backpressure{8}; + auto bp_data = write_and_read("test-pt-1.3.2-bp.h5", bp_fapl); + REQUIRE(bp_data.size() == expected.size()); + CHECK(bp_data == expected); + + // All three produce the same logical content. + CHECK(sync_data == pool_data); + CHECK(pool_data == bp_data); + + std::remove("test-pt-1.3.2-sync.h5"); + std::remove("test-pt-1.3.2-pool.h5"); + std::remove("test-pt-1.3.2-bp.h5"); +} + +TEST_CASE("[#250 1.3.2] pt_t pool path — back-pressure bounds in-flight") { + // Tight back-pressure cap (2) forces frequent drains. The test + // exercises the producer-blocking branch in write_chunk_via_pool. + constexpr int N = 64; + const char* path = "test-pt-1.3.2-tight-bp.h5"; + std::remove(path); + { + h5::fapl_t fapl = h5::threads{2} | h5::backpressure{2}; + h5::fd_t fd = h5::create(path, H5F_ACC_TRUNC, h5::default_fcpl, fapl); + h5::ds_t ds = h5::create(fd, "ds", h5::current_dims_t{0}, + h5::max_dims_t{H5S_UNLIMITED}, h5::chunk{8} | h5::gzip{1}); + h5::pt_t pt(ds); + for (int i = 0; i < N; ++i) h5::append(pt, i); + h5::flush(pt); + } + h5::fd_t fd = h5::open(path, H5F_ACC_RDONLY); + auto data = h5::read>(fd, "ds"); + REQUIRE(data.size() == N); + for (int i = 0; i < N; ++i) CHECK(data[i] == i); + std::remove(path); +} diff --git a/test/H5Pall.cpp b/test/H5Pall.cpp index 0e08c2ecc0..c1475a9694 100644 --- a/test/H5Pall.cpp +++ b/test/H5Pall.cpp @@ -4,6 +4,7 @@ #include #include #include +#include #include #include #include "support/fixture.hpp" @@ -232,3 +233,524 @@ TEST_CASE("property builder can be chained against an existing dcpl handle") { } CHECK(found_deflate); } + +// ===================================================================== +// [#250] Phase I — FAPL worker-pool slot lifecycle +// ===================================================================== +// +// Same H5Pinsert2 + slot pattern as the DAPL pipeline (#242/#244), but with +// shared (refcounted) ownership instead of fresh-allocation-per-copy. +// Multiple FAPL copies alias one underlying worker_pool_t via shared_ptr; +// the pool is destroyed when the last live FAPL copy releases its slot. +// +// Workplan: tasks/h5cpp-fapl-multithreading-workplan.md §3. + +namespace h5_250_fapl_regression { + using slot_t = h5::impl::worker_pool_slot_t; + using pool_t = h5::impl::worker_pool_t; + + inline std::atomic slot_allocations{0}; + inline std::atomic slot_deletions{0}; + inline std::unordered_set& live_slots() { + static std::unordered_set s; + return s; + } + inline std::atomic double_free_detected{false}; + + inline herr_t tracking_close_cb(const char*, size_t, void* ptr) { + auto* slot = *static_cast(ptr); + auto& live = live_slots(); + auto it = live.find(slot); + if (it == live.end()) { + double_free_detected.store(true); + return -1; + } + live.erase(it); + slot_deletions.fetch_add(1); + delete slot; + return 0; + } + inline herr_t tracking_copy_cb(const char*, size_t, void* value) { + auto** loc = static_cast(value); + auto* fresh = new slot_t{(*loc)->pool}; + live_slots().insert(fresh); + slot_allocations.fetch_add(1); + *loc = fresh; + return 0; + } + + inline hid_t make_tracked_fapl(unsigned workers, bool with_copy_cb) { + hid_t fapl = H5Pcreate(H5P_FILE_ACCESS); + auto* slot = new slot_t{std::make_shared(workers)}; + live_slots().insert(slot); + slot_allocations.fetch_add(1); + H5Pinsert2(fapl, H5CPP_FAPL_WORKER_POOL, sizeof(slot_t*), &slot, + nullptr, nullptr, nullptr, + with_copy_cb ? tracking_copy_cb : nullptr, + nullptr, tracking_close_cb); + return fapl; + } + + inline void reset_counters() { + slot_allocations.store(0); + slot_deletions.store(0); + live_slots().clear(); + double_free_detected.store(false); + } +} + +TEST_CASE("[#250] regression scaffold — slot lifecycle test fails when copy-cb is omitted") { + using namespace h5_250_fapl_regression; + reset_counters(); + { + hid_t fapl_a = make_tracked_fapl(/*workers=*/4, /*copy_cb=*/false); + hid_t fapl_b = H5Pcopy(fapl_a); + H5Pclose(fapl_a); + H5Pclose(fapl_b); + } + CHECK(double_free_detected.load()); + reset_counters(); +} + +TEST_CASE("[#250] single FAPL clean lifecycle") { + using namespace h5_250_fapl_regression; + reset_counters(); + { + hid_t fapl = make_tracked_fapl(/*workers=*/4, /*copy_cb=*/true); + H5Pclose(fapl); + } + CHECK(!double_free_detected.load()); + CHECK(slot_allocations.load() == slot_deletions.load()); + CHECK(live_slots().empty()); +} + +TEST_CASE("[#250] H5Pcopy preserves shared pool ownership across FAPL copies") { + using namespace h5_250_fapl_regression; + reset_counters(); + { + hid_t fapl_a = make_tracked_fapl(/*workers=*/4, /*copy_cb=*/true); + auto pool_a = h5::impl::resolve_worker_pool(fapl_a); + REQUIRE(pool_a); + CHECK(pool_a->worker_count() == 4); + + hid_t fapl_b = H5Pcopy(fapl_a); + auto pool_b = h5::impl::resolve_worker_pool(fapl_b); + REQUIRE(pool_b); + CHECK(pool_a.get() == pool_b.get()); // SAME pool — refcount shared + + H5Pclose(fapl_a); + // pool still alive via fapl_b's slot + auto pool_after_close = h5::impl::resolve_worker_pool(fapl_b); + CHECK(pool_after_close.get() == pool_a.get()); + + H5Pclose(fapl_b); + } + CHECK(!double_free_detected.load()); + CHECK(slot_allocations.load() == slot_deletions.load()); + CHECK(live_slots().empty()); +} + +TEST_CASE("[#250] h5::threads tag installs the FAPL property via property chain") { + using namespace h5_250_fapl_regression; + reset_counters(); + { + // Construct an FAPL with h5::threads{N} applied via the property + // chain — this exercises the real fapl_threads_set callback path, + // not the tracking shim. Uses h5cpp's actual copy/close callbacks. + h5::fapl_t fapl = h5::threads{4}; + auto pool = h5::impl::resolve_worker_pool(static_cast(fapl)); + REQUIRE(pool); + CHECK(pool->worker_count() == 4); + } + // Pool is destroyed when fapl goes out of scope — no leak detection + // possible here without separate scaffolding, but TSAN coverage will + // catch any worker-thread shutdown issues. +} + +TEST_CASE("[#250] h5::threads{} (no count) uses hardware_concurrency") { + h5::fapl_t fapl = h5::threads{}; + auto pool = h5::impl::resolve_worker_pool(static_cast(fapl)); + REQUIRE(pool); + const unsigned expected = std::max(1u, std::thread::hardware_concurrency()); + CHECK(pool->worker_count() == expected); +} + +// ===================================================================== +// [#250] Phase 1.2 — worker_pool_t generic submit() + wait_idle() +// ===================================================================== + +TEST_CASE("[#250 1.2] worker_pool_t — single submit + future return") { + h5::impl::worker_pool_t pool{4}; + REQUIRE(pool.worker_count() == 4); + + auto fut = pool.submit([] { return 42; }); + CHECK(fut.get() == 42); + pool.wait_idle(); // close the small post-future-ready/pre-decrement window + CHECK(pool.in_flight() == 0); +} + +TEST_CASE("[#250 1.2] worker_pool_t — many submits resolve in parallel") { + constexpr unsigned N = 256; + h5::impl::worker_pool_t pool{8}; + std::vector> futures; + futures.reserve(N); + for (unsigned i = 0; i < N; ++i) + futures.emplace_back(pool.submit([i] { return static_cast(i * i); })); + + int sum = 0; + for (auto& f : futures) sum += f.get(); + int expected = 0; + for (unsigned i = 0; i < N; ++i) expected += static_cast(i * i); + CHECK(sum == expected); + pool.wait_idle(); // close the small post-future-ready/pre-decrement window + CHECK(pool.in_flight() == 0); +} + +TEST_CASE("[#250 1.2] worker_pool_t — multi-thread submission is safe (MPMC)") { + constexpr unsigned PRODUCERS = 4; + constexpr unsigned PER_PRODUCER = 100; + h5::impl::worker_pool_t pool{4}; + std::atomic total{0}; + + std::vector producers; + producers.reserve(PRODUCERS); + for (unsigned p = 0; p < PRODUCERS; ++p) { + producers.emplace_back([&pool, &total] { + for (unsigned i = 0; i < PER_PRODUCER; ++i) + pool.submit([&total] { total.fetch_add(1); }).wait(); + }); + } + for (auto& t : producers) t.join(); + CHECK(total.load() == static_cast(PRODUCERS * PER_PRODUCER)); + pool.wait_idle(); // close the small post-future-ready/pre-decrement window + CHECK(pool.in_flight() == 0); +} + +TEST_CASE("[#250 1.2] worker_pool_t — wait_idle blocks until completion") { + h5::impl::worker_pool_t pool{2}; + std::atomic done{0}; + for (int i = 0; i < 50; ++i) + pool.submit([&done] { + std::this_thread::sleep_for(std::chrono::milliseconds(2)); + done.fetch_add(1); + }); + pool.wait_idle(); + CHECK(done.load() == 50); + pool.wait_idle(); // close the small post-future-ready/pre-decrement window + CHECK(pool.in_flight() == 0); +} + +TEST_CASE("[#250 1.2] worker_pool_t — exception in task is captured in future") { + h5::impl::worker_pool_t pool{2}; + auto fut = pool.submit([] () -> int { throw std::runtime_error("boom"); }); + bool caught = false; + try { (void)fut.get(); } + catch (const std::runtime_error& e) { + caught = std::string(e.what()) == "boom"; + } + CHECK(caught); + pool.wait_idle(); // close the small post-future-ready/pre-decrement window + CHECK(pool.in_flight() == 0); +} + +TEST_CASE("[#250 1.2] worker_pool_t — dtor drains pending work cleanly") { + std::atomic done{0}; + { + h5::impl::worker_pool_t pool{4}; + for (int i = 0; i < 100; ++i) + pool.submit([&done] { + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + done.fetch_add(1); + }); + // No explicit wait_idle — dtor must drain before joining workers. + } + CHECK(done.load() == 100); +} + +// ===================================================================== +// [#250] Phase 1.3.1 — h5::backpressure FAPL property +// ===================================================================== + +TEST_CASE("[#250 1.3] h5::backpressure tag installs the FAPL property") { + h5::fapl_t fapl = h5::threads{4} | h5::backpressure{16}; + auto pool = h5::impl::resolve_worker_pool(static_cast(fapl)); + REQUIRE(pool); + CHECK(pool->worker_count() == 4); + CHECK(h5::impl::resolve_backpressure(static_cast(fapl), pool->worker_count()) == 16); +} + +TEST_CASE("[#250 1.3] resolve_backpressure default = 8 × worker_count when unset") { + h5::fapl_t fapl = h5::threads{4}; + auto pool = h5::impl::resolve_worker_pool(static_cast(fapl)); + REQUIRE(pool); + CHECK(h5::impl::resolve_backpressure(static_cast(fapl), pool->worker_count()) == 32u); +} + +TEST_CASE("[#250 1.3] backpressure property survives H5Pcopy") { + h5::fapl_t fapl = h5::threads{4} | h5::backpressure{64}; + hid_t copy = H5Pcopy(static_cast(fapl)); + auto pool_copy = h5::impl::resolve_worker_pool(copy); + REQUIRE(pool_copy); + CHECK(h5::impl::resolve_backpressure(copy, pool_copy->worker_count()) == 64u); + H5Pclose(copy); +} + +TEST_CASE("[#250 1.3] backpressure without threads — resolver returns default but no pool") { + h5::fapl_t fapl = h5::backpressure{32}; + CHECK(h5::impl::resolve_worker_pool(static_cast(fapl)) == nullptr); + // Cap is present even without a pool — resolver returns user value. + // pt_t / h5::write are responsible for ignoring it when no pool exists. + CHECK(h5::impl::resolve_backpressure(static_cast(fapl), 0) == 32u); +} + +// ===================================================================== +// [#250 1.3.3] h5::write / h5::read pool integration end-to-end +// ===================================================================== +// +// Per workplan Approach 2: h5::write / h5::read consult the file's FAPL +// for a pool and route through a local pool_pipeline_t when one exists. +// Per-dataset opt-in is still h5::high_throughput on the DAPL; without +// it, even a FAPL-pool-equipped file falls through to standard H5Dwrite. + +TEST_CASE("[#250 1.3.3] h5::write + h5::read round-trip through FAPL pool") { + constexpr int N = 256; + std::vector expected(N); + for (int i = 0; i < N; ++i) expected[i] = i * 11 + 7; + + const char* path = "test-h5write-1.3.3-pool.h5"; + std::remove(path); + { + h5::fapl_t fapl = h5::threads{4} | h5::backpressure{16}; + h5::fd_t fd = h5::create(path, H5F_ACC_TRUNC, h5::default_fcpl, fapl); + // chunked + gzip + DAPL high_throughput to opt the dataset into + // the pipeline path; the FAPL pool then claims the chunks. + h5::dapl_t dapl = h5::high_throughput; + h5::ds_t ds = h5::create(fd, "ds", h5::current_dims_t{N}, + h5::max_dims_t{H5S_UNLIMITED}, + h5::chunk{32} | h5::gzip{6}, dapl); + h5::write(ds, expected.data(), h5::count{N}); + } + { + h5::fapl_t fapl = h5::threads{4}; + h5::fd_t fd = h5::open(path, H5F_ACC_RDONLY, h5::default_fapl); + // Read back without a pool — verify the data is on disk regardless + // of FAPL choice on the reader side. + auto data = h5::read>(fd, "ds"); + REQUIRE(data.size() == expected.size()); + CHECK(data == expected); + } + std::remove(path); +} + +TEST_CASE("[#250 1.3.3] h5::write without high_throughput bypasses pool") { + // Even with FAPL pool installed, h5::write to a dataset whose DAPL + // doesn't have high_throughput should go through standard H5Dwrite. + constexpr int N = 128; + std::vector expected(N); + for (int i = 0; i < N; ++i) expected[i] = i; + + const char* path = "test-h5write-1.3.3-no-ht.h5"; + std::remove(path); + { + h5::fapl_t fapl = h5::threads{4}; + h5::fd_t fd = h5::create(path, H5F_ACC_TRUNC, h5::default_fcpl, fapl); + // No high_throughput on the DAPL; pool is on FAPL but unused. + h5::ds_t ds = h5::create(fd, "ds", h5::current_dims_t{N}, + h5::max_dims_t{H5S_UNLIMITED}, h5::chunk{16}); + h5::write(ds, expected.data(), h5::count{N}); + } + { + h5::fd_t fd = h5::open(path, H5F_ACC_RDONLY); + auto data = h5::read>(fd, "ds"); + REQUIRE(data.size() == expected.size()); + CHECK(data == expected); + } + std::remove(path); +} + +// ===================================================================== +// [#250 1.5] TSAN-targeted coverage — multi-fd isolation, bytewise +// equivalence vs. basic pipeline, fd shutdown drains in-flight chunks +// ===================================================================== +// +// These tests exist to harden the FAPL-scoped pool against data races +// and ownership bugs that show up under `-fsanitize=thread`. They +// run as ordinary ctest cases on the local toolchain (no TSAN gate), +// and as race detectors on the CI `tsan` job introduced in #250 +// Phase 1.5. + +TEST_CASE("[#250 1.5] two FAPLs with independent pools — bytewise equivalence to basic") { + // Same payload, gzip-compressed, written via three configurations: + // basic pipeline, FAPL pool A (4 threads), FAPL pool B (2 threads). + // All three on-disk files must read back to the same logical data. + constexpr int N = 1024; + std::vector expected(N); + for (int i = 0; i < N; ++i) expected[i] = (i * 31337) ^ 0xCAFE; + + auto write_with = [&](const char* path, h5::fapl_t fapl) { + std::remove(path); + h5::fd_t fd = h5::create(path, H5F_ACC_TRUNC, h5::default_fcpl, fapl); + h5::dapl_t dapl = h5::high_throughput; + h5::ds_t ds = h5::create(fd, "ds", h5::current_dims_t{N}, + h5::max_dims_t{H5S_UNLIMITED}, + h5::chunk{64} | h5::gzip{6}, dapl); + h5::write(ds, expected.data(), h5::count{N}); + }; + + const char* p_basic = "test-1.5-basic.h5"; + const char* p_pool4 = "test-1.5-pool4.h5"; + const char* p_pool2 = "test-1.5-pool2.h5"; + + write_with(p_basic, h5::default_fapl); + write_with(p_pool4, h5::fapl_t{h5::threads{4} | h5::backpressure{16}}); + write_with(p_pool2, h5::fapl_t{h5::threads{2} | h5::backpressure{8}}); + + auto load = [](const char* p) { + h5::fd_t fd = h5::open(p, H5F_ACC_RDONLY); + return h5::read>(fd, "ds"); + }; + + auto a = load(p_basic), b = load(p_pool4), c = load(p_pool2); + REQUIRE(a.size() == expected.size()); + CHECK(a == expected); + CHECK(b == expected); + CHECK(c == expected); + CHECK(a == b); + CHECK(b == c); + + std::remove(p_basic); + std::remove(p_pool4); + std::remove(p_pool2); +} + +TEST_CASE("[#250 1.5] interleaved writes through two FAPLs do not cross-contaminate") { + // Two simultaneously-open files, each owning its own h5::fd_t with + // its own FAPL pool. Writes are issued sequentially (HDF5 itself is + // not thread-safe in default builds), but both pools are alive at + // the same time — a shared-state bug would cross chunks between + // them. Compression on each pool's workers runs in parallel. + // + // (Concurrent H5 C-API calls from multiple threads are explicitly + // out of scope: the FAPL pool parallelizes compression, not HDF5 + // itself. See Phase II in the workplan for the async/thread-safe + // story.) + constexpr int N = 512; + std::vector payload_a(N), payload_b(N); + for (int i = 0; i < N; ++i) { + payload_a[i] = i * 3 + 1; + payload_b[i] = i * 5 + 2; + } + + const char* pa = "test-1.5-interleaved-a.h5"; + const char* pb = "test-1.5-interleaved-b.h5"; + std::remove(pa); + std::remove(pb); + + { + h5::fapl_t fapl_a = h5::threads{4}; + h5::fapl_t fapl_b = h5::threads{2}; + h5::fd_t fa = h5::create(pa, H5F_ACC_TRUNC, h5::default_fcpl, fapl_a); + h5::fd_t fb = h5::create(pb, H5F_ACC_TRUNC, h5::default_fcpl, fapl_b); + + h5::dapl_t dapl = h5::high_throughput; + h5::ds_t da = h5::create(fa, "ds", h5::current_dims_t{N}, + h5::max_dims_t{H5S_UNLIMITED}, h5::chunk{32} | h5::gzip{4}, dapl); + h5::ds_t db = h5::create(fb, "ds", h5::current_dims_t{N}, + h5::max_dims_t{H5S_UNLIMITED}, h5::chunk{32} | h5::gzip{4}, dapl); + + // Interleave the writes so both pools have in-flight work + // simultaneously. h5::write on each side drains its own pool + // before returning, so by the time we move on the other pool + // is still alive with its own queued work. + h5::write(da, payload_a.data(), h5::count{N}); + h5::write(db, payload_b.data(), h5::count{N}); + } + + h5::fd_t fa = h5::open(pa, H5F_ACC_RDONLY); + h5::fd_t fb = h5::open(pb, H5F_ACC_RDONLY); + auto ra = h5::read>(fa, "ds"); + auto rb = h5::read>(fb, "ds"); + REQUIRE(ra.size() == payload_a.size()); + REQUIRE(rb.size() == payload_b.size()); + CHECK(ra == payload_a); + CHECK(rb == payload_b); + + std::remove(pa); + std::remove(pb); +} + +TEST_CASE("[#250 1.5] pt_t destructor drains pool before fd close — data on disk") { + // pt_t::flush is called by ~pt_t, which on the pool path calls + // pool_pipeline_t::drain() and blocks until every in-flight chunk + // has completed compression + H5Dwrite_chunk. After the pt_t + // scope exits, the data must be readable from a fresh open even + // though the worker pool is still alive (held by the FAPL slot). + constexpr int N = 384; + std::vector expected(N); + for (int i = 0; i < N; ++i) expected[i] = i * 13; + + const char* path = "test-1.5-pt-drain.h5"; + std::remove(path); + { + h5::fapl_t fapl = h5::threads{4} | h5::backpressure{8}; + h5::fd_t fd = h5::create(path, H5F_ACC_TRUNC, h5::default_fcpl, fapl); + h5::dapl_t dapl = h5::high_throughput; + h5::ds_t ds = h5::create(fd, "ds", h5::current_dims_t{0}, + h5::max_dims_t{H5S_UNLIMITED}, + h5::chunk{24} | h5::gzip{6}, dapl); + { + h5::pt_t pt(ds); + for (int v : expected) h5::append(pt, v); + // No explicit h5::flush — let ~pt_t drain. + } + // ds and fd close at scope exit. + } + { + h5::fd_t fd = h5::open(path, H5F_ACC_RDONLY); + auto data = h5::read>(fd, "ds"); + REQUIRE(data.size() == expected.size()); + CHECK(data == expected); + } + std::remove(path); +} + +TEST_CASE("[#250 1.5] fd close after FAPL pool flush releases workers") { + // After the fd goes out of scope, the FAPL property's close + // callback drops its shared_ptr to the worker_pool_t. When the + // last user releases, the pool destructor stops the workers + // cleanly. This test exercises the lifecycle directly; TSAN + // catches use-after-free if the slot order is wrong. Each + // iteration uses a distinct path because Windows holds file + // handles slightly longer than POSIX after the HDF5 close. + constexpr int N = 256; + std::vector expected(N); + for (int i = 0; i < N; ++i) expected[i] = i + 100; + + auto write_then_close = [&](const char* path) { + std::remove(path); + h5::fapl_t fapl = h5::threads{2}; + h5::fd_t fd = h5::create(path, H5F_ACC_TRUNC, h5::default_fcpl, fapl); + h5::dapl_t dapl = h5::high_throughput; + h5::ds_t ds = h5::create(fd, "ds", h5::current_dims_t{N}, + h5::max_dims_t{H5S_UNLIMITED}, + h5::chunk{32} | h5::gzip{1}, dapl); + h5::write(ds, expected.data(), h5::count{N}); + // ds / fd close at scope exit; FAPL slot drops the pool ref. + }; + + const char* paths[] = { + "test-1.5-fd-close-a.h5", + "test-1.5-fd-close-b.h5", + "test-1.5-fd-close-c.h5", + "test-1.5-fd-close-d.h5", + }; + for (const char* p : paths) write_then_close(p); + + for (const char* p : paths) { + h5::fd_t fd = h5::open(p, H5F_ACC_RDONLY); + auto data = h5::read>(fd, "ds"); + REQUIRE(data.size() == expected.size()); + CHECK(data == expected); + std::remove(p); + } +} diff --git a/test/H5Zpipeline.cpp b/test/H5Zpipeline.cpp index 856fe52caa..2f9850c904 100644 --- a/test/H5Zpipeline.cpp +++ b/test/H5Zpipeline.cpp @@ -4,7 +4,6 @@ #include #include #include -#include #include #include #include "support/fixture.hpp" @@ -184,84 +183,3 @@ TEST_CASE("filter::error throws runtime_error") { char dst[8] = {}; CHECK_THROWS_AS(h5::impl::filter::error(dst, src, 8, 0, 1, nullptr), std::runtime_error); } - -TEST_CASE("threaded_pipeline_t write/read round-trip no filter") { - h5::test::file_fixture_t f("test-threaded-pipeline-nofilter.h5"); - h5::ds_t ds = h5::create(f.fd, "ds", h5::current_dims_t{50}, - h5::max_dims_t{H5S_UNLIMITED}, h5::chunk{10}); - h5::dcpl_t dcpl = h5::get_dcpl(ds); - h5::impl::threaded_pipeline_t pipeline; - pipeline.set_cache(dcpl, sizeof(int)); - - std::vector data(50); - for (size_t i = 0; i < data.size(); ++i) - data[i] = static_cast(i * 3); - - h5::offset_t offset{0}; - h5::stride_t stride{1}; - h5::block_t block{1}; - h5::count_t count{50}; - - pipeline.write(ds, offset, stride, block, count, h5::default_dxpl, data.data()); - pipeline.flush(); - - std::vector readback(50); - pipeline.read(ds, offset, stride, block, count, h5::default_dxpl, readback.data()); - - for (size_t i = 0; i < data.size(); ++i) - CHECK(readback[i] == data[i]); -} - -TEST_CASE("threaded_pipeline_t write/read round-trip gzip") { - h5::test::file_fixture_t f("test-threaded-pipeline-gzip.h5"); - h5::ds_t ds = h5::create(f.fd, "ds", h5::current_dims_t{100}, - h5::max_dims_t{H5S_UNLIMITED}, h5::chunk{10} | h5::gzip{6}); - h5::dcpl_t dcpl = h5::get_dcpl(ds); - h5::impl::threaded_pipeline_t pipeline; - pipeline.set_cache(dcpl, sizeof(double)); - - std::vector data(100); - for (size_t i = 0; i < data.size(); ++i) - data[i] = static_cast(i) * 1.5; - - h5::offset_t offset{0}; - h5::stride_t stride{1}; - h5::block_t block{1}; - h5::count_t count{100}; - - pipeline.write(ds, offset, stride, block, count, h5::default_dxpl, data.data()); - pipeline.flush(); - - std::vector readback(100); - pipeline.read(ds, offset, stride, block, count, h5::default_dxpl, readback.data()); - - for (size_t i = 0; i < data.size(); ++i) - CHECK(readback[i] == data[i]); -} - -TEST_CASE("threaded_pipeline_t write/read round-trip shuffle+gzip") { - h5::test::file_fixture_t f("test-threaded-pipeline-multi.h5"); - h5::ds_t ds = h5::create(f.fd, "ds", h5::current_dims_t{50}, - h5::max_dims_t{H5S_UNLIMITED}, h5::chunk{10} | h5::shuffle | h5::gzip{6}); - h5::dcpl_t dcpl = h5::get_dcpl(ds); - h5::impl::threaded_pipeline_t pipeline; - pipeline.set_cache(dcpl, sizeof(int)); - - std::vector data(50); - for (size_t i = 0; i < data.size(); ++i) - data[i] = static_cast(i + 1); - - h5::offset_t offset{0}; - h5::stride_t stride{1}; - h5::block_t block{1}; - h5::count_t count{50}; - - pipeline.write(ds, offset, stride, block, count, h5::default_dxpl, data.data()); - pipeline.flush(); - - std::vector readback(50); - pipeline.read(ds, offset, stride, block, count, h5::default_dxpl, readback.data()); - - for (size_t i = 0; i < data.size(); ++i) - CHECK(readback[i] == data[i]); -} From 292b21d621a071ca4458e028e978784fe97240a3 Mon Sep 17 00:00:00 2001 From: steven varga Date: Tue, 19 May 2026 21:17:41 +0000 Subject: [PATCH 5/5] [#254]:svarga:refactor, remove compiler_meta_t and field_descriptor_t from type engine --- h5cpp/H5Tmeta.hpp | 76 +---------------------------------------------- test/H5Aall.cpp | 8 ++--- 2 files changed, 4 insertions(+), 80 deletions(-) diff --git a/h5cpp/H5Tmeta.hpp b/h5cpp/H5Tmeta.hpp index 8e4b16146c..866d43a116 100644 --- a/h5cpp/H5Tmeta.hpp +++ b/h5cpp/H5Tmeta.hpp @@ -283,23 +283,6 @@ namespace h5::meta { template struct storage_representation : detail_capabilities::storage_representation_impl> {}; template constexpr storage_representation_t storage_representation_v = storage_representation::value; - inline constexpr std::uint32_t metadata_version = 1; - - /** Base class for compiler-emitted reflected field descriptors. */ - template - struct field_descriptor_t { - using owner_type = owner_t; - using field_type = field_t; - }; - - /** Specialize to std::true_type for any struct described by compiler_meta_t. */ - template - struct is_reflected_compound_t : std::false_type {}; - - /** Specialize to provide the field-descriptor tuple for a reflected compound. */ - template - struct compiler_meta_t; - template struct storage_traits_impl_t; template @@ -395,33 +378,6 @@ namespace h5::meta { } }; - template - struct storage_traits_impl_t::value>> { - static_assert(compiler_meta_t::version == metadata_version, - "H5CPP compiler metadata version mismatch"); - static constexpr bool supported = true; - static constexpr bool owns_handle = true; - static hid_t create_type() noexcept { - using fields_t = typename compiler_meta_t::fields_t; - hid_t dt = H5Tcreate(H5T_COMPOUND, sizeof(T)); - insert_fields(dt, - std::make_index_sequence>{}); - return dt; - } - private: - template - static void insert_fields(hid_t dt, std::index_sequence) noexcept { - (insert_field>(dt), ...); - } - template - static void insert_field(hid_t dt) noexcept { - using field_t = typename FieldDesc::field_type; - hid_t field_dt = storage_traits_t::create_type(); - H5Tinsert(dt, FieldDesc::name(), FieldDesc::offset, field_dt); - if constexpr (storage_traits_t::owns_handle) H5Tclose(field_dt); - } - }; - template struct is_transport_contiguous_impl_t : std::false_type {}; template struct is_transport_contiguous_impl_t>> : std::true_type {}; template struct is_transport_contiguous_impl_t::value>> : std::true_type {}; @@ -429,30 +385,15 @@ namespace h5::meta { is_array_like::value && !is_text_like::value>> : is_transport_contiguous_t::type> {}; - template struct is_transport_contiguous_impl_t::value>> { - private: - static_assert(compiler_meta_t::version == metadata_version, - "H5CPP compiler metadata version mismatch"); - using fields_t = typename compiler_meta_t::fields_t; - template - static constexpr bool check_contiguous(std::index_sequence) noexcept { - return (... && is_transport_contiguous_v< - typename std::tuple_element_t::field_type>); - } - public: - static constexpr bool value = check_contiguous(std::make_index_sequence>{}); - }; - // Gap 1: contiguous STL sequence containers (vector, span, linalg types, etc.) // Triggers when T exposes a data() pointer and size(), but is not a C/std::array, - // not text, not arithmetic, and not a reflected compound. + // not text, and not arithmetic. // The element type inferred from data() must be standard-layout and trivial // (prevents nested containers like vector> or vector from matching). template struct is_transport_contiguous_impl_t::value && !is_text_like::value && - !is_reflected_compound_t::value && !std::is_arithmetic_v && has_data_pointer::value && meta::has_size::value && @@ -626,19 +567,6 @@ namespace h5::meta { static constexpr std::size_t bytes(const T&) noexcept { return sizeof(T); } }; - // Reflected compound structs - template - struct access_traits_t::value>> { - using element_t = T; - using pointer_t = const T*; - static constexpr access_t kind = access_t::object; - static constexpr bool is_trivially_packable = is_transport_contiguous_v; - static const T* data(const T& v) noexcept { return &v; } - static T* data(T& v) noexcept { return &v; } - static constexpr std::array size(const T&) noexcept { return {}; } - static constexpr std::size_t bytes(const T&) noexcept { return sizeof(T); } - }; - // Text-like types (std::string, std::string_view, etc.) — handled by HDF5 string types, not raw memcpy template struct access_traits_t>::value && !std::is_array_v && - !is_reflected_compound_t::value && has_data_pointer::value && meta::has_size::value && is_transport_contiguous_v>> { @@ -693,7 +620,6 @@ namespace h5::meta { struct access_traits_t>::value && !std::is_array_v && - !is_reflected_compound_t::value && has_data_pointer::value && meta::has_size::value && compat::is_detected>::value && diff --git a/test/H5Aall.cpp b/test/H5Aall.cpp index 8580312ad3..c1d1e8470d 100644 --- a/test/H5Aall.cpp +++ b/test/H5Aall.cpp @@ -141,11 +141,9 @@ TEST_CASE("awrite/aread std::array of arithmetic") { // --------------------------------------------------------------------------- // POD struct (compound type) — KNOWN LIMITATION (not tested here) -// h5::awrite/aread for compound structs requires the h5cpp compiler plugin to -// emit compiler_meta_t and register the HDF5 compound type. Without the -// plugin, H5CPP_REGISTER_STRUCT only returns H5I_UNINIT and H5Acreate2 fails. -// Compound attribute round-trips work correctly when the plugin is used to -// generate the reflection shim — no fix needed in H5Awrite.hpp / H5Aread.hpp. +// h5::awrite/aread for compound structs requires a manual h5::create() +// specialization (dt_t path) or C++26 auto-reflection. Without either, +// H5CPP_REGISTER_STRUCT returns H5I_UNINIT and H5Acreate2 fails. // --------------------------------------------------------------------------- // ---------------------------------------------------------------------------