Skip to content

Commit 72a6c12

Browse files
authored
[Improvement](shuffle) add Crc32CHashPartitioner (#59052)
add Crc32CHashPartitioner <img width="596" height="4284" alt="图片" src="https://github.com/user-attachments/assets/5773ea04-b01a-4c8c-ba5a-0c725cb11f11" /> This pull request refactors the codebase to standardize the usage of the CRC32C checksum library by replacing the custom `util/crc32c.h` header and its functions with the upstream `crc32c` library (`<crc32c/crc32c.h>`) and its API. It also updates function calls to use the correct data types expected by the new library and ensures consistent checksum calculation across multiple modules related to file I/O, compression, and storage. **Migration to Upstream CRC32C Library** * Replaced all includes of `"util/crc32c.h"` with `<crc32c/crc32c.h>` and removed the custom header from all relevant files. [[1]](diffhunk://#diff-0572424f9b6fe1561e15b070c1155b1b8f9272499029d425ff5a8d0e0aa8f40fL24) [[2]](diffhunk://#diff-a4327d67c48e4a4115a1ac9bc0a82a646bbfcb141d80f5f428142f55027e16a1R20-L21) [[3]](diffhunk://#diff-f46297d8957a9929f575febc300a004c144e106ea6893f1b95508ab006503407R18-L23) [[4]](diffhunk://#diff-3ef6a4f806adc33273c229fbdb827c072152651d5930b19affde4c1f8984c51cR20-L23) [[5]](diffhunk://#diff-52cdc310f4ed34081299dff53c543455745a834dbc5c50a2c21b765c0c90c3f8R18) [[6]](diffhunk://#diff-03a87568e2651d1524985a56a278f2e2932667c1e92efc60d0c5a750f0ad316bR20-R21) [[7]](diffhunk://#diff-23fa0193d626ba712c4186c66bcd1809c7e55bfc04ea10f5a91c691ed3e04727R21) [[8]](diffhunk://#diff-4dc7440cc992e7f9bdd8ec9c5bfc5a6194f9d78fc5ff359c4781d992df4e610bR20) [[9]](diffhunk://#diff-5eb6e846447db952b75ba0fd9bc1614702c428689c93e089a952ea414c23b7fdR20) [[10]](diffhunk://#diff-c33a6f975ebaa66163e68ba51a4d9ce0cbfd6b5d063edce503130d7bae502c53R20) [[11]](diffhunk://#diff-8061bb86d18c96049b63aa2caf4851933bff6b16cefa5460b1ee736d6f0ac883R27-R28) [[12]](diffhunk://#diff-9018eae3f9bef2cf64079552ce4d9c3fd3535a31b86a4ff496d29853c4968cb0R20) * Updated all function calls from `crc32c::Value(...)` to `crc32c::Crc32c(...)` for computing CRC32C checksums. [[1]](diffhunk://#diff-0572424f9b6fe1561e15b070c1155b1b8f9272499029d425ff5a8d0e0aa8f40fL120-R119) [[2]](diffhunk://#diff-a4327d67c48e4a4115a1ac9bc0a82a646bbfcb141d80f5f428142f55027e16a1L89-R90) [[3]](diffhunk://#diff-3ef6a4f806adc33273c229fbdb827c072152651d5930b19affde4c1f8984c51cL189-R190) [[4]](diffhunk://#diff-3ef6a4f806adc33273c229fbdb827c072152651d5930b19affde4c1f8984c51cL420-R421) [[5]](diffhunk://#diff-52cdc310f4ed34081299dff53c543455745a834dbc5c50a2c21b765c0c90c3f8L180-R180) [[6]](diffhunk://#diff-ea6232df0f48fea9e5403472da0bc4206acfd69b676c1b5fbc2d2df13df24624L149-R150) [[7]](diffhunk://#diff-5eb6e846447db952b75ba0fd9bc1614702c428689c93e089a952ea414c23b7fdL178-R181) [[8]](diffhunk://#diff-c33a6f975ebaa66163e68ba51a4d9ce0cbfd6b5d063edce503130d7bae502c53L472-R472) [[9]](diffhunk://#diff-8061bb86d18c96049b63aa2caf4851933bff6b16cefa5460b1ee736d6f0ac883L1158-R1159) * Updated all function calls from `crc32c::Extend(...)` to use the new function signature, casting data pointers to `const uint8_t*` as required by the upstream library. [[1]](diffhunk://#diff-f46297d8957a9929f575febc300a004c144e106ea6893f1b95508ab006503407L320-R321) [[2]](diffhunk://#diff-f46297d8957a9929f575febc300a004c144e106ea6893f1b95508ab006503407L369-R370) [[3]](diffhunk://#diff-ea6232df0f48fea9e5403472da0bc4206acfd69b676c1b5fbc2d2df13df24624L103-R104) [[4]](diffhunk://#diff-23fa0193d626ba712c4186c66bcd1809c7e55bfc04ea10f5a91c691ed3e04727L2037-R2037) [[5]](diffhunk://#diff-4dc7440cc992e7f9bdd8ec9c5bfc5a6194f9d78fc5ff359c4781d992df4e610bL734-R734) **Checksum Calculation Logic** * Modified checksum calculation for multi-slice data by iteratively using `crc32c::Extend` over each slice, ensuring correct cumulative checksum computation. * Updated checksum verification logic to use the new API and data types, improving reliability and consistency across modules. [[1]](diffhunk://#diff-0572424f9b6fe1561e15b070c1155b1b8f9272499029d425ff5a8d0e0aa8f40fL120-R119) [[2]](diffhunk://#diff-a4327d67c48e4a4115a1ac9bc0a82a646bbfcb141d80f5f428142f55027e16a1L89-R90) [[3]](diffhunk://#diff-3ef6a4f806adc33273c229fbdb827c072152651d5930b19affde4c1f8984c51cL189-R190) [[4]](diffhunk://#diff-3ef6a4f806adc33273c229fbdb827c072152651d5930b19affde4c1f8984c51cL420-R421) [[5]](diffhunk://#diff-52cdc310f4ed34081299dff53c543455745a834dbc5c50a2c21b765c0c90c3f8L180-R180) [[6]](diffhunk://#diff-ea6232df0f48fea9e5403472da0bc4206acfd69b676c1b5fbc2d2df13df24624L149-R150) [[7]](diffhunk://#diff-5eb6e846447db952b75ba0fd9bc1614702c428689c93e089a952ea414c23b7fdL178-R181) [[8]](diffhunk://#diff-c33a6f975ebaa66163e68ba51a4d9ce0cbfd6b5d063edce503130d7bae502c53L472-R472) [[9]](diffhunk://#diff-8061bb86d18c96049b63aa2caf4851933bff6b16cefa5460b1ee736d6f0ac883L1158-R1159) **Code Clean-up and Consistency** * Removed all redundant or obsolete includes of the custom `crc32c.h` header. [[1]](diffhunk://#diff-a4327d67c48e4a4115a1ac9bc0a82a646bbfcb141d80f5f428142f55027e16a1R20-L21) [[2]](diffhunk://#diff-f46297d8957a9929f575febc300a004c144e106ea6893f1b95508ab006503407R18-L23) [[3]](diffhunk://#diff-3ef6a4f806adc33273c229fbdb827c072152651d5930b19affde4c1f8984c51cR20-L23) [[4]](diffhunk://#diff-52cdc310f4ed34081299dff53c543455745a834dbc5c50a2c21b765c0c90c3f8L30) [[5]](diffhunk://#diff-03a87568e2651d1524985a56a278f2e2932667c1e92efc60d0c5a750f0ad316bL30) [[6]](diffhunk://#diff-23fa0193d626ba712c4186c66bcd1809c7e55bfc04ea10f5a91c691ed3e04727L51) [[7]](diffhunk://#diff-4dc7440cc992e7f9bdd8ec9c5bfc5a6194f9d78fc5ff359c4781d992df4e610bL49) [[8]](diffhunk://#diff-5eb6e846447db952b75ba0fd9bc1614702c428689c93e089a952ea414c23b7fdL44) [[9]](diffhunk://#diff-c33a6f975ebaa66163e68ba51a4d9ce0cbfd6b5d063edce503130d7bae502c53L69) [[10]](diffhunk://#diff-8061bb86d18c96049b63aa2caf4851933bff6b16cefa5460b1ee736d6f0ac883L65) * Ensured all modules that require CRC32C now directly depend on the upstream library, reducing maintenance overhead and potential for bugs. (all above references) These changes collectively improve code maintainability, reliability, and alignment with upstream best practices for CRC32C checksum operations.
1 parent 235d8d9 commit 72a6c12

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

54 files changed

+550
-194
lines changed

be/src/cloud/delete_bitmap_file_reader.cpp

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
#include "common/status.h"
2222
#include "io/fs/file_reader.h"
2323
#include "util/coding.h"
24-
#include "util/crc32c.h"
2524

2625
namespace doris {
2726
#include "common/compile_check_begin.h"
@@ -117,7 +116,7 @@ Status DeleteBitmapFileReader::read(DeleteBitmapPB& delete_bitmap) {
117116
offset, {checksum_len_buf, DeleteBitmapFileWriter::CHECKSUM_SIZE}, &bytes_read));
118117
offset += DeleteBitmapFileWriter::CHECKSUM_SIZE;
119118
uint32_t checksum = decode_fixed32_le(checksum_len_buf);
120-
uint32_t computed_checksum = crc32c::Value(delete_bitmap_buf.data(), delete_bitmap_len);
119+
uint32_t computed_checksum = crc32c::Crc32c(delete_bitmap_buf.data(), delete_bitmap_len);
121120
if (computed_checksum != checksum) {
122121
return Status::InternalError("delete bitmap checksum failed from file=" + _path +
123122
", computed checksum=" + std::to_string(computed_checksum) +

be/src/cloud/delete_bitmap_file_writer.cpp

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,9 @@
1717

1818
#include "cloud/delete_bitmap_file_writer.h"
1919

20+
#include <crc32c/crc32c.h>
21+
2022
#include "io/fs/file_writer.h"
21-
#include "util/crc32c.h"
2223

2324
namespace doris {
2425
#include "common/compile_check_begin.h"
@@ -86,7 +87,7 @@ Status DeleteBitmapFileWriter::write(const DeleteBitmapPB& delete_bitmap) {
8687

8788
// 3. write checksum
8889
uint8_t checksum_buf[CHECKSUM_SIZE];
89-
uint32_t checksum = crc32c::Value(content.data(), delete_bitmap_len);
90+
uint32_t checksum = crc32c::Crc32c(content.data(), delete_bitmap_len);
9091
encode_fixed32_le(checksum_buf, checksum);
9192
RETURN_IF_ERROR(_file_writer->append({checksum_buf, CHECKSUM_SIZE}));
9293
return Status::OK();

be/src/exec/lzo_decompressor.cpp

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,13 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18+
#include <crc32c/crc32c.h>
19+
1820
#include "common/cast_set.h"
1921
#include "common/logging.h"
2022
#include "exec/decompressor.h"
2123
#include "olap/utils.h"
2224
#include "orc/Exceptions.hh"
23-
#include "util/crc32c.h"
2425

2526
namespace orc {
2627
/**
@@ -317,7 +318,7 @@ Status LzopDecompressor::parse_header_info(uint8_t* input, size_t input_len,
317318
uint32_t computed_checksum;
318319
if (_header_info.header_checksum_type == CHECK_CRC32) {
319320
computed_checksum = CRC32_INIT_VALUE;
320-
computed_checksum = crc32c::Extend(computed_checksum, (const char*)header, cur - header);
321+
computed_checksum = crc32c::Extend(computed_checksum, (const uint8_t*)header, cur - header);
321322
} else {
322323
computed_checksum = ADLER32_INIT_VALUE;
323324
computed_checksum = olap_adler32(computed_checksum, (const char*)header, cur - header);
@@ -366,7 +367,7 @@ Status LzopDecompressor::checksum(LzoChecksum type, const std::string& source, u
366367
case CHECK_NONE:
367368
return Status::OK();
368369
case CHECK_CRC32:
369-
computed_checksum = crc32c::Extend(CRC32_INIT_VALUE, (const char*)ptr, len);
370+
computed_checksum = crc32c::Extend(CRC32_INIT_VALUE, (const uint8_t*)ptr, len);
370371
break;
371372
case CHECK_ADLER:
372373
computed_checksum = olap_adler32(ADLER32_INIT_VALUE, (const char*)ptr, len);

be/src/exprs/block_bloom_filter.hpp

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020

2121
#pragma once
2222

23+
#include <crc32c/crc32c.h>
24+
2325
#include "vec/common/string_ref.h"
2426
#ifdef __AVX2__
2527
#include <immintrin.h>
@@ -28,7 +30,6 @@
2830
#endif
2931

3032
#include "common/status.h"
31-
#include "util/hash_util.hpp"
3233
#include "util/slice.h"
3334

3435
namespace butil {
@@ -76,7 +77,7 @@ class BlockBloomFilter {
7677
// Same as above with convenience of hashing the key.
7778
void insert(const StringRef& key) noexcept {
7879
if (key.data) {
79-
insert(HashUtil::crc32c_hash(key.data, uint32_t(key.size), _hash_seed));
80+
insert(crc32c::Extend(_hash_seed, (const uint8_t*)key.data, uint32_t(key.size)));
8081
}
8182
}
8283

@@ -105,7 +106,7 @@ class BlockBloomFilter {
105106
// Same as above with convenience of hashing the key.
106107
bool find(const StringRef& key) const noexcept {
107108
if (key.data) {
108-
return find(HashUtil::crc32c_hash(key.data, uint32_t(key.size), _hash_seed));
109+
return find(crc32c::Extend(_hash_seed, (const uint8_t*)key.data, uint32_t(key.size)));
109110
}
110111
return false;
111112
}

be/src/io/cache/cache_lru_dumper.cpp

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,11 @@
1717

1818
#include "io/cache/cache_lru_dumper.h"
1919

20+
#include <crc32c/crc32c.h>
21+
2022
#include "io/cache/block_file_cache.h"
2123
#include "io/cache/lru_queue_recorder.h"
2224
#include "util/coding.h"
23-
#include "util/crc32c.h"
2425
#include "vec/common/endian.h"
2526

2627
namespace doris::io {
@@ -186,7 +187,7 @@ Status CacheLRUDumper::flush_current_group(std::ofstream& out, std::string& file
186187
::doris::io::cache::EntryGroupOffsetSizePb* group_info = _dump_meta.add_group_offset_size();
187188
group_info->set_offset(group_start);
188189
group_info->set_size(serialized.size());
189-
uint32_t checksum = crc32c::Value(serialized.data(), serialized.size());
190+
uint32_t checksum = crc32c::Crc32c(serialized.data(), serialized.size());
190191
group_info->set_checksum(checksum);
191192

192193
// Reset for next group
@@ -417,7 +418,7 @@ Status CacheLRUDumper::parse_one_lru_entry(std::ifstream& in, std::string& filen
417418
std::string group_serialized(group_info.size(), '\0');
418419
in.read(&group_serialized[0], group_serialized.size());
419420
RETURN_IF_ERROR(check_ifstream_status(in, filename));
420-
uint32_t checksum = crc32c::Value(group_serialized.data(), group_serialized.size());
421+
uint32_t checksum = crc32c::Crc32c(group_serialized.data(), group_serialized.size());
421422
if (checksum != group_info.checksum()) {
422423
std::string warn_msg =
423424
fmt::format("restore lru failed as checksum not match, file={}", filename);

be/src/io/cache/file_cache_lru_tool.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18+
#include <crc32c/crc32c.h>
1819
#include <gflags/gflags.h>
1920

2021
#include <fstream>
@@ -27,7 +28,6 @@
2728
#include "io/cache/file_cache_common.h"
2829
#include "io/cache/lru_queue_recorder.h"
2930
#include "util/coding.h"
30-
#include "util/crc32c.h"
3131

3232
using namespace doris;
3333

@@ -177,7 +177,7 @@ Status parse_one_lru_entry(std::ifstream& in, std::string& filename, io::UInt128
177177
std::string group_serialized(group_info.size(), '\0');
178178
in.read(&group_serialized[0], group_serialized.size());
179179
RETURN_IF_ERROR(check_ifstream_status(in, filename));
180-
uint32_t checksum = crc32c::Value(group_serialized.data(), group_serialized.size());
180+
uint32_t checksum = crc32c::Crc32c(group_serialized.data(), group_serialized.size());
181181
if (checksum != group_info.checksum()) {
182182
std::string warn_msg =
183183
fmt::format("restore lru failed as checksum not match, file={}", filename);

be/src/io/fs/s3_file_bufferpool.cpp

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
#include "s3_file_bufferpool.h"
1919

2020
#include <bvar/bvar.h>
21+
#include <crc32c/crc32c.h>
2122

2223
#include <chrono>
2324
#include <memory>
@@ -100,7 +101,7 @@ Status UploadFileBuffer::append_data(const Slice& data) {
100101
data.get_size());
101102
std::memcpy((void*)(_inner_data->data().get_data() + _size), data.get_data(), data.get_size());
102103
_size += data.get_size();
103-
_crc_value = crc32c::Extend(_crc_value, data.get_data(), data.get_size());
104+
_crc_value = crc32c::Extend(_crc_value, (const uint8_t*)data.get_data(), data.get_size());
104105
return Status::OK();
105106
}
106107

@@ -146,7 +147,7 @@ std::string_view FileBuffer::get_string_view_data() const {
146147

147148
void UploadFileBuffer::on_upload() {
148149
_stream_ptr = std::make_shared<StringViewStream>(_inner_data->data().get_data(), _size);
149-
if (_crc_value != crc32c::Value(_inner_data->data().get_data(), _size)) {
150+
if (_crc_value != crc32c::Crc32c(_inner_data->data().get_data(), _size)) {
150151
DCHECK(false);
151152
set_status(Status::IOError("Buffer checksum not match"));
152153
return;

be/src/io/fs/s3_file_bufferpool.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
#pragma once
1919

20+
#include <crc32c/crc32c.h>
21+
2022
#include <condition_variable>
2123
#include <cstdint>
2224
#include <fstream>
@@ -27,7 +29,6 @@
2729

2830
#include "common/status.h"
2931
#include "io/cache/file_block.h"
30-
#include "util/crc32c.h"
3132
#include "util/slice.h"
3233
#include "util/threadpool.h"
3334

be/src/olap/base_tablet.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
#include "olap/base_tablet.h"
1919

2020
#include <bthread/mutex.h>
21+
#include <crc32c/crc32c.h>
2122
#include <fmt/format.h>
2223
#include <rapidjson/prettywriter.h>
2324

@@ -48,7 +49,6 @@
4849
#include "olap/txn_manager.h"
4950
#include "service/point_query_executor.h"
5051
#include "util/bvar_helper.h"
51-
#include "util/crc32c.h"
5252
#include "util/debug_points.h"
5353
#include "util/doris_metrics.h"
5454
#include "util/key_util.h"
@@ -2024,7 +2024,7 @@ Status BaseTablet::calc_file_crc(uint32_t* crc_value, int64_t start_version, int
20242024
return st;
20252025
}
20262026
// crc_value is calculated based on the crc_value of each rowset.
2027-
*crc_value = crc32c::Extend(*crc_value, reinterpret_cast<const char*>(&rs_crc_value),
2027+
*crc_value = crc32c::Extend(*crc_value, reinterpret_cast<const uint8_t*>(&rs_crc_value),
20282028
sizeof(rs_crc_value));
20292029
*file_count += rs_file_count;
20302030
}

be/src/olap/rowset/beta_rowset.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
#include "olap/rowset/beta_rowset.h"
1919

20+
#include <crc32c/crc32c.h>
2021
#include <ctype.h>
2122
#include <errno.h>
2223
#include <fmt/format.h>
@@ -46,7 +47,6 @@
4647
#include "olap/segment_loader.h"
4748
#include "olap/tablet_schema.h"
4849
#include "olap/utils.h"
49-
#include "util/crc32c.h"
5050
#include "util/debug_points.h"
5151
#include "util/doris_metrics.h"
5252

@@ -731,7 +731,7 @@ Status BetaRowset::calc_file_crc(uint32_t* crc_value, int64_t* file_count) {
731731
// 3. calculate the crc_value based on all_file_md5
732732
DCHECK(file_paths.size() == all_file_md5.size());
733733
for (auto& i : all_file_md5) {
734-
*crc_value = crc32c::Extend(*crc_value, i.data(), i.size());
734+
*crc_value = crc32c::Extend(*crc_value, (const uint8_t*)i.data(), i.size());
735735
}
736736

737737
return Status::OK();

0 commit comments

Comments
 (0)