Skip to content

Commit 07584be

Browse files
committed
Create a separate function that calculates the decompressed block and the crc outside the GIL
1 parent 8c5980d commit 07584be

File tree

4 files changed

+133
-8
lines changed

4 files changed

+133
-8
lines changed

src/isal/igzip_threaded.py

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -297,11 +297,8 @@ def _compress(self, index: int):
297297
return
298298
continue
299299
try:
300-
compressor = isal_zlib.compressobj(
301-
self.level, wbits=-15, zdict=zdict)
302-
compressed = compressor.compress(data) + compressor.flush(
303-
isal_zlib.Z_SYNC_FLUSH)
304-
crc = isal_zlib.crc32(data)
300+
compressed, crc = isal_zlib._parallel_deflate_and_crc(
301+
data, zdict, self.level)
305302
except Exception as e:
306303
with self.lock:
307304
self.exception = e

src/isal/isal_zlib.pyi

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,10 @@ def adler32(__data, __value: int = 1) -> int: ...
3838
def crc32(__data, __value: int = 0) -> int: ...
3939
def crc32_combine(__crc1: int, __crc2: int, __crc2_length: int) -> int: ...
4040

41+
def _parallel_deflate_and_crc(__data, __zdict, level
42+
) -> typing.Tuple[bytes, int]: ...
43+
44+
4145
def compress(__data,
4246
level: int = ISAL_DEFAULT_COMPRESSION,
4347
wbits: int = MAX_WBITS) -> bytes: ...

src/isal/isal_zlibmodule.c

Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -288,6 +288,128 @@ isal_zlib_crc32_combine(PyObject *module, PyObject *args) {
288288
crc32_comb(crc1, crc2, crc2_length) & 0xFFFFFFFF);
289289
}
290290

291+
PyDoc_STRVAR(isal_zlib_parallel_deflate_and_crc__doc__,
292+
"parallel_deflate_and_crc($module, data, zdict, /, level)\n"
293+
"--\n"
294+
"\n"
295+
"Function specifically designed for use in parallel compression. Data is \n"
296+
"compressed using deflate and Z_SYNC_FLUSH is used to ensure the block aligns\n"
297+
"to a byte boundary. Also the CRC is calculated. This function is designed to \n"
298+
"maximize the time spent outside the GIL\n"
299+
"\n"
300+
" data\n"
301+
" bytes-like object containing the to be compressed data\n"
302+
" zdict\n"
303+
" last 32 bytes of the previous block\n"
304+
" level\n"
305+
" the compression level to use.\n"
306+
);
307+
#define ISAL_ZLIB_PARALLEL_DEFLATE_AND_CRC_METHODDEF \
308+
{ \
309+
"_parallel_deflate_and_crc", (PyCFunction)(void (*)(void))isal_zlib_parallel_deflate_and_crc, \
310+
METH_VARARGS | METH_KEYWORDS, isal_zlib_parallel_deflate_and_crc__doc__ \
311+
}
312+
313+
static PyObject *
314+
isal_zlib_parallel_deflate_and_crc(PyObject *module, PyObject *args, PyObject *kwargs)
315+
{
316+
Py_buffer data;
317+
Py_buffer zdict;
318+
int level = ISAL_DEFAULT_COMPRESSION;
319+
static char *keywords[] = {"", "", "level"};
320+
static char *format = "y*y*|i:isal_zlib.parallel_deflate_and_crc";
321+
PyObject *out_bytes = NULL;
322+
uint8_t *level_buf = NULL;
323+
324+
if (PyArg_ParseTupleAndKeywords(
325+
args, kwargs, format, keywords, &data, &zdict, &level) < 0) {
326+
return NULL;
327+
}
328+
329+
if (data.len > UINT32_MAX) {
330+
PyErr_Format(PyExc_OverflowError,
331+
"Can only compress %d bytes of data", UINT32_MAX);
332+
goto error;
333+
}
334+
335+
uint32_t level_buf_size;
336+
if (mem_level_to_bufsize(level, MEM_LEVEL_DEFAULT, &level_buf_size) != 0) {
337+
PyErr_SetString(IsalError, "Invalid compression level");
338+
goto error;
339+
}
340+
341+
level_buf = (uint8_t *)PyMem_Malloc(level_buf_size);
342+
if (level_buf == NULL) {
343+
PyErr_NoMemory();
344+
goto error;
345+
}
346+
// Assume output size < input_size. But just to be sure add 350 safety
347+
// bytes per 64K of input.
348+
Py_ssize_t output_size = data.len + (((data.len >> 16) + 1) * 350);
349+
if (output_size > UINT32_MAX) {
350+
PyErr_SetNone(PyExc_OverflowError);
351+
goto error;
352+
}
353+
out_bytes = PyBytes_FromStringAndSize(NULL, output_size);
354+
if (out_bytes == NULL) {
355+
PyErr_NoMemory();
356+
goto error;
357+
}
358+
uint8_t *out_ptr = (uint8_t *)PyBytes_AS_STRING(out_bytes);
359+
int err;
360+
struct isal_zstream zst;
361+
isal_deflate_init(&zst);
362+
zst.level = (uint32_t)level;
363+
zst.level_buf = level_buf;
364+
zst.level_buf_size = level_buf_size;
365+
zst.hist_bits = ISAL_DEF_MAX_HIST_BITS;
366+
zst.gzip_flag = IGZIP_DEFLATE;
367+
zst.avail_in = data.len;
368+
zst.next_in = data.buf;
369+
zst.next_out = out_ptr;
370+
zst.avail_out = output_size;
371+
zst.flush = SYNC_FLUSH;
372+
err = isal_deflate_set_dict(&zst, zdict.buf, zdict.len);
373+
if (err != 0){
374+
isal_deflate_error(err);
375+
return NULL;
376+
}
377+
uint32_t crc;
378+
Py_BEGIN_ALLOW_THREADS
379+
err = isal_deflate(&zst);
380+
crc = crc32_gzip_refl(0, data.buf, data.len);
381+
Py_END_ALLOW_THREADS
382+
383+
if (err != COMP_OK) {
384+
isal_deflate_error(err);
385+
goto error;
386+
}
387+
if (zst.avail_in != 0) {
388+
PyErr_Format(
389+
PyExc_RuntimeError,
390+
"Developer error input bytes are still available: %u. "
391+
"Please contact the developers by creating an issue at "
392+
"https://github.com/pycompression/python-isal/issues",
393+
zst.avail_in);
394+
goto error;
395+
}
396+
397+
if (_PyBytes_Resize(&out_bytes, zst.next_out - out_ptr) < 0) {
398+
goto error;
399+
}
400+
PyBuffer_Release(&data);
401+
PyBuffer_Release(&zdict);
402+
PyMem_Free(level_buf);
403+
return Py_BuildValue("(OI)", out_bytes, crc);
404+
error:
405+
PyMem_Free(level_buf);
406+
Py_XDECREF(out_bytes);
407+
PyBuffer_Release(&data);
408+
PyBuffer_Release(&zdict);
409+
return NULL;
410+
}
411+
412+
291413
PyDoc_STRVAR(zlib_compress__doc__,
292414
"compress($module, data, /, level=ISAL_DEFAULT_COMPRESSION, wbits=MAX_WBITS)\n"
293415
"--\n"
@@ -1972,6 +2094,7 @@ static PyMethodDef IsalZlibMethods[] = {
19722094
ISAL_ZLIB_ADLER32_METHODDEF,
19732095
ISAL_ZLIB_CRC32_METHODDEF,
19742096
ISAL_ZLIB_CRC32_COMBINE_METHODDEF,
2097+
ISAL_ZLIB_PARALLEL_DEFLATE_AND_CRC_METHODDEF,
19752098
ISAL_ZLIB_COMPRESS_METHODDEF,
19762099
ISAL_ZLIB_DECOMPRESS_METHODDEF,
19772100
ISAL_ZLIB_COMPRESSOBJ_METHODDEF,

tests/test_igzip_threaded.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -75,12 +75,13 @@ def test_threaded_read_error():
7575
@pytest.mark.timeout(5)
7676
def test_threaded_write_error(monkeypatch):
7777
tmp = tempfile.mktemp()
78-
# Compressobj method is called in a worker thread.
79-
monkeypatch.delattr(igzip_threaded.isal_zlib, "compressobj")
78+
# parallel_deflate_and_crc method is called in a worker thread.
79+
monkeypatch.delattr(igzip_threaded.isal_zlib,
80+
"_parallel_deflate_and_crc")
8081
with pytest.raises(AttributeError) as error:
8182
with igzip_threaded.open(tmp, "wb", compresslevel=3) as writer:
8283
writer.write(b"x")
83-
error.match("no attribute 'compressobj'")
84+
error.match("no attribute '_parallel_deflate_and_crc'")
8485

8586

8687
def test_close_reader():

0 commit comments

Comments
 (0)