1
+ # Copyright (c) 2001, 2002, 2003, 2004, 2005, 2006, 2007, 2008, 2009, 2010,
2
+ # 2011, 2012, 2013, 2014, 2015, 2016, 2017, 2018, 2019, 2020, 2021, 2022
3
+ # Python Software Foundation; All Rights Reserved
4
+
5
+ # This file is part of python-isal which is distributed under the
6
+ # PYTHON SOFTWARE FOUNDATION LICENSE VERSION 2.
7
+
1
8
import io
2
9
import multiprocessing
3
10
import os
4
11
import queue
5
12
import struct
6
13
import threading
7
- from typing import BinaryIO , List , Tuple
14
+ from typing import BinaryIO , List , Optional , Tuple
8
15
9
16
from . import igzip , isal_zlib
10
17
11
18
DEFLATE_WINDOW_SIZE = 2 ** 15
12
19
13
20
14
21
def open (filename , mode = "rb" , compresslevel = igzip ._COMPRESS_LEVEL_TRADEOFF ,
15
- encoding = None , errors = None , newline = None , * , threads = - 1 ):
22
+ encoding = None , errors = None , newline = None , * , threads = 1 ):
23
+ """
24
+ Utilize threads to read and write gzip objects and escape the GIL.
25
+ Comparable to gzip.open. This method is only usable for streamed reading
26
+ and writing of objects. Seeking is not supported.
27
+
28
+ threads == 0 will defer to igzip.open. A threads < 0 will attempt to use
29
+ the number of threads in the system.
30
+
31
+ :param filename: str, bytes or file-like object (supporting read or write
32
+ method)
33
+ :param mode: the mode with which the file should be opened.
34
+ :param compresslevel: Compression level, only used for gzip writers.
35
+ :param encoding: Passed through to the io.TextIOWrapper, if applicable.
36
+ :param errors: Passed through to the io.TextIOWrapper, if applicable.
37
+ :param newline: Passed through to the io.TextIOWrapper, if applicable.
38
+ :param threads: If 0 will defer to igzip.open, if < 0 will use all threads
39
+ available to the system. Reading gzip can only
40
+ use one thread.
41
+ :return: An io.BufferedReader, io.BufferedWriter, or io.TextIOWrapper,
42
+ depending on the mode.
43
+ """
16
44
if threads == 0 :
17
45
return igzip .open (filename , mode , compresslevel , encoding , errors ,
18
46
newline )
@@ -32,18 +60,18 @@ def open(filename, mode="rb", compresslevel=igzip._COMPRESS_LEVEL_TRADEOFF,
32
60
else :
33
61
raise TypeError ("filename must be a str or bytes object, or a file" )
34
62
if "r" in mode :
35
- gzip_file = io .BufferedReader (ThreadedGzipReader (binary_file ))
63
+ gzip_file = io .BufferedReader (_ThreadedGzipReader (binary_file ))
36
64
else :
37
65
gzip_file = io .BufferedWriter (
38
- ThreadedGzipWriter (binary_file , compresslevel , threads ),
66
+ _ThreadedGzipWriter (binary_file , compresslevel , threads ),
39
67
buffer_size = 1024 * 1024
40
68
)
41
69
if "t" in mode :
42
70
return io .TextIOWrapper (gzip_file , encoding , errors , newline )
43
71
return gzip_file
44
72
45
73
46
- class ThreadedGzipReader (io .RawIOBase ):
74
+ class _ThreadedGzipReader (io .RawIOBase ):
47
75
def __init__ (self , fp , queue_size = 4 , block_size = 8 * 1024 * 1024 ):
48
76
self .raw = fp
49
77
self .fileobj = igzip ._IGzipReader (fp , buffersize = 8 * 1024 * 1024 )
@@ -109,7 +137,7 @@ def close(self) -> None:
109
137
self .fileobj .close ()
110
138
111
139
112
- class ThreadedGzipWriter (io .RawIOBase ):
140
+ class _ThreadedGzipWriter (io .RawIOBase ):
113
141
"""
114
142
Write a gzip file using multiple threads.
115
143
@@ -145,7 +173,7 @@ def __init__(self,
145
173
threads : int = 1 ,
146
174
queue_size : int = 2 ):
147
175
self .lock = threading .Lock ()
148
- self .exception = None
176
+ self .exception : Optional [ Exception ] = None
149
177
self .raw = fp
150
178
self .level = level
151
179
self .previous_block = b""
0 commit comments