perf: use sync methods for chunk encoding / decoding#3885
Conversation
`PreparedWrite` models a set of per-chunk changes that would be applied to a stored chunk. `SupportsChunkPacking` is a protocol for array -> bytes codecs that can use `PreparedWrite` objects to update an existing chunk.
…into perf/prepared-write-v2
…into perf/prepared-write-v2
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #3885 +/- ##
==========================================
+ Coverage 93.47% 93.73% +0.25%
==========================================
Files 90 91 +1
Lines 11967 12591 +624
==========================================
+ Hits 11186 11802 +616
- Misses 781 789 +8
🚀 New features to boost your workflow:
|
…into perf/prepared-write-v2
|
@TomAugspurger how would this design work with CUDA codecs? |
5d3064e to
b67a5a0
Compare
a84a15a to
68a7cdc
Compare
| # Phase 1: fetch all chunks (IO, sequential) | ||
| raw_buffers: list[Buffer | None] = [ | ||
| bg.get_sync(prototype=cs.prototype) # type: ignore[attr-defined] | ||
| for bg, cs, *_ in batch | ||
| ] | ||
|
|
||
| # Phase 2: decode (compute, optionally threaded) | ||
| def _decode_one(raw: Buffer | None, chunk_spec: ArraySpec) -> NDBuffer | None: | ||
| if raw is None: | ||
| return None | ||
| return transform.decode_chunk(raw, chunk_spec) | ||
|
|
||
| specs = [cs for _, cs, *_ in batch] | ||
| if n_workers > 0 and len(batch) > 1: | ||
| with ThreadPoolExecutor(max_workers=n_workers) as pool: | ||
| decoded_list = list(pool.map(_decode_one, raw_buffers, specs)) | ||
| else: | ||
| decoded_list = [ | ||
| _decode_one(raw, spec) for raw, spec in zip(raw_buffers, specs, strict=True) | ||
| ] |
There was a problem hiding this comment.
Why isn't this all multi-threaded i.e., the I/O as well?
There was a problem hiding this comment.
I should benchmark this, but my expectation was that IO against memory storage and local storage is not compute-limited, and so threads wouldn't remove a real bottleneck. for memory storage i'm sure this is true, not sure about local storage though
Adds a SupportsSetRange protocol to zarr.abc.store for stores that allow overwriting a byte range within an existing value. Implementations are added for LocalStore (using file-handle seek+write) and MemoryStore (in-memory bytearray slice assignment). This is the prerequisite for the partial-shard write fast path in ShardingCodec, which can patch individual inner-chunk slots without rewriting the entire shard blob when the inner codec chain is fixed-size. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
V2Codec, BytesCodec, BloscCodec, etc. previously only implemented the async _decode_single / _encode_single methods. Add their sync counterparts (_decode_sync / _encode_sync) so that the upcoming SyncCodecPipeline can dispatch through them without spinning up an event loop. For codecs that wrap external compressors (numcodecs.Zstd, numcodecs.Blosc, the V2 fallback chain), the sync versions just call the underlying compressor's blocking API directly instead of routing through asyncio.to_thread. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…arallelism
Adds SyncCodecPipeline alongside BatchedCodecPipeline. The new pipeline
runs codecs through their sync entry points (_decode_sync / _encode_sync)
and dispatches per-chunk work to a module-level thread pool sized by
the codec_pipeline.max_workers config (default = os.cpu_count()).
Each chunk's full lifecycle (fetch + decode + scatter for reads;
get-existing + merge + encode + set/delete for writes) runs as one
pool task — overlapping IO of one chunk with compute of another.
Scatter into the shared output buffer is thread-safe because chunks
have non-overlapping output selections.
The async wrappers (read/write) detect SupportsGetSync/SupportsSetSync
stores and dispatch to the sync fast path, passing the configured
max_workers. Other stores fall through to the async path, which still
uses asyncio.concurrent_map at async.concurrency.
Notes on perf:
- Default (None → cpu_count) is tuned for chunks ≥ ~512 KB.
- Small chunks (≤ 64 KB) regress 1.5-3x because pool dispatch overhead
(~30-50 µs/task) dominates per-chunk work. Workaround:
zarr.config.set({"codec_pipeline.max_workers": 1}).
- For large chunks on local/memory stores, IO+compute parallelism
yields 1.7-2.5x over BatchedCodecPipeline on direct-API reads and
~2.5x on roundtrip.
ChunkTransform encapsulates the sync codec chain. It caches resolved
ArraySpecs across calls with the same chunk_spec — combined with the
constant-ArraySpec optimization in indexing, hot-path overhead is
minimized.
Includes test scaffolding for the new pipeline (test_sync_codec_pipeline)
and config plumbing for the max_workers key.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Adds _encode_partial_sync and _decode_partial_sync to ShardingCodec.
For fixed-size inner codec chains and stores that implement
SupportsSetRange, partial writes patch individual inner-chunk slots
in-place instead of rewriting the whole shard:
- Reads existing shard index (one byte-range get).
- For each affected inner chunk: decodes the slot, merges the new
region, re-encodes.
- Writes each modified slot at its deterministic byte offset, then
rewrites just the index.
For variable-size inner codecs (e.g. with compression) or stores that
don't support byte-range writes, falls through to a full-shard rewrite
matching BatchedCodecPipeline semantics.
The partial-decode path computes a ReadPlan from the shard index and
issues one byte-range get per overlapping chunk, decoding only what
the read selection touches.
Both paths are dispatched from SyncCodecPipeline via the existing
supports_partial_decode / supports_partial_encode protocol checks.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Two new test files:
test_codec_invariants — asserts contract-level properties that every
codec / shard / buffer combination must satisfy: round-trip exactness,
prototype propagation, fill-value handling, all-empty shard handling.
test_pipeline_parity — exhaustive matrix asserting that
SyncCodecPipeline and BatchedCodecPipeline produce semantically
identical results across codec configs, layouts (including
nested sharding), write sequences, and write_empty_chunks settings.
Three checks per cell:
1. Same array contents on read.
2. Same set of store keys after writes.
3. Each pipeline reads the other's output identically (catches
layout-divergence bugs).
These tests pinned the design throughout the SyncCodecPipeline +
partial-shard development.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Adds .gitignore entries for .claude/, CLAUDE.md, and docs/superpowers/ so local IDE/agent planning artifacts don't get committed by accident. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
aa111a2 to
1be5563
Compare
| selected = decoded[chunk_selection] | ||
| if drop_axes: | ||
| selected = selected.squeeze(axis=drop_axes) | ||
| out[out_selection] = selected |
There was a problem hiding this comment.
It might be worth experimenting with moving this setting operation out[out_selection] = selected outside the threadpool execution since, IIRC, it holds the GIL and is probably non-trivial time-wise.
There was a problem hiding this comment.
The memory usage will probably go up a bit though....
Code fixes: - ShardingCodec.evolve_from_array_spec now threads the spec through the inner chain via evolve_codecs. The unthreaded evolve survived on the real array-creation path after the transform builders were fixed, baking an endian-stripped BytesCodec into the evolved instance behind any dtype-changing inner codec. Regression test goes through evolve_from_array_spec and then builds the inner transform. - Async _decode_shard_index/_encode_shard_index fall back to the async pipeline when an index codec is not sync-capable, instead of failing every path for third-party async-only index codecs. - Removed the redundant hand-rolled dict caches inside the chunk transform builders; the instance-local lru_cache wrappers are the single memoization mechanism. _shard_index_size is now lru_cached too. - The encoded-index-size guard lives once, in _assemble_shard, instead of duplicated in the sync and async encoders. - _get_inner_pipeline cache key includes codec_pipeline.batch_size, which from_codecs captures at construction. - Coordinate arrays built via np.indices instead of np.array(list(np.ndindex(...))) in the partial-write loaders. - _merge_chunk_array docstring states the view-aliasing contract; the guard comment sits on the check it annotates. - The socketpair unraisable filter covers family=(1|2) (Windows emulates socketpair with AF_INET), and its comment owns the tradeoff that the patterns cannot scope to pytest-asyncio. Test hardening: - Pool tests assert the pool branch actually fires (_resolve_max_workers + a _get_pool spy) instead of silently degrading to the sequential branch on a config regression; the concurrent-read test re-opens the array each round so readers race a cold spec cache. - New direct test pins the SyncByteSetter write gate in FusedCodecPipeline.write (verified to fail with the gate removed). - New test pins the merge fast path's view-aliasing + source-unmutated contract end-to-end on both pipelines. - read_missing_chunks=False sharded test asserts both halves of the asymmetry against the same partially-written array. - v2 scenario with a numcodecs Delta filter covers the V2Codec filter branch; _chunk_keys recognizes v2 metadata keys. Assisted-by: ClaudeCode:claude-fable-5
# Conflicts: # tests/test_store/test_local.py # tests/test_store/test_memory.py
Rename the fused-default placeholder to the PR-numbered 3885.feature.md and add a second feature entry for the new SyncByteGetter/SyncByteSetter protocols and Store.get_ranges_sync. Assisted-by: ClaudeCode:claude-opus-4.8
Making FusedCodecPipeline the default left the async (Batched) mirror paths and the new sync-IO error paths exercised only narrowly, dropping project coverage. Add targeted tests for the reachable gaps: - AsyncChunkTransform.decode_chunk/encode_chunk == ChunkTransform across aa/ab/bb codec combinations (the async per-chunk chain the default sync path never runs), plus FusedCodecPipeline.encode/decode None-chunk passthrough. - ShardingCodec._decode_single/_encode_single whole-shard round-trip and all-empty branches. The codec advertises partial decode/encode, so the pipeline always picks the partial methods; these whole-shard async methods are reached only via the direct ArrayBytesCodec API. - Async-only index codec fallback in _decode_shard_index/ _encode_shard_index (zarr-developers#269), via a test-only async-only ArrayBytesCodec stub that is not SupportsSyncCodec. - Store.get_ranges_sync happy path, missing-key BaseExceptionGroup, and the non-sync-store TypeError. Local merged coverage on the touched files: codec_pipeline.py 85.4->92.2%, sharding.py 92.4->96.7%, abc/store.py 93.8->95.3%. Assisted-by: ClaudeCode:claude-opus-4.8
ilan-gold
left a comment
There was a problem hiding this comment.
A few small comments, may do them myself.
| """Encode a full shard synchronously. | ||
|
|
||
| Sync counterpart to `_encode_single`. This is reached when a | ||
| `ShardingCodec` is an *inner* codec of another sharding codec (nested |
There was a problem hiding this comment.
Is this true? I hit this path when I ran test_create_array_with_data_num_gets which looks pretty un-nested
|
|
||
| Returns `None` for an all-empty shard (no chunks present). | ||
| """ | ||
| layout = self._build_shard_layout(shard_dict, chunks_per_shard) |
There was a problem hiding this comment.
Instead of materializing layout as a list, does it make sense to have it be an iterator? This could help save on memory, but not sure.
There was a problem hiding this comment.
as long as we are sure we aren't consuming the iterator, then passing the (dead, empty) iterator off to a consumer, then sure
ilan-gold
left a comment
There was a problem hiding this comment.
In general, I don't know how I feel about all these added tests. I'm not a huge fan of testing non-public APIs unless absolutely necessary, so I wonder how much of this can be captured by codspeed.
That being said, they are all super clear, I just worry about other reviewers.
|
|
||
|
|
||
| @dataclass(frozen=True) | ||
| class BatchedCodecPipeline(CodecPipeline): |
There was a problem hiding this comment.
Should we deprecate this? I think so.
There was a problem hiding this comment.
since we are keeping it as the default for now, we should deprecate in the next round of revisions, when we make the new codec pipeline the default
|
|
||
| # SupportsSetRange import disabled with the byte-range-write tests below | ||
| # (removed from this PR pending a store-interface decision). | ||
| # from zarr.abc.store import SupportsSetRange |
There was a problem hiding this comment.
| # SupportsSetRange import disabled with the byte-range-write tests below | |
| # (removed from this PR pending a store-interface decision). | |
| # from zarr.abc.store import SupportsSetRange |
* perf: use `as_completed` * refactor: unfiy branches * refactor: `concurrent_iter` only handles running * add note * add note about blocking * chore: latency distribution in store * refactor: as_completed reading * perf: as_completed writes * chore: remove dead code * fix: use concurrent_map * fix: remove unnecessary list * fix: revert use of concurrent_map --------- Co-authored-by: ilan-gold <ilanbassgold@gmail.com>
`ShardingCodec._decode_full_shard_bulk_if_uncompressed` gated the vectorized whole-shard fast path on `indexer.shape == shard_spec.shape`. For a `CoordinateIndexer` (vindex / integer-array oindex), `.shape` is the flattened point count, which can equal the shard shape by coincidence (trivially in 1-D). Such a selection then passed the gate and the bulk path returned the shard in natural order, silently dropping the reordering — data corruption on uncompressed, crc-free shards. Gate instead on `sel_shape`: a gather indexer exposes it, a contiguous full read (BasicIndexer, or a non-gathering OrthogonalIndexer from `arr[:]`) does not. `isinstance(indexer, BasicIndexer)` would be too strict — `arr[:]` produces an OrthogonalIndexer and must keep the fast path. Regression guard: test_reordering_read_on_uncompressed_shard_honors_selection exercises the end-to-end read path (where the gate lives), which the BasicIndexer-only bulk-decode parity test could not reach. The `arrays()` hypothesis strategy now also samples the uncompressed single-BytesCodec sharding config (via `sharding_inner_codecs`) so the fast path is covered under randomized indexing going forward. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
A `ChunkTransform` is shared across thread-pool workers (read_sync / write_sync with max_workers > 1). Its `_resolve_specs` cache stored the key and the resolved specs in three separate fields, written non-atomically: a worker could observe a freshly-set key paired with the previous (or None) specs, returning the wrong codec chain for a chunk_spec — silent corruption with mixed specs, or a tripped assert. Collapse the cache into a single `(key, aa_specs, ab_spec)` tuple field replaced with one atomic attribute write. Under the GIL a reader now sees either the complete old entry or the complete new one, never a torn mix; worst case is a recompute, never a wrong result. Regression guard: test_shared_transform_decode_alternating_specs pins the single-slot eviction/refill correctness that underpins the atomicity (it fails if specs go stale on eviction). Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…t crash `ShardingCodec._inner_codecs_fixed_size` reads `c.is_fixed_size` on every inner codec to gate the bulk-decode fast path. `is_fixed_size` was declared on the Codec ABC as a bare annotation with no default, so codecs that never set it — VLenUTF8Codec, VLenBytesCodec, the numcodecs wrappers — raised AttributeError, crashing every sharded read whose inner chain included such a codec under the default (Fused) pipeline. Give the ABC a conservative default `is_fixed_size = False`. It stays a class attribute (not a dataclass field; fixed-size codecs still override with True), and treating an unknown codec as not-fixed-size only disables the size-dependent fast path, never correctness. Regression guard: test_sharding_vlen_inner_codec_roundtrip (Fused + Batched). Uses StringDType to force the VLenUTF8 inner chain — a fixed-width <U dtype would use BytesCodec and not reproduce. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
`_merge_chunk_array` always returns a real NDBuffer, so the merged-chunk list never contains None and the `if chunk_array is None` branch (with its `# type: ignore[unreachable]`) was dead. Replace the loop with a comprehension that applies only the empty-chunk normalization. mypy confirms no unused-ignore error, so the branch really was the only unreachable code there. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
… opt-in Move slowly on the new pipeline: revert the default back to BatchedCodecPipeline so behavior is unchanged for existing users, and let early adopters opt into FusedCodecPipeline via codec_pipeline.path. Nothing else about the Fused pipeline changes. - config: default codec_pipeline.path back to BatchedCodecPipeline. Keep the codec_pipeline.max_workers=1 entry (only read by Fused; harmless and inert under Batched) so opting in gets a sane sequential default. - test_config: update the defaults snapshot and the stale "the default" comment. - test_fastpath_equivalence: test_scalar_write_equals_broadcast_write relied on Fused being the default to exercise the scalar-broadcast memoization; set the pipeline explicitly so it still pins that path. - changelog: describe FusedCodecPipeline as opt-in, not the default. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
# Conflicts: # src/zarr/codecs/sharding.py # tests/test_codec_pipeline.py # tests/test_codecs/test_sharding.py # tests/test_codecs/test_sharding_unit.py
The sharding codec's byte-range-write fast path (set_range_sync) was removed from this PR pending a store-interface decision, leaving four partial-shard-write tests commented out. Per review, drop the dead block rather than carry it: it references a removed code path and would need rewriting against whatever interface eventually lands. The tests live in git history if needed when that design returns. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
`_decode_full_shard_bulk_if_uncompressed` returns None for any gather indexer (it gates on `sel_shape is not None`), so when the total-shard bulk path gets a non-None result the indexer cannot have `sel_shape` — making the `hasattr(indexer, "sel_shape")` reshape branch unreachable. It was a leftover from before the bulk-decode gather gate (the vindex corruption fix). Surfaced by coverage. Behavior-neutral. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Add a section to the experimental-features user guide explaining the opt-in synchronous codec pipeline: what it does (removes per-chunk async scheduling overhead), when it helps (low-latency stores like MemoryStore and LocalStore; transparent async fallback otherwise), how to opt in via codec_pipeline.path, and the codec_pipeline.max_workers threading knob. All code blocks are exec="true" so they run at docs build time. Verified with a strict mkdocs build (no broken cross-references). Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
|
@ilan-gold some changes for you to note:
|
perf: synchronous codec pipeline (
FusedCodecPipeline), opt-inSummary
This PR adds
FusedCodecPipeline, an opt-in codec pipeline. It removes the per-chunk async scheduling overhead of the defaultBatchedCodecPipeline— roughly one coroutine per chunk, which on real workloads dominates the actual codec work — by running codec compute synchronously and in bulk.The default pipeline is unchanged: existing users see no behavior change. We want to move slowly here — let early adopters exercise the new pipeline before considering it as a default.
What it does
SupportsSyncCodec, aChunkTransformruns the codec chain synchronously — no event loop, no per-chunk coroutine — optionally across a thread pool for CPU-heavy decode/encode.Store.get_ranges_sync, and dense, fixed-size, uncompressed shards take a vectorized whole-shard bulk decode. Sharded writes go through the codec's synchronous full-shard-rewrite path.ZipStore), or a codec isn't sync-capable, the pipeline falls back to the async path — behavior-equivalent toBatchedCodecPipeline.IO ownership is unchanged: the sharding codec still holds the byte getter/setter and reads/writes storage directly (the same model as before; this PR does not move toward storage-free codecs).
Performance
Large speedups for sharded arrays — up to ~24× writes / ~14× reads on many-chunks-per-shard layouts, more with compression — and no regressions on compute-bound workloads.
Opting in
codec_pipeline.pathis unchanged (BatchedCodecPipeline). Enable the new pipeline with:codec_pipeline.max_workersdefaults to1(sequential, no thread pool) and is only read byFusedCodecPipeline. Threading is opt-in viamax_workers > 1(orNoneforcpu_count). Threading-by-default is intentionally deferred: it carries downstream thread-safety and many-core oversubscription concerns, and slows small chunks.These changes are non-breaking — nothing changes for existing users until they opt in.
Public API additions
SyncByteGetter/SyncByteSetter— runtime-checkable protocols letting custom byte getters/setters opt into the sync fast path for in-memory IO (used by the sharding codec for its inner chunks).Store.get_ranges_sync— synchronous, coalescing counterpart ofget_ranges, with the same coalescing policy.Correctness
Both pipelines are held to the same behavior, with new equivalence/parity suites asserting that each fast path produces results identical to the general path it bypasses:
tests/test_pipeline_parity.py— Fused vs Batched parity across sharded/unsharded, compressed/uncompressed, read/write/overwrite scenarios.tests/test_codec_pipeline_suite.py— a shared suite both pipelines must pass (incl. zarr v2 + nested sharding).tests/test_fastpath_equivalence.py— property tests pinning each fast path (complete-chunk merge view, bulk shard decode, scalar-broadcast write, byte-range coalescing) equal to the general path.tests/test_fused_pipeline.py— Fused-specific behavior incl. thread-pool and async-fallback paths.Three fast-path correctness issues were found, reproduced, and fixed with regression tests: bulk shard decode could serve a reordering
vindex/oindexselection in natural order on uncompressed crc-free shards (now gated onsel_shape); theChunkTransformspec cache was not torn-read-safe undermax_workers > 1(now a single atomically-assigned entry); and sharded reads with variable-length / numcodecs inner codecs crashed becauseCodec.is_fixed_sizehad no default (now defaults toFalse).Notable internal changes
src/zarr/core/chunk_utils.py(merge/encode, decode/scatter, empty-chunk normalization, spec evolution) used by both pipelines and the sharding codec, so the sync and async paths can't drift.V2Codecand the numcodecs wrappers gained_decode_sync/_encode_synccores; the async methods are now thinasyncio.to_threadwrappers.codec_pipeline.path(restoring Narrow JSON type, ensure thatto_dictalways returns a dict, and v2 filter / compressor parsing #2179 behavior) and is evolved/memoized per spec.Lineage
Builds on prior work in #3719. Depends on changes from #3907 and #3908. Mostly authored with Claude.