Skip to content

Commit 9479a62

Browse files
authored
gh-116738: Use PyMutex for bz2 module (gh-140555)
The methods are already wrapped with a lock, which makes them thread-safe in free-threaded build. This replaces `PyThread_acquire_lock` with `PyMutex` and removes some macros and allocation handling code. Also add a test for free-threading to ensure we aren't getting data races and that the locking is working.
1 parent e8b5cb8 commit 9479a62

File tree

2 files changed

+65
-37
lines changed

2 files changed

+65
-37
lines changed
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
import unittest
2+
3+
from test.support import import_helper, threading_helper
4+
from test.support.threading_helper import run_concurrently
5+
6+
bz2 = import_helper.import_module("bz2")
7+
from bz2 import BZ2Compressor, BZ2Decompressor
8+
9+
from test.test_bz2 import ext_decompress, BaseTest
10+
11+
12+
NTHREADS = 10
13+
TEXT = BaseTest.TEXT
14+
15+
16+
@threading_helper.requires_working_threading()
17+
class TestBZ2(unittest.TestCase):
18+
def test_compressor(self):
19+
bz2c = BZ2Compressor()
20+
21+
def worker():
22+
# it should return empty bytes as it buffers data internally
23+
data = bz2c.compress(TEXT)
24+
self.assertEqual(data, b"")
25+
26+
run_concurrently(worker_func=worker, nthreads=NTHREADS)
27+
data = bz2c.flush()
28+
# The decompressed data should be TEXT repeated NTHREADS times
29+
decompressed = ext_decompress(data)
30+
self.assertEqual(decompressed, TEXT * NTHREADS)
31+
32+
def test_decompressor(self):
33+
chunk_size = 128
34+
chunks = [bytes([ord("a") + i]) * chunk_size for i in range(NTHREADS)]
35+
input_data = b"".join(chunks)
36+
compressed = bz2.compress(input_data)
37+
38+
bz2d = BZ2Decompressor()
39+
output = []
40+
41+
def worker():
42+
data = bz2d.decompress(compressed, chunk_size)
43+
self.assertEqual(len(data), chunk_size)
44+
output.append(data)
45+
46+
run_concurrently(worker_func=worker, nthreads=NTHREADS)
47+
self.assertEqual(len(output), NTHREADS)
48+
# Verify the expected chunks (order doesn't matter due to append race)
49+
self.assertEqual(set(output), set(chunks))
50+
51+
52+
if __name__ == "__main__":
53+
unittest.main()

Modules/_bz2module.c

Lines changed: 12 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -97,20 +97,11 @@ OutputBuffer_OnError(_BlocksOutputBuffer *buffer)
9797
#endif /* ! BZ_CONFIG_ERROR */
9898

9999

100-
#define ACQUIRE_LOCK(obj) do { \
101-
if (!PyThread_acquire_lock((obj)->lock, 0)) { \
102-
Py_BEGIN_ALLOW_THREADS \
103-
PyThread_acquire_lock((obj)->lock, 1); \
104-
Py_END_ALLOW_THREADS \
105-
} } while (0)
106-
#define RELEASE_LOCK(obj) PyThread_release_lock((obj)->lock)
107-
108-
109100
typedef struct {
110101
PyObject_HEAD
111102
bz_stream bzs;
112103
int flushed;
113-
PyThread_type_lock lock;
104+
PyMutex mutex;
114105
} BZ2Compressor;
115106

116107
typedef struct {
@@ -126,7 +117,7 @@ typedef struct {
126117
separately. Conversion and looping is encapsulated in
127118
decompress_buf() */
128119
size_t bzs_avail_in_real;
129-
PyThread_type_lock lock;
120+
PyMutex mutex;
130121
} BZ2Decompressor;
131122

132123
#define _BZ2Compressor_CAST(op) ((BZ2Compressor *)(op))
@@ -271,12 +262,12 @@ _bz2_BZ2Compressor_compress_impl(BZ2Compressor *self, Py_buffer *data)
271262
{
272263
PyObject *result = NULL;
273264

274-
ACQUIRE_LOCK(self);
265+
PyMutex_Lock(&self->mutex);
275266
if (self->flushed)
276267
PyErr_SetString(PyExc_ValueError, "Compressor has been flushed");
277268
else
278269
result = compress(self, data->buf, data->len, BZ_RUN);
279-
RELEASE_LOCK(self);
270+
PyMutex_Unlock(&self->mutex);
280271
return result;
281272
}
282273

@@ -296,14 +287,14 @@ _bz2_BZ2Compressor_flush_impl(BZ2Compressor *self)
296287
{
297288
PyObject *result = NULL;
298289

299-
ACQUIRE_LOCK(self);
290+
PyMutex_Lock(&self->mutex);
300291
if (self->flushed)
301292
PyErr_SetString(PyExc_ValueError, "Repeated call to flush()");
302293
else {
303294
self->flushed = 1;
304295
result = compress(self, NULL, 0, BZ_FINISH);
305296
}
306-
RELEASE_LOCK(self);
297+
PyMutex_Unlock(&self->mutex);
307298
return result;
308299
}
309300

@@ -357,13 +348,7 @@ _bz2_BZ2Compressor_impl(PyTypeObject *type, int compresslevel)
357348
return NULL;
358349
}
359350

360-
self->lock = PyThread_allocate_lock();
361-
if (self->lock == NULL) {
362-
Py_DECREF(self);
363-
PyErr_SetString(PyExc_MemoryError, "Unable to allocate lock");
364-
return NULL;
365-
}
366-
351+
self->mutex = (PyMutex){0};
367352
self->bzs.opaque = NULL;
368353
self->bzs.bzalloc = BZ2_Malloc;
369354
self->bzs.bzfree = BZ2_Free;
@@ -382,10 +367,8 @@ static void
382367
BZ2Compressor_dealloc(PyObject *op)
383368
{
384369
BZ2Compressor *self = _BZ2Compressor_CAST(op);
370+
assert(!PyMutex_IsLocked(&self->mutex));
385371
BZ2_bzCompressEnd(&self->bzs);
386-
if (self->lock != NULL) {
387-
PyThread_free_lock(self->lock);
388-
}
389372
PyTypeObject *tp = Py_TYPE(self);
390373
tp->tp_free((PyObject *)self);
391374
Py_DECREF(tp);
@@ -619,12 +602,12 @@ _bz2_BZ2Decompressor_decompress_impl(BZ2Decompressor *self, Py_buffer *data,
619602
{
620603
PyObject *result = NULL;
621604

622-
ACQUIRE_LOCK(self);
605+
PyMutex_Lock(&self->mutex);
623606
if (self->eof)
624607
PyErr_SetString(PyExc_EOFError, "End of stream already reached");
625608
else
626609
result = decompress(self, data->buf, data->len, max_length);
627-
RELEASE_LOCK(self);
610+
PyMutex_Unlock(&self->mutex);
628611
return result;
629612
}
630613

@@ -650,13 +633,7 @@ _bz2_BZ2Decompressor_impl(PyTypeObject *type)
650633
return NULL;
651634
}
652635

653-
self->lock = PyThread_allocate_lock();
654-
if (self->lock == NULL) {
655-
Py_DECREF(self);
656-
PyErr_SetString(PyExc_MemoryError, "Unable to allocate lock");
657-
return NULL;
658-
}
659-
636+
self->mutex = (PyMutex){0};
660637
self->needs_input = 1;
661638
self->bzs_avail_in_real = 0;
662639
self->input_buffer = NULL;
@@ -678,15 +655,13 @@ static void
678655
BZ2Decompressor_dealloc(PyObject *op)
679656
{
680657
BZ2Decompressor *self = _BZ2Decompressor_CAST(op);
658+
assert(!PyMutex_IsLocked(&self->mutex));
681659

682660
if(self->input_buffer != NULL) {
683661
PyMem_Free(self->input_buffer);
684662
}
685663
BZ2_bzDecompressEnd(&self->bzs);
686664
Py_CLEAR(self->unused_data);
687-
if (self->lock != NULL) {
688-
PyThread_free_lock(self->lock);
689-
}
690665

691666
PyTypeObject *tp = Py_TYPE(self);
692667
tp->tp_free((PyObject *)self);

0 commit comments

Comments
 (0)