Skip to content

Commit c336a4b

Browse files
committed
prevent multithreaded sharing of Compressor and Decompressor instances
1 parent 7fb4b99 commit c336a4b

File tree

1 file changed

+106
-78
lines changed

1 file changed

+106
-78
lines changed

src/brotlicffi/_api.py

Lines changed: 106 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
# -*- coding: utf-8 -*-
22
import math
33
import enum
4+
import threading
45

56
from ._brotlicffi import ffi, lib
67

@@ -249,6 +250,7 @@ def __init__(self,
249250
quality=lib.BROTLI_DEFAULT_QUALITY,
250251
lgwin=lib.BROTLI_DEFAULT_WINDOW,
251252
lgblock=0):
253+
self.lock = threading.Lock()
252254
enc = lib.BrotliEncoderCreateInstance(
253255
ffi.NULL, ffi.NULL, ffi.NULL
254256
)
@@ -284,15 +286,21 @@ def _compress(self, data, operation):
284286
input_buffer = ffi.new("uint8_t []", data)
285287
ptr_to_input_buffer = ffi.new("uint8_t **", input_buffer)
286288

287-
rc = lib.BrotliEncoderCompressStream(
288-
self._encoder,
289-
operation,
290-
input_size,
291-
ptr_to_input_buffer,
292-
available_out,
293-
ptr_to_output_buffer,
294-
ffi.NULL
295-
)
289+
if not self.lock.acquire(blocking=False):
290+
raise error(
291+
"Concurrently sharing Compressor objects is not allowed")
292+
try:
293+
rc = lib.BrotliEncoderCompressStream(
294+
self._encoder,
295+
operation,
296+
input_size,
297+
ptr_to_input_buffer,
298+
available_out,
299+
ptr_to_output_buffer,
300+
ffi.NULL
301+
)
302+
finally:
303+
self.lock.release()
296304
if rc != lib.BROTLI_TRUE: # pragma: no cover
297305
raise error("Error encountered compressing data.")
298306

@@ -362,6 +370,7 @@ class Decompressor(object):
362370
_unconsumed_data = None
363371

364372
def __init__(self, dictionary=b''):
373+
self.lock = threading.Lock()
365374
dec = lib.BrotliDecoderCreateInstance(ffi.NULL, ffi.NULL, ffi.NULL)
366375
self._decoder = ffi.gc(dec, lib.BrotliDecoderDestroyInstance)
367376
self._unconsumed_data = b''
@@ -420,73 +429,78 @@ def decompress(self, data, output_buffer_limit=None):
420429
if output_buffer_limit is not None and output_buffer_limit <= 0:
421430
return b''
422431

423-
# Use unconsumed data if available, use new data otherwise.
424-
if self._unconsumed_data:
425-
input_data = self._unconsumed_data
426-
self._unconsumed_data = b''
427-
else:
428-
input_data = data
432+
if not self.lock.acquire(blocking=False):
433+
raise error(
434+
"Concurrently sharing Decompressor instances is not allowed")
435+
try:
436+
# Use unconsumed data if available, use new data otherwise.
437+
if self._unconsumed_data:
438+
input_data = self._unconsumed_data
439+
self._unconsumed_data = b''
440+
else:
441+
input_data = data
429442

430-
chunks = []
431-
chunks_len = 0
432-
433-
available_in = ffi.new("size_t *", len(input_data))
434-
in_buffer = ffi.new("uint8_t[]", input_data)
435-
next_in = ffi.new("uint8_t **", in_buffer)
436-
437-
while True:
438-
buffer_size = self._calculate_buffer_size(
439-
input_data_len=len(input_data),
440-
output_buffer_limit=output_buffer_limit,
441-
chunks_len=chunks_len,
442-
chunks_num=len(chunks),
443-
)
443+
chunks = []
444+
chunks_len = 0
444445

445-
available_out = ffi.new("size_t *", buffer_size)
446-
out_buffer = ffi.new("uint8_t[]", buffer_size)
447-
next_out = ffi.new("uint8_t **", out_buffer)
448-
449-
rc = lib.BrotliDecoderDecompressStream(self._decoder,
450-
available_in,
451-
next_in,
452-
available_out,
453-
next_out,
454-
ffi.NULL)
455-
456-
# First, check for errors.
457-
if rc == lib.BROTLI_DECODER_RESULT_ERROR:
458-
error_code = lib.BrotliDecoderGetErrorCode(self._decoder)
459-
error_message = lib.BrotliDecoderErrorString(error_code)
460-
raise error(
461-
b"Decompression error: %s" % ffi.string(error_message)
462-
)
446+
available_in = ffi.new("size_t *", len(input_data))
447+
in_buffer = ffi.new("uint8_t[]", input_data)
448+
next_in = ffi.new("uint8_t **", in_buffer)
463449

464-
# Next, copy the result out.
465-
chunk = ffi.buffer(out_buffer, buffer_size - available_out[0])[:]
466-
chunks.append(chunk)
467-
chunks_len += len(chunk)
468-
469-
# Save any unconsumed input for the next call.
470-
if available_in[0] > 0:
471-
remaining_input = ffi.buffer(next_in[0], available_in[0])[:]
472-
self._unconsumed_data = remaining_input
473-
474-
# Check if we've reached the output limit.
475-
if (
476-
output_buffer_limit is not None
477-
and chunks_len >= output_buffer_limit
478-
):
479-
break
480-
481-
if rc == lib.BROTLI_DECODER_RESULT_NEEDS_MORE_INPUT:
482-
assert available_in[0] == 0
483-
break
484-
elif rc == lib.BROTLI_DECODER_RESULT_SUCCESS:
485-
break
486-
else:
487-
# It's cool if we need more output, we just loop again.
488-
assert rc == lib.BROTLI_DECODER_RESULT_NEEDS_MORE_OUTPUT
450+
while True:
451+
buffer_size = self._calculate_buffer_size(
452+
input_data_len=len(input_data),
453+
output_buffer_limit=output_buffer_limit,
454+
chunks_len=chunks_len,
455+
chunks_num=len(chunks),
456+
)
489457

458+
available_out = ffi.new("size_t *", buffer_size)
459+
out_buffer = ffi.new("uint8_t[]", buffer_size)
460+
next_out = ffi.new("uint8_t **", out_buffer)
461+
462+
rc = lib.BrotliDecoderDecompressStream(self._decoder,
463+
available_in,
464+
next_in,
465+
available_out,
466+
next_out,
467+
ffi.NULL)
468+
469+
# First, check for errors.
470+
if rc == lib.BROTLI_DECODER_RESULT_ERROR:
471+
error_code = lib.BrotliDecoderGetErrorCode(self._decoder)
472+
error_message = lib.BrotliDecoderErrorString(error_code)
473+
raise error(
474+
b"Decompression error: %s" % ffi.string(error_message)
475+
)
476+
477+
# Next, copy the result out.
478+
chunk = ffi.buffer(out_buffer, buffer_size - available_out[0])[:]
479+
chunks.append(chunk)
480+
chunks_len += len(chunk)
481+
482+
# Save any unconsumed input for the next call.
483+
if available_in[0] > 0:
484+
remaining_input = ffi.buffer(next_in[0], available_in[0])[:]
485+
self._unconsumed_data = remaining_input
486+
487+
# Check if we've reached the output limit.
488+
if (
489+
output_buffer_limit is not None
490+
and chunks_len >= output_buffer_limit
491+
):
492+
break
493+
494+
if rc == lib.BROTLI_DECODER_RESULT_NEEDS_MORE_INPUT:
495+
assert available_in[0] == 0
496+
break
497+
elif rc == lib.BROTLI_DECODER_RESULT_SUCCESS:
498+
break
499+
else:
500+
# It's cool if we need more output, we just loop again.
501+
assert rc == lib.BROTLI_DECODER_RESULT_NEEDS_MORE_OUTPUT
502+
finally:
503+
self.lock.release()
490504
return b''.join(chunks)
491505

492506
process = decompress
@@ -527,7 +541,14 @@ def is_finished(self):
527541
Returns ``True`` if the decompression stream
528542
is complete, ``False`` otherwise
529543
"""
530-
return lib.BrotliDecoderIsFinished(self._decoder) == lib.BROTLI_TRUE
544+
if not self.lock.acquire(blocking=False):
545+
raise error(
546+
"Concurrently sharing Decompressor instances is not allowed")
547+
try:
548+
return (
549+
lib.BrotliDecoderIsFinished(self._decoder) == lib.BROTLI_TRUE)
550+
finally:
551+
self.lock.release()
531552

532553
def can_accept_more_data(self):
533554
"""
@@ -550,8 +571,15 @@ def can_accept_more_data(self):
550571
more compressed data.
551572
:rtype: ``bool``
552573
"""
553-
if len(self._unconsumed_data) > 0:
554-
return False
555-
if lib.BrotliDecoderHasMoreOutput(self._decoder) == lib.BROTLI_TRUE:
556-
return False
557-
return True
574+
if not self.lock.acquire(blocking=False):
575+
raise error(
576+
"Concurrently sharing Decompressor instances is not allowed")
577+
try:
578+
ret = True
579+
if len(self._unconsumed_data) > 0:
580+
ret = False
581+
if lib.BrotliDecoderHasMoreOutput(self._decoder) == lib.BROTLI_TRUE:
582+
ret = False
583+
finally:
584+
self.lock.release()
585+
return ret

0 commit comments

Comments
 (0)