20
20
21
21
22
22
def open (filename , mode = "rb" , compresslevel = igzip ._COMPRESS_LEVEL_TRADEOFF ,
23
- encoding = None , errors = None , newline = None , * , threads = 1 ):
23
+ encoding = None , errors = None , newline = None , * , threads = 1 ,
24
+ block_size = 1024 * 1024 ):
24
25
"""
25
26
Utilize threads to read and write gzip objects and escape the GIL.
26
27
Comparable to gzip.open. This method is only usable for streamed reading
@@ -39,6 +40,8 @@ def open(filename, mode="rb", compresslevel=igzip._COMPRESS_LEVEL_TRADEOFF,
39
40
:param threads: If 0 will defer to igzip.open, if < 0 will use all threads
40
41
available to the system. Reading gzip can only
41
42
use one thread.
43
+ :param block_size: Determines how large the blocks in the read/write
44
+ queues are for threaded reading and writing.
42
45
:return: An io.BufferedReader, io.BufferedWriter, or io.TextIOWrapper,
43
46
depending on the mode.
44
47
"""
@@ -61,20 +64,21 @@ def open(filename, mode="rb", compresslevel=igzip._COMPRESS_LEVEL_TRADEOFF,
61
64
else :
62
65
raise TypeError ("filename must be a str or bytes object, or a file" )
63
66
if "r" in mode :
64
- gzip_file = io .BufferedReader (_ThreadedGzipReader (binary_file ))
67
+ gzip_file = io .BufferedReader (
68
+ _ThreadedGzipReader (binary_file , block_size = block_size ))
65
69
else :
66
- write_buffer_size = 1024 * 1024
67
70
# Deflating random data results in an output a little larger than the
68
71
# input. Making the output buffer 10% larger is sufficient overkill.
69
- compress_buffer_size = write_buffer_size + max (
70
- write_buffer_size // 10 , 500 )
72
+ compress_buffer_size = block_size + max (
73
+ block_size // 10 , 500 )
71
74
gzip_file = io .BufferedWriter (
72
75
_ThreadedGzipWriter (
73
76
fp = binary_file ,
74
77
buffer_size = compress_buffer_size ,
75
78
level = compresslevel ,
76
- threads = threads ),
77
- buffer_size = write_buffer_size
79
+ threads = threads
80
+ ),
81
+ buffer_size = block_size
78
82
)
79
83
if "t" in mode :
80
84
return io .TextIOWrapper (gzip_file , encoding , errors , newline )
@@ -84,7 +88,7 @@ def open(filename, mode="rb", compresslevel=igzip._COMPRESS_LEVEL_TRADEOFF,
84
88
class _ThreadedGzipReader (io .RawIOBase ):
85
89
def __init__ (self , fp , queue_size = 2 , block_size = 1024 * 1024 ):
86
90
self .raw = fp
87
- self .fileobj = igzip ._IGzipReader (fp , buffersize = 8 * 1024 * 1024 )
91
+ self .fileobj = igzip ._IGzipReader (fp , buffersize = 8 * block_size )
88
92
self .pos = 0
89
93
self .read_file = False
90
94
self .queue = queue .Queue (queue_size )
0 commit comments