Skip to content

Commit 69aa274

Browse files
committed
Making encode and decode async
1 parent a8c0db3 commit 69aa274

File tree

2 files changed

+52
-4
lines changed

2 files changed

+52
-4
lines changed

src/zarr/codecs/gpu.py

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ def __await__(self) -> Generator[Any, None, None]:
5555
async def _wait(self) -> None:
5656
"""Polls the CUDA event asynchronously with exponential backoff until it completes."""
5757
delay = self.initial_delay
58-
while not self.event.query(): # `query()` returns True if the event is complete
58+
while not self.event.done: # `done` returns True if the event is complete
5959
await asyncio.sleep(delay) # Yield control to other async tasks
6060
delay = min(delay * 2, self.max_delay) # Exponential backoff
6161

@@ -127,9 +127,13 @@ async def _convert_from_nvcomp_arrays(
127127
self,
128128
arrays: Iterable[nvcomp.Array],
129129
chunks_and_specs: Iterable[tuple[Buffer | None, ArraySpec]],
130+
awaitable: AsyncCUDAEvent,
130131
) -> Iterable[Buffer | None]:
132+
await awaitable # Wait for array computation to complete before accessing
131133
return [
132-
spec.prototype.buffer.from_array_like(cp.asarray(a, dtype=np.dtype("b"))) if a else None
134+
spec.prototype.buffer.from_array_like(cp.array(a, dtype=np.dtype("b"), copy=False))
135+
if a
136+
else None
133137
for a, (_, spec) in zip(arrays, chunks_and_specs, strict=True)
134138
]
135139

@@ -155,10 +159,15 @@ async def decode(
155159
filtered_inputs, none_indices = await self._convert_to_nvcomp_arrays(chunks_and_specs)
156160

157161
outputs = self._zstd_codec.decode(filtered_inputs) if len(filtered_inputs) > 0 else []
162+
163+
# Record event for synchronization
164+
event = cp.cuda.Event()
165+
awaitable = AsyncCUDAEvent(event) # Convert CUDA event to awaitable object
166+
158167
for index in none_indices:
159168
outputs.insert(index, None)
160169

161-
return await self._convert_from_nvcomp_arrays(outputs, chunks_and_specs)
170+
return await self._convert_from_nvcomp_arrays(outputs, chunks_and_specs, awaitable)
162171

163172
async def encode(
164173
self,
@@ -183,10 +192,15 @@ async def encode(
183192
filtered_inputs, none_indices = await self._convert_to_nvcomp_arrays(chunks_and_specs)
184193

185194
outputs = self._zstd_codec.encode(filtered_inputs) if len(filtered_inputs) > 0 else []
195+
196+
# Record event for synchronization
197+
event = cp.cuda.Event()
198+
awaitable = AsyncCUDAEvent(event) # Convert CUDA event to awaitable object
199+
186200
for index in none_indices:
187201
outputs.insert(index, None)
188202

189-
return await self._convert_from_nvcomp_arrays(outputs, chunks_and_specs)
203+
return await self._convert_from_nvcomp_arrays(outputs, chunks_and_specs, awaitable)
190204

191205
def compute_encoded_size(self, _input_byte_length: int, _chunk_spec: ArraySpec) -> int:
192206
raise NotImplementedError

tests/test_codecs/test_nvcomp.py

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
import pytest
2+
3+
import zarr
4+
from zarr.abc.store import Store
5+
from zarr.codecs import NvcompZstdCodec
6+
from zarr.storage import StorePath
7+
from zarr.testing.utils import gpu_test
8+
9+
10+
@gpu_test
11+
@pytest.mark.parametrize("store", ["local", "memory"], indirect=["store"])
12+
@pytest.mark.parametrize(
13+
"checksum",
14+
[
15+
False,
16+
],
17+
)
18+
def test_nvcomp_zstd(store: Store, checksum: bool) -> None:
19+
import cupy as cp
20+
21+
with zarr.config.enable_gpu():
22+
data = cp.arange(0, 256, dtype="uint16").reshape((16, 16))
23+
24+
a = zarr.create_array(
25+
StorePath(store, path="nvcomp_zstd"),
26+
shape=data.shape,
27+
chunks=(16, 16),
28+
dtype=data.dtype,
29+
fill_value=0,
30+
compressors=NvcompZstdCodec(level=0, checksum=checksum),
31+
)
32+
33+
a[:, :] = data
34+
cp.testing.assert_array_equal(data, a[:, :])

0 commit comments

Comments
 (0)