Skip to content

Commit 683e829

Browse files
committed
Merge pull request #16 from alimanfoo/blosc_use_context
Implement support for using global blosc functions, resolves #15.
2 parents 235e4dc + 658d431 commit 683e829

File tree

5 files changed

+131
-27
lines changed

5 files changed

+131
-27
lines changed

zarr/__init__.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,19 @@
22
from __future__ import absolute_import, print_function, division
33

44

5+
import atexit
6+
7+
58
from zarr.core import empty, zeros, ones, full, array, open
6-
from zarr.ext import blosc_version
9+
from zarr.ext import blosc_version, init as _init, destroy as _destroy, \
10+
set_blosc_options
711
from zarr import defaults
812
from zarr import constants
913
from zarr.version import version as __version__
14+
15+
16+
import multiprocessing
17+
ncores = multiprocessing.cpu_count()
18+
_init()
19+
set_blosc_options(use_context=False, nthreads=ncores)
20+
atexit.register(_destroy)

zarr/ext.pyx

Lines changed: 89 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import shutil
1717
import tempfile
1818
from collections import namedtuple
1919
from glob import glob
20+
import multiprocessing
2021
import fasteners
2122

2223

@@ -52,6 +53,14 @@ cdef extern from "blosc.h":
5253
BLOSC_VERSION_STRING,
5354
BLOSC_VERSION_DATE
5455

56+
void blosc_init()
57+
void blosc_destroy()
58+
int blosc_set_nthreads(int nthreads)
59+
int blosc_set_compressor(const char *compname)
60+
int blosc_compress(int clevel, int doshuffle, size_t typesize,
61+
size_t nbytes, void *src, void *dest,
62+
size_t destsize) nogil
63+
int blosc_decompress(void *src, void *dest, size_t destsize) nogil
5564
int blosc_compname_to_compcode(const char *compname)
5665
int blosc_compress_ctx(int clevel, int doshuffle, size_t typesize,
5766
size_t nbytes, const void* src, void* dest,
@@ -64,7 +73,7 @@ cdef extern from "blosc.h":
6473

6574

6675
def blosc_version():
67-
"""Return the version of c-blosc that zarr was compiled with."""
76+
"""Return the version of blosc that zarr was compiled with."""
6877

6978
# all the 'decode' contorsions are for Python 3 returning actual strings
7079
ver_str = <char *> BLOSC_VERSION_STRING
@@ -76,6 +85,43 @@ def blosc_version():
7685
return ver_str, ver_date
7786

7887

88+
def init():
89+
blosc_init()
90+
91+
92+
def destroy():
93+
blosc_destroy()
94+
95+
96+
_blosc_use_context = False
97+
98+
99+
def set_blosc_options(use_context=False, nthreads=None):
100+
"""Set options for how the blosc compressor is used.
101+
102+
Parameters
103+
----------
104+
use_context : bool, optional
105+
If False, blosc will be used in non-contextual mode, which is best
106+
when using zarr in a single-threaded environment because it allows
107+
blosc to use multiple threads internally. If True, blosc will be used
108+
in contextual mode, which is better when using zarr in a
109+
multi-threaded environment like dask.array because it avoids the blosc
110+
global lock and so multiple blosc operations can be running
111+
concurrently.
112+
nthreads : int, optional
113+
Number of internal threads to use when running blosc in non-contextual
114+
mode.
115+
116+
"""
117+
global _blosc_use_context
118+
_blosc_use_context = use_context
119+
if not use_context:
120+
if nthreads is None:
121+
nthreads = multiprocessing.cpu_count()
122+
blosc_set_nthreads(nthreads)
123+
124+
79125
###############################################################################
80126
# DEBUG LOGGING #
81127
###############################################################################
@@ -122,7 +168,7 @@ def _normalize_cparams(cname=None, clevel=None, shuffle=None):
122168
cname = cname.encode()
123169
# check compressor is available
124170
if blosc_compname_to_compcode(cname) < 0:
125-
raise ValueError("compressor not available: %s" % cname)
171+
raise ValueError('compressor not available: %s' % cname)
126172

127173
# determine compression level
128174
clevel = clevel if clevel is not None else defaults.clevel
@@ -389,8 +435,12 @@ cdef class Chunk(BaseChunk):
389435
cdef int ret
390436

391437
# do decompression
392-
with nogil:
393-
ret = blosc_decompress_ctx(self._data, dest, self._nbytes, 1)
438+
if _blosc_use_context:
439+
with nogil:
440+
ret = blosc_decompress_ctx(self._data, dest, self._nbytes, 1)
441+
else:
442+
with nogil:
443+
ret = blosc_decompress(self._data, dest, self._nbytes)
394444

395445
# handle errors
396446
if ret <= 0:
@@ -408,12 +458,20 @@ cdef class Chunk(BaseChunk):
408458
dest = <char *> malloc(self._nbytes + BLOSC_MAX_OVERHEAD)
409459

410460
# perform compression
411-
with nogil:
412-
cbytes = blosc_compress_ctx(self._clevel, self._shuffle,
413-
self._itemsize, self._nbytes,
414-
source, dest,
415-
self._nbytes + BLOSC_MAX_OVERHEAD,
416-
self._cname, 0, 1)
461+
if _blosc_use_context:
462+
with nogil:
463+
cbytes = blosc_compress_ctx(self._clevel, self._shuffle,
464+
self._itemsize, self._nbytes,
465+
source, dest,
466+
self._nbytes + BLOSC_MAX_OVERHEAD,
467+
self._cname, 0, 1)
468+
else:
469+
# compressor should have been checked already
470+
assert blosc_set_compressor(self._cname) >= 0
471+
with nogil:
472+
cbytes = blosc_compress(self._clevel, self._shuffle,
473+
self._itemsize, self._nbytes, source,
474+
dest, self._nbytes + BLOSC_MAX_OVERHEAD)
417475

418476
# check compression was successful
419477
if cbytes <= 0:
@@ -505,8 +563,12 @@ cdef class PersistentChunk(BaseChunk):
505563
source = PyBytes_AsString(data)
506564

507565
# do decompression
508-
with nogil:
509-
ret = blosc_decompress_ctx(source, dest, self._nbytes, 1)
566+
if _blosc_use_context:
567+
with nogil:
568+
ret = blosc_decompress_ctx(source, dest, self._nbytes, 1)
569+
else:
570+
with nogil:
571+
ret = blosc_decompress(source, dest, self._nbytes)
510572

511573
# handle errors
512574
if ret <= 0:
@@ -542,12 +604,20 @@ cdef class PersistentChunk(BaseChunk):
542604
dest = <char *> malloc(self._nbytes + BLOSC_MAX_OVERHEAD)
543605

544606
# perform compression
545-
with nogil:
546-
cbytes = blosc_compress_ctx(self._clevel, self._shuffle,
547-
self._itemsize, self._nbytes,
548-
source, dest,
549-
self._nbytes + BLOSC_MAX_OVERHEAD,
550-
self._cname, 0, 1)
607+
if _blosc_use_context:
608+
with nogil:
609+
cbytes = blosc_compress_ctx(self._clevel, self._shuffle,
610+
self._itemsize, self._nbytes,
611+
source, dest,
612+
self._nbytes + BLOSC_MAX_OVERHEAD,
613+
self._cname, 0, 1)
614+
else:
615+
# compressor should have been checked already
616+
assert blosc_set_compressor(self._cname) >= 0
617+
with nogil:
618+
cbytes = blosc_compress(self._clevel, self._shuffle,
619+
self._itemsize, self._nbytes, source,
620+
dest, self._nbytes + BLOSC_MAX_OVERHEAD)
551621

552622
# check compression was successful
553623
if cbytes <= 0:
@@ -1321,6 +1391,7 @@ cdef class SynchronizedPersistentArray(PersistentArray):
13211391
###############################################################################
13221392

13231393

1394+
# noinspection PyUnresolvedReferences,PyProtectedMember
13241395
cdef _lazy_get_chunk(BaseArray array, tuple cidx):
13251396
try:
13261397
chunk = array._cdata[cidx]

zarr/tests/test_array.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
from zarr.ext import Array, SynchronizedArray, PersistentArray, \
1414
SynchronizedPersistentArray, LazyArray, SynchronizedLazyArray, \
1515
LazyPersistentArray, SynchronizedLazyPersistentArray
16-
from zarr import defaults
16+
from zarr import defaults, set_blosc_options
1717

1818

1919
class ArrayTests(object):
@@ -479,3 +479,14 @@ def create_array(self, **kwargs):
479479
lambda: shutil.rmtree(path) if os.path.exists(path) else None
480480
)
481481
return SynchronizedLazyPersistentArray(**kwargs)
482+
483+
484+
class TestArrayUsingContext(TestArray):
485+
486+
@classmethod
487+
def setUpClass(cls):
488+
set_blosc_options(use_context=True)
489+
490+
@classmethod
491+
def tearDownClass(cls):
492+
set_blosc_options(use_context=False)

zarr/tests/test_chunk.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
from numpy.testing import assert_array_equal
1212
from zarr.ext import Chunk, PersistentChunk, SynchronizedChunk, \
1313
SynchronizedPersistentChunk
14-
from zarr import defaults
14+
from zarr import defaults, set_blosc_options
1515

1616

1717
class ChunkTests(object):
@@ -252,3 +252,14 @@ def test_shuffles():
252252

253253
# expect improvement from bitshuffle
254254
assert c2.cbytes < c1.cbytes
255+
256+
257+
class TestChunkUsingContext(TestChunk):
258+
259+
@classmethod
260+
def setUpClass(cls):
261+
set_blosc_options(use_context=True)
262+
263+
@classmethod
264+
def tearDownClass(cls):
265+
set_blosc_options(use_context=False)

zarr/util.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,14 @@
44

55
def human_readable_size(size):
66
if size < 2**10:
7-
return "%s" % size
7+
return '%s' % size
88
elif size < 2**20:
9-
return "%.1fK" % (size / float(2**10))
9+
return '%.1fK' % (size / float(2**10))
1010
elif size < 2**30:
11-
return "%.1fM" % (size / float(2**20))
11+
return '%.1fM' % (size / float(2**20))
1212
elif size < 2**40:
13-
return "%.1fG" % (size / float(2**30))
13+
return '%.1fG' % (size / float(2**30))
1414
elif size < 2**50:
15-
return "%.1fT" % (size / float(2**40))
15+
return '%.1fT' % (size / float(2**40))
1616
else:
17-
return "%.1fP" % (size / float(2**50))
17+
return '%.1fP' % (size / float(2**50))

0 commit comments

Comments
 (0)