Skip to content

Commit 583d633

Browse files
committed
add decompressor test
1 parent c336a4b commit 583d633

File tree

1 file changed

+127
-63
lines changed

1 file changed

+127
-63
lines changed

test/test_multithreaded_sharing.py

Lines changed: 127 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -12,69 +12,64 @@
1212
import brotlicffi
1313

1414

15-
def make_input(size):
16-
abc = [bytes([b]) for b in b"abcdefghijklmnopqrstuvwxyz"]
17-
abc_cap = [bytes([b]) for b in b"ABCDEFGHIJKLMNOPQRSTUVWXYZ"]
18-
num_words_by_len = [0, 25, 100, 175, 1700, 1000, 1000, 1000]
19-
word_set = set()
20-
rng = random.Random()
21-
rng.seed(2025)
22-
words_by_len = [[]]
23-
for word_len in range(1, len(num_words_by_len)):
24-
num_words = num_words_by_len[word_len]
25-
words = []
26-
for _ in range(num_words):
27-
while True:
28-
word = b"".join(
29-
[rng.choice(abc_cap)]
30-
+ [rng.choice(abc) for _ in range(word_len - 1)]
31-
)
32-
if word not in word_set:
33-
word_set.add(word)
34-
words.append(word)
35-
break
36-
words_by_len.append(words)
37-
total_size = 0
38-
out = []
39-
while total_size < size:
40-
word_len = rng.choice(range(1, len(num_words_by_len)))
41-
word = rng.choice(words_by_len[word_len])
42-
total_size += len(word)
43-
out.append(word)
44-
return b"".join(out)
45-
46-
47-
def _thread_compress(original, compressor, results):
48-
compressed = compressor.process(original)
49-
compressed += compressor.finish()
50-
results.put(1)
51-
52-
53-
def _thread_concurrent_process(compressor, results):
54-
time.sleep(0.01)
55-
try:
56-
_ = compressor.process(b"whatever")
57-
except brotlicffi.error:
58-
results.put(2)
59-
60-
61-
def _thread_concurrent_flush(compressor, results):
62-
time.sleep(0.02)
63-
try:
64-
_ = compressor.flush()
65-
except brotlicffi.error:
66-
results.put(3)
67-
68-
69-
def _thread_concurrent_finish(compressor, results):
70-
time.sleep(0.03)
71-
try:
72-
_ = compressor.finish()
73-
except brotlicffi.error:
74-
results.put(4)
75-
76-
77-
def test_concurrency():
15+
def test_compress_concurrency():
16+
def make_input(size):
17+
abc = [bytes([b]) for b in b"abcdefghijklmnopqrstuvwxyz"]
18+
abc_cap = [bytes([b]) for b in b"ABCDEFGHIJKLMNOPQRSTUVWXYZ"]
19+
num_words_by_len = [0, 25, 100, 175, 1700, 1000, 1000, 1000]
20+
word_set = set()
21+
rng = random.Random()
22+
rng.seed(0x4d3d3d3)
23+
words_by_len = [[]]
24+
for word_len in range(1, len(num_words_by_len)):
25+
num_words = num_words_by_len[word_len]
26+
words = []
27+
for _ in range(num_words):
28+
while True:
29+
word = b"".join(
30+
[rng.choice(abc_cap)]
31+
+ [rng.choice(abc) for _ in range(word_len - 1)]
32+
)
33+
if word not in word_set:
34+
word_set.add(word)
35+
words.append(word)
36+
break
37+
words_by_len.append(words)
38+
total_size = 0
39+
out = []
40+
while total_size < size:
41+
word_len = rng.choice(range(1, len(num_words_by_len)))
42+
word = rng.choice(words_by_len[word_len])
43+
total_size += len(word)
44+
out.append(word)
45+
return b"".join(out)
46+
47+
def _thread_compress(original, compressor, results):
48+
compressed = compressor.process(original)
49+
compressed += compressor.finish()
50+
results.put(1)
51+
52+
def _thread_concurrent_process(compressor, results):
53+
time.sleep(0.001)
54+
try:
55+
_ = compressor.process(b"whatever")
56+
except brotlicffi.error:
57+
results.put(2)
58+
59+
def _thread_concurrent_flush(compressor, results):
60+
time.sleep(0.002)
61+
try:
62+
_ = compressor.flush()
63+
except brotlicffi.error:
64+
results.put(3)
65+
66+
def _thread_concurrent_finish(compressor, results):
67+
time.sleep(0.003)
68+
try:
69+
_ = compressor.finish()
70+
except brotlicffi.error:
71+
results.put(4)
72+
7873
original = make_input(2 * 1024 * 1024)
7974
compressor = brotlicffi.Compressor(quality=9)
8075
results = queue.Queue()
@@ -104,3 +99,72 @@ def test_concurrency():
10499
for thread in threads:
105100
thread.join()
106101
assert sorted(list(results.queue)) == [1, 2, 3, 4]
102+
103+
104+
def test_decompressor_concurrency():
105+
def make_input(size):
106+
compressor = brotlicffi.Compressor(quality=1)
107+
prologue = compressor.process(b'b')
108+
prologue += compressor.flush()
109+
filler = compressor.process(b'c')
110+
filler += compressor.flush()
111+
epilogue = compressor.finish()
112+
return b''.join(
113+
[prologue] + [filler] * (size // len(filler)) + [epilogue])
114+
115+
def _thread_decompress(compressed, decompressor, results):
116+
_ = decompressor.process(compressed)
117+
if decompressor.is_finished():
118+
results.put(1)
119+
120+
def _thread_concurrent_process(decompressor, results):
121+
time.sleep(0.001)
122+
try:
123+
_ = decompressor.process(b'')
124+
except brotlicffi.error:
125+
results.put(2)
126+
127+
def _thread_concurrent_can_accept_more_data(decompressor, results):
128+
time.sleep(0.002)
129+
try:
130+
_ = decompressor.can_accept_more_data()
131+
except brotlicffi.error:
132+
results.put(3)
133+
134+
def _thread_concurrent_is_finished(decompressor, results):
135+
time.sleep(0.03)
136+
try:
137+
_ = decompressor.is_finished()
138+
except brotlicffi.error:
139+
results.put(4)
140+
141+
compressed = make_input(16 * 1024 * 1024)
142+
decompressor = brotlicffi.Decompressor()
143+
results = queue.Queue()
144+
threads = []
145+
threads.append(
146+
threading.Thread(
147+
target=_thread_decompress, args=(compressed, decompressor, results)
148+
)
149+
)
150+
threads.append(
151+
threading.Thread(
152+
target=_thread_concurrent_process, args=(decompressor, results)
153+
)
154+
)
155+
threads.append(
156+
threading.Thread(
157+
target=_thread_concurrent_can_accept_more_data,
158+
args=(decompressor, results),
159+
)
160+
)
161+
threads.append(
162+
threading.Thread(
163+
target=_thread_concurrent_is_finished, args=(decompressor, results)
164+
)
165+
)
166+
for thread in threads:
167+
thread.start()
168+
for thread in threads:
169+
thread.join()
170+
assert sorted(list(results.queue)) == [1, 2, 3, 4]

0 commit comments

Comments
 (0)