Skip to content

Commit 855abeb

Browse files
authored
Merge pull request #25112 from WillemKauf/manual-backport-25092-v24.2.x-458
[v24.2.x] `compression`: correct endianness in `snappy_java_compressor` (Manual backport) Force-merging per Willem request 5:09 PM EST Slack channel private-team-code-storage
2 parents 8b86e1d + 2ca5874 commit 855abeb

File tree

10 files changed

+367
-23
lines changed

10 files changed

+367
-23
lines changed

src/v/compression/include/compression/internal/snappy_java_compressor.h

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,21 @@
1515

1616
namespace compression::internal {
1717
struct snappy_java_compressor {
18+
struct snappy_magic {
19+
static const constexpr std::array<uint8_t, 8> java_magic = {
20+
0x82, 'S', 'N', 'A', 'P', 'P', 'Y', 0};
21+
// Previously, these version fields were erroneously written with
22+
// little-endian encoding. They are now corrected to be written and
23+
// decoded using big-endian, but we must retain backwards compatibility
24+
// with the existing, improperly encoded batches.
25+
static const constexpr int32_t default_version = 1;
26+
static const constexpr int32_t min_compatible_version = 1;
27+
static const constexpr size_t header_len = java_magic.size()
28+
+ sizeof(default_version)
29+
+ sizeof(
30+
min_compatible_version);
31+
};
32+
1833
static iobuf compress(const iobuf&);
1934
static iobuf uncompress(const iobuf&);
2035
};

src/v/compression/internal/snappy_java_compressor.cc

Lines changed: 11 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -22,15 +22,6 @@
2222
#include <snappy.h>
2323

2424
namespace compression::internal {
25-
struct snappy_magic {
26-
static const constexpr std::array<uint8_t, 8> java_magic = {
27-
0x82, 'S', 'N', 'A', 'P', 'P', 'Y', 0};
28-
static const constexpr int32_t default_version = 1;
29-
static const constexpr int32_t min_compatible_version = 1;
30-
static const constexpr size_t header_len = java_magic.size()
31-
+ sizeof(default_version)
32-
+ sizeof(min_compatible_version);
33-
};
3425

3526
size_t find_max_size_in_frags(const iobuf& x) {
3627
size_t ret = 0;
@@ -58,8 +49,10 @@ iobuf snappy_java_compressor::compress(const iobuf& x) {
5849
iobuf ret;
5950
ret.append(
6051
snappy_magic::java_magic.data(), snappy_magic::java_magic.size());
61-
append_le(ret, snappy_magic::default_version);
62-
append_le(ret, snappy_magic::min_compatible_version);
52+
// versions in header are big-endian. See:
53+
// https://github.com/xerial/snappy-java/blob/65e1ec3de1a0d447b137c6dd6393629aa3d75b8b/src/main/java/org/xerial/snappy/SnappyCodec.java#L78-L81
54+
append_be(ret, snappy_magic::default_version);
55+
append_be(ret, snappy_magic::min_compatible_version);
6356
// staging buffer
6457
ss::temporary_buffer<char> obuf(find_max_size_in_frags(x));
6558
for (const auto& f : x) {
@@ -82,17 +75,13 @@ iobuf snappy_java_compressor::uncompress(const iobuf& x) {
8275
if (unlikely(snappy_magic::java_magic != magic_compare)) {
8376
return snappy_standard_compressor::uncompress(x);
8477
}
85-
// NOTE: version and min_version are LITTLE_ENDIAN!
86-
const auto version = iter.consume_type<int32_t>();
87-
const auto min_version = iter.consume_type<int32_t>();
88-
if (unlikely(min_version < snappy_magic::min_compatible_version)) {
89-
throw std::runtime_error(fmt_with_ctx(
90-
fmt::format,
91-
"version missmatch. iobuf: {} - version:{}, min_version:{}",
92-
x,
93-
version,
94-
min_version));
95-
}
78+
// Previously, these version fields were erroneously written with
79+
// little-endian encoding. They are now corrected to be written and decoded
80+
// using big-endian. Additionally, there was previously a version check
81+
// here. It has been removed due to incorrect implementation, and because
82+
// most other snappy clients do not perform checks around these fields.
83+
[[maybe_unused]] const auto version = iter.consume_be_type<int32_t>();
84+
[[maybe_unused]] const auto min_version = iter.consume_be_type<int32_t>();
9685
// stream decoder next
9786
iobuf ret;
9887
const size_t input_bytes = x.size_bytes();

src/v/compression/tests/BUILD

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,32 @@ redpanda_cc_gtest(
1919
],
2020
)
2121

22+
redpanda_cc_gtest(
23+
name = "snappy_test",
24+
timeout = "short",
25+
srcs = [
26+
"snappy_tests.cc",
27+
],
28+
cpu = 1,
29+
data = [
30+
"//src/v/compression/tests/snappy_payload:little_endian_compressed_data.snappy",
31+
"//src/v/compression/tests/snappy_payload:uncompressed_data",
32+
],
33+
env = {"SNAPPY_PAYLOAD_PATH": "src/v/compression/tests/snappy_payload"},
34+
deps = [
35+
"//src/v/base",
36+
"//src/v/bytes:iobuf",
37+
"//src/v/bytes:iostream",
38+
"//src/v/compression",
39+
"//src/v/random:generators",
40+
"//src/v/test_utils:gtest",
41+
"//src/v/utils:file_io",
42+
"@googletest//:gtest",
43+
"@seastar",
44+
"@snappy",
45+
],
46+
)
47+
2248
redpanda_cc_btest(
2349
name = "zstd_test",
2450
timeout = "short",

src/v/compression/tests/CMakeLists.txt

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,3 +34,19 @@ rp_test(
3434
LABELS compression
3535
ARGS "-- -c 1"
3636
)
37+
38+
set(SNAPPY_PAYLOAD_PATH "${CMAKE_CURRENT_SOURCE_DIR}/snappy_payload")
39+
40+
rp_test(
41+
UNIT_TEST
42+
GTEST
43+
BINARY_NAME snappy_tests
44+
SOURCES snappy_tests.cc
45+
LIBRARIES v::compression v::random v::gtest_main v::utils
46+
LABELS compression
47+
ARGS "-- -c 1"
48+
INPUT_FILES
49+
"${SNAPPY_PAYLOAD_PATH}/uncompressed_data"
50+
"${SNAPPY_PAYLOAD_PATH}/little_endian_compressed_data.snappy"
51+
ENV "SNAPPY_PAYLOAD_PATH="
52+
)
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
exports_files([
2+
"uncompressed_data",
3+
"little_endian_compressed_data.snappy",
4+
])
Binary file not shown.
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
iXXf8ieqH0vIe65zxnKMN4578QHDxqFJgo38wPtJzp0AD1MvsLHUIggX2MGazSEznDhpUnUMVgVVs6ObsUKUWpwj4QNcPQUYLqYUWbDjvSv5SBPMeH2HtYNcvr4urkkfDh6qm1CA0lcOyUziWiIpMh5qLylBYDbPA44e1TlpKF3qy6psrtqgsEDS2gsGcZVluPUnfpKATW6aZ3BvUwYWeybCSfQrZRH0vVzoPuAgdmeWqXDNrkfZoLxxpAbgVcejU5AgG8n2wxrOjrMfi20AmnuaWIT1LECnMWFzbKkyGYI4v7Xhv4URTBv033EYKnVdqthPWZHLwQgVVyTxNjCT0drSS5W6KkcyYykyJxsH2CexbsDj7ZmVOYQ5FW4ShXBluHKD8eqhBzeYHTSFoNUqxfOl3IE48PfJyqb5t2fqIkZvcw4OSrhupxFFEEoAt8aaypTleU7Dg50sjMRVt1n7Wewiu1RlCignU74yKcyMSPwDYSeWwOHSfYxZaij8Pk7D
Lines changed: 171 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,171 @@
1+
// Copyright 2024 Redpanda Data, Inc.
2+
//
3+
// Use of this software is governed by the Business Source License
4+
// included in the file licenses/BSL.md
5+
//
6+
// As of the Change Date specified in that file, in accordance with
7+
// the Business Source License, use of this software will be governed
8+
// by the Apache License, Version 2.0
9+
10+
#include "bytes/iobuf.h"
11+
#include "bytes/iostream.h"
12+
#include "compression/internal/snappy_java_compressor.h"
13+
#include "compression/snappy_standard_compressor.h"
14+
#include "random/generators.h"
15+
#include "utils/file_io.h"
16+
17+
#include <seastar/core/byteorder.hh>
18+
#include <seastar/core/seastar.hh>
19+
#include <seastar/core/temporary_buffer.hh>
20+
#include <seastar/util/short_streams.hh>
21+
22+
#include <gtest/gtest.h>
23+
24+
#include <cstdlib>
25+
#include <snappy-sinksource.h>
26+
#include <snappy.h>
27+
28+
TEST(SnappyTest, CompressAndDecompressSnappyStandardTest) {
29+
const auto data = random_generators::gen_alphanum_string(512);
30+
31+
iobuf buf;
32+
buf.append(data.data(), data.size());
33+
auto compressed_buf = compression::snappy_standard_compressor::compress(
34+
buf);
35+
auto decompressed_buf = compression::snappy_standard_compressor::uncompress(
36+
compressed_buf);
37+
EXPECT_EQ(buf, decompressed_buf);
38+
}
39+
40+
TEST(SnappyTest, CompressAndDecompressSnappyJavaTest) {
41+
const auto data = random_generators::gen_alphanum_string(512);
42+
43+
iobuf buf;
44+
buf.append(data.data(), data.size());
45+
auto compressed_buf
46+
= compression::internal::snappy_java_compressor::compress(buf);
47+
auto decompressed_buf
48+
= compression::internal::snappy_java_compressor::uncompress(
49+
compressed_buf);
50+
EXPECT_EQ(buf, decompressed_buf);
51+
}
52+
53+
TEST(SnappyTest, SnappyStandardIsValidCompressedTest) {
54+
const auto data = random_generators::gen_alphanum_string(512);
55+
56+
iobuf buf;
57+
buf.append(data.data(), data.size());
58+
auto compressed_buf = compression::snappy_standard_compressor::compress(
59+
buf);
60+
61+
EXPECT_EQ(
62+
snappy::IsValidCompressedBuffer(
63+
compressed_buf.begin()->get(), compressed_buf.size_bytes()),
64+
true);
65+
}
66+
67+
TEST(SnappyTest, CompressedVersionHeadersSnappyJavaTest) {
68+
// snappy-java uses big-endian format to encode headers. See:
69+
// https://github.com/xerial/snappy-java/blob/65e1ec3de1a0d447b137c6dd6393629aa3d75b8b/src/main/java/org/xerial/snappy/SnappyOutputStream.java#L343-L349
70+
// https://github.com/xerial/snappy-java/blob/65e1ec3de1a0d447b137c6dd6393629aa3d75b8b/src/main/java/org/xerial/snappy/SnappyCodec.java#L78-L81
71+
const auto data = random_generators::gen_alphanum_string(512);
72+
73+
iobuf buf;
74+
buf.append(data.data(), data.size());
75+
auto compressed_buf
76+
= compression::internal::snappy_java_compressor::compress(buf);
77+
auto magic_buf = ss::temporary_buffer<char>(
78+
compression::internal::snappy_java_compressor::snappy_magic::java_magic
79+
.size());
80+
auto compressed_frag = compressed_buf.begin();
81+
82+
// Check the magic header
83+
EXPECT_EQ(
84+
std::memcmp(
85+
compressed_frag->get(),
86+
compression::internal::snappy_java_compressor::snappy_magic::java_magic
87+
.begin(),
88+
compression::internal::snappy_java_compressor::snappy_magic::java_magic
89+
.size()),
90+
0);
91+
compressed_frag->trim_front(
92+
compression::internal::snappy_java_compressor::snappy_magic::java_magic
93+
.size());
94+
95+
// Check the default version
96+
auto be_default_version = ss::cpu_to_be(
97+
compression::internal::snappy_java_compressor::snappy_magic::
98+
default_version);
99+
EXPECT_EQ(
100+
std::memcmp(
101+
compressed_frag->get(),
102+
reinterpret_cast<const char*>(&be_default_version),
103+
sizeof(compression::internal::snappy_java_compressor::snappy_magic::
104+
default_version)),
105+
0);
106+
107+
compressed_frag->trim_front(
108+
sizeof(compression::internal::snappy_java_compressor::snappy_magic::
109+
default_version));
110+
111+
// Check the compat version
112+
auto be_compat_version = ss::cpu_to_be(
113+
compression::internal::snappy_java_compressor::snappy_magic::
114+
min_compatible_version);
115+
EXPECT_EQ(
116+
std::memcmp(
117+
compressed_frag->get(),
118+
reinterpret_cast<const char*>(&be_compat_version),
119+
sizeof(compression::internal::snappy_java_compressor::snappy_magic::
120+
min_compatible_version)),
121+
0);
122+
123+
compressed_frag->trim_front(
124+
sizeof(compression::internal::snappy_java_compressor::snappy_magic::
125+
min_compatible_version));
126+
127+
// Check the size of the compressed payload
128+
int32_t be_compressed_size{};
129+
std::memcpy(
130+
&be_compressed_size, compressed_frag->get(), sizeof(be_compressed_size));
131+
int32_t compressed_size = ss::be_to_cpu(be_compressed_size);
132+
compressed_frag->trim_front(sizeof(compressed_size));
133+
EXPECT_EQ(compressed_size, compressed_frag->size());
134+
135+
// Get the size of the decompressed payload
136+
snappy::ByteArraySource compressed_source(
137+
compressed_frag->get(), compressed_size);
138+
uint32_t decompressed_size;
139+
snappy::GetUncompressedLength(&compressed_source, &decompressed_size);
140+
EXPECT_EQ(decompressed_size, data.size());
141+
compressed_frag->trim_front(sizeof(decompressed_size));
142+
}
143+
144+
TEST(SnappyTest, LittleEndianHeadersBackwardsCompatibilitySnappyJavaTest) {
145+
// Previously, version fields were erroneously written with
146+
// little-endian encoding. They are now corrected to be written and decoded
147+
// using big-endian, but we must retain backwards compatibility here with
148+
// the existing, improperly encoded batches (as version, min_version fields
149+
// with value 1 will decode to the value 16777216).
150+
// See: https://github.com/redpanda-data/redpanda/issues/25091
151+
auto snappy_payload_path = std::getenv("SNAPPY_PAYLOAD_PATH");
152+
vassert(snappy_payload_path, "expected value for payload path");
153+
auto root = std::filesystem::path(snappy_payload_path);
154+
155+
// The original, uncompressed data.
156+
auto expected_decompressed_file = root / "uncompressed_data";
157+
EXPECT_TRUE(ss::file_exists(expected_decompressed_file.c_str()).get());
158+
auto expected_decompressed_buffer
159+
= read_fully(expected_decompressed_file.native()).get();
160+
161+
// A payload that was previously compressed by redpanda, with version
162+
// headers in little-endian encoding.
163+
auto le_compressed_file = root / "little_endian_compressed_data.snappy";
164+
EXPECT_TRUE(ss::file_exists(le_compressed_file.c_str()).get());
165+
auto le_compressed_buffer = read_fully(le_compressed_file.native()).get();
166+
167+
auto decompressed_buffer
168+
= compression::internal::snappy_java_compressor::uncompress(
169+
le_compressed_buffer);
170+
EXPECT_EQ(decompressed_buffer, expected_decompressed_buffer);
171+
}

0 commit comments

Comments
 (0)