@@ -29,33 +29,22 @@ def has_lz4() -> bool:
2929 return cramjam is not None
3030
3131
32- def gzip_encode (payload : Buffer , compresslevel : int | None = None ) -> bytes :
33- if not compresslevel :
34- compresslevel = 9
32+ def gzip_encode (payload : Buffer , level : int | None = None ) -> bytes :
33+ if not level :
34+ # using the fastest compression,
35+ # as higher levels are just 1-2% better in size
36+ level = 1
3537
3638 buf = io .BytesIO ()
37-
38- # Gzip context manager introduced in python 2.7
39- # so old-fashioned way until we decide to not support 2.6
40- gzipper = gzip .GzipFile (fileobj = buf , mode = "w" , compresslevel = compresslevel )
41- try :
39+ with gzip .GzipFile (fileobj = buf , mode = "w" , compresslevel = level ) as gzipper :
4240 gzipper .write (payload )
43- finally :
44- gzipper .close ()
45-
4641 return buf .getvalue ()
4742
4843
4944def gzip_decode (payload : Buffer ) -> bytes :
5045 buf = io .BytesIO (payload )
51-
52- # Gzip context manager introduced in python 2.7
53- # so old-fashioned way until we decide to not support 2.6
54- gzipper = gzip .GzipFile (fileobj = buf , mode = "r" )
55- try :
46+ with gzip .GzipFile (fileobj = buf , mode = "r" ) as gzipper :
5647 return gzipper .read ()
57- finally :
58- gzipper .close ()
5948
6049
6150def snappy_encode (
@@ -163,12 +152,17 @@ def snappy_decode(payload: Buffer) -> bytes:
163152 return bytes (cramjam .snappy .decompress_raw (payload ))
164153
165154
166- def lz4_encode (payload : Buffer , level : int = 9 ) -> bytes :
155+ def lz4_encode (payload : Buffer , level : int | None = None ) -> bytes :
167156 # level=9 is used by default by broker itself
168157 # https://cwiki.apache.org/confluence/display/KAFKA/KIP-390%3A+Support+Compression+Level
169158 if not has_lz4 ():
170159 raise NotImplementedError ("LZ4 codec is not available" )
171160
161+ if not level :
162+ # using the fastest compression,
163+ # as higher levels are just 1-2% better in size
164+ level = 1
165+
172166 # Kafka broker doesn't support linked-block compression
173167 # https://cwiki.apache.org/confluence/display/KAFKA/KIP-57+-+Interoperable+LZ4+Framing
174168 compressor = cramjam .lz4 .Compressor (
@@ -190,9 +184,9 @@ def zstd_encode(payload: Buffer, level: int | None = None) -> bytes:
190184 raise NotImplementedError ("Zstd codec is not available" )
191185
192186 if level is None :
193- # Default for kafka broker
194- # https://cwiki.apache.org/confluence/display/KAFKA/KIP-390%3A+Support+Compression+Level
195- level = 3
187+ # using the fastest compression,
188+ # as higher levels are just 1-2% better in size
189+ level = 1
196190
197191 return bytes (cramjam .zstd .compress (payload , level = level ))
198192
0 commit comments