Skip to content

Commit 656b734

Browse files
xingbowangmeta-codesync[bot]
authored andcommitted
Support abort background compaction jobs. (#14227)
Summary: This adds a new public API to allow applications to abort all running compactions and prevent new ones from starting. Unlike DisableManualCompaction() which only pauses manual compactions and waits for them to finish naturally, AbortAllCompactions() actively signals running compactions (both automatic and manual) to terminate early and waits for them to complete before returning. The abort signal is checked periodically during compaction (every 100 keys), so ongoing compactions abort quickly. Any output files from aborted compactions are automatically cleaned up to prevent partial results from being installed. This is useful for scenarios where applications need to quickly stop all compaction activity, such as during graceful shutdown or when performing maintenance operations. Pull Request resolved: #14227 Test Plan: - Unit tests in db_compaction_abort_test.cc cover various abort scenarios including: abort before/during compaction, abort with multiple subcompactions, nested abort/resume calls, abort with CompactFiles API, abort across multiple column families, and timing guarantees - Updated compaction_job_test.cc to include the new parameter Reviewed By: anand1976 Differential Revision: D91480994 Pulled By: xingbowang fbshipit-source-id: 36837971d8a540cd34d3ec28a78bc94b582625b0
1 parent 21a8b5f commit 656b734

34 files changed

+1471
-61
lines changed

BUCK

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4823,6 +4823,12 @@ cpp_unittest_wrapper(name="db_clip_test",
48234823
extra_compiler_flags=[])
48244824

48254825

4826+
cpp_unittest_wrapper(name="db_compaction_abort_test",
4827+
srcs=["db/db_compaction_abort_test.cc"],
4828+
deps=[":rocksdb_test_lib"],
4829+
extra_compiler_flags=[])
4830+
4831+
48264832
cpp_unittest_wrapper(name="db_compaction_filter_test",
48274833
srcs=["db/db_compaction_filter_test.cc"],
48284834
deps=[":rocksdb_test_lib"],

CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1381,6 +1381,7 @@ if(WITH_TESTS)
13811381
db/db_bloom_filter_test.cc
13821382
db/db_compaction_filter_test.cc
13831383
db/db_compaction_test.cc
1384+
db/db_compaction_abort_test.cc
13841385
db/db_clip_test.cc
13851386
db/db_dynamic_level_test.cc
13861387
db/db_encryption_test.cc

Makefile

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1442,6 +1442,9 @@ db_compaction_filter_test: $(OBJ_DIR)/db/db_compaction_filter_test.o $(TEST_LIBR
14421442
db_compaction_test: $(OBJ_DIR)/db/db_compaction_test.o $(TEST_LIBRARY) $(LIBRARY)
14431443
$(AM_LINK)
14441444

1445+
db_compaction_abort_test: $(OBJ_DIR)/db/db_compaction_abort_test.o $(TEST_LIBRARY) $(LIBRARY)
1446+
$(AM_LINK)
1447+
14451448
db_clip_test: $(OBJ_DIR)/db/db_clip_test.o $(TEST_LIBRARY) $(LIBRARY)
14461449
$(AM_LINK)
14471450

db/c.cc

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8570,6 +8570,14 @@ void rocksdb_enable_manual_compaction(rocksdb_t* db) {
85708570
db->rep->EnableManualCompaction();
85718571
}
85728572

8573+
void rocksdb_abort_all_compactions(rocksdb_t* db) {
8574+
db->rep->AbortAllCompactions();
8575+
}
8576+
8577+
void rocksdb_resume_all_compactions(rocksdb_t* db) {
8578+
db->rep->ResumeAllCompactions();
8579+
}
8580+
85738581
rocksdb_statistics_histogram_data_t*
85748582
rocksdb_statistics_histogram_data_create() {
85758583
return new rocksdb_statistics_histogram_data_t{};

db/c_test.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4447,7 +4447,7 @@ int main(int argc, char** argv) {
44474447

44484448
StartPhase("statistics");
44494449
{
4450-
const uint32_t BYTES_WRITTEN_TICKER = 60;
4450+
const uint32_t BYTES_WRITTEN_TICKER = 61;
44514451
const uint32_t DB_WRITE_HIST = 1;
44524452

44534453
rocksdb_statistics_histogram_data_t* hist =

db/compaction/compaction_job.cc

Lines changed: 122 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,10 @@ const char* GetCompactionProximalOutputRangeTypeString(
128128
}
129129
}
130130

131+
// Static constant for compaction abort flag - always false, used for
132+
// compaction service jobs that don't support abort signaling
133+
const std::atomic<int> CompactionJob::kCompactionAbortedFalse{0};
134+
131135
CompactionJob::CompactionJob(
132136
int job_id, Compaction* compaction, const ImmutableDBOptions& db_options,
133137
const MutableDBOptions& mutable_db_options, const FileOptions& file_options,
@@ -141,10 +145,10 @@ CompactionJob::CompactionJob(
141145
CompactionJobStats* compaction_job_stats, Env::Priority thread_pri,
142146
const std::shared_ptr<IOTracer>& io_tracer,
143147
const std::atomic<bool>& manual_compaction_canceled,
144-
const std::string& db_id, const std::string& db_session_id,
145-
std::string full_history_ts_low, std::string trim_ts,
146-
BlobFileCompletionCallback* blob_callback, int* bg_compaction_scheduled,
147-
int* bg_bottom_compaction_scheduled)
148+
const std::atomic<int>& compaction_aborted, const std::string& db_id,
149+
const std::string& db_session_id, std::string full_history_ts_low,
150+
std::string trim_ts, BlobFileCompletionCallback* blob_callback,
151+
int* bg_compaction_scheduled, int* bg_bottom_compaction_scheduled)
148152
: compact_(new CompactionState(compaction)),
149153
internal_stats_(compaction->compaction_reason(), 1),
150154
db_options_(db_options),
@@ -168,6 +172,7 @@ CompactionJob::CompactionJob(
168172
versions_(versions),
169173
shutting_down_(shutting_down),
170174
manual_compaction_canceled_(manual_compaction_canceled),
175+
compaction_aborted_(compaction_aborted),
171176
db_directory_(db_directory),
172177
blob_output_directory_(blob_output_directory),
173178
db_mutex_(db_mutex),
@@ -708,6 +713,7 @@ void CompactionJob::InitializeCompactionRun() {
708713
}
709714

710715
void CompactionJob::RunSubcompactions() {
716+
TEST_SYNC_POINT("CompactionJob::RunSubcompactions:BeforeStart");
711717
const size_t num_threads = compact_->sub_compact_states.size();
712718
assert(num_threads > 0);
713719
compact_->compaction->GetOrInitInputTableProperties();
@@ -753,6 +759,71 @@ void CompactionJob::RemoveEmptyOutputs() {
753759
}
754760
}
755761

762+
void CompactionJob::CleanupAbortedSubcompactions() {
763+
ColumnFamilyData* cfd = compact_->compaction->column_family_data();
764+
765+
uint64_t total_sst_files_deleted = 0;
766+
uint64_t total_blob_files_deleted = 0;
767+
768+
// Track the first file deletion error to report at the end
769+
Status first_error;
770+
int deletion_errors = 0;
771+
772+
// Mark all subcompactions as aborted and delete their output files
773+
for (auto& sub_compact : compact_->sub_compact_states) {
774+
// Mark this subcompaction as aborted
775+
sub_compact.status =
776+
Status::Incomplete(Status::SubCode::kCompactionAborted);
777+
778+
// Delete all files (SST and blob) tracked during compaction.
779+
// GetOutputFilePaths() contains ALL file paths created, including
780+
// in-progress files that may have been removed from outputs_ or
781+
// blob_file_additions_.
782+
for (const bool is_proximal_level : {false, true}) {
783+
if (is_proximal_level &&
784+
!compact_->compaction->SupportsPerKeyPlacement()) {
785+
continue;
786+
}
787+
for (const std::string& file_path :
788+
sub_compact.Outputs(is_proximal_level)->GetOutputFilePaths()) {
789+
Status s = env_->DeleteFile(file_path);
790+
if (s.ok()) {
791+
// Count SST vs blob files by checking extension
792+
if (file_path.find(".sst") != std::string::npos) {
793+
total_sst_files_deleted++;
794+
} else if (file_path.find(".blob") != std::string::npos) {
795+
total_blob_files_deleted++;
796+
}
797+
} else if (!s.IsNotFound()) {
798+
if (first_error.ok()) {
799+
first_error = s;
800+
}
801+
deletion_errors++;
802+
}
803+
}
804+
}
805+
sub_compact.CleanupOutputs();
806+
}
807+
808+
if (stats_) {
809+
RecordTick(stats_, COMPACTION_ABORTED);
810+
}
811+
812+
ROCKS_LOG_INFO(db_options_.info_log,
813+
"[%s] [JOB %d] Compaction aborted: deleted %" PRIu64
814+
" SST files and %" PRIu64 " blob files",
815+
cfd->GetName().c_str(), job_id_, total_sst_files_deleted,
816+
total_blob_files_deleted);
817+
818+
if (!first_error.ok()) {
819+
ROCKS_LOG_ERROR(db_options_.info_log,
820+
"[%s] [JOB %d] Cleanup completed with %d file deletion "
821+
"errors. First error: %s",
822+
cfd->GetName().c_str(), job_id_, deletion_errors,
823+
first_error.ToString().c_str());
824+
}
825+
}
826+
756827
bool CompactionJob::HasNewBlobFiles() const {
757828
for (const auto& state : compact_->sub_compact_states) {
758829
if (state.Current().HasBlobFileAdditions()) {
@@ -1004,6 +1075,15 @@ Status CompactionJob::Run() {
10041075

10051076
Status status = CollectSubcompactionErrors();
10061077

1078+
// If compaction was aborted or manually paused, clean up any output files
1079+
// from completed subcompactions to prevent orphaned files on disk.
1080+
// Skip cleanup for resumable compaction (when progress writer is set)
1081+
// because the output files are needed for resumption.
1082+
if ((status.IsCompactionAborted() || status.IsManualCompactionPaused()) &&
1083+
compaction_progress_writer_ == nullptr) {
1084+
CleanupAbortedSubcompactions();
1085+
}
1086+
10071087
if (status.ok()) {
10081088
status = SyncOutputDirectories();
10091089
}
@@ -1415,10 +1495,10 @@ InternalIterator* CompactionJob::CreateInputIterator(
14151495
return input;
14161496
}
14171497

1418-
void CompactionJob::CreateBlobFileBuilder(SubcompactionState* sub_compact,
1419-
ColumnFamilyData* cfd,
1420-
BlobFileResources& blob_resources,
1421-
const WriteOptions& write_options) {
1498+
void CompactionJob::CreateBlobFileBuilder(
1499+
SubcompactionState* sub_compact, ColumnFamilyData* cfd,
1500+
std::unique_ptr<BlobFileBuilder>& blob_file_builder,
1501+
const WriteOptions& write_options) {
14221502
const auto& mutable_cf_options =
14231503
sub_compact->compaction->mutable_cf_options();
14241504

@@ -1427,24 +1507,24 @@ void CompactionJob::CreateBlobFileBuilder(SubcompactionState* sub_compact,
14271507
if (mutable_cf_options.enable_blob_files &&
14281508
sub_compact->compaction->output_level() >=
14291509
mutable_cf_options.blob_file_starting_level) {
1430-
blob_resources.blob_file_builder = std::make_unique<BlobFileBuilder>(
1510+
blob_file_builder = std::make_unique<BlobFileBuilder>(
14311511
versions_, fs_.get(), &sub_compact->compaction->immutable_options(),
14321512
&mutable_cf_options, &file_options_, &write_options, db_id_,
14331513
db_session_id_, job_id_, cfd->GetID(), cfd->GetName(), write_hint_,
14341514
io_tracer_, blob_callback_, BlobFileCreationReason::kCompaction,
1435-
&blob_resources.blob_file_paths,
1515+
sub_compact->Current().GetOutputFilePathsPtr(),
14361516
sub_compact->Current().GetBlobFileAdditionsPtr());
14371517
} else {
1438-
blob_resources.blob_file_builder = nullptr;
1518+
blob_file_builder = nullptr;
14391519
}
14401520
}
14411521

14421522
std::unique_ptr<CompactionIterator> CompactionJob::CreateCompactionIterator(
14431523
SubcompactionState* sub_compact, ColumnFamilyData* cfd,
14441524
InternalIterator* input, const CompactionFilter* compaction_filter,
1445-
MergeHelper& merge, BlobFileResources& blob_resources,
1525+
MergeHelper& merge, std::unique_ptr<BlobFileBuilder>& blob_file_builder,
14461526
const WriteOptions& write_options) {
1447-
CreateBlobFileBuilder(sub_compact, cfd, blob_resources, write_options);
1527+
CreateBlobFileBuilder(sub_compact, cfd, blob_file_builder, write_options);
14481528

14491529
const std::string* const full_history_ts_low =
14501530
full_history_ts_low_.empty() ? nullptr : &full_history_ts_low_;
@@ -1456,7 +1536,7 @@ std::unique_ptr<CompactionIterator> CompactionJob::CreateCompactionIterator(
14561536
job_context_->earliest_write_conflict_snapshot,
14571537
job_context_->GetJobSnapshotSequence(), job_context_->snapshot_checker,
14581538
env_, ShouldReportDetailedTime(env_, stats_), sub_compact->RangeDelAgg(),
1459-
blob_resources.blob_file_builder.get(), db_options_.allow_data_in_errors,
1539+
blob_file_builder.get(), db_options_.allow_data_in_errors,
14601540
db_options_.enforce_single_del_contracts, manual_compaction_canceled_,
14611541
sub_compact->compaction
14621542
->DoesInputReferenceBlobFiles() /* must_count_input_entries */,
@@ -1495,10 +1575,17 @@ Status CompactionJob::ProcessKeyValue(
14951575
SubcompactionState* sub_compact, ColumnFamilyData* cfd,
14961576
CompactionIterator* c_iter, const CompactionFileOpenFunc& open_file_func,
14971577
const CompactionFileCloseFunc& close_file_func, uint64_t& prev_cpu_micros) {
1498-
Status status;
1499-
const uint64_t kRecordStatsEvery = 1000;
1578+
// Cron interval for periodic operations: stats update, abort check,
1579+
// and sync points. Uses 1024 (power of 2) for efficient bitwise check.
1580+
const uint64_t kCronEveryMask = (1 << 10) - 1;
15001581
[[maybe_unused]] const std::optional<const Slice> end = sub_compact->end;
15011582

1583+
// Check for abort signal before starting key processing
1584+
if (compaction_aborted_.load(std::memory_order_acquire) > 0) {
1585+
return Status::Incomplete(Status::SubCode::kCompactionAborted);
1586+
}
1587+
1588+
Status status;
15021589
IterKey prev_iter_output_key;
15031590
ParsedInternalKey prev_iter_output_internal_key;
15041591

@@ -1511,8 +1598,16 @@ Status CompactionJob::ProcessKeyValue(
15111598
assert(!end.has_value() ||
15121599
cfd->user_comparator()->Compare(c_iter->user_key(), *end) < 0);
15131600

1514-
if (c_iter->iter_stats().num_input_records % kRecordStatsEvery ==
1515-
kRecordStatsEvery - 1) {
1601+
const uint64_t num_records = c_iter->iter_stats().num_input_records;
1602+
1603+
// Periodic cron operations: stats update, abort check.
1604+
if ((num_records & kCronEveryMask) == kCronEveryMask) {
1605+
// Check for abort signal periodically
1606+
if (compaction_aborted_.load(std::memory_order_acquire) > 0) {
1607+
status = Status::Incomplete(Status::SubCode::kCompactionAborted);
1608+
break;
1609+
}
1610+
15161611
UpdateSubcompactionJobStatsIncrementally(
15171612
c_iter, &sub_compact->compaction_job_stats,
15181613
db_options_.clock->CPUMicros(), prev_cpu_micros);
@@ -1719,6 +1814,7 @@ Status CompactionJob::FinalizeBlobFiles(SubcompactionState* sub_compact,
17191814
}
17201815

17211816
void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
1817+
TEST_SYNC_POINT("CompactionJob::ProcessKeyValueCompaction:Start");
17221818
assert(sub_compact);
17231819
assert(sub_compact->compaction);
17241820

@@ -1772,11 +1868,11 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
17721868
false /* internal key corruption is expected */,
17731869
job_context_->GetLatestSnapshotSequence(), job_context_->snapshot_checker,
17741870
compact_->compaction->level(), db_options_.stats);
1775-
BlobFileResources blob_resources;
1871+
std::unique_ptr<BlobFileBuilder> blob_file_builder;
17761872

17771873
auto c_iter =
17781874
CreateCompactionIterator(sub_compact, cfd, input_iter, compaction_filter,
1779-
merge, blob_resources, write_options);
1875+
merge, blob_file_builder, write_options);
17801876
assert(c_iter);
17811877
c_iter->SeekToFirst();
17821878

@@ -1794,9 +1890,8 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
17941890
status = FinalizeProcessKeyValueStatus(cfd, input_iter, c_iter.get(), status);
17951891

17961892
FinalizeSubcompaction(sub_compact, status, open_file_func, close_file_func,
1797-
blob_resources.blob_file_builder.get(), c_iter.get(),
1798-
input_iter, start_cpu_micros, prev_cpu_micros,
1799-
io_stats);
1893+
blob_file_builder.get(), c_iter.get(), input_iter,
1894+
start_cpu_micros, prev_cpu_micros, io_stats);
18001895

18011896
NotifyOnSubcompactionCompleted(sub_compact);
18021897
}
@@ -2295,6 +2390,10 @@ Status CompactionJob::OpenCompactionOutputFile(SubcompactionState* sub_compact,
22952390
Status s;
22962391
IOStatus io_s = NewWritableFile(fs_.get(), fname, &writable_file, fo_copy);
22972392
s = io_s;
2393+
if (io_s.ok()) {
2394+
// Track the SST file path for cleanup on abort.
2395+
outputs.AddOutputFilePath(fname);
2396+
}
22982397
if (sub_compact->io_status.ok()) {
22992398
sub_compact->io_status = io_s;
23002399
// Since this error is really a copy of the io_s that is checked below as s,

db/compaction/compaction_job.h

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,9 @@ class SubcompactionState;
142142

143143
class CompactionJob {
144144
public:
145+
// Constant false aborted flag, used for compaction service jobs
146+
static const std::atomic<int> kCompactionAbortedFalse;
147+
145148
CompactionJob(int job_id, Compaction* compaction,
146149
const ImmutableDBOptions& db_options,
147150
const MutableDBOptions& mutable_db_options,
@@ -157,6 +160,7 @@ class CompactionJob {
157160
Env::Priority thread_pri,
158161
const std::shared_ptr<IOTracer>& io_tracer,
159162
const std::atomic<bool>& manual_compaction_canceled,
163+
const std::atomic<int>& compaction_aborted,
160164
const std::string& db_id = "",
161165
const std::string& db_session_id = "",
162166
std::string full_history_ts_low = "", std::string trim_ts = "",
@@ -299,6 +303,7 @@ class CompactionJob {
299303
void RunSubcompactions();
300304
void UpdateTimingStats(uint64_t start_micros);
301305
void RemoveEmptyOutputs();
306+
void CleanupAbortedSubcompactions();
302307
bool HasNewBlobFiles() const;
303308
Status CollectSubcompactionErrors();
304309
Status SyncOutputDirectories();
@@ -363,11 +368,6 @@ class CompactionJob {
363368
std::unique_ptr<InternalIterator> trim_history_iter;
364369
};
365370

366-
struct BlobFileResources {
367-
std::vector<std::string> blob_file_paths;
368-
std::unique_ptr<BlobFileBuilder> blob_file_builder;
369-
};
370-
371371
bool ShouldUseLocalCompaction(SubcompactionState* sub_compact);
372372
CompactionIOStatsSnapshot InitializeIOStats();
373373
Status SetupAndValidateCompactionFilter(
@@ -382,14 +382,14 @@ class CompactionJob {
382382
SubcompactionState* sub_compact, ColumnFamilyData* cfd,
383383
SubcompactionInternalIterators& iterators,
384384
SubcompactionKeyBoundaries& boundaries, ReadOptions& read_options);
385-
void CreateBlobFileBuilder(SubcompactionState* sub_compact,
386-
ColumnFamilyData* cfd,
387-
BlobFileResources& blob_resources,
388-
const WriteOptions& write_options);
385+
void CreateBlobFileBuilder(
386+
SubcompactionState* sub_compact, ColumnFamilyData* cfd,
387+
std::unique_ptr<BlobFileBuilder>& blob_file_builder,
388+
const WriteOptions& write_options);
389389
std::unique_ptr<CompactionIterator> CreateCompactionIterator(
390390
SubcompactionState* sub_compact, ColumnFamilyData* cfd,
391391
InternalIterator* input_iter, const CompactionFilter* compaction_filter,
392-
MergeHelper& merge, BlobFileResources& blob_resources,
392+
MergeHelper& merge, std::unique_ptr<BlobFileBuilder>& blob_file_builder,
393393
const WriteOptions& write_options);
394394
std::pair<CompactionFileOpenFunc, CompactionFileCloseFunc> CreateFileHandlers(
395395
SubcompactionState* sub_compact, SubcompactionKeyBoundaries& boundaries);
@@ -461,6 +461,7 @@ class CompactionJob {
461461
VersionSet* versions_;
462462
const std::atomic<bool>* shutting_down_;
463463
const std::atomic<bool>& manual_compaction_canceled_;
464+
const std::atomic<int>& compaction_aborted_;
464465
FSDirectory* db_directory_;
465466
FSDirectory* blob_output_directory_;
466467
InstrumentedMutex* db_mutex_;

0 commit comments

Comments
 (0)