Skip to content

Commit 29a3c0b

Browse files
committed
appease linter
1 parent 8f65ef0 commit 29a3c0b

File tree

2 files changed

+181
-163
lines changed

2 files changed

+181
-163
lines changed

src/brotlicffi/_api.py

Lines changed: 70 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -433,75 +433,79 @@ def decompress(self, data, output_buffer_limit=None):
433433
raise error(
434434
"Concurrently sharing Decompressor instances is not allowed")
435435
try:
436-
# Use unconsumed data if available, use new data otherwise.
437-
if self._unconsumed_data:
438-
input_data = self._unconsumed_data
439-
self._unconsumed_data = b''
440-
else:
441-
input_data = data
436+
chunks = self._decompress(data, output_buffer_limit)
437+
finally:
438+
self.lock.release()
439+
return b''.join(chunks)
442440

443-
chunks = []
444-
chunks_len = 0
441+
def _decompress(self, data, output_buffer_limit):
442+
# Use unconsumed data if available, use new data otherwise.
443+
if self._unconsumed_data:
444+
input_data = self._unconsumed_data
445+
self._unconsumed_data = b''
446+
else:
447+
input_data = data
445448

446-
available_in = ffi.new("size_t *", len(input_data))
447-
in_buffer = ffi.new("uint8_t[]", input_data)
448-
next_in = ffi.new("uint8_t **", in_buffer)
449+
chunks = []
450+
chunks_len = 0
451+
452+
available_in = ffi.new("size_t *", len(input_data))
453+
in_buffer = ffi.new("uint8_t[]", input_data)
454+
next_in = ffi.new("uint8_t **", in_buffer)
455+
456+
while True:
457+
buffer_size = self._calculate_buffer_size(
458+
input_data_len=len(input_data),
459+
output_buffer_limit=output_buffer_limit,
460+
chunks_len=chunks_len,
461+
chunks_num=len(chunks),
462+
)
449463

450-
while True:
451-
buffer_size = self._calculate_buffer_size(
452-
input_data_len=len(input_data),
453-
output_buffer_limit=output_buffer_limit,
454-
chunks_len=chunks_len,
455-
chunks_num=len(chunks),
464+
available_out = ffi.new("size_t *", buffer_size)
465+
out_buffer = ffi.new("uint8_t[]", buffer_size)
466+
next_out = ffi.new("uint8_t **", out_buffer)
467+
468+
rc = lib.BrotliDecoderDecompressStream(self._decoder,
469+
available_in,
470+
next_in,
471+
available_out,
472+
next_out,
473+
ffi.NULL)
474+
475+
# First, check for errors.
476+
if rc == lib.BROTLI_DECODER_RESULT_ERROR:
477+
error_code = lib.BrotliDecoderGetErrorCode(self._decoder)
478+
error_message = lib.BrotliDecoderErrorString(error_code)
479+
raise error(
480+
b"Decompression error: %s" % ffi.string(error_message)
456481
)
457482

458-
available_out = ffi.new("size_t *", buffer_size)
459-
out_buffer = ffi.new("uint8_t[]", buffer_size)
460-
next_out = ffi.new("uint8_t **", out_buffer)
461-
462-
rc = lib.BrotliDecoderDecompressStream(self._decoder,
463-
available_in,
464-
next_in,
465-
available_out,
466-
next_out,
467-
ffi.NULL)
468-
469-
# First, check for errors.
470-
if rc == lib.BROTLI_DECODER_RESULT_ERROR:
471-
error_code = lib.BrotliDecoderGetErrorCode(self._decoder)
472-
error_message = lib.BrotliDecoderErrorString(error_code)
473-
raise error(
474-
b"Decompression error: %s" % ffi.string(error_message)
475-
)
476-
477-
# Next, copy the result out.
478-
chunk = ffi.buffer(out_buffer, buffer_size - available_out[0])[:]
479-
chunks.append(chunk)
480-
chunks_len += len(chunk)
481-
482-
# Save any unconsumed input for the next call.
483-
if available_in[0] > 0:
484-
remaining_input = ffi.buffer(next_in[0], available_in[0])[:]
485-
self._unconsumed_data = remaining_input
486-
487-
# Check if we've reached the output limit.
488-
if (
489-
output_buffer_limit is not None
490-
and chunks_len >= output_buffer_limit
491-
):
492-
break
493-
494-
if rc == lib.BROTLI_DECODER_RESULT_NEEDS_MORE_INPUT:
495-
assert available_in[0] == 0
496-
break
497-
elif rc == lib.BROTLI_DECODER_RESULT_SUCCESS:
498-
break
499-
else:
500-
# It's cool if we need more output, we just loop again.
501-
assert rc == lib.BROTLI_DECODER_RESULT_NEEDS_MORE_OUTPUT
502-
finally:
503-
self.lock.release()
504-
return b''.join(chunks)
483+
# Next, copy the result out.
484+
chunk = ffi.buffer(out_buffer, buffer_size - available_out[0])[:]
485+
chunks.append(chunk)
486+
chunks_len += len(chunk)
487+
488+
# Save any unconsumed input for the next call.
489+
if available_in[0] > 0:
490+
remaining_input = ffi.buffer(next_in[0], available_in[0])[:]
491+
self._unconsumed_data = remaining_input
492+
493+
# Check if we've reached the output limit.
494+
if (
495+
output_buffer_limit is not None
496+
and chunks_len >= output_buffer_limit
497+
):
498+
break
499+
500+
if rc == lib.BROTLI_DECODER_RESULT_NEEDS_MORE_INPUT:
501+
assert available_in[0] == 0
502+
break
503+
elif rc == lib.BROTLI_DECODER_RESULT_SUCCESS:
504+
break
505+
else:
506+
# It's cool if we need more output, we just loop again.
507+
assert rc == lib.BROTLI_DECODER_RESULT_NEEDS_MORE_OUTPUT
508+
return chunks
505509

506510
process = decompress
507511

@@ -578,7 +582,8 @@ def can_accept_more_data(self):
578582
ret = True
579583
if len(self._unconsumed_data) > 0:
580584
ret = False
581-
if lib.BrotliDecoderHasMoreOutput(self._decoder) == lib.BROTLI_TRUE:
585+
if ((lib.BrotliDecoderHasMoreOutput(self._decoder) ==
586+
lib.BROTLI_TRUE)):
582587
ret = False
583588
finally:
584589
self.lock.release()

test/test_multithreaded_sharing.py

Lines changed: 111 additions & 98 deletions
Original file line numberDiff line numberDiff line change
@@ -12,65 +12,70 @@
1212
import brotlicffi
1313

1414

15-
def test_compress_concurrency():
16-
def make_input(size):
17-
abc = [bytes([b]) for b in b"abcdefghijklmnopqrstuvwxyz"]
18-
abc_cap = [bytes([b]) for b in b"ABCDEFGHIJKLMNOPQRSTUVWXYZ"]
19-
num_words_by_len = [0, 25, 100, 175, 1700, 1000, 1000, 1000]
20-
word_set = set()
21-
rng = random.Random()
22-
rng.seed(0x4d3d3d3)
23-
words_by_len = [[]]
24-
for word_len in range(1, len(num_words_by_len)):
25-
num_words = num_words_by_len[word_len]
26-
words = []
27-
for _ in range(num_words):
28-
while True:
29-
word = b"".join(
30-
[rng.choice(abc_cap)]
31-
+ [rng.choice(abc) for _ in range(word_len - 1)]
32-
)
33-
if word not in word_set:
34-
word_set.add(word)
35-
words.append(word)
36-
break
37-
words_by_len.append(words)
38-
total_size = 0
39-
out = []
40-
while total_size < size:
41-
word_len = rng.choice(range(1, len(num_words_by_len)))
42-
word = rng.choice(words_by_len[word_len])
43-
total_size += len(word)
44-
out.append(word)
45-
return b"".join(out)
46-
47-
def _thread_compress(original, compressor, results):
48-
compressed = compressor.process(original)
49-
compressed += compressor.finish()
50-
results.put(1)
15+
def make_compress_input(size):
16+
abc = [bytes([b]) for b in b"abcdefghijklmnopqrstuvwxyz"]
17+
abc_cap = [bytes([b]) for b in b"ABCDEFGHIJKLMNOPQRSTUVWXYZ"]
18+
num_words_by_len = [0, 25, 100, 175, 1700, 1000, 1000, 1000]
19+
word_set = set()
20+
rng = random.Random()
21+
rng.seed(0x4d3d3d3)
22+
words_by_len = [[]]
23+
for word_len in range(1, len(num_words_by_len)):
24+
num_words = num_words_by_len[word_len]
25+
words = []
26+
for _ in range(num_words):
27+
while True:
28+
word = b"".join(
29+
[rng.choice(abc_cap)]
30+
+ [rng.choice(abc) for _ in range(word_len - 1)]
31+
)
32+
if word not in word_set:
33+
word_set.add(word)
34+
words.append(word)
35+
break
36+
words_by_len.append(words)
37+
total_size = 0
38+
out = []
39+
while total_size < size:
40+
word_len = rng.choice(range(1, len(num_words_by_len)))
41+
word = rng.choice(words_by_len[word_len])
42+
total_size += len(word)
43+
out.append(word)
44+
return b"".join(out)
45+
46+
47+
def _thread_compress(original, compressor, results):
48+
compressed = compressor.process(original)
49+
compressed += compressor.finish()
50+
results.put(1)
51+
52+
53+
def _thread_concurrent_process_compress(compressor, results):
54+
time.sleep(0.001)
55+
try:
56+
_ = compressor.process(b"whatever")
57+
except brotlicffi.error:
58+
results.put(2)
59+
60+
61+
def _thread_concurrent_flush_compress(compressor, results):
62+
time.sleep(0.002)
63+
try:
64+
_ = compressor.flush()
65+
except brotlicffi.error:
66+
results.put(3)
67+
68+
69+
def _thread_concurrent_finish_compress(compressor, results):
70+
time.sleep(0.003)
71+
try:
72+
_ = compressor.finish()
73+
except brotlicffi.error:
74+
results.put(4)
5175

52-
def _thread_concurrent_process(compressor, results):
53-
time.sleep(0.001)
54-
try:
55-
_ = compressor.process(b"whatever")
56-
except brotlicffi.error:
57-
results.put(2)
58-
59-
def _thread_concurrent_flush(compressor, results):
60-
time.sleep(0.002)
61-
try:
62-
_ = compressor.flush()
63-
except brotlicffi.error:
64-
results.put(3)
65-
66-
def _thread_concurrent_finish(compressor, results):
67-
time.sleep(0.003)
68-
try:
69-
_ = compressor.finish()
70-
except brotlicffi.error:
71-
results.put(4)
72-
73-
original = make_input(2 * 1024 * 1024)
76+
77+
def test_compress_concurrency():
78+
original = make_compress_input(2 * 1024 * 1024)
7479
compressor = brotlicffi.Compressor(quality=9)
7580
results = queue.Queue()
7681
threads = []
@@ -81,17 +86,20 @@ def _thread_concurrent_finish(compressor, results):
8186
)
8287
threads.append(
8388
threading.Thread(
84-
target=_thread_concurrent_process, args=(compressor, results)
89+
target=_thread_concurrent_process_compress,
90+
args=(compressor, results)
8591
)
8692
)
8793
threads.append(
8894
threading.Thread(
89-
target=_thread_concurrent_flush, args=(compressor, results)
95+
target=_thread_concurrent_flush_compress,
96+
args=(compressor, results)
9097
)
9198
)
9299
threads.append(
93100
threading.Thread(
94-
target=_thread_concurrent_finish, args=(compressor, results)
101+
target=_thread_concurrent_finish_compress,
102+
args=(compressor, results)
95103
)
96104
)
97105
for thread in threads:
@@ -101,44 +109,49 @@ def _thread_concurrent_finish(compressor, results):
101109
assert sorted(list(results.queue)) == [1, 2, 3, 4]
102110

103111

112+
def make_decompress_input(size):
113+
compressor = brotlicffi.Compressor(quality=1)
114+
prologue = compressor.process(b'b')
115+
prologue += compressor.flush()
116+
filler = compressor.process(b'c')
117+
filler += compressor.flush()
118+
epilogue = compressor.finish()
119+
return b''.join(
120+
[prologue] + [filler] * (size // len(filler)) + [epilogue])
121+
122+
123+
def _thread_decompress(compressed, decompressor, results):
124+
_ = decompressor.process(compressed)
125+
if decompressor.is_finished():
126+
results.put(1)
127+
128+
129+
def _thread_concurrent_process(decompressor, results):
130+
time.sleep(0.001)
131+
try:
132+
_ = decompressor.process(b'')
133+
except brotlicffi.error:
134+
results.put(2)
135+
136+
137+
def _thread_concurrent_can_accept_more_data(decompressor, results):
138+
time.sleep(0.002)
139+
try:
140+
_ = decompressor.can_accept_more_data()
141+
except brotlicffi.error:
142+
results.put(3)
143+
144+
145+
def _thread_concurrent_is_finished(decompressor, results):
146+
time.sleep(0.03)
147+
try:
148+
_ = decompressor.is_finished()
149+
except brotlicffi.error:
150+
results.put(4)
151+
152+
104153
def test_decompressor_concurrency():
105-
def make_input(size):
106-
compressor = brotlicffi.Compressor(quality=1)
107-
prologue = compressor.process(b'b')
108-
prologue += compressor.flush()
109-
filler = compressor.process(b'c')
110-
filler += compressor.flush()
111-
epilogue = compressor.finish()
112-
return b''.join(
113-
[prologue] + [filler] * (size // len(filler)) + [epilogue])
114-
115-
def _thread_decompress(compressed, decompressor, results):
116-
_ = decompressor.process(compressed)
117-
if decompressor.is_finished():
118-
results.put(1)
119-
120-
def _thread_concurrent_process(decompressor, results):
121-
time.sleep(0.001)
122-
try:
123-
_ = decompressor.process(b'')
124-
except brotlicffi.error:
125-
results.put(2)
126-
127-
def _thread_concurrent_can_accept_more_data(decompressor, results):
128-
time.sleep(0.002)
129-
try:
130-
_ = decompressor.can_accept_more_data()
131-
except brotlicffi.error:
132-
results.put(3)
133-
134-
def _thread_concurrent_is_finished(decompressor, results):
135-
time.sleep(0.03)
136-
try:
137-
_ = decompressor.is_finished()
138-
except brotlicffi.error:
139-
results.put(4)
140-
141-
compressed = make_input(16 * 1024 * 1024)
154+
compressed = make_decompress_input(16 * 1024 * 1024)
142155
decompressor = brotlicffi.Decompressor()
143156
results = queue.Queue()
144157
threads = []

0 commit comments

Comments
 (0)