Skip to content

Commit c13b592

Browse files
pythongh-116738: use PyMutex in lzma module (python#140711)
Co-authored-by: Kumar Aditya <[email protected]>
1 parent 2befce8 commit c13b592

File tree

2 files changed

+69
-33
lines changed

2 files changed

+69
-33
lines changed
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
import unittest
2+
3+
from test.support import import_helper, threading_helper
4+
from test.support.threading_helper import run_concurrently
5+
6+
lzma = import_helper.import_module("lzma")
7+
from lzma import LZMACompressor, LZMADecompressor
8+
9+
from test.test_lzma import INPUT
10+
11+
12+
NTHREADS = 10
13+
14+
15+
@threading_helper.requires_working_threading()
16+
class TestLZMA(unittest.TestCase):
17+
def test_compressor(self):
18+
lzc = LZMACompressor()
19+
20+
# First compress() outputs LZMA header
21+
header = lzc.compress(INPUT)
22+
self.assertGreater(len(header), 0)
23+
24+
def worker():
25+
# it should return empty bytes as it buffers data internally
26+
data = lzc.compress(INPUT)
27+
self.assertEqual(data, b"")
28+
29+
run_concurrently(worker_func=worker, nthreads=NTHREADS - 1)
30+
full_compressed = header + lzc.flush()
31+
decompressed = lzma.decompress(full_compressed)
32+
# The decompressed data should be INPUT repeated NTHREADS times
33+
self.assertEqual(decompressed, INPUT * NTHREADS)
34+
35+
def test_decompressor(self):
36+
chunk_size = 128
37+
chunks = [bytes([ord("a") + i]) * chunk_size for i in range(NTHREADS)]
38+
input_data = b"".join(chunks)
39+
compressed = lzma.compress(input_data)
40+
41+
lzd = LZMADecompressor()
42+
output = []
43+
44+
def worker():
45+
data = lzd.decompress(compressed, chunk_size)
46+
self.assertEqual(len(data), chunk_size)
47+
output.append(data)
48+
49+
run_concurrently(worker_func=worker, nthreads=NTHREADS)
50+
self.assertEqual(len(output), NTHREADS)
51+
# Verify the expected chunks (order doesn't matter due to append race)
52+
self.assertSetEqual(set(output), set(chunks))
53+
54+
55+
if __name__ == "__main__":
56+
unittest.main()

Modules/_lzmamodule.c

Lines changed: 13 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -72,13 +72,6 @@ OutputBuffer_OnError(_BlocksOutputBuffer *buffer)
7272
}
7373

7474

75-
#define ACQUIRE_LOCK(obj) do { \
76-
if (!PyThread_acquire_lock((obj)->lock, 0)) { \
77-
Py_BEGIN_ALLOW_THREADS \
78-
PyThread_acquire_lock((obj)->lock, 1); \
79-
Py_END_ALLOW_THREADS \
80-
} } while (0)
81-
#define RELEASE_LOCK(obj) PyThread_release_lock((obj)->lock)
8275

8376
typedef struct {
8477
PyTypeObject *lzma_compressor_type;
@@ -111,7 +104,7 @@ typedef struct {
111104
lzma_allocator alloc;
112105
lzma_stream lzs;
113106
int flushed;
114-
PyThread_type_lock lock;
107+
PyMutex mutex;
115108
} Compressor;
116109

117110
typedef struct {
@@ -124,7 +117,7 @@ typedef struct {
124117
char needs_input;
125118
uint8_t *input_buffer;
126119
size_t input_buffer_size;
127-
PyThread_type_lock lock;
120+
PyMutex mutex;
128121
} Decompressor;
129122

130123
#define Compressor_CAST(op) ((Compressor *)(op))
@@ -617,14 +610,14 @@ _lzma_LZMACompressor_compress_impl(Compressor *self, Py_buffer *data)
617610
{
618611
PyObject *result = NULL;
619612

620-
ACQUIRE_LOCK(self);
613+
PyMutex_Lock(&self->mutex);
621614
if (self->flushed) {
622615
PyErr_SetString(PyExc_ValueError, "Compressor has been flushed");
623616
}
624617
else {
625618
result = compress(self, data->buf, data->len, LZMA_RUN);
626619
}
627-
RELEASE_LOCK(self);
620+
PyMutex_Unlock(&self->mutex);
628621
return result;
629622
}
630623

@@ -644,14 +637,14 @@ _lzma_LZMACompressor_flush_impl(Compressor *self)
644637
{
645638
PyObject *result = NULL;
646639

647-
ACQUIRE_LOCK(self);
640+
PyMutex_Lock(&self->mutex);
648641
if (self->flushed) {
649642
PyErr_SetString(PyExc_ValueError, "Repeated call to flush()");
650643
} else {
651644
self->flushed = 1;
652645
result = compress(self, NULL, 0, LZMA_FINISH);
653646
}
654-
RELEASE_LOCK(self);
647+
PyMutex_Unlock(&self->mutex);
655648
return result;
656649
}
657650

@@ -820,12 +813,7 @@ Compressor_new(PyTypeObject *type, PyObject *args, PyObject *kwargs)
820813
self->alloc.free = PyLzma_Free;
821814
self->lzs.allocator = &self->alloc;
822815

823-
self->lock = PyThread_allocate_lock();
824-
if (self->lock == NULL) {
825-
Py_DECREF(self);
826-
PyErr_SetString(PyExc_MemoryError, "Unable to allocate lock");
827-
return NULL;
828-
}
816+
self->mutex = (PyMutex){0};
829817

830818
self->flushed = 0;
831819
switch (format) {
@@ -867,10 +855,8 @@ static void
867855
Compressor_dealloc(PyObject *op)
868856
{
869857
Compressor *self = Compressor_CAST(op);
858+
assert(!PyMutex_IsLocked(&self->mutex));
870859
lzma_end(&self->lzs);
871-
if (self->lock != NULL) {
872-
PyThread_free_lock(self->lock);
873-
}
874860
PyTypeObject *tp = Py_TYPE(self);
875861
tp->tp_free(self);
876862
Py_DECREF(tp);
@@ -1146,12 +1132,12 @@ _lzma_LZMADecompressor_decompress_impl(Decompressor *self, Py_buffer *data,
11461132
{
11471133
PyObject *result = NULL;
11481134

1149-
ACQUIRE_LOCK(self);
1135+
PyMutex_Lock(&self->mutex);
11501136
if (self->eof)
11511137
PyErr_SetString(PyExc_EOFError, "Already at end of stream");
11521138
else
11531139
result = decompress(self, data->buf, data->len, max_length);
1154-
RELEASE_LOCK(self);
1140+
PyMutex_Unlock(&self->mutex);
11551141
return result;
11561142
}
11571143

@@ -1244,12 +1230,7 @@ _lzma_LZMADecompressor_impl(PyTypeObject *type, int format,
12441230
self->lzs.allocator = &self->alloc;
12451231
self->lzs.next_in = NULL;
12461232

1247-
self->lock = PyThread_allocate_lock();
1248-
if (self->lock == NULL) {
1249-
Py_DECREF(self);
1250-
PyErr_SetString(PyExc_MemoryError, "Unable to allocate lock");
1251-
return NULL;
1252-
}
1233+
self->mutex = (PyMutex){0};
12531234

12541235
self->check = LZMA_CHECK_UNKNOWN;
12551236
self->needs_input = 1;
@@ -1304,14 +1285,13 @@ static void
13041285
Decompressor_dealloc(PyObject *op)
13051286
{
13061287
Decompressor *self = Decompressor_CAST(op);
1288+
assert(!PyMutex_IsLocked(&self->mutex));
1289+
13071290
if(self->input_buffer != NULL)
13081291
PyMem_Free(self->input_buffer);
13091292

13101293
lzma_end(&self->lzs);
13111294
Py_CLEAR(self->unused_data);
1312-
if (self->lock != NULL) {
1313-
PyThread_free_lock(self->lock);
1314-
}
13151295
PyTypeObject *tp = Py_TYPE(self);
13161296
tp->tp_free(self);
13171297
Py_DECREF(tp);

0 commit comments

Comments
 (0)