Skip to content

Commit f4e7140

Browse files
committed
implement support for using global blosc functions
1 parent db9241f commit f4e7140

File tree

5 files changed

+113
-26
lines changed

5 files changed

+113
-26
lines changed

zarr/__init__.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,19 @@
22
from __future__ import absolute_import, print_function, division
33

44

5+
import atexit
6+
7+
58
from zarr.core import empty, zeros, ones, full, array, open
6-
from zarr.ext import blosc_version
9+
from zarr.ext import blosc_version, init as _init, destroy as _destroy, \
10+
set_blosc_options
711
from zarr import defaults
812
from zarr import constants
913
from zarr.version import version as __version__
14+
15+
16+
import multiprocessing
17+
_cpu_count = multiprocessing.cpu_count()
18+
_init()
19+
set_blosc_options(use_context=False, nthreads=_cpu_count)
20+
atexit.register(_destroy)

zarr/ext.pyx

Lines changed: 71 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import shutil
1818
import tempfile
1919
from collections import namedtuple
2020
from glob import glob
21+
import multiprocessing
2122
import fasteners
2223

2324

@@ -53,6 +54,14 @@ cdef extern from "blosc.h":
5354
BLOSC_VERSION_STRING,
5455
BLOSC_VERSION_DATE
5556

57+
void blosc_init()
58+
void blosc_destroy()
59+
int blosc_set_nthreads(int nthreads)
60+
int blosc_set_compressor(const char *compname)
61+
int blosc_compress(int clevel, int doshuffle, size_t typesize,
62+
size_t nbytes, void *src, void *dest,
63+
size_t destsize) nogil
64+
int blosc_decompress(void *src, void *dest, size_t destsize) nogil
5665
int blosc_compname_to_compcode(const char *compname)
5766
int blosc_compress_ctx(int clevel, int doshuffle, size_t typesize,
5867
size_t nbytes, const void* src, void* dest,
@@ -77,6 +86,26 @@ def blosc_version():
7786
return ver_str, ver_date
7887

7988

89+
def init():
90+
blosc_init()
91+
92+
93+
def destroy():
94+
blosc_destroy()
95+
96+
97+
_blosc_use_context = False
98+
99+
100+
def set_blosc_options(use_context, nthreads=None):
101+
global _blosc_use_context
102+
_blosc_use_context = use_context
103+
if not use_context:
104+
if nthreads is None:
105+
nthreads = multiprocessing.cpu_count()
106+
blosc_set_nthreads(nthreads)
107+
108+
80109
###############################################################################
81110
# DEBUG LOGGING #
82111
###############################################################################
@@ -123,7 +152,7 @@ def _normalize_cparams(cname=None, clevel=None, shuffle=None):
123152
cname = cname.encode()
124153
# check compressor is available
125154
if blosc_compname_to_compcode(cname) < 0:
126-
raise ValueError("compressor not available: %s" % cname)
155+
raise ValueError('compressor not available: %s' % cname)
127156

128157
# determine compression level
129158
clevel = clevel if clevel is not None else defaults.clevel
@@ -390,8 +419,12 @@ cdef class Chunk(BaseChunk):
390419
cdef int ret
391420

392421
# do decompression
393-
with nogil:
394-
ret = blosc_decompress_ctx(self._data, dest, self._nbytes, 1)
422+
if _blosc_use_context:
423+
with nogil:
424+
ret = blosc_decompress_ctx(self._data, dest, self._nbytes, 1)
425+
else:
426+
with nogil:
427+
ret = blosc_decompress(self._data, dest, self._nbytes)
395428

396429
# handle errors
397430
if ret <= 0:
@@ -409,12 +442,20 @@ cdef class Chunk(BaseChunk):
409442
dest = <char *> malloc(self._nbytes + BLOSC_MAX_OVERHEAD)
410443

411444
# perform compression
412-
with nogil:
413-
cbytes = blosc_compress_ctx(self._clevel, self._shuffle,
414-
self._itemsize, self._nbytes,
415-
source, dest,
416-
self._nbytes + BLOSC_MAX_OVERHEAD,
417-
self._cname, 0, 1)
445+
if _blosc_use_context:
446+
with nogil:
447+
cbytes = blosc_compress_ctx(self._clevel, self._shuffle,
448+
self._itemsize, self._nbytes,
449+
source, dest,
450+
self._nbytes + BLOSC_MAX_OVERHEAD,
451+
self._cname, 0, 1)
452+
else:
453+
# compressor should have been checked already
454+
assert blosc_set_compressor(self._cname) >= 0
455+
with nogil:
456+
cbytes = blosc_compress(self._clevel, self._shuffle,
457+
self._itemsize, self._nbytes, source,
458+
dest, self._nbytes + BLOSC_MAX_OVERHEAD)
418459

419460
# check compression was successful
420461
if cbytes <= 0:
@@ -506,8 +547,12 @@ cdef class PersistentChunk(BaseChunk):
506547
source = PyBytes_AsString(data)
507548

508549
# do decompression
509-
with nogil:
510-
ret = blosc_decompress_ctx(source, dest, self._nbytes, 1)
550+
if _blosc_use_context:
551+
with nogil:
552+
ret = blosc_decompress_ctx(source, dest, self._nbytes, 1)
553+
else:
554+
with nogil:
555+
ret = blosc_decompress(source, dest, self._nbytes)
511556

512557
# handle errors
513558
if ret <= 0:
@@ -543,12 +588,20 @@ cdef class PersistentChunk(BaseChunk):
543588
dest = <char *> malloc(self._nbytes + BLOSC_MAX_OVERHEAD)
544589

545590
# perform compression
546-
with nogil:
547-
cbytes = blosc_compress_ctx(self._clevel, self._shuffle,
548-
self._itemsize, self._nbytes,
549-
source, dest,
550-
self._nbytes + BLOSC_MAX_OVERHEAD,
551-
self._cname, 0, 1)
591+
if _blosc_use_context:
592+
with nogil:
593+
cbytes = blosc_compress_ctx(self._clevel, self._shuffle,
594+
self._itemsize, self._nbytes,
595+
source, dest,
596+
self._nbytes + BLOSC_MAX_OVERHEAD,
597+
self._cname, 0, 1)
598+
else:
599+
# compressor should have been checked already
600+
assert blosc_set_compressor(self._cname) >= 0
601+
with nogil:
602+
cbytes = blosc_compress(self._clevel, self._shuffle,
603+
self._itemsize, self._nbytes, source,
604+
dest, self._nbytes + BLOSC_MAX_OVERHEAD)
552605

553606
# check compression was successful
554607
if cbytes <= 0:
@@ -1322,6 +1375,7 @@ cdef class SynchronizedPersistentArray(PersistentArray):
13221375
###############################################################################
13231376

13241377

1378+
# noinspection PyUnresolvedReferences,PyProtectedMember
13251379
cdef _lazy_get_chunk(BaseArray array, tuple cidx):
13261380
try:
13271381
chunk = array._cdata[cidx]

zarr/tests/test_array.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
from zarr.ext import Array, SynchronizedArray, PersistentArray, \
1414
SynchronizedPersistentArray, LazyArray, SynchronizedLazyArray, \
1515
LazyPersistentArray, SynchronizedLazyPersistentArray
16-
from zarr import defaults
16+
from zarr import defaults, set_blosc_options
1717

1818

1919
class ArrayTests(object):
@@ -479,3 +479,14 @@ def create_array(self, **kwargs):
479479
lambda: shutil.rmtree(path) if os.path.exists(path) else None
480480
)
481481
return SynchronizedLazyPersistentArray(**kwargs)
482+
483+
484+
class TestArrayUsingContext(TestArray):
485+
486+
@classmethod
487+
def setUpClass(cls):
488+
set_blosc_options(use_context=True)
489+
490+
@classmethod
491+
def tearDownClass(cls):
492+
set_blosc_options(use_context=False)

zarr/tests/test_chunk.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
from numpy.testing import assert_array_equal
1212
from zarr.ext import Chunk, PersistentChunk, SynchronizedChunk, \
1313
SynchronizedPersistentChunk
14-
from zarr import defaults
14+
from zarr import defaults, set_blosc_options
1515

1616

1717
class ChunkTests(object):
@@ -252,3 +252,14 @@ def test_shuffles():
252252

253253
# expect improvement from bitshuffle
254254
assert c2.cbytes < c1.cbytes
255+
256+
257+
class TestChunkUsingContext(TestChunk):
258+
259+
@classmethod
260+
def setUpClass(cls):
261+
set_blosc_options(use_context=True)
262+
263+
@classmethod
264+
def tearDownClass(cls):
265+
set_blosc_options(use_context=False)

zarr/util.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,14 @@
44

55
def human_readable_size(size):
66
if size < 2**10:
7-
return "%s" % size
7+
return '%s' % size
88
elif size < 2**20:
9-
return "%.1fK" % (size / float(2**10))
9+
return '%.1fK' % (size / float(2**10))
1010
elif size < 2**30:
11-
return "%.1fM" % (size / float(2**20))
11+
return '%.1fM' % (size / float(2**20))
1212
elif size < 2**40:
13-
return "%.1fG" % (size / float(2**30))
13+
return '%.1fG' % (size / float(2**30))
1414
elif size < 2**50:
15-
return "%.1fT" % (size / float(2**40))
15+
return '%.1fT' % (size / float(2**40))
1616
else:
17-
return "%.1fP" % (size / float(2**50))
17+
return '%.1fP' % (size / float(2**50))

0 commit comments

Comments
 (0)