Skip to content

Commit 325df56

Browse files
committed
rework the blosc extension to use the buffer protocol and avoid an unnecessary memory copy during compress()
1 parent dedc5de commit 325df56

File tree

4 files changed

+71
-52
lines changed

4 files changed

+71
-52
lines changed

notebooks/dask_copy.ipynb

Lines changed: 31 additions & 31 deletions
Large diffs are not rendered by default.

zarr/blosc.pyx

Lines changed: 37 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,13 @@ import threading
88

99

1010
from numpy cimport ndarray
11-
from cpython.bytes cimport PyBytes_AsString, PyBytes_FromStringAndSize
12-
from cpython.mem cimport PyMem_Malloc, PyMem_Free
11+
from cpython.ref cimport PyObject
12+
from cpython.buffer cimport PyObject_GetBuffer, PyBuffer_Release, \
13+
PyBUF_ANY_CONTIGUOUS
14+
15+
16+
cdef extern from "Python.h":
17+
int PyByteArray_Resize(PyObject *bytearray, Py_ssize_t len)
1318

1419

1520
from zarr.compat import PY2, text_type
@@ -70,12 +75,12 @@ def set_nthreads(int nthreads):
7075
return blosc_set_nthreads(nthreads)
7176

7277

73-
def decompress(bytes cdata, ndarray array):
78+
def decompress(cdata, ndarray array):
7479
"""Decompress data into a numpy array.
7580
7681
Parameters
7782
----------
78-
cdata : bytes
83+
cdata : bytes-like
7984
Compressed data, including blosc header.
8085
array : ndarray
8186
Numpy array to decompress into.
@@ -90,11 +95,15 @@ def decompress(bytes cdata, ndarray array):
9095
int ret
9196
char *source
9297
char *dest
93-
size_t nbytes
98+
Py_buffer source_buffer
99+
Py_buffer dest_buffer
100+
Py_ssize_t nbytes
94101

95102
# setup
96-
source = PyBytes_AsString(cdata)
97-
dest = array.data
103+
PyObject_GetBuffer(cdata, &source_buffer, PyBUF_ANY_CONTIGUOUS)
104+
source = <char *> source_buffer.buf
105+
PyObject_GetBuffer(array, &dest_buffer, PyBUF_ANY_CONTIGUOUS)
106+
dest = <char *> dest_buffer.buf
98107
nbytes = array.nbytes
99108

100109
# perform decompression
@@ -106,6 +115,10 @@ def decompress(bytes cdata, ndarray array):
106115
with nogil:
107116
ret = blosc_decompress_ctx(source, dest, nbytes, 1)
108117

118+
# release buffers
119+
PyBuffer_Release(&source_buffer)
120+
PyBuffer_Release(&dest_buffer)
121+
109122
# handle errors
110123
if ret <= 0:
111124
raise RuntimeError('error during blosc decompression: %d' % ret)
@@ -135,16 +148,21 @@ def compress(ndarray array, char* cname, int clevel, int shuffle):
135148
cdef:
136149
char *source
137150
char *dest
138-
size_t nbytes, cbytes, itemsize
139-
bytes cdata
151+
bytearray cdata
152+
Py_buffer source_buffer
153+
Py_buffer dest_buffer
154+
Py_ssize_t nbytes, cbytes, itemsize
140155

141-
# obtain reference to underlying buffer
142-
source = array.data
156+
# setup source
157+
PyObject_GetBuffer(array, &source_buffer, PyBUF_ANY_CONTIGUOUS)
158+
source = <char *> source_buffer.buf
143159

144-
# allocate memory for compressed data
160+
# setup destination
145161
nbytes = array.nbytes
146162
itemsize = array.dtype.itemsize
147-
dest = <char *> PyMem_Malloc(nbytes + BLOSC_MAX_OVERHEAD)
163+
cdata = bytearray(nbytes + BLOSC_MAX_OVERHEAD)
164+
PyObject_GetBuffer(cdata, &dest_buffer, PyBUF_ANY_CONTIGUOUS)
165+
dest = <char *> dest_buffer.buf
148166

149167
# perform compression
150168
if _get_use_threads():
@@ -163,15 +181,16 @@ def compress(ndarray array, char* cname, int clevel, int shuffle):
163181
nbytes + BLOSC_MAX_OVERHEAD, cname,
164182
0, 1)
165183

184+
# release buffers
185+
PyBuffer_Release(&source_buffer)
186+
PyBuffer_Release(&dest_buffer)
187+
166188
# check compression was successful
167189
if cbytes <= 0:
168190
raise RuntimeError('error during blosc compression: %d' % cbytes)
169191

170-
# store as bytes
171-
cdata = PyBytes_FromStringAndSize(dest, cbytes)
172-
173-
# release memory
174-
PyMem_Free(dest)
192+
# resize after compression
193+
PyByteArray_Resize(<PyObject *> cdata, cbytes)
175194

176195
return cdata
177196

zarr/storage.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -194,8 +194,8 @@ def __getitem__(self, key):
194194
def __setitem__(self, key, value):
195195

196196
# guard conditions
197-
if not isinstance(value, bytes):
198-
raise ValueError('value must be of type bytes')
197+
if not isinstance(value, (bytes, bytearray)):
198+
raise ValueError('value must be of type bytes or bytearray')
199199

200200
# write to temporary file
201201
with tempfile.NamedTemporaryFile(mode='wb', delete=False,

zarr/tests/test_compression.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ def _test_compress_decompress(self, compression_opts=None):
2727
comp = self.init_compressor(compression_opts)
2828
a = np.arange(1000, dtype='i4')
2929
cdata = comp.compress(a)
30-
assert isinstance(cdata, bytes)
30+
assert isinstance(cdata, (bytes, bytearray))
3131
assert len(cdata) <= a.nbytes
3232

3333
b = np.empty_like(a)

0 commit comments

Comments
 (0)