77import queue
88import random
99import threading
10+ import time
1011
1112import brotlicffi
1213
@@ -43,34 +44,34 @@ def make_compress_input(size):
4344 return b"" .join (out )
4445
4546
46- compress_started = threading .Event ()
47-
48-
49- def _thread_compress (original , compressor , results ):
50- compress_started .set ()
47+ def _thread_compress (original , compressor , barrier , results ):
48+ barrier .wait ()
5149 compressed = compressor .process (original )
5250 compressed += compressor .finish ()
5351 results .put (1 )
5452
5553
56- def _thread_concurrent_process_compress (compressor , results ):
57- compress_started .wait ()
54+ def _thread_concurrent_process_compress (compressor , barrier , results ):
55+ barrier .wait ()
56+ time .sleep (.001 )
5857 try :
5958 _ = compressor .process (b"whatever" )
6059 except brotlicffi .error :
6160 results .put (2 )
6261
6362
64- def _thread_concurrent_flush_compress (compressor , results ):
65- compress_started .wait ()
63+ def _thread_concurrent_flush_compress (compressor , barrier , results ):
64+ barrier .wait ()
65+ time .sleep (.001 )
6666 try :
6767 _ = compressor .flush ()
6868 except brotlicffi .error :
6969 results .put (3 )
7070
7171
72- def _thread_concurrent_finish_compress (compressor , results ):
73- compress_started .wait ()
72+ def _thread_concurrent_finish_compress (compressor , barrier , results ):
73+ barrier .wait ()
74+ time .sleep (.001 )
7475 try :
7576 _ = compressor .finish ()
7677 except brotlicffi .error :
@@ -81,28 +82,30 @@ def test_compress_concurrency():
8182 original = make_compress_input (2 * 1024 * 1024 )
8283 compressor = brotlicffi .Compressor (quality = 9 )
8384 results = queue .Queue ()
85+ barrier = threading .Barrier (4 )
8486 threads = []
8587 threads .append (
8688 threading .Thread (
87- target = _thread_compress , args = (original , compressor , results )
89+ target = _thread_compress ,
90+ args = (original , compressor , barrier , results )
8891 )
8992 )
9093 threads .append (
9194 threading .Thread (
9295 target = _thread_concurrent_process_compress ,
93- args = (compressor , results )
96+ args = (compressor , barrier , results )
9497 )
9598 )
9699 threads .append (
97100 threading .Thread (
98101 target = _thread_concurrent_flush_compress ,
99- args = (compressor , results )
102+ args = (compressor , barrier , results )
100103 )
101104 )
102105 threads .append (
103106 threading .Thread (
104107 target = _thread_concurrent_finish_compress ,
105- args = (compressor , results )
108+ args = (compressor , barrier , results )
106109 )
107110 )
108111 for thread in threads :
@@ -123,34 +126,34 @@ def make_decompress_input(size):
123126 [prologue ] + [filler ] * (size // len (filler )) + [epilogue ])
124127
125128
126- decompress_started = threading .Event ()
127-
128-
129- def _thread_decompress (compressed , decompressor , results ):
130- decompress_started .set ()
129+ def _thread_decompress (compressed , decompressor , barrier , results ):
130+ barrier .wait ()
131131 _ = decompressor .process (compressed )
132132 if decompressor .is_finished ():
133133 results .put (1 )
134134
135135
136- def _thread_concurrent_process (decompressor , results ):
137- decompress_started .wait ()
136+ def _thread_concurrent_process (decompressor , barrier , results ):
137+ barrier .wait ()
138+ time .sleep (.001 )
138139 try :
139140 _ = decompressor .process (b'' )
140141 except brotlicffi .error :
141142 results .put (2 )
142143
143144
144- def _thread_concurrent_can_accept_more_data (decompressor , results ):
145- decompress_started .wait ()
145+ def _thread_concurrent_can_accept_more_data (decompressor , barrier , results ):
146+ barrier .wait ()
147+ time .sleep (.001 )
146148 try :
147149 _ = decompressor .can_accept_more_data ()
148150 except brotlicffi .error :
149151 results .put (3 )
150152
151153
152- def _thread_concurrent_is_finished (decompressor , results ):
153- decompress_started .wait ()
154+ def _thread_concurrent_is_finished (decompressor , barrier , results ):
155+ barrier .wait ()
156+ time .sleep (.001 )
154157 try :
155158 _ = decompressor .is_finished ()
156159 except brotlicffi .error :
@@ -161,26 +164,30 @@ def test_decompressor_concurrency():
161164 compressed = make_decompress_input (16 * 1024 * 1024 )
162165 decompressor = brotlicffi .Decompressor ()
163166 results = queue .Queue ()
167+ barrier = threading .Barrier (4 )
164168 threads = []
165169 threads .append (
166170 threading .Thread (
167- target = _thread_decompress , args = (compressed , decompressor , results )
171+ target = _thread_decompress ,
172+ args = (compressed , decompressor , barrier , results )
168173 )
169174 )
170175 threads .append (
171176 threading .Thread (
172- target = _thread_concurrent_process , args = (decompressor , results )
177+ target = _thread_concurrent_process ,
178+ args = (decompressor , barrier , results )
173179 )
174180 )
175181 threads .append (
176182 threading .Thread (
177183 target = _thread_concurrent_can_accept_more_data ,
178- args = (decompressor , results ),
184+ args = (decompressor , barrier , results ),
179185 )
180186 )
181187 threads .append (
182188 threading .Thread (
183- target = _thread_concurrent_is_finished , args = (decompressor , results )
189+ target = _thread_concurrent_is_finished ,
190+ args = (decompressor , barrier , results )
184191 )
185192 )
186193 for thread in threads :
0 commit comments