Skip to content

Commit b2abb89

Browse files
committed
init
1 parent da2c3c0 commit b2abb89

23 files changed

+530
-8
lines changed

db/compaction/compaction_job.cc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,8 @@ const char* GetCompactionReasonString(CompactionReason compaction_reason) {
103103
return "RoundRobinTtl";
104104
case CompactionReason::kRefitLevel:
105105
return "RefitLevel";
106+
case CompactionReason::kReadTriggered:
107+
return "ReadTriggered";
106108
case CompactionReason::kNumOfReasons:
107109
// fall through
108110
default:

db/compaction/compaction_picker_level.cc

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,9 @@ bool LevelCompactionPicker::NeedsCompaction(
3636
if (!vstorage->FilesMarkedForForcedBlobGC().empty()) {
3737
return true;
3838
}
39+
if (!vstorage->ReadTriggeredCompactionFiles().empty()) {
40+
return true;
41+
}
3942
for (int i = 0; i <= vstorage->MaxInputLevel(); i++) {
4043
if (vstorage->CompactionScore(i) >= 1) {
4144
return true;
@@ -325,6 +328,14 @@ void LevelCompactionBuilder::SetupInitialFiles() {
325328
compaction_reason_ = CompactionReason::kForcedBlobGC;
326329
return;
327330
}
331+
332+
// Read-triggered compaction
333+
PickFileToCompact(vstorage_->ReadTriggeredCompactionFiles(),
334+
CompactToNextLevel::kYes);
335+
if (!start_level_inputs_.empty()) {
336+
compaction_reason_ = CompactionReason::kReadTriggered;
337+
return;
338+
}
328339
}
329340

330341
bool LevelCompactionBuilder::SetupOtherL0FilesIfNeeded() {

db/compaction/compaction_picker_test.cc

Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -982,6 +982,150 @@ TEST_F(CompactionPickerTest, UniversalIncrementalSpace1) {
982982
ASSERT_EQ(14U, compaction->input(1, 3)->fd.GetNumber());
983983
}
984984

985+
TEST_F(CompactionPickerTest, ReadTriggeredCompactionDisabled) {
986+
NewVersionStorage(6, kCompactionStyleLevel);
987+
// threshold=0 means disabled, no files should be marked
988+
mutable_cf_options_.read_triggered_compaction_threshold = 0.0;
989+
Add(0, 1U, "100", "200", 1000U);
990+
file_map_[1U].first->stats.num_reads_sampled.store(999999);
991+
UpdateVersionStorageInfo();
992+
ASSERT_TRUE(vstorage_->ReadTriggeredCompactionFiles().empty());
993+
}
994+
995+
TEST_F(CompactionPickerTest, ReadTriggeredCompactionBelowThreshold) {
996+
NewVersionStorage(6, kCompactionStyleLevel);
997+
mutable_cf_options_.read_triggered_compaction_threshold = 1.0;
998+
// file_size=1000, reads=500 => reads_per_byte=0.5 < 1.0
999+
Add(0, 1U, "100", "200", 1000U);
1000+
file_map_[1U].first->stats.num_reads_sampled.store(500);
1001+
UpdateVersionStorageInfo();
1002+
ASSERT_TRUE(vstorage_->ReadTriggeredCompactionFiles().empty());
1003+
}
1004+
1005+
TEST_F(CompactionPickerTest, ReadTriggeredCompactionAboveThreshold) {
1006+
NewVersionStorage(6, kCompactionStyleLevel);
1007+
mutable_cf_options_.read_triggered_compaction_threshold = 0.5;
1008+
// file_size=1000, reads=600 => reads_per_byte=0.6 > 0.5
1009+
Add(0, 1U, "100", "200", 1000U);
1010+
file_map_[1U].first->stats.num_reads_sampled.store(600);
1011+
// file_size=1000, reads=300 => reads_per_byte=0.3 < 0.5 (not marked)
1012+
Add(1, 2U, "300", "400", 1000U);
1013+
file_map_[2U].first->stats.num_reads_sampled.store(300);
1014+
// file_size=1000, reads=800 => reads_per_byte=0.8 > 0.5 (hottest)
1015+
Add(2, 3U, "500", "600", 1000U);
1016+
file_map_[3U].first->stats.num_reads_sampled.store(800);
1017+
// Add a file at the bottom so L2 is not the last non-empty level
1018+
Add(4, 4U, "700", "800", 1000U);
1019+
UpdateVersionStorageInfo();
1020+
1021+
const auto& marked = vstorage_->ReadTriggeredCompactionFiles();
1022+
ASSERT_EQ(marked.size(), 2);
1023+
// Sorted by reads_per_byte descending: file 3 (0.8) then file 1 (0.6)
1024+
ASSERT_EQ(marked[0].second->fd.GetNumber(), 3U);
1025+
ASSERT_EQ(marked[1].second->fd.GetNumber(), 1U);
1026+
}
1027+
1028+
TEST_F(CompactionPickerTest, NeedsCompactionReadTriggered) {
1029+
NewVersionStorage(6, kCompactionStyleLevel);
1030+
mutable_cf_options_.read_triggered_compaction_threshold = 0.1;
1031+
Add(1, 1U, "100", "200", 1000U);
1032+
file_map_[1U].first->stats.num_reads_sampled.store(500);
1033+
Add(3, 2U, "300", "400", 1000U);
1034+
UpdateVersionStorageInfo();
1035+
1036+
ASSERT_FALSE(vstorage_->ReadTriggeredCompactionFiles().empty());
1037+
ASSERT_TRUE(level_compaction_picker.NeedsCompaction(vstorage_.get()));
1038+
}
1039+
1040+
TEST_F(CompactionPickerTest, ReadTriggeredPicksFile) {
1041+
NewVersionStorage(6, kCompactionStyleLevel);
1042+
mutable_cf_options_.read_triggered_compaction_threshold = 0.1;
1043+
Add(1, 1U, "100", "200", 1000U);
1044+
file_map_[1U].first->stats.num_reads_sampled.store(500);
1045+
Add(3, 2U, "300", "400", 1000U);
1046+
UpdateVersionStorageInfo();
1047+
1048+
std::unique_ptr<Compaction> compaction(level_compaction_picker.PickCompaction(
1049+
cf_name_, mutable_cf_options_, mutable_db_options_,
1050+
/*existing_snapshots=*/{}, /*snapshot_checker=*/nullptr, vstorage_.get(),
1051+
&log_buffer_, /*full_history_ts_low=*/""));
1052+
ASSERT_TRUE(compaction.get() != nullptr);
1053+
ASSERT_EQ(compaction->compaction_reason(), CompactionReason::kReadTriggered);
1054+
}
1055+
1056+
TEST_F(CompactionPickerTest, UniversalReadTriggeredCompaction) {
1057+
const uint64_t kFileSize = 100000;
1058+
1059+
mutable_cf_options_.read_triggered_compaction_threshold = 0.001;
1060+
// Set trigger high so size amp / sorted run pickers don't fire
1061+
mutable_cf_options_.level0_file_num_compaction_trigger = 10;
1062+
UniversalCompactionPicker universal_compaction_picker(ioptions_, &icmp_);
1063+
1064+
NewVersionStorage(5, kCompactionStyleUniversal);
1065+
1066+
// Hot file at L2 with data at L4 below it
1067+
Add(0, 1U, "150", "200", kFileSize, 0, 500, 550);
1068+
Add(2, 2U, "301", "350", kFileSize, 0, 201, 250);
1069+
Add(4, 3U, "301", "400", kFileSize, 0, 101, 150);
1070+
1071+
// Mark file 2 (L2) as having high reads
1072+
file_map_[2U].first->stats.num_reads_sampled.store(kFileSize);
1073+
UpdateVersionStorageInfo();
1074+
1075+
ASSERT_TRUE(universal_compaction_picker.NeedsCompaction(vstorage_.get()));
1076+
1077+
std::unique_ptr<Compaction> compaction(
1078+
universal_compaction_picker.PickCompaction(
1079+
cf_name_, mutable_cf_options_, mutable_db_options_,
1080+
/*existing_snapshots=*/{}, /*snapshot_checker=*/nullptr,
1081+
vstorage_.get(), &log_buffer_, /*full_history_ts_low=*/""));
1082+
1083+
ASSERT_TRUE(compaction);
1084+
ASSERT_EQ(compaction->compaction_reason(), CompactionReason::kReadTriggered);
1085+
ASSERT_EQ(compaction->start_level(), 2);
1086+
ASSERT_EQ(compaction->output_level(), 4);
1087+
}
1088+
1089+
TEST_F(CompactionPickerTest, ReadTriggeredSkipsLastLevel) {
1090+
const uint64_t kFileSize = 100000;
1091+
1092+
mutable_cf_options_.read_triggered_compaction_threshold = 0.001;
1093+
mutable_cf_options_.level0_file_num_compaction_trigger = 10;
1094+
UniversalCompactionPicker universal_compaction_picker(ioptions_, &icmp_);
1095+
1096+
NewVersionStorage(5, kCompactionStyleUniversal);
1097+
1098+
Add(0, 1U, "150", "200", kFileSize, 0, 500, 550);
1099+
Add(4, 3U, "301", "350", kFileSize, 0, 101, 150);
1100+
1101+
// File 3 is at the last non-empty level — should NOT be marked for
1102+
// read-triggered compaction. Bottommost file cleanup is handled
1103+
// separately by ComputeBottommostFilesMarkedForCompaction().
1104+
file_map_[3U].first->stats.num_reads_sampled.store(kFileSize);
1105+
UpdateVersionStorageInfo();
1106+
1107+
ASSERT_TRUE(vstorage_->ReadTriggeredCompactionFiles().empty());
1108+
}
1109+
1110+
TEST_F(CompactionPickerTest, UniversalReadTriggeredNoPickWhenNotMarked) {
1111+
const uint64_t kFileSize = 100000;
1112+
1113+
mutable_cf_options_.read_triggered_compaction_threshold = 0.001;
1114+
UniversalCompactionPicker universal_compaction_picker(ioptions_, &icmp_);
1115+
1116+
NewVersionStorage(5, kCompactionStyleUniversal);
1117+
1118+
Add(0, 1U, "150", "200", kFileSize, 0, 500, 550);
1119+
Add(4, 3U, "301", "350", kFileSize, 0, 101, 150);
1120+
1121+
// No reads on any file
1122+
UpdateVersionStorageInfo();
1123+
1124+
ASSERT_TRUE(vstorage_->ReadTriggeredCompactionFiles().empty());
1125+
// Not enough sorted runs to trigger compaction either
1126+
ASSERT_FALSE(universal_compaction_picker.NeedsCompaction(vstorage_.get()));
1127+
}
1128+
9851129
TEST_F(CompactionPickerTest, UniversalIncrementalSpace2) {
9861130
const uint64_t kFileSize = 100000;
9871131

db/compaction/compaction_picker_universal.cc

Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -275,6 +275,21 @@ class UniversalCompactionBuilder {
275275
return c;
276276
}
277277

278+
Compaction* MaybePickReadTriggeredCompaction(
279+
Compaction* const prev_picked_c) {
280+
if (prev_picked_c != nullptr ||
281+
vstorage_->ReadTriggeredCompactionFiles().empty()) {
282+
return prev_picked_c;
283+
}
284+
Compaction* c = PickReadTriggeredCompaction();
285+
if (c != nullptr) {
286+
ROCKS_LOG_BUFFER(log_buffer_,
287+
"[%s] Universal: picked for read triggered compaction\n",
288+
cf_name_.c_str());
289+
}
290+
return c;
291+
}
292+
278293
// Pick Universal compaction to limit read amplification
279294
Compaction* PickCompactionToReduceSortedRuns(
280295
unsigned int ratio, unsigned int max_number_of_files_to_compact);
@@ -292,6 +307,8 @@ class UniversalCompactionBuilder {
292307

293308
Compaction* PickDeleteTriggeredCompaction();
294309

310+
Compaction* PickReadTriggeredCompaction();
311+
295312
// Returns true if this given file (that is marked be compaction) should be
296313
// skipped from being picked for now. We do this to best use standalone range
297314
// tombstone files.
@@ -594,6 +611,9 @@ bool UniversalCompactionPicker::NeedsCompaction(
594611
if (!vstorage->FilesMarkedForCompaction().empty()) {
595612
return true;
596613
}
614+
if (!vstorage->ReadTriggeredCompactionFiles().empty()) {
615+
return true;
616+
}
597617
return false;
598618
}
599619

@@ -759,6 +779,7 @@ Compaction* UniversalCompactionBuilder::PickCompaction() {
759779
if (sorted_runs_.size() == 0 ||
760780
(vstorage_->FilesMarkedForPeriodicCompaction().empty() &&
761781
vstorage_->FilesMarkedForCompaction().empty() &&
782+
vstorage_->ReadTriggeredCompactionFiles().empty() &&
762783
sorted_runs_.size() < (unsigned int)file_num_compaction_trigger)) {
763784
ROCKS_LOG_BUFFER(log_buffer_, "[%s] Universal: nothing to do\n",
764785
cf_name_.c_str());
@@ -782,6 +803,7 @@ Compaction* UniversalCompactionBuilder::PickCompaction() {
782803
c = MaybePickCompactionToReduceSortedRuns(c, file_num_compaction_trigger,
783804
ratio);
784805
c = MaybePickDeleteTriggeredCompaction(c);
806+
c = MaybePickReadTriggeredCompaction(c);
785807

786808
if (c == nullptr) {
787809
TEST_SYNC_POINT_CALLBACK(
@@ -1782,6 +1804,112 @@ Compaction* UniversalCompactionBuilder::PickPeriodicCompaction() {
17821804
return c;
17831805
}
17841806

1807+
Compaction* UniversalCompactionBuilder::PickReadTriggeredCompaction() {
1808+
ROCKS_LOG_BUFFER(log_buffer_, "[%s] Universal: Read Triggered Compaction",
1809+
cf_name_.c_str());
1810+
1811+
// Directly pick the hot file and compact it with the overlapping next level,
1812+
// similar to PickDeleteTriggeredCompaction for multi-level universal.
1813+
CompactionInputFiles start_level_inputs;
1814+
int start_level = -1;
1815+
int output_level;
1816+
std::vector<CompactionInputFiles> inputs;
1817+
std::vector<FileMetaData*> grandparents;
1818+
1819+
// Find the first non-being-compacted read-triggered file.
1820+
for (const auto& level_file : vstorage_->ReadTriggeredCompactionFiles()) {
1821+
assert(!level_file.second->being_compacted);
1822+
start_level = level_file.first;
1823+
1824+
if (start_level == 0 &&
1825+
!picker_->level0_compactions_in_progress()->empty()) {
1826+
continue;
1827+
}
1828+
1829+
start_level_inputs.files = {level_file.second};
1830+
start_level_inputs.level = start_level;
1831+
if (picker_->ExpandInputsToCleanCut(cf_name_, vstorage_,
1832+
&start_level_inputs)) {
1833+
break;
1834+
}
1835+
start_level_inputs.files.clear();
1836+
}
1837+
1838+
if (start_level_inputs.empty()) {
1839+
return nullptr;
1840+
}
1841+
1842+
int max_output_level = vstorage_->MaxOutputLevel(allow_ingest_behind_);
1843+
1844+
// Pick the first non-empty level after start_level as output.
1845+
for (output_level = start_level + 1; output_level <= max_output_level;
1846+
output_level++) {
1847+
if (vstorage_->NumLevelFiles(output_level) != 0) {
1848+
break;
1849+
}
1850+
}
1851+
1852+
// Only non-last level files can be picked as inputs
1853+
assert(output_level > start_level);
1854+
1855+
if (!MeetsOutputLevelRequirements(output_level)) {
1856+
return nullptr;
1857+
}
1858+
1859+
if (start_level == 0) {
1860+
if (!picker_->GetOverlappingL0Files(vstorage_, &start_level_inputs,
1861+
output_level, nullptr)) {
1862+
return nullptr;
1863+
}
1864+
}
1865+
1866+
CompactionInputFiles output_level_inputs;
1867+
int parent_index = -1;
1868+
1869+
output_level_inputs.level = output_level;
1870+
if (!picker_->SetupOtherInputs(cf_name_, mutable_cf_options_, vstorage_,
1871+
&start_level_inputs, &output_level_inputs,
1872+
&parent_index, -1, false)) {
1873+
return nullptr;
1874+
}
1875+
1876+
inputs.push_back(start_level_inputs);
1877+
if (!output_level_inputs.empty()) {
1878+
inputs.push_back(output_level_inputs);
1879+
}
1880+
1881+
if (picker_->FilesRangeOverlapWithCompaction(
1882+
inputs, output_level,
1883+
Compaction::EvaluateProximalLevel(vstorage_, mutable_cf_options_,
1884+
ioptions_, start_level,
1885+
output_level))) {
1886+
return nullptr;
1887+
}
1888+
1889+
picker_->GetGrandparents(vstorage_, start_level_inputs, output_level_inputs,
1890+
&grandparents);
1891+
1892+
uint64_t estimated_total_size = 0;
1893+
for (FileMetaData* f : vstorage_->LevelFiles(output_level)) {
1894+
estimated_total_size += f->fd.GetFileSize();
1895+
}
1896+
uint32_t path_id =
1897+
GetPathId(ioptions_, mutable_cf_options_, estimated_total_size);
1898+
return new Compaction(
1899+
vstorage_, ioptions_, mutable_cf_options_, mutable_db_options_,
1900+
std::move(inputs), output_level,
1901+
MaxFileSizeForLevel(mutable_cf_options_, output_level,
1902+
kCompactionStyleUniversal),
1903+
GetMaxOverlappingBytes(), path_id,
1904+
GetCompressionType(vstorage_, mutable_cf_options_, output_level, 1),
1905+
GetCompressionOptions(mutable_cf_options_, vstorage_, output_level),
1906+
Temperature::kUnknown,
1907+
/* max_subcompactions */ 0, grandparents, earliest_snapshot_,
1908+
snapshot_checker_, CompactionReason::kReadTriggered,
1909+
/* trim_ts */ "", score_,
1910+
/* l0_files_might_overlap */ true);
1911+
}
1912+
17851913
uint64_t UniversalCompactionBuilder::GetMaxOverlappingBytes() const {
17861914
if (!mutable_cf_options_.compaction_options_universal.incremental) {
17871915
return std::numeric_limits<uint64_t>::max();

0 commit comments

Comments
 (0)