Skip to content

Commit 343fc82

Browse files
First take on initialize compress2 context only once
1 parent 214de7a commit 343fc82

File tree

4 files changed

+162
-2
lines changed

4 files changed

+162
-2
lines changed

src/python-zstd.c

Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,136 @@ static PyObject *py_zstd_compress_mt(PyObject* self, PyObject *args)
159159
return result;
160160
}
161161

162+
void init_cContext( int32_t threads, int32_t level)
163+
{
164+
m_cctx = ZSTD_createCCtx();
165+
ZSTD_CCtx_setParameter(m_cctx, ZSTD_c_compressionLevel, level);
166+
ZSTD_CCtx_setParameter(m_cctx, ZSTD_c_nbWorkers, threads);
167+
}
168+
169+
void free_cContext()
170+
{
171+
ZSTD_freeCCtx(m_cctx);
172+
}
173+
174+
void reset_cContext(int32_t threads, int32_t level)
175+
{
176+
free_cContext();
177+
init_cContext(threads, level);
178+
}
179+
180+
/**
181+
* New function for multi-threaded compression.
182+
* Uses origin zstd header, nothing more.
183+
* Simple version: not for streaming, no dict support, full block compression.
184+
* Uses new API with context object, initialize only once.
185+
*/
186+
static PyObject *py_zstd_compress_mt2(PyObject* self, PyObject *args)
187+
{
188+
UNUSED(self);
189+
190+
PyObject *result;
191+
// PyObject *resultT;
192+
const char *source;
193+
Py_ssize_t source_size;
194+
// Py_ssize_t chunk_size;
195+
char *dest;
196+
// char *destT;
197+
Py_ssize_t dest_size;
198+
size_t cSize;
199+
// size_t sum=0;
200+
int32_t level = ZSTD_CLEVEL_DEFAULT;
201+
static int32_t lastLevel =0;
202+
int32_t threads = 0;
203+
int32_t strict = 0;
204+
ZSTD_CCtx* cctx = NULL;
205+
206+
#if PY_MAJOR_VERSION >= 3
207+
if (!PyArg_ParseTuple(args, "y#|iii", &source, &source_size, &level, &threads, &strict))
208+
return NULL;
209+
#else
210+
if (!PyArg_ParseTuple(args, "s#|iii", &source, &source_size, &level, &threads, &strict))
211+
return NULL;
212+
#endif
213+
214+
printdn("got Compression level:%d\n",level);
215+
if (0 == level) level=ZSTD_defaultCLevel();
216+
/* Fast levels (zstd >= 1.3.4) - [-1..-100] */
217+
/* Usual levels - [ 1..22] */
218+
/* If level less than -100 or 1 - raise Error, level 0 handled before. */
219+
printdn("Compression min level:%d\n",ZSTD_MIN_CLEVEL);
220+
if (level < ZSTD_MIN_CLEVEL) {
221+
printd2e("Bad compression level - less than %d: %d\n", ZSTD_MIN_CLEVEL, level);
222+
if (strict) {
223+
PyErr_Format(ZstdError, "Bad compression level - less than %d: %d", ZSTD_MIN_CLEVEL, level);
224+
return NULL;
225+
} else {
226+
level = ZSTD_MIN_CLEVEL;
227+
}
228+
}
229+
/* If level more than 22 - raise Error. */
230+
printdn("Compression max level:%d\n",ZSTD_maxCLevel());
231+
if (level > ZSTD_maxCLevel()) {
232+
printd2e("Bad compression level - more than %d: %d\n", ZSTD_maxCLevel(), level);
233+
if (strict) {
234+
PyErr_Format(ZstdError, "Bad compression level - more than %d: %d", ZSTD_MAX_CLEVEL, level);
235+
return NULL;
236+
} else {
237+
level = ZSTD_maxCLevel();
238+
}
239+
}
240+
printdn("Compression level will be:%d\n",level);
241+
242+
printdn("got Compression threads:%d\n",threads);
243+
if (threads < 0) {
244+
printd2e("Bad threads count - less than %d: %d\n", 0, threads);
245+
if (strict) {
246+
PyErr_Format(ZstdError, "Bad threads count - less than %d: %d", 0, threads);
247+
return NULL;
248+
} else threads = 1;
249+
}
250+
if (0 == threads) threads = UTIL_countAvailableCores();
251+
printdn("got CPU cores:%d\n",threads);
252+
/* If threads more than 200 - raise Error. */
253+
if (threads > ZSTDMT_NBWORKERS_MAX) {
254+
printd2e("Bad threads count - more than %d: %d\n", ZSTDMT_NBWORKERS_MAX, threads);
255+
threads = ZSTDMT_NBWORKERS_MAX;
256+
// do not fail here, due auto thread counter
257+
//PyErr_Format(ZstdError, "Bad threads count - more than %d: %d", ZSTDMT_NBWORKERS_MAX, threads);
258+
//return NULL;
259+
}
260+
printdn("Compression will use:%d threads\n",threads);
261+
262+
dest_size = (Py_ssize_t)ZSTD_compressBound(source_size);
263+
result = PyBytes_FromStringAndSize(NULL, dest_size);
264+
if (result == NULL) {
265+
return NULL;
266+
}
267+
268+
if (source_size >= 0) {
269+
dest = PyBytes_AS_STRING(result);
270+
271+
if(level != lastLevel) {
272+
reset_cContext(threads, level);
273+
}
274+
275+
Py_BEGIN_ALLOW_THREADS
276+
cSize = ZSTD_compress2(m_cctx, dest, (size_t)dest_size, source, (size_t)source_size);
277+
Py_END_ALLOW_THREADS
278+
lastLevel = level;
279+
280+
printdn("Compression result: %d\n", cSize);
281+
if (ZSTD_isError(cSize)) {
282+
printdes("debug INFO: Compression error: %s", ZSTD_getErrorName(cSize));
283+
PyErr_Format(ZstdError, "Compression error: %s", ZSTD_getErrorName(cSize));
284+
Py_CLEAR(result);
285+
return NULL;
286+
}
287+
Py_SET_SIZE(result, cSize);
288+
}
289+
return result;
290+
}
291+
162292

163293
/**
164294
* New more interoperable function
@@ -538,6 +668,7 @@ static PyMethodDef ZstdMethods[] = {
538668
{"verify", py_zstd_check, METH_VARARGS, CHECK_DOCSTRING},
539669
{"compress", py_zstd_compress_mt, METH_VARARGS, COMPRESS_DOCSTRING},
540670
{"compress_real_mt", py_zstd_compress_mt, METH_VARARGS, COMPRESS_DOCSTRING},
671+
{"compress2", py_zstd_compress_mt2, METH_VARARGS, COMPRESS_DOCSTRING},
541672
{"uncompress", py_zstd_uncompress, METH_VARARGS, UNCOMPRESS_DOCSTRING},
542673
{"encode", py_zstd_compress_mt, METH_VARARGS, COMPRESS_DOCSTRING},
543674
{"decode", py_zstd_uncompress, METH_VARARGS, UNCOMPRESS_DOCSTRING},
@@ -586,6 +717,7 @@ static int init_py_zstd(PyObject *module) {
586717
Py_INCREF(ZstdError);
587718
PyModule_AddObject(module, "Error", ZstdError);
588719

720+
init_cContext(1, 3);
589721
return 0;
590722
}
591723

@@ -611,6 +743,7 @@ static int myextension_clear(PyObject *self) {
611743

612744
static void myextension_free(void *self) {
613745
Py_CLEAR(GETSTATE((PyObject *)self)->error);
746+
free_cContext();
614747
printdi("ZSTD module->free\n",0);
615748
return;
616749
}

src/python-zstd.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,10 @@
9292

9393
static PyObject *ZstdError;
9494

95+
static ZSTD_CCtx* m_cctx;
96+
9597
static PyObject *py_zstd_compress_mt(PyObject* self, PyObject *args);
98+
static PyObject *py_zstd_compress_mt2(PyObject* self, PyObject *args);
9699
static PyObject *py_zstd_uncompress(PyObject* self, PyObject *args);
97100
static PyObject *py_zstd_check(PyObject* self, PyObject *args);
98101
static PyObject *py_zstd_module_version(PyObject* self, PyObject *args);

tests/test_compress.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,17 +54,22 @@ def test_compression_MT_and_not(self):
5454
cdata2 = zstd.compress_real_mt(tDATA, 14)
5555
self.assertEqual(cdata1, cdata2)
5656

57+
def test_compression_oldMT_and_new2(self):
58+
cdata1 = zstd.compress(tDATA, 14)
59+
cdata2 = zstd.compress_real_mt(tDATA, 14)
60+
self.assertEqual(cdata1, cdata2)
61+
5762
def test_compression_MT_and_raw(self):
5863
cdata2 = zstd.compress_real_mt(tDATA, 14)
5964
data2 = zstd.decompress(cdata2)
6065
self.assertEqual(tDATA, data2)
6166

62-
def test_compression_equal_low_level(self):
67+
def test_compression_equal_too_low_level(self):
6368
cdata1 = zstd.compress(tDATA, -100)
6469
cdata2 = zstd.compress(tDATA, -1000)
6570
self.assertEqual(cdata1, cdata2)
6671

67-
def test_compression_equal_high_threads(self):
72+
def test_compression_equal_too_high_threads(self):
6873
cdata1 = zstd.compress(tDATA, 22, 256)
6974
cdata2 = zstd.compress(tDATA, 22, 2560)
7075
self.assertEqual(cdata1, cdata2)

tests/test_speed.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,25 @@ def test_compression_speed(self):
3030
log.info("Compression speed average = %6.2f Mb/sec" % (1.0*sum/1024/1024/wait,))
3131
log.info("diff Compression memory usage = %6.2f kb" % (1.0*(endMemoryUsage-beginMemoryUsage)/1024,))
3232

33+
def test_compression2_speed_init_context_once(self):
34+
wait = 10
35+
if "ZSTD_FULLTIME_TESTS" in os.environ:
36+
wait = 30
37+
log.info("\nWait %d seconds..." % wait)
38+
sum = 0
39+
l=len(tDATA)
40+
tbegin = time()
41+
beginMemoryUsage=get_real_memory_usage()
42+
log.info("begin Compression2 memory usage = %6.2f kb" % (1.0*beginMemoryUsage/1024,))
43+
while time()-tbegin<wait:
44+
cdata = zstd.compress2(tDATA,3,0)
45+
sum+=l
46+
47+
endMemoryUsage=get_real_memory_usage()
48+
log.info("end Compression2 memory usage = %6.2f kb" % (1.0*endMemoryUsage/1024,))
49+
log.info("Compression2 speed average = %6.2f Mb/sec" % (1.0*sum/1024/1024/wait,))
50+
log.info("diff Compression2 memory usage = %6.2f kb" % (1.0*(endMemoryUsage-beginMemoryUsage)/1024,))
51+
3352
def test_compression_speed_no_cpu_cores_cache(self):
3453
wait = 10
3554
if "ZSTD_FULLTIME_TESTS" in os.environ:

0 commit comments

Comments
 (0)