Skip to content

Commit 8f7bff0

Browse files
authored
Use cramjam for LZ4 (#960)
1 parent 425ce26 commit 8f7bff0

File tree

6 files changed

+38
-83
lines changed

6 files changed

+38
-83
lines changed

aiokafka/codec.py

Lines changed: 22 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -11,28 +11,6 @@
1111
except ImportError:
1212
cramjam = None
1313

14-
try:
15-
import lz4.frame as lz4
16-
17-
def _lz4_compress(payload, **kwargs):
18-
# Kafka does not support LZ4 dependent blocks
19-
# https://cwiki.apache.org/confluence/display/KAFKA/KIP-57+-+Interoperable+LZ4+Framing
20-
kwargs.pop("block_linked", None)
21-
return lz4.compress(payload, block_linked=False, **kwargs)
22-
23-
except ImportError:
24-
lz4 = None
25-
26-
try:
27-
import lz4f
28-
except ImportError:
29-
lz4f = None
30-
31-
try:
32-
import lz4framed
33-
except ImportError:
34-
lz4framed = None
35-
3614

3715
def has_gzip():
3816
return True
@@ -47,13 +25,7 @@ def has_zstd():
4725

4826

4927
def has_lz4():
50-
if lz4 is not None:
51-
return True
52-
if lz4f is not None:
53-
return True
54-
if lz4framed is not None:
55-
return True
56-
return False
28+
return cramjam is not None
5729

5830

5931
def gzip_encode(payload, compresslevel=None):
@@ -161,7 +133,7 @@ def _detect_xerial_stream(payload):
161133
"""
162134

163135
if len(payload) > 16:
164-
header = struct.unpack("!" + _XERIAL_V1_FORMAT, bytes(payload)[:16])
136+
header = struct.unpack("!" + _XERIAL_V1_FORMAT, memoryview(payload)[:16])
165137
return header == _XERIAL_V1_HEADER
166138
return False
167139

@@ -191,38 +163,26 @@ def snappy_decode(payload):
191163
return bytes(cramjam.snappy.decompress_raw(payload))
192164

193165

194-
if lz4:
195-
lz4_encode = _lz4_compress # pylint: disable-msg=no-member
196-
elif lz4f:
197-
lz4_encode = lz4f.compressFrame # pylint: disable-msg=no-member
198-
elif lz4framed:
199-
lz4_encode = lz4framed.compress # pylint: disable-msg=no-member
200-
else:
201-
lz4_encode = None
202-
203-
204-
def lz4f_decode(payload):
205-
"""Decode payload using interoperable LZ4 framing. Requires Kafka >= 0.10"""
206-
# pylint: disable-msg=no-member
207-
ctx = lz4f.createDecompContext()
208-
data = lz4f.decompressFrame(payload, ctx)
209-
lz4f.freeDecompContext(ctx)
210-
211-
# lz4f python module does not expose how much of the payload was
212-
# actually read if the decompression was only partial.
213-
if data["next"] != 0:
214-
raise RuntimeError("lz4f unable to decompress full payload")
215-
return data["decomp"]
216-
217-
218-
if lz4:
219-
lz4_decode = lz4.decompress # pylint: disable-msg=no-member
220-
elif lz4f:
221-
lz4_decode = lz4f_decode
222-
elif lz4framed:
223-
lz4_decode = lz4framed.decompress # pylint: disable-msg=no-member
224-
else:
225-
lz4_decode = None
166+
def lz4_encode(payload, level=9):
167+
# level=9 is used by default by broker itself
168+
# https://cwiki.apache.org/confluence/display/KAFKA/KIP-390%3A+Support+Compression+Level
169+
if not has_lz4():
170+
raise NotImplementedError("LZ4 codec is not available")
171+
172+
# Kafka broker doesn't support linked-block compression
173+
# https://cwiki.apache.org/confluence/display/KAFKA/KIP-57+-+Interoperable+LZ4+Framing
174+
compressor = cramjam.lz4.Compressor(
175+
level=level, content_checksum=False, block_linked=False
176+
)
177+
compressor.compress(payload)
178+
return bytes(compressor.finish())
179+
180+
181+
def lz4_decode(payload):
182+
if not has_lz4():
183+
raise NotImplementedError("LZ4 codec is not available")
184+
185+
return bytes(cramjam.lz4.decompress(payload))
226186

227187

228188
def zstd_encode(payload, level=None):

pyproject.toml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,10 +35,11 @@ dependencies = [
3535

3636
[project.optional-dependencies]
3737
snappy = ["cramjam"]
38-
lz4 = ["lz4 >=3.1.3"]
38+
# v2.8.0 adds support for independent-block mode
39+
lz4 = ["cramjam >=2.8.0"]
3940
zstd = ["cramjam"]
4041
gssapi = ["gssapi"]
41-
all = ["cramjam", "lz4 >=3.1.3", "gssapi"]
42+
all = ["cramjam >=2.8.0", "gssapi"]
4243

4344
[tool.setuptools.dynamic]
4445
version = { attr = "aiokafka.__version__" }

requirements-ci.txt

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,8 @@ pytest-cov==4.1.0
88
pytest-asyncio==0.21.1
99
pytest-mock==3.12.0
1010
docker==6.1.3
11-
lz4==3.1.3
12-
docutils==0.17.1
11+
docutils==0.20.1
1312
Pygments==2.15.0
1413
gssapi==1.8.3
1514
async-timeout==4.0.1
16-
cramjam==2.7.0
15+
cramjam==2.8.0

requirements-win-test.txt

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,5 +8,4 @@ pytest-cov==4.1.0
88
pytest-asyncio==0.21.1
99
pytest-mock==3.12.0
1010
docker==6.1.3
11-
lz4==3.1.3
12-
cramjam==2.7.0
11+
cramjam==2.8.0

tests/record/test_default_records.py

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,15 @@
1010

1111

1212
@pytest.mark.parametrize("compression_type,crc", [
13-
(DefaultRecordBatch.CODEC_NONE, 3950153926),
13+
pytest.param(DefaultRecordBatch.CODEC_NONE, 3950153926, id="none"),
1414
# Gzip header includes timestamp, so checksum varies
15-
(DefaultRecordBatch.CODEC_GZIP, None),
16-
(DefaultRecordBatch.CODEC_SNAPPY, 2171068483),
17-
(DefaultRecordBatch.CODEC_LZ4, 462121143),
18-
(DefaultRecordBatch.CODEC_ZSTD, 1714138923),
15+
pytest.param(DefaultRecordBatch.CODEC_GZIP, None, id="gzip"),
16+
pytest.param(DefaultRecordBatch.CODEC_SNAPPY, 2171068483, id="snappy"),
17+
# Checksum is
18+
# 462121143 with content size (header = 01101000)
19+
# 1260758266 without content size (header = 01100000)
20+
pytest.param(DefaultRecordBatch.CODEC_LZ4, 1260758266, id="lz4"),
21+
pytest.param(DefaultRecordBatch.CODEC_ZSTD, 1714138923, id="zstd"),
1922
])
2023
def test_read_write_serde_v2(compression_type, crc):
2124
builder = DefaultRecordBatchBuilder(

tests/test_codec.py

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
import platform
21
import struct
32

43
import pytest
@@ -88,10 +87,7 @@ def test_snappy_encode_xerial():
8887
assert compressed == to_ensure
8988

9089

91-
@pytest.mark.skipif(
92-
not has_lz4() or platform.python_implementation() == "PyPy",
93-
reason="python-lz4 crashes on old versions of pypy",
94-
)
90+
@pytest.mark.skipif(not has_lz4(), reason="LZ4 not available")
9591
def test_lz4():
9692
for i in range(1000):
9793
b1 = random_string(100)
@@ -100,10 +96,7 @@ def test_lz4():
10096
assert b1 == b2
10197

10298

103-
@pytest.mark.skipif(
104-
not has_lz4() or platform.python_implementation() == "PyPy",
105-
reason="python-lz4 crashes on old versions of pypy",
106-
)
99+
@pytest.mark.skipif(not has_lz4(), reason="LZ4 not available")
107100
def test_lz4_incremental():
108101
for i in range(1000):
109102
# lz4 max single block size is 4MB

0 commit comments

Comments
 (0)