Skip to content

Commit fed1d1d

Browse files
committed
Use a separate object with reusable buffers
1 parent 07584be commit fed1d1d

File tree

4 files changed

+151
-90
lines changed

4 files changed

+151
-90
lines changed

src/isal/igzip_threaded.py

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -63,8 +63,17 @@ def open(filename, mode="rb", compresslevel=igzip._COMPRESS_LEVEL_TRADEOFF,
6363
if "r" in mode:
6464
gzip_file = io.BufferedReader(_ThreadedGzipReader(binary_file))
6565
else:
66+
write_buffer_size = 1024 * 1024
67+
# Deflating random data results in an output a little larger than the
68+
# input. Making the output buffer 10% larger is sufficient overkill.
69+
compress_buffer_size = write_buffer_size + max(
70+
write_buffer_size // 10, 500)
6671
gzip_file = io.BufferedWriter(
67-
_ThreadedGzipWriter(binary_file, compresslevel, threads),
72+
_ThreadedGzipWriter(
73+
fp=binary_file,
74+
buffer_size=compress_buffer_size,
75+
level=compresslevel,
76+
threads=threads),
6877
buffer_size=1024 * 1024
6978
)
7079
if "t" in mode:
@@ -182,18 +191,19 @@ class _ThreadedGzipWriter(io.RawIOBase):
182191
"""
183192
def __init__(self,
184193
fp: BinaryIO,
194+
buffer_size: int = 1024 * 1024,
185195
level: int = isal_zlib.ISAL_DEFAULT_COMPRESSION,
186196
threads: int = 1,
187197
queue_size: int = 2):
188-
if level < 0 or level > 3:
189-
raise ValueError(
190-
f"Invalid compression level, "
191-
f"level should be between 0 and 3: {level}")
192198
self.lock = threading.Lock()
193199
self.exception: Optional[Exception] = None
194200
self.raw = fp
195201
self.level = level
196202
self.previous_block = b""
203+
self.compressors: List[isal_zlib._ParallelCompress] = [
204+
isal_zlib._ParallelCompress(buffersize=buffer_size,
205+
level=level) for _ in range(threads)
206+
]
197207
self.input_queues: List[queue.Queue[Tuple[bytes, memoryview]]] = [
198208
queue.Queue(queue_size) for _ in range(threads)]
199209
self.output_queues: List[queue.Queue[Tuple[bytes, int, int]]] = [
@@ -289,6 +299,7 @@ def closed(self) -> bool:
289299
def _compress(self, index: int):
290300
in_queue = self.input_queues[index]
291301
out_queue = self.output_queues[index]
302+
compressor: isal_zlib._ParallelCompress = self.compressors[index]
292303
while True:
293304
try:
294305
data, zdict = in_queue.get(timeout=0.05)
@@ -297,8 +308,7 @@ def _compress(self, index: int):
297308
return
298309
continue
299310
try:
300-
compressed, crc = isal_zlib._parallel_deflate_and_crc(
301-
data, zdict, self.level)
311+
compressed, crc = compressor.compress_and_crc(data, zdict)
302312
except Exception as e:
303313
with self.lock:
304314
self.exception = e

src/isal/isal_zlib.pyi

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,9 +38,9 @@ def adler32(__data, __value: int = 1) -> int: ...
3838
def crc32(__data, __value: int = 0) -> int: ...
3939
def crc32_combine(__crc1: int, __crc2: int, __crc2_length: int) -> int: ...
4040

41-
def _parallel_deflate_and_crc(__data, __zdict, level
42-
) -> typing.Tuple[bytes, int]: ...
43-
41+
class _ParallelCompress:
42+
def __init__(self, buffersize: int, level: int): ...
43+
def compress_and_crc(self, __data, __zdict) -> typing.Tuple[bytes, int]: ...
4444

4545
def compress(__data,
4646
level: int = ISAL_DEFAULT_COMPRESSION,

src/isal/isal_zlibmodule.c

Lines changed: 124 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -288,8 +288,75 @@ isal_zlib_crc32_combine(PyObject *module, PyObject *args) {
288288
crc32_comb(crc1, crc2, crc2_length) & 0xFFFFFFFF);
289289
}
290290

291-
PyDoc_STRVAR(isal_zlib_parallel_deflate_and_crc__doc__,
292-
"parallel_deflate_and_crc($module, data, zdict, /, level)\n"
291+
292+
typedef struct {
293+
PyObject_HEAD
294+
uint8_t *buffer;
295+
uint32_t buffer_size;
296+
struct isal_zstream zst;
297+
} ParallelCompress;
298+
299+
static void
300+
ParallelCompress_dealloc(ParallelCompress *self)
301+
{
302+
PyMem_Free(self->buffer);
303+
PyMem_Free(self->zst.level_buf);
304+
Py_TYPE(self)->tp_free((PyObject *)self);
305+
}
306+
307+
static PyObject *
308+
ParallelCompress__new__(PyTypeObject *type, PyObject *args, PyObject *kwargs)
309+
{
310+
Py_ssize_t buffer_size = 0;
311+
int level = ISAL_DEFAULT_COMPRESSION;
312+
static char *format = "n|i:ParallelCompress__new__";
313+
static char *kwarg_names[] = {"buffersize", "level", NULL};
314+
if (PyArg_ParseTupleAndKeywords(args, kwargs, format, kwarg_names,
315+
&buffer_size, &level) < 0) {
316+
return NULL;
317+
}
318+
uint32_t level_buf_size;
319+
if (mem_level_to_bufsize(level, MEM_LEVEL_DEFAULT, &level_buf_size) < 0) {
320+
PyErr_Format(PyExc_ValueError, "Invalid compression level %d", level);
321+
return NULL;
322+
}
323+
if (buffer_size > UINT32_MAX) {
324+
PyErr_Format(PyExc_ValueError,
325+
"buffersize must be at most %zd, got %zd",
326+
(Py_ssize_t)UINT32_MAX, buffer_size);
327+
}
328+
ParallelCompress *self = PyObject_New(ParallelCompress, type);
329+
if (self == NULL) {
330+
return PyErr_NoMemory();
331+
}
332+
self->buffer = NULL;
333+
self->zst.level_buf = NULL;
334+
isal_deflate_init(&self->zst);
335+
uint8_t *level_buf = PyMem_Malloc(level_buf_size);
336+
if (level_buf == NULL) {
337+
Py_DECREF(self);
338+
return PyErr_NoMemory();
339+
}
340+
uint8_t *buffer = PyMem_Malloc(buffer_size);
341+
if (buffer == NULL) {
342+
Py_DECREF(self);
343+
PyMem_Free(level_buf);
344+
return PyErr_NoMemory();
345+
}
346+
self->buffer = buffer;
347+
self->buffer_size = buffer_size;
348+
self->zst.level_buf = level_buf;
349+
self->zst.level_buf_size = level_buf_size;
350+
self->zst.gzip_flag = IGZIP_DEFLATE;
351+
self->zst.hist_bits = ISAL_DEF_MAX_HIST_BITS;
352+
self->zst.level = (uint32_t)level;
353+
self->zst.flush = SYNC_FLUSH;
354+
return (PyObject *)self;
355+
}
356+
357+
358+
PyDoc_STRVAR(ParallelCompress_compress_and_crc__doc__,
359+
"compress_and_crc($self, data, zdict, /)\n"
293360
"--\n"
294361
"\n"
295362
"Function specifically designed for use in parallel compression. Data is \n"
@@ -301,28 +368,20 @@ PyDoc_STRVAR(isal_zlib_parallel_deflate_and_crc__doc__,
301368
" bytes-like object containing the to be compressed data\n"
302369
" zdict\n"
303370
" last 32 bytes of the previous block\n"
304-
" level\n"
305-
" the compression level to use.\n"
306371
);
307-
#define ISAL_ZLIB_PARALLEL_DEFLATE_AND_CRC_METHODDEF \
308-
{ \
309-
"_parallel_deflate_and_crc", (PyCFunction)(void (*)(void))isal_zlib_parallel_deflate_and_crc, \
310-
METH_VARARGS | METH_KEYWORDS, isal_zlib_parallel_deflate_and_crc__doc__ \
311-
}
372+
#define PARALLELCOMPRESS_COMPRESS_AND_CRC_METHODDEF \
373+
{ \
374+
"compress_and_crc", (PyCFunction)ParallelCompress_compress_and_crc, \
375+
METH_VARARGS, ParallelCompress_compress_and_crc__doc__}
312376

313377
static PyObject *
314-
isal_zlib_parallel_deflate_and_crc(PyObject *module, PyObject *args, PyObject *kwargs)
378+
ParallelCompress_compress_and_crc(ParallelCompress *self, PyObject *args)
315379
{
316380
Py_buffer data;
317381
Py_buffer zdict;
318-
int level = ISAL_DEFAULT_COMPRESSION;
319-
static char *keywords[] = {"", "", "level"};
320-
static char *format = "y*y*|i:isal_zlib.parallel_deflate_and_crc";
321-
PyObject *out_bytes = NULL;
322-
uint8_t *level_buf = NULL;
382+
static char *format = "y*y*:compress_and_crc";
323383

324-
if (PyArg_ParseTupleAndKeywords(
325-
args, kwargs, format, keywords, &data, &zdict, &level) < 0) {
384+
if (PyArg_ParseTuple(args, format, &data, &zdict) < 0) {
326385
return NULL;
327386
}
328387

@@ -331,84 +390,69 @@ isal_zlib_parallel_deflate_and_crc(PyObject *module, PyObject *args, PyObject *k
331390
"Can only compress %d bytes of data", UINT32_MAX);
332391
goto error;
333392
}
334-
335-
uint32_t level_buf_size;
336-
if (mem_level_to_bufsize(level, MEM_LEVEL_DEFAULT, &level_buf_size) != 0) {
337-
PyErr_SetString(IsalError, "Invalid compression level");
338-
goto error;
339-
}
340-
341-
level_buf = (uint8_t *)PyMem_Malloc(level_buf_size);
342-
if (level_buf == NULL) {
343-
PyErr_NoMemory();
344-
goto error;
345-
}
346-
// Assume output size < input_size. But just to be sure add 350 safety
347-
// bytes per 64K of input.
348-
Py_ssize_t output_size = data.len + (((data.len >> 16) + 1) * 350);
349-
if (output_size > UINT32_MAX) {
350-
PyErr_SetNone(PyExc_OverflowError);
351-
goto error;
352-
}
353-
out_bytes = PyBytes_FromStringAndSize(NULL, output_size);
354-
if (out_bytes == NULL) {
355-
PyErr_NoMemory();
356-
goto error;
357-
}
358-
uint8_t *out_ptr = (uint8_t *)PyBytes_AS_STRING(out_bytes);
359-
int err;
360-
struct isal_zstream zst;
361-
isal_deflate_init(&zst);
362-
zst.level = (uint32_t)level;
363-
zst.level_buf = level_buf;
364-
zst.level_buf_size = level_buf_size;
365-
zst.hist_bits = ISAL_DEF_MAX_HIST_BITS;
366-
zst.gzip_flag = IGZIP_DEFLATE;
367-
zst.avail_in = data.len;
368-
zst.next_in = data.buf;
369-
zst.next_out = out_ptr;
370-
zst.avail_out = output_size;
371-
zst.flush = SYNC_FLUSH;
372-
err = isal_deflate_set_dict(&zst, zdict.buf, zdict.len);
393+
isal_deflate_reset(&self->zst);
394+
self->zst.avail_in = data.len;
395+
self->zst.next_in = data.buf;
396+
self->zst.next_out = self->buffer;
397+
self->zst.avail_out = self->buffer_size;
398+
PyThreadState *_save;
399+
Py_UNBLOCK_THREADS
400+
int err = isal_deflate_set_dict(&self->zst, zdict.buf, zdict.len);
373401
if (err != 0){
402+
Py_BLOCK_THREADS;
374403
isal_deflate_error(err);
375-
return NULL;
404+
goto error;
376405
}
377-
uint32_t crc;
378-
Py_BEGIN_ALLOW_THREADS
379-
err = isal_deflate(&zst);
380-
crc = crc32_gzip_refl(0, data.buf, data.len);
381-
Py_END_ALLOW_THREADS
406+
err = isal_deflate(&self->zst);
407+
uint32_t crc = crc32_gzip_refl(0, data.buf, data.len);
408+
Py_BLOCK_THREADS;
382409

383410
if (err != COMP_OK) {
384411
isal_deflate_error(err);
385412
goto error;
386413
}
387-
if (zst.avail_in != 0) {
414+
if (self->zst.avail_out == 0) {
415+
PyErr_Format(
416+
PyExc_OverflowError,
417+
"Compressed output exceeds buffer size of %u", self->buffer_size
418+
);
419+
goto error;
420+
}
421+
if (self->zst.avail_in != 0) {
388422
PyErr_Format(
389423
PyExc_RuntimeError,
390424
"Developer error input bytes are still available: %u. "
391425
"Please contact the developers by creating an issue at "
392426
"https://github.com/pycompression/python-isal/issues",
393-
zst.avail_in);
427+
self->zst.avail_in);
394428
goto error;
395429
}
396-
397-
if (_PyBytes_Resize(&out_bytes, zst.next_out - out_ptr) < 0) {
430+
PyObject *out_bytes = PyBytes_FromStringAndSize(
431+
(char *)self->buffer, self->zst.next_out - self->buffer);
432+
if (out_bytes == NULL) {
398433
goto error;
399434
}
400-
PyBuffer_Release(&data);
401-
PyBuffer_Release(&zdict);
402-
PyMem_Free(level_buf);
403435
return Py_BuildValue("(OI)", out_bytes, crc);
404436
error:
405-
PyMem_Free(level_buf);
406-
Py_XDECREF(out_bytes);
407437
PyBuffer_Release(&data);
408438
PyBuffer_Release(&zdict);
409439
return NULL;
410440
}
411441

442+
static PyMethodDef ParallelCompress_methods[] = {
443+
PARALLELCOMPRESS_COMPRESS_AND_CRC_METHODDEF,
444+
{NULL},
445+
};
446+
447+
static PyTypeObject ParallelCompress_Type = {
448+
.tp_name = "isal_zlib._ParallelCompress",
449+
.tp_basicsize = sizeof(ParallelCompress),
450+
.tp_doc = PyDoc_STR(
451+
"A reusable zstream and buffer fast parallel compression."),
452+
.tp_dealloc = (destructor)ParallelCompress_dealloc,
453+
.tp_new = ParallelCompress__new__,
454+
.tp_methods = ParallelCompress_methods,
455+
};
412456

413457
PyDoc_STRVAR(zlib_compress__doc__,
414458
"compress($module, data, /, level=ISAL_DEFAULT_COMPRESSION, wbits=MAX_WBITS)\n"
@@ -2094,7 +2138,6 @@ static PyMethodDef IsalZlibMethods[] = {
20942138
ISAL_ZLIB_ADLER32_METHODDEF,
20952139
ISAL_ZLIB_CRC32_METHODDEF,
20962140
ISAL_ZLIB_CRC32_COMBINE_METHODDEF,
2097-
ISAL_ZLIB_PARALLEL_DEFLATE_AND_CRC_METHODDEF,
20982141
ISAL_ZLIB_COMPRESS_METHODDEF,
20992142
ISAL_ZLIB_DECOMPRESS_METHODDEF,
21002143
ISAL_ZLIB_COMPRESSOBJ_METHODDEF,
@@ -2178,6 +2221,15 @@ PyInit_isal_zlib(void)
21782221
return NULL;
21792222
}
21802223

2224+
if (PyType_Ready(&ParallelCompress_Type) != 0) {
2225+
return NULL;
2226+
}
2227+
Py_INCREF(&ParallelCompress_Type);
2228+
if (PyModule_AddObject(m, "_ParallelCompress",
2229+
(PyObject *)&ParallelCompress_Type) < 0) {
2230+
return NULL;
2231+
}
2232+
21812233
PyModule_AddIntConstant(m, "MAX_WBITS", ISAL_DEF_MAX_HIST_BITS);
21822234
PyModule_AddIntConstant(m, "DEFLATED", Z_DEFLATED);
21832235
PyModule_AddIntMacro(m, DEF_MEM_LEVEL);

tests/test_igzip_threaded.py

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
import gzip
99
import io
10+
import random
1011
import tempfile
1112
from pathlib import Path
1213

@@ -73,15 +74,13 @@ def test_threaded_read_error():
7374

7475

7576
@pytest.mark.timeout(5)
76-
def test_threaded_write_error(monkeypatch):
77-
tmp = tempfile.mktemp()
77+
def test_threaded_write_error():
7878
# parallel_deflate_and_crc method is called in a worker thread.
79-
monkeypatch.delattr(igzip_threaded.isal_zlib,
80-
"_parallel_deflate_and_crc")
81-
with pytest.raises(AttributeError) as error:
82-
with igzip_threaded.open(tmp, "wb", compresslevel=3) as writer:
83-
writer.write(b"x")
84-
error.match("no attribute '_parallel_deflate_and_crc'")
79+
with pytest.raises(OverflowError) as error:
80+
with igzip_threaded.open(
81+
io.BytesIO(), "wb", compresslevel=3) as writer:
82+
writer.write(random.randbytes(1024 * 1024 * 50))
83+
error.match("Compressed output exceeds buffer size")
8584

8685

8786
def test_close_reader():

0 commit comments

Comments
 (0)