Skip to content

Commit dd3b3ef

Browse files
luwei16liaoxin01
authored andcommitted
[fix](packed-file) fix issues in packed file recycler checker (apache#59153)
1 parent 4899dc7 commit dd3b3ef

File tree

14 files changed

+109
-52
lines changed

14 files changed

+109
-52
lines changed

be/src/cloud/config.cpp

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -140,8 +140,7 @@ DEFINE_mInt64(packed_file_time_threshold_ms, "100"); // 100ms
140140
DEFINE_mInt64(packed_file_try_lock_timeout_ms, "5"); // 5ms
141141
DEFINE_mInt64(packed_file_small_file_count_threshold, "100");
142142
DEFINE_mInt64(small_file_threshold_bytes, "1048576"); // 1MB
143-
DEFINE_mInt64(uploaded_file_retention_seconds, "60"); // 1 minute
144-
DEFINE_mInt64(index_retention_seconds, "60"); // 1 minute
143+
DEFINE_mInt64(uploaded_file_retention_seconds, "1800"); // 1 minute
145144
DEFINE_mInt64(packed_file_cleanup_interval_seconds, "60"); // 1 minute
146145

147146
DEFINE_mBool(enable_standby_passive_compaction, "true");

be/src/cloud/config.h

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,6 @@ DECLARE_mInt64(packed_file_try_lock_timeout_ms);
184184
DECLARE_mInt64(packed_file_small_file_count_threshold);
185185
DECLARE_mInt64(small_file_threshold_bytes);
186186
DECLARE_mInt64(uploaded_file_retention_seconds);
187-
DECLARE_mInt64(index_retention_seconds);
188187
DECLARE_mInt64(packed_file_cleanup_interval_seconds);
189188

190189
DECLARE_mBool(enable_standby_passive_compaction);

be/src/io/fs/packed_file_manager.cpp

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -85,24 +85,24 @@ Status append_packed_info_trailer(FileWriter* writer, const std::string& packed_
8585
packed_file_path);
8686
}
8787

88-
cloud::PackedFileDebugInfoPB debug_pb;
89-
debug_pb.mutable_packed_file_info()->CopyFrom(packed_file_info);
88+
cloud::PackedFileFooterPB footer_pb;
89+
footer_pb.mutable_packed_file_info()->CopyFrom(packed_file_info);
9090

91-
std::string serialized_debug_info;
92-
if (!debug_pb.SerializeToString(&serialized_debug_info)) {
93-
return Status::InternalError("Failed to serialize packed file debug info for {}",
91+
std::string serialized_footer;
92+
if (!footer_pb.SerializeToString(&serialized_footer)) {
93+
return Status::InternalError("Failed to serialize packed file footer info for {}",
9494
packed_file_path);
9595
}
9696

97-
if (serialized_debug_info.size() >
97+
if (serialized_footer.size() >
9898
std::numeric_limits<uint32_t>::max() - kPackedFileTrailerSuffixSize) {
99-
return Status::InternalError("PackedFileDebugInfoPB too large for {}", packed_file_path);
99+
return Status::InternalError("PackedFileFooterPB too large for {}", packed_file_path);
100100
}
101101

102102
std::string trailer;
103-
trailer.reserve(serialized_debug_info.size() + kPackedFileTrailerSuffixSize);
104-
trailer.append(serialized_debug_info);
105-
put_fixed32_le(&trailer, static_cast<uint32_t>(serialized_debug_info.size()));
103+
trailer.reserve(serialized_footer.size() + kPackedFileTrailerSuffixSize);
104+
trailer.append(serialized_footer);
105+
put_fixed32_le(&trailer, static_cast<uint32_t>(serialized_footer.size()));
106106
put_fixed32_le(&trailer, kPackedFileTrailerVersion);
107107

108108
return writer->append(Slice(trailer));

be/src/io/fs/packed_file_trailer.cpp

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525

2626
namespace doris::io {
2727

28-
Status parse_packed_file_trailer(std::string_view data, cloud::PackedFileDebugInfoPB* debug_pb,
28+
Status parse_packed_file_trailer(std::string_view data, cloud::PackedFileFooterPB* debug_pb,
2929
uint32_t* version) {
3030
if (debug_pb == nullptr || version == nullptr) {
3131
return Status::InvalidArgument("Output parameters must not be null");
@@ -39,14 +39,14 @@ Status parse_packed_file_trailer(std::string_view data, cloud::PackedFileDebugIn
3939
const uint32_t trailer_size = decode_fixed32_le(suffix_ptr);
4040
const uint32_t trailer_version = decode_fixed32_le(suffix_ptr + sizeof(uint32_t));
4141

42-
// Preferred format: [PackedFileDebugInfoPB][length][version]
42+
// Preferred format: [PackedFileFooterPB][length][version]
4343
if (trailer_size > 0 && trailer_size <= data.size() - kPackedFileTrailerSuffixSize) {
4444
const size_t payload_offset = data.size() - kPackedFileTrailerSuffixSize - trailer_size;
4545
std::string_view payload(data.data() + payload_offset, trailer_size);
4646
if (payload.size() > static_cast<size_t>(std::numeric_limits<int>::max())) {
4747
return Status::InternalError("Packed file trailer payload too large");
4848
}
49-
cloud::PackedFileDebugInfoPB parsed_pb;
49+
cloud::PackedFileFooterPB parsed_pb;
5050
if (parsed_pb.ParseFromArray(payload.data(), static_cast<int>(payload.size()))) {
5151
debug_pb->Swap(&parsed_pb);
5252
*version = trailer_version;
@@ -80,8 +80,8 @@ Status parse_packed_file_trailer(std::string_view data, cloud::PackedFileDebugIn
8080
return Status::OK();
8181
}
8282

83-
Status read_packed_file_trailer(const std::string& file_path,
84-
cloud::PackedFileDebugInfoPB* debug_pb, uint32_t* version) {
83+
Status read_packed_file_trailer(const std::string& file_path, cloud::PackedFileFooterPB* debug_pb,
84+
uint32_t* version) {
8585
if (debug_pb == nullptr || version == nullptr) {
8686
return Status::InvalidArgument("Output parameters must not be null");
8787
}

be/src/io/fs/packed_file_trailer.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,10 @@ namespace doris::io {
2929
constexpr uint32_t kPackedFileTrailerVersion = 1;
3030
constexpr size_t kPackedFileTrailerSuffixSize = sizeof(uint32_t) * 2;
3131

32-
Status parse_packed_file_trailer(std::string_view data, cloud::PackedFileDebugInfoPB* debug_pb,
32+
Status parse_packed_file_trailer(std::string_view data, cloud::PackedFileFooterPB* debug_pb,
3333
uint32_t* version);
3434

35-
Status read_packed_file_trailer(const std::string& file_path,
36-
cloud::PackedFileDebugInfoPB* debug_pb, uint32_t* version);
35+
Status read_packed_file_trailer(const std::string& file_path, cloud::PackedFileFooterPB* debug_pb,
36+
uint32_t* version);
3737

3838
} // namespace doris::io

be/src/tools/packed_file_tool.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ int main(int argc, char** argv) {
3838
return -1;
3939
}
4040

41-
doris::cloud::PackedFileDebugInfoPB debug_info;
41+
doris::cloud::PackedFileFooterPB debug_info;
4242
uint32_t version = 0;
4343
doris::Status st = doris::io::read_packed_file_trailer(FLAGS_file, &debug_info, &version);
4444
if (!st.ok()) {

be/test/io/fs/packed_file_manager_test.cpp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -608,14 +608,14 @@ TEST_F(PackedFileManagerTest, AppendPackedFileInfoToFileTail) {
608608
ASSERT_NE(writer, nullptr);
609609

610610
const auto& data = writer->written_data();
611-
cloud::PackedFileDebugInfoPB parsed_debug;
611+
cloud::PackedFileFooterPB parsed_footer;
612612
uint32_t version = 0;
613-
auto st = parse_packed_file_trailer(data, &parsed_debug, &version);
613+
auto st = parse_packed_file_trailer(data, &parsed_footer, &version);
614614
ASSERT_TRUE(st.ok()) << st;
615615
ASSERT_EQ(version, kPackedFileTrailerVersion);
616-
ASSERT_TRUE(parsed_debug.has_packed_file_info());
616+
ASSERT_TRUE(parsed_footer.has_packed_file_info());
617617

618-
const auto& parsed_info = parsed_debug.packed_file_info();
618+
const auto& parsed_info = parsed_footer.packed_file_info();
619619
ASSERT_EQ(parsed_info.slices_size(), 1);
620620
EXPECT_EQ(parsed_info.slices(0).path(), "trailer_path");
621621
EXPECT_EQ(parsed_info.slices(0).offset(), 0);

be/test/io/packed_file_trailer_test.cpp

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -57,21 +57,21 @@ TEST(PackedFileTrailerTest, ReadNewFormatTrailer) {
5757
slice->set_offset(10);
5858
slice->set_size(20);
5959

60-
cloud::PackedFileDebugInfoPB debug_pb;
61-
debug_pb.mutable_packed_file_info()->CopyFrom(info);
60+
cloud::PackedFileFooterPB footer_pb;
61+
footer_pb.mutable_packed_file_info()->CopyFrom(info);
6262

63-
std::string serialized_debug;
64-
ASSERT_TRUE(debug_pb.SerializeToString(&serialized_debug));
63+
std::string serialized_footer;
64+
ASSERT_TRUE(footer_pb.SerializeToString(&serialized_footer));
6565

6666
std::string file_content = "data";
67-
file_content.append(serialized_debug);
68-
put_fixed32_le(&file_content, static_cast<uint32_t>(serialized_debug.size()));
67+
file_content.append(serialized_footer);
68+
put_fixed32_le(&file_content, static_cast<uint32_t>(serialized_footer.size()));
6969
put_fixed32_le(&file_content, kPackedFileTrailerVersion);
7070

7171
auto path = unique_temp_file();
7272
write_file(path, file_content);
7373

74-
cloud::PackedFileDebugInfoPB parsed;
74+
cloud::PackedFileFooterPB parsed;
7575
uint32_t version = 0;
7676
Status st = read_packed_file_trailer(path, &parsed, &version);
7777
ASSERT_TRUE(st.ok()) << st;
@@ -101,7 +101,7 @@ TEST(PackedFileTrailerTest, ReadLegacyTrailer) {
101101
auto path = unique_temp_file();
102102
write_file(path, file_content);
103103

104-
cloud::PackedFileDebugInfoPB parsed;
104+
cloud::PackedFileFooterPB parsed;
105105
uint32_t version = 0;
106106
Status st = read_packed_file_trailer(path, &parsed, &version);
107107
ASSERT_TRUE(st.ok()) << st;

cloud/src/common/config.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,7 @@ CONF_mInt32(packed_file_txn_retry_times, "10");
120120
// randomized interval to reduce conflict storms in FoundationDB, default 5-50ms
121121
CONF_mInt64(packed_file_txn_retry_sleep_min_ms, "5");
122122
CONF_mInt64(packed_file_txn_retry_sleep_max_ms, "50");
123+
CONF_mInt32(recycle_txn_delete_max_retry_times, "10");
123124

124125
// force recycler to recycle all useless object.
125126
// **just for TEST**

cloud/src/recycler/checker.cpp

Lines changed: 70 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -991,21 +991,25 @@ int InstanceChecker::do_inverted_check() {
991991
}
992992

993993
for (auto file = list_iter->next(); file.has_value(); file = list_iter->next()) {
994+
const auto& path = file->path;
995+
if (path == "data/packed_file" || path.starts_with("data/packed_file/")) {
996+
continue; // packed_file has dedicated check logic
997+
}
994998
++num_scanned;
995-
int ret = check_segment_file(file->path);
999+
int ret = check_segment_file(path);
9961000
if (ret != 0) {
9971001
LOG(WARNING) << "failed to check segment file, uri=" << accessor->uri()
998-
<< " path=" << file->path;
1002+
<< " path=" << path;
9991003
if (ret == 1) {
10001004
++num_file_leak;
10011005
} else {
10021006
check_ret = -1;
10031007
}
10041008
}
1005-
ret = check_inverted_index_file(file->path);
1009+
ret = check_inverted_index_file(path);
10061010
if (ret != 0) {
10071011
LOG(WARNING) << "failed to check index file, uri=" << accessor->uri()
1008-
<< " path=" << file->path;
1012+
<< " path=" << path;
10091013
if (ret == 1) {
10101014
++num_file_leak;
10111015
} else {
@@ -2749,6 +2753,18 @@ int InstanceChecker::do_packed_file_check() {
27492753

27502754
// Step 1: Scan all rowset metas to collect packed_slice_locations references
27512755
// Use efficient range scan instead of iterating through each tablet_id
2756+
auto collect_packed_refs = [&](const doris::RowsetMetaCloudPB& rs_meta) {
2757+
const auto& index_map = rs_meta.packed_slice_locations();
2758+
for (const auto& [small_file_path, index_pb] : index_map) {
2759+
if (!index_pb.has_packed_file_path() || index_pb.packed_file_path().empty()) {
2760+
continue;
2761+
}
2762+
const std::string& packed_file_path = index_pb.packed_file_path();
2763+
expected_ref_counts[packed_file_path]++;
2764+
packed_file_small_files[packed_file_path].insert(small_file_path);
2765+
}
2766+
};
2767+
27522768
{
27532769
std::string start_key = meta_rowset_key({instance_id_, 0, 0});
27542770
std::string end_key = meta_rowset_key({instance_id_, INT64_MAX, 0});
@@ -2788,16 +2804,57 @@ int InstanceChecker::do_packed_file_check() {
27882804

27892805
num_scanned_rowsets++;
27902806

2791-
// Check packed_slice_locations in rowset meta
2792-
const auto& index_map = rs_meta.packed_slice_locations();
2793-
for (const auto& [small_file_path, index_pb] : index_map) {
2794-
if (!index_pb.has_packed_file_path() || index_pb.packed_file_path().empty()) {
2795-
continue;
2796-
}
2797-
const std::string& packed_file_path = index_pb.packed_file_path();
2798-
expected_ref_counts[packed_file_path]++;
2799-
packed_file_small_files[packed_file_path].insert(small_file_path);
2807+
collect_packed_refs(rs_meta);
2808+
}
2809+
start_key.push_back('\x00'); // Update to next smallest key for iteration
2810+
} while (it->more() && !stopped());
2811+
}
2812+
2813+
// Rowsets in recycle keys may still hold packed file references while ref count
2814+
// updates are pending, so include them when calculating expected references.
2815+
{
2816+
std::string start_key = recycle_rowset_key({instance_id_, 0, ""});
2817+
std::string end_key = recycle_rowset_key({instance_id_, INT64_MAX, "\xff"});
2818+
2819+
std::unique_ptr<RangeGetIterator> it;
2820+
do {
2821+
if (stopped()) {
2822+
return -1;
2823+
}
2824+
2825+
std::unique_ptr<Transaction> txn;
2826+
TxnErrorCode err = txn_kv_->create_txn(&txn);
2827+
if (err != TxnErrorCode::TXN_OK) {
2828+
LOG(WARNING) << "failed to create txn for recycle rowset scan in packed file check";
2829+
return -1;
2830+
}
2831+
2832+
err = txn->get(start_key, end_key, &it);
2833+
if (err != TxnErrorCode::TXN_OK) {
2834+
LOG(WARNING) << "failed to scan recycle rowset metas, err=" << err;
2835+
check_ret = -1;
2836+
break;
2837+
}
2838+
2839+
while (it->has_next() && !stopped()) {
2840+
auto [k, v] = it->next();
2841+
if (!it->has_next()) {
2842+
start_key = k;
2843+
}
2844+
2845+
RecycleRowsetPB recycle_rowset;
2846+
if (!recycle_rowset.ParseFromArray(v.data(), v.size())) {
2847+
LOG(WARNING) << "malformed recycle rowset, key=" << hex(k);
2848+
check_ret = -1;
2849+
continue;
28002850
}
2851+
2852+
if (!recycle_rowset.has_rowset_meta()) {
2853+
continue;
2854+
}
2855+
2856+
num_scanned_rowsets++;
2857+
collect_packed_refs(recycle_rowset.rowset_meta());
28012858
}
28022859
start_key.push_back('\x00'); // Update to next smallest key for iteration
28032860
} while (it->more() && !stopped());

0 commit comments

Comments
 (0)