Skip to content

Commit e52c799

Browse files
Yukang-LianYour Name
authored andcommitted
[Fix](Compaction) Fix cumulative compaction pick rowsets to trim by max score after filtering (#59268)
1 parent 7f5ba43 commit e52c799

File tree

4 files changed

+1967
-12
lines changed

4 files changed

+1967
-12
lines changed

be/src/cloud/cloud_cumulative_compaction_policy.cpp

Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
#include "olap/olap_common.h"
3030
#include "olap/tablet.h"
3131
#include "olap/tablet_meta.h"
32+
#include "util/defer_op.h"
3233

3334
namespace doris {
3435
#include "common/compile_check_begin.h"
@@ -119,6 +120,26 @@ int64_t CloudSizeBasedCumulativeCompactionPolicy::pick_input_rowsets(
119120
int transient_size = 0;
120121
*compaction_score = 0;
121122
int64_t total_size = 0;
123+
bool skip_trim = false; // Skip trim for Empty Rowset Compaction
124+
125+
// DEFER: trim input_rowsets from back if score > max_compaction_score
126+
// This ensures we don't return more rowsets than allowed by max_compaction_score,
127+
// while still collecting enough rowsets to pass min_compaction_score check after level_size removal.
128+
// Must be placed after variable initialization and before collection loop.
129+
DEFER({
130+
if (skip_trim) {
131+
return;
132+
}
133+
// Keep at least 1 rowset to avoid removing the only rowset (consistent with fallback branch)
134+
while (input_rowsets->size() > 1 &&
135+
*compaction_score > static_cast<size_t>(max_compaction_score)) {
136+
auto& last_rowset = input_rowsets->back();
137+
*compaction_score -= last_rowset->rowset_meta()->get_compaction_score();
138+
total_size -= last_rowset->rowset_meta()->total_disk_size();
139+
input_rowsets->pop_back();
140+
}
141+
});
142+
122143
for (auto& rowset : candidate_rowsets) {
123144
// check whether this rowset is delete version
124145
if (!allow_delete && rowset->rowset_meta()->has_delete_predicate()) {
@@ -142,10 +163,8 @@ int64_t CloudSizeBasedCumulativeCompactionPolicy::pick_input_rowsets(
142163
continue;
143164
}
144165
}
145-
if (*compaction_score >= max_compaction_score) {
146-
// got enough segments
147-
break;
148-
}
166+
// Removed: max_compaction_score check here
167+
// We now collect all candidate rowsets and trim from back at return time via DEFER
149168
*compaction_score += rowset->rowset_meta()->get_compaction_score();
150169
total_size += rowset->rowset_meta()->total_disk_size();
151170

@@ -183,8 +202,10 @@ int64_t CloudSizeBasedCumulativeCompactionPolicy::pick_input_rowsets(
183202
static_cast<double>(input_rowsets->size()) >=
184203
config::empty_rowset_compaction_min_ratio) {
185204
// Prioritize consecutive empty rowset compaction
205+
// Skip trim: empty rowset compaction has very low cost and the goal is to reduce rowset count
186206
*input_rowsets = consecutive_empty_rowsets;
187207
*compaction_score = consecutive_empty_rowsets.size();
208+
skip_trim = true;
188209
return consecutive_empty_rowsets.size();
189210
}
190211
}
@@ -228,7 +249,7 @@ int64_t CloudSizeBasedCumulativeCompactionPolicy::pick_input_rowsets(
228249
*compaction_score = max_score;
229250
return transient_size;
230251
}
231-
// Exceeding max compaction score, do compaction on all candidate rowsets anyway
252+
// no rowset is OVERLAPPING, return all input rowsets (DEFER will trim to max_compaction_score)
232253
return transient_size;
233254
}
234255
}

be/src/olap/cumulative_compaction_policy.cpp

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
#include "olap/tablet.h"
3030
#include "olap/tablet_meta.h"
3131
#include "util/debug_points.h"
32+
#include "util/defer_op.h"
3233

3334
namespace doris {
3435

@@ -267,6 +268,22 @@ int SizeBasedCumulativeCompactionPolicy::pick_input_rowsets(
267268
int transient_size = 0;
268269
*compaction_score = 0;
269270
int64_t total_size = 0;
271+
272+
// DEFER: trim input_rowsets from back if score > max_compaction_score
273+
// This ensures we don't return more rowsets than allowed by max_compaction_score,
274+
// while still collecting enough rowsets to pass min_compaction_score check after level_size removal.
275+
// Must be placed after variable initialization and before collection loop.
276+
DEFER({
277+
// Keep at least 1 rowset to avoid removing the only rowset (consistent with fallback branch)
278+
while (input_rowsets->size() > 1 &&
279+
*compaction_score > static_cast<size_t>(max_compaction_score)) {
280+
auto& last_rowset = input_rowsets->back();
281+
*compaction_score -= last_rowset->rowset_meta()->get_compaction_score();
282+
total_size -= last_rowset->rowset_meta()->total_disk_size();
283+
input_rowsets->pop_back();
284+
}
285+
});
286+
270287
for (auto& rowset : candidate_rowsets) {
271288
// check whether this rowset is delete version
272289
if (!allow_delete && rowset->rowset_meta()->has_delete_predicate()) {
@@ -290,10 +307,8 @@ int SizeBasedCumulativeCompactionPolicy::pick_input_rowsets(
290307
continue;
291308
}
292309
}
293-
if (*compaction_score >= max_compaction_score) {
294-
// got enough segments
295-
break;
296-
}
310+
// Removed: max_compaction_score check here
311+
// We now collect all candidate rowsets and trim from back at return time via DEFER
297312
*compaction_score += rowset->rowset_meta()->get_compaction_score();
298313
total_size += rowset->rowset_meta()->total_disk_size();
299314

@@ -354,7 +369,7 @@ int SizeBasedCumulativeCompactionPolicy::pick_input_rowsets(
354369
*compaction_score = max_score;
355370
return transient_size;
356371
}
357-
// no rowset is OVERLAPPING, execute compaction on all input rowsets
372+
// no rowset is OVERLAPPING, return all input rowsets (DEFER will trim to max_compaction_score)
358373
return transient_size;
359374
}
360375
input_rowsets->erase(input_rowsets->begin(), rs_begin);

be/test/cloud/cloud_cumulative_compaction_policy_test.cpp

Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,17 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18+
#include "cloud/cloud_cumulative_compaction_policy.h"
19+
1820
#include <gen_cpp/AgentService_types.h>
1921
#include <gen_cpp/olap_file.pb.h>
2022
#include <gtest/gtest-message.h>
2123
#include <gtest/gtest-test-part.h>
2224
#include <gtest/gtest.h>
2325

2426
#include "cloud/cloud_storage_engine.h"
27+
#include "cloud/config.h"
28+
#include "common/config.h"
2529
#include "gtest/gtest_pred_impl.h"
2630
#include "json2pb/json_to_pb.h"
2731
#include "olap/olap_common.h"
@@ -145,4 +149,122 @@ TEST_F(TestCloudSizeBasedCumulativeCompactionPolicy, new_cumulative_point) {
145149
Version version(1, 1);
146150
EXPECT_EQ(policy.new_cumulative_point(&_tablet, output_rowset, version, 2), 6);
147151
}
152+
153+
// Test case: Empty rowset compaction with skip_trim
154+
TEST_F(TestCloudSizeBasedCumulativeCompactionPolicy, pick_input_rowsets_empty_rowset_compaction) {
155+
// Save original config values
156+
bool orig_enable_empty_rowset_compaction = config::enable_empty_rowset_compaction;
157+
int32_t orig_empty_rowset_compaction_min_count = config::empty_rowset_compaction_min_count;
158+
double orig_empty_rowset_compaction_min_ratio = config::empty_rowset_compaction_min_ratio;
159+
160+
// Enable empty rowset compaction
161+
config::enable_empty_rowset_compaction = true;
162+
config::empty_rowset_compaction_min_count = 5;
163+
config::empty_rowset_compaction_min_ratio = 0.5;
164+
165+
CloudTablet _tablet(_engine, _tablet_meta);
166+
_tablet._base_size = 1024L * 1024 * 1024; // 1GB base
167+
168+
// Create candidate rowsets: 2 normal + 150 empty rowsets
169+
// This tests that skip_trim = true for empty rowset compaction
170+
std::vector<RowsetSharedPtr> candidate_rowsets;
171+
172+
// 2 normal rowsets
173+
for (int i = 0; i < 2; i++) {
174+
auto rowset = create_rowset(Version(i + 2, i + 2), 1, true, 1024 * 1024); // 1MB
175+
candidate_rowsets.push_back(rowset);
176+
}
177+
178+
// 150 empty rowsets (consecutive)
179+
for (int i = 0; i < 150; i++) {
180+
auto rowset = create_rowset(Version(i + 4, i + 4), 0, false, 0); // empty
181+
candidate_rowsets.push_back(rowset);
182+
}
183+
184+
std::vector<RowsetSharedPtr> input_rowsets;
185+
Version last_delete_version {-1, -1};
186+
size_t compaction_score = 0;
187+
188+
CloudSizeBasedCumulativeCompactionPolicy policy;
189+
// max=100, but empty rowset compaction should return 150 (skip_trim = true)
190+
policy.pick_input_rowsets(&_tablet, candidate_rowsets, 100, 50, &input_rowsets,
191+
&last_delete_version, &compaction_score, true);
192+
193+
// Empty rowset compaction should return all 150 empty rowsets
194+
// skip_trim = true, so no trimming even though score > max
195+
EXPECT_EQ(150, input_rowsets.size());
196+
EXPECT_EQ(150, compaction_score);
197+
198+
// Verify all returned rowsets are empty
199+
for (const auto& rs : input_rowsets) {
200+
EXPECT_EQ(0, rs->num_segments());
201+
}
202+
203+
// Restore original config values
204+
config::enable_empty_rowset_compaction = orig_enable_empty_rowset_compaction;
205+
config::empty_rowset_compaction_min_count = orig_empty_rowset_compaction_min_count;
206+
config::empty_rowset_compaction_min_ratio = orig_empty_rowset_compaction_min_ratio;
207+
}
208+
209+
// Test case: prioritize_query_perf_in_compaction for non-DUP_KEYS table
210+
// This tests the branch: rs_begin == end && prioritize_query_perf && keys_type != DUP_KEYS
211+
TEST_F(TestCloudSizeBasedCumulativeCompactionPolicy, pick_input_rowsets_prioritize_query_perf) {
212+
// Save original config value
213+
bool orig_prioritize_query_perf = config::prioritize_query_perf_in_compaction;
214+
215+
// Enable prioritize_query_perf_in_compaction
216+
config::prioritize_query_perf_in_compaction = true;
217+
218+
// Create tablet with UNIQUE keys (not DUP_KEYS)
219+
TTabletSchema schema;
220+
schema.keys_type = TKeysType::UNIQUE_KEYS;
221+
TabletMetaSharedPtr tablet_meta(new TabletMeta(1, 2, 15673, 15674, 4, 5, schema, 6, {{7, 8}},
222+
UniqueId(9, 10), TTabletType::TABLET_TYPE_DISK,
223+
TCompressionType::LZ4F));
224+
225+
CloudTablet _tablet(_engine, tablet_meta);
226+
// Use large base_size to get large promotion_size, ensuring total_size < promotion_size
227+
// so we don't trigger promotion_size early return and can reach level_size logic
228+
_tablet._base_size = 20L * 1024 * 1024 * 1024; // 20GB base, promotion_size ~= 1GB
229+
230+
// Create candidate rowsets that will ALL be removed by level_size
231+
// Key: each rowset's level > remain_level after removal
232+
std::vector<RowsetSharedPtr> candidate_rowsets;
233+
234+
// 3 rowsets with decreasing sizes, all will be removed by level_size:
235+
// - 40MB: level(40)=32, remain=35, level(35)=32, 32>32? NO... need adjustment
236+
// Let's use sizes that guarantee all removal:
237+
// - 50MB: level(50)=32, after remove remain=25, level(25)=16, 32>16 -> remove
238+
// - 20MB: level(20)=16, after remove remain=5, level(5)=4, 16>4 -> remove
239+
// - 5MB: level(5)=4, after remove remain=0, level(0)=0, 4>0 -> remove
240+
auto rowset1 = create_rowset(Version(2, 2), 30, true, 50L * 1024 * 1024); // 50MB, score=30
241+
auto rowset2 = create_rowset(Version(3, 3), 20, true, 20L * 1024 * 1024); // 20MB, score=20
242+
auto rowset3 = create_rowset(Version(4, 4), 10, true, 5L * 1024 * 1024); // 5MB, score=10
243+
candidate_rowsets.push_back(rowset1);
244+
candidate_rowsets.push_back(rowset2);
245+
candidate_rowsets.push_back(rowset3);
246+
247+
// total_size = 75MB < promotion_size (~1GB), enters level_size logic
248+
// All 3 rowsets will be removed by level_size -> rs_begin == end
249+
// With prioritize_query_perf=true and UNIQUE_KEYS, should return all candidates
250+
251+
std::vector<RowsetSharedPtr> input_rowsets;
252+
Version last_delete_version {-1, -1};
253+
size_t compaction_score = 0;
254+
255+
CloudSizeBasedCumulativeCompactionPolicy policy;
256+
policy.pick_input_rowsets(&_tablet, candidate_rowsets, 100, 5, &input_rowsets,
257+
&last_delete_version, &compaction_score, true);
258+
259+
// With prioritize_query_perf enabled for non-DUP_KEYS table,
260+
// when all rowsets are removed by level_size, should return all candidates
261+
// (before DEFER trim)
262+
// Total score = 60, max = 100, so no trimming needed
263+
EXPECT_EQ(3, input_rowsets.size());
264+
EXPECT_EQ(60, compaction_score);
265+
266+
// Restore original config value
267+
config::prioritize_query_perf_in_compaction = orig_prioritize_query_perf;
268+
}
269+
148270
} // namespace doris

0 commit comments

Comments
 (0)