Skip to content

Commit 949a0c7

Browse files
Fix zstd multi-frame decompression failure (#2717)
1 parent 3ee9ef3 commit 949a0c7

File tree

2 files changed

+13
-4
lines changed

2 files changed

+13
-4
lines changed

kafka/codec.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -319,7 +319,5 @@ def zstd_encode(payload):
319319
def zstd_decode(payload):
320320
if not zstd:
321321
raise NotImplementedError("Zstd codec is not available")
322-
try:
323-
return zstd.ZstdDecompressor().decompress(payload)
324-
except zstd.ZstdError:
325-
return zstd.ZstdDecompressor().decompress(payload, max_output_size=ZSTD_MAX_OUTPUT_SIZE)
322+
with zstd.ZstdDecompressor().stream_reader(io.BytesIO(payload), read_across_frames=True) as reader:
323+
return reader.read()

test/test_codec.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,3 +121,14 @@ def test_zstd():
121121
b1 = random_string(100).encode('utf-8')
122122
b2 = zstd_decode(zstd_encode(b1))
123123
assert b1 == b2
124+
125+
126+
@pytest.mark.skipif(not has_zstd(), reason="Zstd not available")
127+
def test_zstd_multi_frame():
128+
"""Test that zstd_decode handles multiple concatenated zstd frames."""
129+
frame1_data = b'some payload data ' * 100
130+
frame2_data = b'another frame of data ' * 100
131+
# Concatenate two independently compressed zstd frames
132+
multi_frame_payload = zstd_encode(frame1_data) + zstd_encode(frame2_data)
133+
result = zstd_decode(multi_frame_payload)
134+
assert result == frame1_data + frame2_data

0 commit comments

Comments
 (0)