From 92ef881c8f5822d1032f40152776c7bd8e27b630 Mon Sep 17 00:00:00 2001 From: Xingbo Wang Date: Mon, 29 Dec 2025 13:39:01 -0800 Subject: [PATCH 1/7] Support abort and resume background compaction Summary: This adds a new public API to allow applications to abort all running compactions and prevent new ones from starting. Unlike DisableManualCompaction() which only pauses manual compactions and waits for them to finish naturally, AbortAllCompactions() actively signals running compactions (both automatic and manual) to terminate early and waits for them to complete before returning. The abort signal is checked periodically during compaction (every 100 keys), so ongoing compactions abort quickly. Any output files from aborted compactions are automatically cleaned up to prevent partial results from being installed. This is useful for scenarios where applications need to quickly stop all compaction activity, such as during graceful shutdown or when performing maintenance operations. This also adds a new public API to resume compactions after the call to abort. Limitation: compaction service is not support. Test Plan: - Unit tests in db_compaction_abort_test.cc cover various abort scenarios including: abort before/during compaction, abort with multiple subcompactions, nested abort/resume calls, abort with CompactFiles API, abort across multiple column families, and timing guarantees - Updated compaction_job_test.cc to include the new parameter - Stress test --- BUCK | 6 + CMakeLists.txt | 1 + Makefile | 3 + db/c.cc | 8 + db/c_test.c | 2 +- db/compaction/compaction_job.cc | 145 ++- db/compaction/compaction_job.h | 21 +- db/compaction/compaction_job_test.cc | 7 +- db/compaction/compaction_outputs.h | 19 + db/compaction/compaction_service_job.cc | 29 +- db/compaction/subcompaction_state.h | 8 + db/db_compaction_abort_test.cc | 998 ++++++++++++++++++ db/db_impl/db_impl.h | 10 + db/db_impl/db_impl_compaction_flush.cc | 111 +- db/db_test.cc | 2 + db/internal_stats.cc | 13 + db/internal_stats.h | 2 + db_stress_tool/db_stress_common.h | 1 + db_stress_tool/db_stress_gflags.cc | 4 + db_stress_tool/db_stress_test_base.cc | 19 + db_stress_tool/db_stress_test_base.h | 2 + include/rocksdb/db.h | 44 + include/rocksdb/listener.h | 3 + include/rocksdb/statistics.h | 2 + include/rocksdb/status.h | 8 + include/rocksdb/utilities/stackable_db.h | 2 + java/rocksjni/rocksjni.cc | 22 + java/src/main/java/org/rocksdb/RocksDB.java | 19 + monitoring/statistics.cc | 1 + monitoring/stats_history_test.cc | 2 +- src.mk | 1 + .../abort_compaction_apis.md | 1 + util/status.cc | 4 +- 33 files changed, 1463 insertions(+), 57 deletions(-) create mode 100644 db/db_compaction_abort_test.cc create mode 100644 unreleased_history/public_api_changes/abort_compaction_apis.md diff --git a/BUCK b/BUCK index 8fa8f35d3d9e..7037c44e778f 100644 --- a/BUCK +++ b/BUCK @@ -4823,6 +4823,12 @@ cpp_unittest_wrapper(name="db_clip_test", extra_compiler_flags=[]) +cpp_unittest_wrapper(name="db_compaction_abort_test", + srcs=["db/db_compaction_abort_test.cc"], + deps=[":rocksdb_test_lib"], + extra_compiler_flags=[]) + + cpp_unittest_wrapper(name="db_compaction_filter_test", srcs=["db/db_compaction_filter_test.cc"], deps=[":rocksdb_test_lib"], diff --git a/CMakeLists.txt b/CMakeLists.txt index 03837b672ac4..518b1205b169 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1377,6 +1377,7 @@ if(WITH_TESTS) db/db_bloom_filter_test.cc db/db_compaction_filter_test.cc db/db_compaction_test.cc + db/db_compaction_abort_test.cc db/db_clip_test.cc db/db_dynamic_level_test.cc db/db_encryption_test.cc diff --git a/Makefile b/Makefile index 4f62ad5b576e..b2c3a8f6b741 100644 --- a/Makefile +++ b/Makefile @@ -1442,6 +1442,9 @@ db_compaction_filter_test: $(OBJ_DIR)/db/db_compaction_filter_test.o $(TEST_LIBR db_compaction_test: $(OBJ_DIR)/db/db_compaction_test.o $(TEST_LIBRARY) $(LIBRARY) $(AM_LINK) +db_compaction_abort_test: $(OBJ_DIR)/db/db_compaction_abort_test.o $(TEST_LIBRARY) $(LIBRARY) + $(AM_LINK) + db_clip_test: $(OBJ_DIR)/db/db_clip_test.o $(TEST_LIBRARY) $(LIBRARY) $(AM_LINK) diff --git a/db/c.cc b/db/c.cc index 7abab13a6fda..dae0d0ebb569 100644 --- a/db/c.cc +++ b/db/c.cc @@ -8570,6 +8570,14 @@ void rocksdb_enable_manual_compaction(rocksdb_t* db) { db->rep->EnableManualCompaction(); } +void rocksdb_abort_all_compactions(rocksdb_t* db) { + db->rep->AbortAllCompactions(); +} + +void rocksdb_resume_all_compactions(rocksdb_t* db) { + db->rep->ResumeAllCompactions(); +} + rocksdb_statistics_histogram_data_t* rocksdb_statistics_histogram_data_create() { return new rocksdb_statistics_histogram_data_t{}; diff --git a/db/c_test.c b/db/c_test.c index 7f05dd2ab4b2..6811fe4ae8cb 100644 --- a/db/c_test.c +++ b/db/c_test.c @@ -4447,7 +4447,7 @@ int main(int argc, char** argv) { StartPhase("statistics"); { - const uint32_t BYTES_WRITTEN_TICKER = 60; + const uint32_t BYTES_WRITTEN_TICKER = 61; const uint32_t DB_WRITE_HIST = 1; rocksdb_statistics_histogram_data_t* hist = diff --git a/db/compaction/compaction_job.cc b/db/compaction/compaction_job.cc index 3d51f8fd5410..f76e9c8de244 100644 --- a/db/compaction/compaction_job.cc +++ b/db/compaction/compaction_job.cc @@ -128,6 +128,10 @@ const char* GetCompactionProximalOutputRangeTypeString( } } +// Static constant for compaction abort flag - always false, used for +// compaction service jobs that don't support abort signaling +const std::atomic CompactionJob::kCompactionAbortedFalse{0}; + CompactionJob::CompactionJob( int job_id, Compaction* compaction, const ImmutableDBOptions& db_options, const MutableDBOptions& mutable_db_options, const FileOptions& file_options, @@ -141,10 +145,10 @@ CompactionJob::CompactionJob( CompactionJobStats* compaction_job_stats, Env::Priority thread_pri, const std::shared_ptr& io_tracer, const std::atomic& manual_compaction_canceled, - const std::string& db_id, const std::string& db_session_id, - std::string full_history_ts_low, std::string trim_ts, - BlobFileCompletionCallback* blob_callback, int* bg_compaction_scheduled, - int* bg_bottom_compaction_scheduled) + const std::atomic& compaction_aborted, const std::string& db_id, + const std::string& db_session_id, std::string full_history_ts_low, + std::string trim_ts, BlobFileCompletionCallback* blob_callback, + int* bg_compaction_scheduled, int* bg_bottom_compaction_scheduled) : compact_(new CompactionState(compaction)), internal_stats_(compaction->compaction_reason(), 1), db_options_(db_options), @@ -168,6 +172,7 @@ CompactionJob::CompactionJob( versions_(versions), shutting_down_(shutting_down), manual_compaction_canceled_(manual_compaction_canceled), + compaction_aborted_(compaction_aborted), db_directory_(db_directory), blob_output_directory_(blob_output_directory), db_mutex_(db_mutex), @@ -708,6 +713,7 @@ void CompactionJob::InitializeCompactionRun() { } void CompactionJob::RunSubcompactions() { + TEST_SYNC_POINT("CompactionJob::RunSubcompactions:BeforeStart"); const size_t num_threads = compact_->sub_compact_states.size(); assert(num_threads > 0); compact_->compaction->GetOrInitInputTableProperties(); @@ -753,6 +759,71 @@ void CompactionJob::RemoveEmptyOutputs() { } } +void CompactionJob::CleanupAbortedSubcompactions() { + ColumnFamilyData* cfd = compact_->compaction->column_family_data(); + + uint64_t total_sst_files_deleted = 0; + uint64_t total_blob_files_deleted = 0; + + // Track the first file deletion error to report at the end + Status first_error; + int deletion_errors = 0; + + // Mark all subcompactions as aborted and delete their output files + for (auto& sub_compact : compact_->sub_compact_states) { + // Mark this subcompaction as aborted + sub_compact.status = + Status::Incomplete(Status::SubCode::kCompactionAborted); + + // Delete all files (SST and blob) tracked during compaction. + // GetOutputFilePaths() contains ALL file paths created, including + // in-progress files that may have been removed from outputs_ or + // blob_file_additions_. + for (const bool is_proximal_level : {false, true}) { + if (is_proximal_level && + !compact_->compaction->SupportsPerKeyPlacement()) { + continue; + } + for (const std::string& file_path : + sub_compact.Outputs(is_proximal_level)->GetOutputFilePaths()) { + Status s = env_->DeleteFile(file_path); + if (s.ok()) { + // Count SST vs blob files by checking extension + if (file_path.find(".sst") != std::string::npos) { + total_sst_files_deleted++; + } else if (file_path.find(".blob") != std::string::npos) { + total_blob_files_deleted++; + } + } else if (!s.IsNotFound()) { + if (first_error.ok()) { + first_error = s; + } + deletion_errors++; + } + } + } + sub_compact.CleanupOutputs(); + } + + if (stats_) { + RecordTick(stats_, COMPACTION_ABORTED); + } + + ROCKS_LOG_INFO(db_options_.info_log, + "[%s] [JOB %d] Compaction aborted: deleted %" PRIu64 + " SST files and %" PRIu64 " blob files", + cfd->GetName().c_str(), job_id_, total_sst_files_deleted, + total_blob_files_deleted); + + if (!first_error.ok()) { + ROCKS_LOG_ERROR(db_options_.info_log, + "[%s] [JOB %d] Cleanup completed with %d file deletion " + "errors. First error: %s", + cfd->GetName().c_str(), job_id_, deletion_errors, + first_error.ToString().c_str()); + } +} + bool CompactionJob::HasNewBlobFiles() const { for (const auto& state : compact_->sub_compact_states) { if (state.Current().HasBlobFileAdditions()) { @@ -1004,6 +1075,15 @@ Status CompactionJob::Run() { Status status = CollectSubcompactionErrors(); + // If compaction was aborted or manually paused, clean up any output files + // from completed subcompactions to prevent orphaned files on disk. + // Skip cleanup for resumable compaction (when progress writer is set) + // because the output files are needed for resumption. + if ((status.IsCompactionAborted() || status.IsManualCompactionPaused()) && + compaction_progress_writer_ == nullptr) { + CleanupAbortedSubcompactions(); + } + if (status.ok()) { status = SyncOutputDirectories(); } @@ -1415,10 +1495,10 @@ InternalIterator* CompactionJob::CreateInputIterator( return input; } -void CompactionJob::CreateBlobFileBuilder(SubcompactionState* sub_compact, - ColumnFamilyData* cfd, - BlobFileResources& blob_resources, - const WriteOptions& write_options) { +void CompactionJob::CreateBlobFileBuilder( + SubcompactionState* sub_compact, ColumnFamilyData* cfd, + std::unique_ptr& blob_file_builder, + const WriteOptions& write_options) { const auto& mutable_cf_options = sub_compact->compaction->mutable_cf_options(); @@ -1427,24 +1507,24 @@ void CompactionJob::CreateBlobFileBuilder(SubcompactionState* sub_compact, if (mutable_cf_options.enable_blob_files && sub_compact->compaction->output_level() >= mutable_cf_options.blob_file_starting_level) { - blob_resources.blob_file_builder = std::make_unique( + blob_file_builder = std::make_unique( versions_, fs_.get(), &sub_compact->compaction->immutable_options(), &mutable_cf_options, &file_options_, &write_options, db_id_, db_session_id_, job_id_, cfd->GetID(), cfd->GetName(), write_hint_, io_tracer_, blob_callback_, BlobFileCreationReason::kCompaction, - &blob_resources.blob_file_paths, + sub_compact->Current().GetOutputFilePathsPtr(), sub_compact->Current().GetBlobFileAdditionsPtr()); } else { - blob_resources.blob_file_builder = nullptr; + blob_file_builder = nullptr; } } std::unique_ptr CompactionJob::CreateCompactionIterator( SubcompactionState* sub_compact, ColumnFamilyData* cfd, InternalIterator* input, const CompactionFilter* compaction_filter, - MergeHelper& merge, BlobFileResources& blob_resources, + MergeHelper& merge, std::unique_ptr& blob_file_builder, const WriteOptions& write_options) { - CreateBlobFileBuilder(sub_compact, cfd, blob_resources, write_options); + CreateBlobFileBuilder(sub_compact, cfd, blob_file_builder, write_options); const std::string* const full_history_ts_low = full_history_ts_low_.empty() ? nullptr : &full_history_ts_low_; @@ -1456,7 +1536,7 @@ std::unique_ptr CompactionJob::CreateCompactionIterator( job_context_->earliest_write_conflict_snapshot, job_context_->GetJobSnapshotSequence(), job_context_->snapshot_checker, env_, ShouldReportDetailedTime(env_, stats_), sub_compact->RangeDelAgg(), - blob_resources.blob_file_builder.get(), db_options_.allow_data_in_errors, + blob_file_builder.get(), db_options_.allow_data_in_errors, db_options_.enforce_single_del_contracts, manual_compaction_canceled_, sub_compact->compaction ->DoesInputReferenceBlobFiles() /* must_count_input_entries */, @@ -1495,10 +1575,17 @@ Status CompactionJob::ProcessKeyValue( SubcompactionState* sub_compact, ColumnFamilyData* cfd, CompactionIterator* c_iter, const CompactionFileOpenFunc& open_file_func, const CompactionFileCloseFunc& close_file_func, uint64_t& prev_cpu_micros) { - Status status; - const uint64_t kRecordStatsEvery = 1000; + // Cron interval for periodic operations: stats update, abort check, + // and sync points. Uses 1000 to maintain responsive abort checking. + const uint64_t kCronEvery = 1000; [[maybe_unused]] const std::optional end = sub_compact->end; + // Check for abort signal before starting key processing + if (compaction_aborted_.load(std::memory_order_acquire) > 0) { + return Status::Incomplete(Status::SubCode::kCompactionAborted); + } + + Status status; IterKey prev_iter_output_key; ParsedInternalKey prev_iter_output_internal_key; @@ -1511,8 +1598,16 @@ Status CompactionJob::ProcessKeyValue( assert(!end.has_value() || cfd->user_comparator()->Compare(c_iter->user_key(), *end) < 0); - if (c_iter->iter_stats().num_input_records % kRecordStatsEvery == - kRecordStatsEvery - 1) { + const uint64_t num_records = c_iter->iter_stats().num_input_records; + + // Periodic cron operations: stats update, abort check, and sync points + if (num_records % kCronEvery == kCronEvery - 1) { + // Check for abort signal periodically + if (compaction_aborted_.load(std::memory_order_acquire) > 0) { + status = Status::Incomplete(Status::SubCode::kCompactionAborted); + break; + } + UpdateSubcompactionJobStatsIncrementally( c_iter, &sub_compact->compaction_job_stats, db_options_.clock->CPUMicros(), prev_cpu_micros); @@ -1719,6 +1814,7 @@ Status CompactionJob::FinalizeBlobFiles(SubcompactionState* sub_compact, } void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { + TEST_SYNC_POINT("CompactionJob::ProcessKeyValueCompaction:Start"); assert(sub_compact); assert(sub_compact->compaction); @@ -1772,11 +1868,11 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { false /* internal key corruption is expected */, job_context_->GetLatestSnapshotSequence(), job_context_->snapshot_checker, compact_->compaction->level(), db_options_.stats); - BlobFileResources blob_resources; + std::unique_ptr blob_file_builder; auto c_iter = CreateCompactionIterator(sub_compact, cfd, input_iter, compaction_filter, - merge, blob_resources, write_options); + merge, blob_file_builder, write_options); assert(c_iter); c_iter->SeekToFirst(); @@ -1794,9 +1890,8 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { status = FinalizeProcessKeyValueStatus(cfd, input_iter, c_iter.get(), status); FinalizeSubcompaction(sub_compact, status, open_file_func, close_file_func, - blob_resources.blob_file_builder.get(), c_iter.get(), - input_iter, start_cpu_micros, prev_cpu_micros, - io_stats); + blob_file_builder.get(), c_iter.get(), input_iter, + start_cpu_micros, prev_cpu_micros, io_stats); NotifyOnSubcompactionCompleted(sub_compact); } @@ -2295,6 +2390,10 @@ Status CompactionJob::OpenCompactionOutputFile(SubcompactionState* sub_compact, Status s; IOStatus io_s = NewWritableFile(fs_.get(), fname, &writable_file, fo_copy); s = io_s; + if (io_s.ok()) { + // Track the SST file path for cleanup on abort. + outputs.AddOutputFilePath(fname); + } if (sub_compact->io_status.ok()) { sub_compact->io_status = io_s; // Since this error is really a copy of the io_s that is checked below as s, diff --git a/db/compaction/compaction_job.h b/db/compaction/compaction_job.h index c9dac611cd6f..8b942c6fe64d 100644 --- a/db/compaction/compaction_job.h +++ b/db/compaction/compaction_job.h @@ -142,6 +142,9 @@ class SubcompactionState; class CompactionJob { public: + // Constant false aborted flag, used for compaction service jobs + static const std::atomic kCompactionAbortedFalse; + CompactionJob(int job_id, Compaction* compaction, const ImmutableDBOptions& db_options, const MutableDBOptions& mutable_db_options, @@ -157,6 +160,7 @@ class CompactionJob { Env::Priority thread_pri, const std::shared_ptr& io_tracer, const std::atomic& manual_compaction_canceled, + const std::atomic& compaction_aborted, const std::string& db_id = "", const std::string& db_session_id = "", std::string full_history_ts_low = "", std::string trim_ts = "", @@ -299,6 +303,7 @@ class CompactionJob { void RunSubcompactions(); void UpdateTimingStats(uint64_t start_micros); void RemoveEmptyOutputs(); + void CleanupAbortedSubcompactions(); bool HasNewBlobFiles() const; Status CollectSubcompactionErrors(); Status SyncOutputDirectories(); @@ -363,11 +368,6 @@ class CompactionJob { std::unique_ptr trim_history_iter; }; - struct BlobFileResources { - std::vector blob_file_paths; - std::unique_ptr blob_file_builder; - }; - bool ShouldUseLocalCompaction(SubcompactionState* sub_compact); CompactionIOStatsSnapshot InitializeIOStats(); Status SetupAndValidateCompactionFilter( @@ -382,14 +382,14 @@ class CompactionJob { SubcompactionState* sub_compact, ColumnFamilyData* cfd, SubcompactionInternalIterators& iterators, SubcompactionKeyBoundaries& boundaries, ReadOptions& read_options); - void CreateBlobFileBuilder(SubcompactionState* sub_compact, - ColumnFamilyData* cfd, - BlobFileResources& blob_resources, - const WriteOptions& write_options); + void CreateBlobFileBuilder( + SubcompactionState* sub_compact, ColumnFamilyData* cfd, + std::unique_ptr& blob_file_builder, + const WriteOptions& write_options); std::unique_ptr CreateCompactionIterator( SubcompactionState* sub_compact, ColumnFamilyData* cfd, InternalIterator* input_iter, const CompactionFilter* compaction_filter, - MergeHelper& merge, BlobFileResources& blob_resources, + MergeHelper& merge, std::unique_ptr& blob_file_builder, const WriteOptions& write_options); std::pair CreateFileHandlers( SubcompactionState* sub_compact, SubcompactionKeyBoundaries& boundaries); @@ -461,6 +461,7 @@ class CompactionJob { VersionSet* versions_; const std::atomic* shutting_down_; const std::atomic& manual_compaction_canceled_; + const std::atomic& compaction_aborted_; FSDirectory* db_directory_; FSDirectory* blob_output_directory_; InstrumentedMutex* db_mutex_; diff --git a/db/compaction/compaction_job_test.cc b/db/compaction/compaction_job_test.cc index 95d74be4d485..ce55dfe4f8ee 100644 --- a/db/compaction/compaction_job_test.cc +++ b/db/compaction/compaction_job_test.cc @@ -676,8 +676,8 @@ class CompactionJobTestBase : public testing::Test { &event_logger, false, false, dbname_, &compaction_job_stats_, Env::Priority::USER, nullptr /* IOTracer */, /*manual_compaction_canceled=*/kManualCompactionCanceledFalse, - env_->GenerateUniqueId(), DBImpl::GenerateDbSessionId(nullptr), - full_history_ts_low_); + CompactionJob::kCompactionAbortedFalse, env_->GenerateUniqueId(), + DBImpl::GenerateDbSessionId(nullptr), full_history_ts_low_); VerifyInitializationOfCompactionJobStats(compaction_job_stats_); compaction_job.Prepare(std::nullopt /*subcompact to be computed*/); @@ -2545,7 +2545,8 @@ class ResumableCompactionJobTest : public CompactionJobTestBase { versions_.get(), &shutting_down_, &log_buffer, nullptr, nullptr, nullptr, stats.get(), &mutex_, &error_handler_, &job_context, table_cache_, &event_logger, false, false, dbname_, &job_stats, - Env::Priority::USER, nullptr, cancel_, env_->GenerateUniqueId(), + Env::Priority::USER, nullptr, cancel_, + CompactionJob::kCompactionAbortedFalse, env_->GenerateUniqueId(), DBImpl::GenerateDbSessionId(nullptr), ""); compaction_job.Prepare(std::nullopt, compaction_progress, diff --git a/db/compaction/compaction_outputs.h b/db/compaction/compaction_outputs.h index 6f9de28efcfd..757e1b6b85ed 100644 --- a/db/compaction/compaction_outputs.h +++ b/db/compaction/compaction_outputs.h @@ -84,6 +84,19 @@ class CompactionOutputs { bool HasBlobFileAdditions() const { return !blob_file_additions_.empty(); } + // Get all file paths (SST and blob) created during compaction. + const std::vector& GetOutputFilePaths() const { + return output_file_paths_; + } + + std::vector* GetOutputFilePathsPtr() { + return &output_file_paths_; + } + + void AddOutputFilePath(const std::string& path) { + output_file_paths_.push_back(path); + } + BlobGarbageMeter* CreateBlobGarbageMeter() { assert(!is_proximal_level_); blob_garbage_meter_ = std::make_unique(); @@ -321,6 +334,12 @@ class CompactionOutputs { std::vector blob_file_additions_; std::unique_ptr blob_garbage_meter_; + // All file paths (SST and blob) created during compaction. + // Used for cleanup on abort - ensures orphan files are deleted even if + // they were removed from outputs_ or blob_file_additions_ (e.g., by + // RemoveLastEmptyOutput when file_size is 0 because builder was abandoned). + std::vector output_file_paths_; + // Per level's output stat InternalStats::CompactionStats stats_; diff --git a/db/compaction/compaction_service_job.cc b/db/compaction/compaction_service_job.cc index d9eea538193f..cb88c53d8f8d 100644 --- a/db/compaction/compaction_service_job.cc +++ b/db/compaction/compaction_service_job.cc @@ -117,6 +117,14 @@ CompactionJob::ProcessKeyValueCompactionWithCompactionService( std::string debug_str_before_wait = compaction->input_version()->DebugString(/*hex=*/true); + // TODO: Update CompactionService API to support abort and resume + // functionality. Currently, remote compaction jobs cannot be aborted via + // AbortAllCompactions() because the CompactionService interface lacks methods + // to signal abort to remote workers and to properly resume after an abort. + // The API needs to be extended with: + // - A method to signal abort to running remote compaction jobs + // - A method to resume/re-enable scheduling after an abort is lifted + ROCKS_LOG_INFO(db_options_.info_log, "[%s] [JOB %d] Waiting for remote compaction...", compaction->column_family_data()->GetName().c_str(), job_id_); @@ -312,16 +320,17 @@ CompactionServiceCompactionJob::CompactionServiceCompactionJob( std::string output_path, const CompactionServiceInput& compaction_service_input, CompactionServiceResult* compaction_service_result) - : CompactionJob(job_id, compaction, db_options, mutable_db_options, - file_options, versions, shutting_down, log_buffer, nullptr, - output_directory, nullptr, stats, db_mutex, - db_error_handler, job_context, std::move(table_cache), - event_logger, - compaction->mutable_cf_options().paranoid_file_checks, - compaction->mutable_cf_options().report_bg_io_stats, dbname, - &(compaction_service_result->stats), Env::Priority::USER, - io_tracer, manual_compaction_canceled, db_id, db_session_id, - compaction->column_family_data()->GetFullHistoryTsLow()), + : CompactionJob( + job_id, compaction, db_options, mutable_db_options, file_options, + versions, shutting_down, log_buffer, nullptr, output_directory, + nullptr, stats, db_mutex, db_error_handler, job_context, + std::move(table_cache), event_logger, + compaction->mutable_cf_options().paranoid_file_checks, + compaction->mutable_cf_options().report_bg_io_stats, dbname, + &(compaction_service_result->stats), Env::Priority::USER, io_tracer, + manual_compaction_canceled, CompactionJob::kCompactionAbortedFalse, + db_id, db_session_id, + compaction->column_family_data()->GetFullHistoryTsLow()), output_path_(std::move(output_path)), compaction_input_(compaction_service_input), compaction_result_(compaction_service_result) {} diff --git a/db/compaction/subcompaction_state.h b/db/compaction/subcompaction_state.h index 09af46540ca9..38785f9ae085 100644 --- a/db/compaction/subcompaction_state.h +++ b/db/compaction/subcompaction_state.h @@ -95,6 +95,14 @@ class SubcompactionState { proximal_level_outputs_.RemoveLastEmptyOutput(); } + // Cleanup output builders for abandoning in-progress files. + void CleanupOutputs() { + compaction_outputs_.Cleanup(); + if (compaction->SupportsPerKeyPlacement()) { + proximal_level_outputs_.Cleanup(); + } + } + void BuildSubcompactionJobInfo( SubcompactionJobInfo& subcompaction_job_info) const { const Compaction* c = compaction; diff --git a/db/db_compaction_abort_test.cc b/db/db_compaction_abort_test.cc new file mode 100644 index 000000000000..d8b7a8511620 --- /dev/null +++ b/db/db_compaction_abort_test.cc @@ -0,0 +1,998 @@ +// Copyright (c) Meta Platforms, Inc. and affiliates. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include +#include +#include +#include + +#include "db/compaction/compaction_job.h" +#include "db/db_impl/db_impl_secondary.h" +#include "db/db_test_util.h" +#include "options/options_helper.h" +#include "port/stack_trace.h" +#include "rocksdb/db.h" +#include "rocksdb/sst_file_writer.h" +#include "test_util/sync_point.h" +#include "test_util/testharness.h" + +namespace ROCKSDB_NAMESPACE { + +// Helper class to manage abort synchronization in tests. +// +// Compaction abort could happen at various stage of compaction. +// To test this, we need to trigger abort at different stage. This requires +// precise control on the timing of abort API invocation. To achieve this in a +// consistent way across various tests, we invoke AbortAllCompactions() within +// the sync point callback, that is added at various stages of compaction. +// However as the abort API is a blocking call, calling it within the sync point +// callback on the compaction thread would cause deadlock. This test helper +// class is designed to solve this challenge. +// +// 1. Abort must happen from a different thread: +// AbortAllCompactions() is typically called from the compaction thread +// via a sync point callback, so that we could precisely control the time of +// API invocation to simulate abort at different stage of compaction. +// However, we can't block the compaction thread waiting for the abort to +// complete - the compaction needs to continue executing to actually check +// the abort flag and exit. So we spawn a separate thread to call +// AbortAllCompactions(). +// +// 2. We need to know when abort completes: +// After compaction returns (with aborted status), we often need to: +// - Verify state (e.g., no output files created) +// - Call ResumeAllCompactions() +// - Run compaction again to verify it succeeds +// We must wait for the abort thread to finish before proceeding, otherwise +// we might call Resume before Abort completes, causing race conditions. +// +// 3. Sync point callbacks may fire multiple times: +// With multiple subcompactions, a callback like +// "CompactionJob::ProcessKeyValueCompaction:Start" fires once per +// subcompaction. We only want to trigger abort once, so we use +// abort_triggered_ as a guard. +// +// 4. Tests may need multiple abort cycles: +// Some tests (e.g., MultipleAbortResumeSequence) do abort->resume->abort +// multiple times. The class supports this by auto-resetting when a +// previous abort has completed. +class AbortSynchronizer { + public: + AbortSynchronizer() : abort_cv_(&abort_mutex_) {} + + ~AbortSynchronizer() { + // Join the thread if it was started - ensures clean shutdown + if (abort_thread_.joinable()) { + abort_thread_.join(); + } + } + + // Non-copyable, non-movable due to thread member + AbortSynchronizer(const AbortSynchronizer&) = delete; + AbortSynchronizer& operator=(const AbortSynchronizer&) = delete; + + // Trigger abort from a separate thread. + // - Safe to call multiple times; only first call in each cycle spawns thread + // - If a previous abort has completed, automatically resets state first + // - The spawned thread calls AbortAllCompactions() and signals completion + void TriggerAbort(DBImpl* db) { + // If previous abort completed, reset state to allow new abort + if (abort_triggered_.load() && abort_completed_.load()) { + Reset(); + } + + if (!abort_triggered_.exchange(true)) { + abort_thread_ = std::thread([this, db]() { + db->AbortAllCompactions(); + SignalAbortCompleted(); + }); + } + } + + // Wait for the abort thread to complete. + // Call this AFTER compaction returns to ensure the abort thread has finished + // before proceeding with Resume or other operations. + void WaitForAbortCompletion() { + MutexLock l(&abort_mutex_); + while (!abort_completed_.load()) { + abort_cv_.Wait(); + } + } + + // Reset state for reuse. Joins any previous thread first. + // Called automatically by TriggerAbort() if previous abort completed, + // but can also be called explicitly for clarity. + void Reset() { + if (abort_thread_.joinable()) { + abort_thread_.join(); + } + abort_triggered_.store(false); + abort_completed_.store(false); + } + + bool IsAbortTriggered() const { return abort_triggered_.load(); } + + private: + void SignalAbortCompleted() { + MutexLock l(&abort_mutex_); + abort_completed_.store(true); + abort_cv_.SignalAll(); + } + + std::atomic abort_triggered_{false}; // Guards against multiple spawns + std::atomic abort_completed_{false}; // Signals thread completion + port::Mutex abort_mutex_; + port::CondVar abort_cv_; + std::thread abort_thread_; // The thread that calls AbortAllCompactions() +}; + +// Helper to clean up SyncPoint state after tests +inline void CleanupSyncPoints() { + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks(); +} + +// Helper class that combines AbortSynchronizer with sync point setup for +// deterministic abort triggering. This adds sync point coordination on top +// of AbortSynchronizer: +// +// This is useful when you need deterministic timing - the callback won't +// return until AbortAllCompactions() has actually set the abort flag, +// guaranteeing the compaction will see it on the next check. +class SyncPointAbortHelper { + public: + explicit SyncPointAbortHelper(const std::string& trigger_point) + : trigger_point_(trigger_point) {} + + // Set up sync points and callbacks. Call this before starting compaction. + void Setup(DBImpl* db_impl) { + db_impl_ = db_impl; + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({ + {"DBImpl::AbortAllCompactions:FlagSet", kWaitPointName}, + }); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + trigger_point_, [this](void* /*arg*/) { + // Use AbortSynchronizer to handle the abort in a separate thread + abort_sync_.TriggerAbort(db_impl_); + + // Wait for abort flag to be set via sync point dependency + // This ensures deterministic timing - compaction will see the flag + TEST_SYNC_POINT_CALLBACK(kWaitPointName, nullptr); + }); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + } + + // Wait for the abort to complete. Call this after compaction returns. + void WaitForAbortCompletion() { abort_sync_.WaitForAbortCompletion(); } + + // Clean up sync points and wait for abort completion in one call + void CleanupAndWait() { + CleanupSyncPoints(); + WaitForAbortCompletion(); + } + + private: + static constexpr const char* kWaitPointName = + "SyncPointAbortHelper::WaitForAbort"; + std::string trigger_point_; + DBImpl* db_impl_{nullptr}; + AbortSynchronizer abort_sync_; +}; + +class DBCompactionAbortTest : public DBTestBase { + public: + DBCompactionAbortTest() + : DBTestBase("db_compaction_abort_test", /*env_do_fsync=*/false) {} + + protected: + // Map to track the latest value of each key for verification + std::unordered_map expected_values_; + + // Statistics object for verifying compaction metrics + std::shared_ptr stats_; + + // Get current options with statistics enabled + Options GetOptionsWithStats() { + Options options = CurrentOptions(); + stats_ = CreateDBStatistics(); + options.statistics = stats_; + return options; + } + + // Populate database with test data. + // If overlapping=true, uses the same key range (0 to keys_per_file-1) in each + // file to ensure compaction has work to do. + // If overlapping=false, uses non-overlapping keys across files. + void PopulateData(int num_files, int keys_per_file, int value_size, + bool overlapping = true, int seed = 301) { + Random rnd(seed); + for (int i = 0; i < num_files; ++i) { + for (int j = 0; j < keys_per_file; ++j) { + int key_index = overlapping ? j : (j + i * keys_per_file); + std::string key = Key(key_index); + std::string value = rnd.RandomString(value_size); + ASSERT_OK(Put(key, value)); + expected_values_[key] = value; + } + ASSERT_OK(Flush()); + } + } + + // Verify data integrity by reading all keys and comparing with expected + // values + void VerifyDataIntegrity(int num_keys, int start_key = 0) { + std::string val; + for (int j = start_key; j < start_key + num_keys; ++j) { + std::string key = Key(j); + ASSERT_OK(dbfull()->Get(ReadOptions(), key, &val)); + auto it = expected_values_.find(key); + if (it != expected_values_.end()) { + ASSERT_EQ(it->second, val) << "Value mismatch for key: " << key; + } + } + } + + // Clear expected values (useful when reopening DB or between tests) + void ClearExpectedValues() { expected_values_.clear(); } + + // Run the common abort test pattern with SyncPointAbortHelper: + // 1. Set up sync point abort helper + // 2. Run compaction and verify it's aborted + // 3. Verify COMPACTION_ABORTED stat increased (if stats enabled) + // 4. Clean up, resume, and verify compaction succeeds + // 5. Verify COMPACT_WRITE_BYTES increased (if stats enabled) + void RunSyncPointAbortTest(const std::string& trigger_point, + CompactRangeOptions cro = CompactRangeOptions()) { + // Capture stats and file counts before abort + uint64_t aborted_before = 0; + uint64_t write_bytes_before = 0; + if (stats_) { + aborted_before = stats_->getTickerCount(COMPACTION_ABORTED); + write_bytes_before = stats_->getTickerCount(COMPACT_WRITE_BYTES); + } + + SyncPointAbortHelper helper(trigger_point); + helper.Setup(dbfull()); + + Status s = dbfull()->CompactRange(cro, nullptr, nullptr); + ASSERT_TRUE(s.IsIncomplete()); + ASSERT_TRUE(s.IsCompactionAborted()); + + // Verify abort was counted + if (stats_) { + uint64_t aborted_after = stats_->getTickerCount(COMPACTION_ABORTED); + ASSERT_GT(aborted_after, aborted_before) + << "COMPACTION_ABORTED stat should increase after abort"; + } + + helper.CleanupAndWait(); + dbfull()->ResumeAllCompactions(); + + ASSERT_OK(dbfull()->CompactRange(cro, nullptr, nullptr)); + + // Verify compaction completed and wrote bytes + if (stats_) { + uint64_t write_bytes_after = stats_->getTickerCount(COMPACT_WRITE_BYTES); + ASSERT_GT(write_bytes_after, write_bytes_before) + << "COMPACT_WRITE_BYTES should increase after successful compaction"; + } + } +}; + +// Parameterized test for abort with different subcompaction configurations +// This consolidates tests that were essentially duplicates with different +// max_subcompactions values +class DBCompactionAbortSubcompactionTest + : public DBCompactionAbortTest, + public ::testing::WithParamInterface {}; + +TEST_P(DBCompactionAbortSubcompactionTest, AbortWithVaryingSubcompactions) { + int max_subcompactions = GetParam(); + + Options options = GetOptionsWithStats(); + options.level0_file_num_compaction_trigger = 4; + options.max_subcompactions = max_subcompactions; + options.disable_auto_compactions = true; + Reopen(options); + + PopulateData(/*num_files=*/4, /*keys_per_file=*/100, /*value_size=*/100); + + RunSyncPointAbortTest("CompactionJob::RunSubcompactions:BeforeStart"); + + VerifyDataIntegrity(/*num_keys=*/100); +} + +INSTANTIATE_TEST_CASE_P(SubcompactionVariants, + DBCompactionAbortSubcompactionTest, + ::testing::Values(1, 2, 4), + [](const ::testing::TestParamInfo& param_info) { + return "MaxSubcompactionCount_" + + std::to_string(param_info.param); + }); + +// Parameterized test for abort with different compaction styles +// This consolidates tests for Level, Universal, and FIFO compaction styles +class DBCompactionAbortStyleTest + : public DBCompactionAbortTest, + public ::testing::WithParamInterface { + protected: + // Configure options based on compaction style + void ConfigureOptionsForStyle(Options& options, CompactionStyle style) { + options.compaction_style = style; + options.level0_file_num_compaction_trigger = 4; + options.disable_auto_compactions = true; + + switch (style) { + case kCompactionStyleLevel: + // Level compaction uses default settings + break; + case kCompactionStyleUniversal: + options.compaction_options_universal.size_ratio = 10; + break; + case kCompactionStyleFIFO: + // Set a large max_table_files_size to avoid deletion compaction + options.compaction_options_fifo.max_table_files_size = + 100 * 1024 * 1024; + // Enable intra-L0 compaction which goes through normal compaction path + options.compaction_options_fifo.allow_compaction = true; + options.max_open_files = -1; // Required for FIFO compaction + break; + default: + break; + } + } +}; + +TEST_P(DBCompactionAbortStyleTest, AbortCompaction) { + CompactionStyle style = GetParam(); + + Options options = GetOptionsWithStats(); + options.max_subcompactions = 1; + ConfigureOptionsForStyle(options, style); + Reopen(options); + + // Use larger value size for Universal compaction to ensure compaction work + int value_size = (style == kCompactionStyleUniversal) ? 1000 : 100; + PopulateData(/*num_files=*/4, /*keys_per_file=*/100, + /*value_size=*/value_size); + + RunSyncPointAbortTest((style == kCompactionStyleUniversal) + ? "CompactionJob::ProcessKeyValueCompaction:Start" + : "CompactionJob::RunSubcompactions:BeforeStart"); + + VerifyDataIntegrity(/*num_keys=*/100); +} + +INSTANTIATE_TEST_CASE_P( + CompactionStyleVariants, DBCompactionAbortStyleTest, + ::testing::Values(kCompactionStyleLevel, kCompactionStyleUniversal, + kCompactionStyleFIFO), + [](const ::testing::TestParamInfo& param_info) { + return OptionsHelper::compaction_style_to_string.at(param_info.param); + }); + +TEST_F(DBCompactionAbortTest, AbortManualCompaction) { + Options options = GetOptionsWithStats(); + options.level0_file_num_compaction_trigger = 10; + options.disable_auto_compactions = true; + Reopen(options); + + PopulateData(/*num_files=*/5, /*keys_per_file=*/100, /*value_size=*/1000); + + CompactRangeOptions cro; + cro.exclusive_manual_compaction = true; + RunSyncPointAbortTest("CompactionJob::ProcessKeyValueCompaction:Start", cro); + + VerifyDataIntegrity(/*num_keys=*/100); +} + +TEST_F(DBCompactionAbortTest, AbortAutomaticCompaction) { + Options options = CurrentOptions(); + options.level0_file_num_compaction_trigger = 4; + options.max_subcompactions = 2; + options.disable_auto_compactions = false; + Reopen(options); + + Random rnd(301); + AbortSynchronizer abort_sync; + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "CompactionJob::ProcessKeyValueCompaction:Start", + [&](void* /*arg*/) { abort_sync.TriggerAbort(dbfull()); }); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + + for (int i = 0; i < 4; ++i) { + for (int j = 0; j < 100; ++j) { + ASSERT_OK(Put(Key(j), rnd.RandomString(1000))); + } + ASSERT_OK(Flush()); + } + + ASSERT_OK(dbfull()->TEST_WaitForCompact()); + + CleanupSyncPoints(); + + abort_sync.WaitForAbortCompletion(); + dbfull()->ResumeAllCompactions(); + + for (int j = 0; j < 100; ++j) { + ASSERT_OK(Put(Key(j), rnd.RandomString(1000))); + } + ASSERT_OK(Flush()); + + ASSERT_OK(dbfull()->TEST_WaitForCompact()); + + std::string val; + for (int j = 0; j < 100; ++j) { + ASSERT_OK(dbfull()->Get(ReadOptions(), Key(j), &val)); + } +} + +TEST_F(DBCompactionAbortTest, AbortAndVerifyNoOutputFiles) { + Options options = CurrentOptions(); + options.level0_file_num_compaction_trigger = 4; + options.max_subcompactions = 2; + options.disable_auto_compactions = true; + Reopen(options); + + PopulateData(/*num_files=*/4, /*keys_per_file=*/100, /*value_size=*/1000); + + int num_l0_files_before = NumTableFilesAtLevel(0); + int num_l1_files_before = NumTableFilesAtLevel(1); + + SyncPointAbortHelper helper("CompactionJob::ProcessKeyValueCompaction:Start"); + helper.Setup(dbfull()); + + CompactRangeOptions cro; + Status s = dbfull()->CompactRange(cro, nullptr, nullptr); + ASSERT_TRUE(s.IsIncomplete()); + ASSERT_TRUE(s.IsCompactionAborted()); + + CleanupSyncPoints(); + + int num_l0_files_after = NumTableFilesAtLevel(0); + int num_l1_files_after = NumTableFilesAtLevel(1); + + ASSERT_EQ(num_l0_files_before, num_l0_files_after); + ASSERT_EQ(num_l1_files_before, num_l1_files_after); + + helper.WaitForAbortCompletion(); + dbfull()->ResumeAllCompactions(); + + ASSERT_OK(dbfull()->CompactRange(cro, nullptr, nullptr)); + + int num_l0_files_final = NumTableFilesAtLevel(0); + int num_l1_files_final = NumTableFilesAtLevel(1); + + ASSERT_EQ(0, num_l0_files_final); + ASSERT_GT(num_l1_files_final, 0); + + VerifyDataIntegrity(/*num_keys=*/100); +} + +TEST_F(DBCompactionAbortTest, MultipleAbortResumeSequence) { + Options options = CurrentOptions(); + options.level0_file_num_compaction_trigger = 4; + options.max_subcompactions = 2; + options.disable_auto_compactions = true; + Reopen(options); + + PopulateData(/*num_files=*/4, /*keys_per_file=*/100, /*value_size=*/1000); + + for (int round = 0; round < 3; ++round) { + // Use SyncPointAbortHelper for deterministic abort timing - it waits + // for the abort flag to be set via sync point dependency + SyncPointAbortHelper helper( + "CompactionJob::ProcessKeyValueCompaction:Start"); + helper.Setup(dbfull()); + + CompactRangeOptions cro; + Status s = dbfull()->CompactRange(cro, nullptr, nullptr); + ASSERT_TRUE(s.IsIncomplete()); + ASSERT_TRUE(s.IsCompactionAborted()); + + helper.CleanupAndWait(); + dbfull()->ResumeAllCompactions(); + } + + ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + + VerifyDataIntegrity(/*num_keys=*/100); +} + +TEST_F(DBCompactionAbortTest, AbortWithOutputFilesCleanup) { + Options options = CurrentOptions(); + options.num_levels = 2; // Ensure compaction output goes to L1 + options.level0_file_num_compaction_trigger = 4; + options.max_subcompactions = 2; + options.disable_auto_compactions = true; + options.target_file_size_base = 50 * 1024; + Reopen(options); + + PopulateData(/*num_files=*/4, /*keys_per_file=*/100, /*value_size=*/100); + + SyncPointAbortHelper helper("CompactionJob::RunSubcompactions:BeforeStart"); + helper.Setup(dbfull()); + + CompactRangeOptions cro; + Status s = dbfull()->CompactRange(cro, nullptr, nullptr); + ASSERT_TRUE(s.IsIncomplete()); + ASSERT_TRUE(s.IsCompactionAborted()); + + CleanupSyncPoints(); + + int num_l1_files_after_abort = NumTableFilesAtLevel(1); + ASSERT_EQ(0, num_l1_files_after_abort); + + helper.WaitForAbortCompletion(); + dbfull()->ResumeAllCompactions(); + + ASSERT_OK(dbfull()->CompactRange(cro, nullptr, nullptr)); + + // Verify L0 files are compacted and L1 has output files + int num_l0_files_final = NumTableFilesAtLevel(0); + int num_l1_files_final = NumTableFilesAtLevel(1); + ASSERT_EQ(0, num_l0_files_final) + << "L0 should be empty after successful compaction"; + ASSERT_GT(num_l1_files_final, 0) + << "L1 should have files after successful compaction"; + + VerifyDataIntegrity(/*num_keys=*/100); +} + +TEST_F(DBCompactionAbortTest, NestedAbortResumeCalls) { + // Test that nested AbortAllCompactions() calls work correctly with the + // counter + Options options = CurrentOptions(); + options.level0_file_num_compaction_trigger = 4; + options.max_subcompactions = 2; + options.disable_auto_compactions = true; + Reopen(options); + + PopulateData(/*num_files=*/4, /*keys_per_file=*/100, /*value_size=*/1000); + + // First abort call + dbfull()->AbortAllCompactions(); + + // Nested abort call (counter should be 2) + dbfull()->AbortAllCompactions(); + + // Compaction should still be blocked after one resume + dbfull()->ResumeAllCompactions(); + + // Compaction should still return aborted because counter is still 1 + CompactRangeOptions cro; + Status s = dbfull()->CompactRange(cro, nullptr, nullptr); + ASSERT_TRUE(s.IsIncomplete()); + ASSERT_TRUE(s.IsCompactionAborted()); + + // Second resume - counter should be 0 now + dbfull()->ResumeAllCompactions(); + + // Compaction should succeed now + ASSERT_OK(dbfull()->CompactRange(cro, nullptr, nullptr)); + + VerifyDataIntegrity(/*num_keys=*/100); +} + +TEST_F(DBCompactionAbortTest, AbortCompactFilesAPI) { + // Test that AbortAllCompactions works with CompactFiles API + Options options = CurrentOptions(); + options.level0_file_num_compaction_trigger = 100; // Disable auto compaction + options.disable_auto_compactions = true; + Reopen(options); + + PopulateData(/*num_files=*/4, /*keys_per_file=*/100, /*value_size=*/1000); + + // Get the L0 file names + std::vector files_to_compact; + ColumnFamilyMetaData cf_meta; + dbfull()->GetColumnFamilyMetaData(dbfull()->DefaultColumnFamily(), &cf_meta); + for (const auto& file : cf_meta.levels[0].files) { + files_to_compact.push_back(file.name); + } + ASSERT_GE(files_to_compact.size(), 2); + + SyncPointAbortHelper helper("CompactionJob::ProcessKeyValueCompaction:Start"); + helper.Setup(dbfull()); + + CompactionOptions compact_options; + Status s = dbfull()->CompactFiles(compact_options, files_to_compact, 1); + ASSERT_TRUE(s.IsIncomplete()); + ASSERT_TRUE(s.IsCompactionAborted()); + + helper.CleanupAndWait(); + dbfull()->ResumeAllCompactions(); + + // CompactFiles should work after resume + ASSERT_OK(dbfull()->CompactFiles(compact_options, files_to_compact, 1)); + + VerifyDataIntegrity(/*num_keys=*/100); +} + +TEST_F(DBCompactionAbortTest, AbortDoesNotAffectFlush) { + // Test that AbortAllCompactions does not affect flush operations + Options options = CurrentOptions(); + options.level0_file_num_compaction_trigger = 100; + options.disable_auto_compactions = true; + Reopen(options); + + Random rnd(301); + for (int j = 0; j < 100; ++j) { + ASSERT_OK(Put(Key(j), rnd.RandomString(1000))); + } + + // Abort compactions + dbfull()->AbortAllCompactions(); + + // Flush should still work + ASSERT_OK(Flush()); + + // Write more data + for (int j = 100; j < 200; ++j) { + ASSERT_OK(Put(Key(j), rnd.RandomString(1000))); + } + + // Flush should still work + ASSERT_OK(Flush()); + + // Resume compactions + dbfull()->ResumeAllCompactions(); + + VerifyDataIntegrity(/*num_keys=*/200); +} + +TEST_F(DBCompactionAbortTest, AbortBeforeCompactionStarts) { + // Test aborting before any compaction has started + Options options = CurrentOptions(); + options.level0_file_num_compaction_trigger = 4; + options.disable_auto_compactions = true; + Reopen(options); + + PopulateData(/*num_files=*/4, /*keys_per_file=*/100, /*value_size=*/1000); + + // Abort before starting compaction + dbfull()->AbortAllCompactions(); + + // Compaction should immediately return aborted + CompactRangeOptions cro; + Status s = dbfull()->CompactRange(cro, nullptr, nullptr); + ASSERT_TRUE(s.IsIncomplete()); + ASSERT_TRUE(s.IsCompactionAborted()); + + // Resume + dbfull()->ResumeAllCompactions(); + + // Now compaction should work + ASSERT_OK(dbfull()->CompactRange(cro, nullptr, nullptr)); + + // Verify L0 files are compacted + ASSERT_EQ(0, NumTableFilesAtLevel(0)); +} + +// Test that in-progress blob and SST files are properly cleaned up when +// compaction is aborted. This specifically tests the case where abort happens +// while files are being written (opened but not yet completed/closed). +// This catches the bug where files exist on disk but are removed from the +// outputs_ vector (e.g., by RemoveLastEmptyOutput when file_size is 0 because +// the builder was abandoned), leaving orphan files. +TEST_F(DBCompactionAbortTest, AbortWithInProgressFileCleanup) { + Options options = CurrentOptions(); + options.level0_file_num_compaction_trigger = 4; + options.max_subcompactions = + 1; // Single subcompaction for deterministic behavior + options.disable_auto_compactions = true; + options.target_file_size_base = 32 * 1024; // 32KB + + // Enable BlobDB with garbage collection to force blob rewriting during + // compaction + options.enable_blob_files = true; + options.min_blob_size = 0; // All values go to blob files + options.blob_file_size = + 1024 * 1024; // 1MB - large enough to not close during test + // Enable blob garbage collection - this forces blob data to be rewritten + // during compaction, creating new blob files + options.enable_blob_garbage_collection = true; + options.blob_garbage_collection_age_cutoff = 1.0; // Include all blob files + options.blob_garbage_collection_force_threshold = 0.0; // Always force GC + + Reopen(options); + + // Write enough data to trigger the periodic abort check (every 1000 records). + // 4 files * 2000 keys = 2000 unique overlapping keys processed during + // compaction. The sync point triggers at 999, 1999, etc. + PopulateData(/*num_files=*/4, /*keys_per_file=*/2000, /*value_size=*/500); + + // Helper function to get blob files on disk with their names + auto GetBlobFilesOnDisk = [this]() -> std::vector { + std::vector blob_files; + std::vector files; + EXPECT_OK(env_->GetChildren(dbname_, &files)); + for (const auto& f : files) { + if (f.find(".blob") != std::string::npos) { + blob_files.push_back(f); + } + } + std::sort(blob_files.begin(), blob_files.end()); + return blob_files; + }; + + // Helper function to get blob file count in metadata + auto GetBlobFilesInMetadata = [this]() -> std::vector { + std::vector blob_file_numbers; + ColumnFamilyMetaData cf_meta; + dbfull()->GetColumnFamilyMetaData(db_->DefaultColumnFamily(), &cf_meta); + for (const auto& blob_meta : cf_meta.blob_files) { + blob_file_numbers.push_back(blob_meta.blob_file_number); + } + std::sort(blob_file_numbers.begin(), blob_file_numbers.end()); + return blob_file_numbers; + }; + + // Helper function to get SST files on disk + auto GetSstFilesOnDisk = [this]() -> std::vector { + std::vector sst_files; + std::vector files; + EXPECT_OK(env_->GetChildren(dbname_, &files)); + for (const auto& f : files) { + if (f.find(".sst") != std::string::npos) { + sst_files.push_back(f); + } + } + std::sort(sst_files.begin(), sst_files.end()); + return sst_files; + }; + + // Helper function to get SST file numbers in metadata + auto GetSstFilesInMetadata = [this]() -> std::vector { + std::vector sst_file_numbers; + ColumnFamilyMetaData cf_meta; + dbfull()->GetColumnFamilyMetaData(db_->DefaultColumnFamily(), &cf_meta); + for (const auto& level : cf_meta.levels) { + for (const auto& file : level.files) { + // Extract file number from the file name (e.g., "000010.sst" -> 10) + uint64_t file_num = 0; + std::string fname = file.name; + // Remove leading path separators if present + size_t pos = fname.rfind('/'); + if (pos != std::string::npos) { + fname = fname.substr(pos + 1); + } + if (sscanf(fname.c_str(), "%" PRIu64, &file_num) == 1) { + sst_file_numbers.push_back(file_num); + } + } + } + std::sort(sst_file_numbers.begin(), sst_file_numbers.end()); + return sst_file_numbers; + }; + + std::vector initial_blob_files = GetBlobFilesOnDisk(); + std::vector initial_meta_blobs = GetBlobFilesInMetadata(); + std::vector initial_sst_files = GetSstFilesOnDisk(); + std::vector initial_meta_ssts = GetSstFilesInMetadata(); + + ASSERT_GT(initial_blob_files.size(), 0u) << "Expected initial blob files"; + ASSERT_EQ(initial_blob_files.size(), initial_meta_blobs.size()) + << "Initial blob files should match between disk and metadata"; + ASSERT_GT(initial_sst_files.size(), 0u) << "Expected initial SST files"; + ASSERT_EQ(initial_sst_files.size(), initial_meta_ssts.size()) + << "Initial SST files should match between disk and metadata"; + + // Tracking variables for blob file lifecycle + std::atomic blob_writes{0}; + std::atomic abort_triggered{false}; + AbortSynchronizer abort_sync; + + // Set up dependency: the wait point will block until FlagSet is hit + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({ + {"DBImpl::AbortAllCompactions:FlagSet", + "DBCompactionAbortTest::InProgressBlob:WaitForAbort"}, + }); + + // Trigger abort after some blob writes during compaction output. + // This ensures we have an in-progress blob file when abort happens. + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "BlobFileBuilder::WriteBlobToFile:AddRecord", [&](void* /*arg*/) { + int count = blob_writes.fetch_add(1) + 1; + + // Trigger abort after 100 blob writes - this ensures: + // 1. A blob file has been opened (for writing) + // 2. Some data has been written to it + // 3. But it's not yet completed (blob_file_size is 1MB) + if (count == 100 && !abort_triggered.exchange(true)) { + abort_sync.TriggerAbort(dbfull()); + // Wait for abort flag to be set - this sync point blocks until + // FlagSet is processed + TEST_SYNC_POINT_CALLBACK( + "DBCompactionAbortTest::InProgressBlob:WaitForAbort", nullptr); + } + }); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + + // Run compaction - it should be aborted while blob file is in-progress + CompactRangeOptions cro; + Status s = dbfull()->CompactRange(cro, nullptr, nullptr); + + ASSERT_TRUE(s.IsIncomplete()) + << "Expected compaction to be aborted, got: " << s.ToString(); + + CleanupSyncPoints(); + abort_sync.WaitForAbortCompletion(); + + // Check state after abort + std::vector post_abort_disk_blobs = GetBlobFilesOnDisk(); + std::vector post_abort_meta_blobs = GetBlobFilesInMetadata(); + std::vector post_abort_disk_ssts = GetSstFilesOnDisk(); + std::vector post_abort_meta_ssts = GetSstFilesInMetadata(); + + // This is the key assertion for blob files: files on disk should match + // metadata. If the in-progress blob file was NOT cleaned up, there will be an + // extra file on disk that's not in metadata (orphan). + ASSERT_EQ(post_abort_disk_blobs.size(), post_abort_meta_blobs.size()) + << "Orphan blob file detected! In-progress blob file was not cleaned up " + "after abort. Files on disk: " + << post_abort_disk_blobs.size() + << ", Files in metadata: " << post_abort_meta_blobs.size() + << ". The difference indicates orphaned in-progress blob file(s)."; + + // This is the key assertion for SST files: files on disk should match + // metadata. If the in-progress SST file was NOT cleaned up, there will be an + // extra file on disk that's not in metadata (orphan). + ASSERT_EQ(post_abort_disk_ssts.size(), post_abort_meta_ssts.size()) + << "Orphan SST file detected! In-progress SST file was not cleaned up " + "after abort. Files on disk: " + << post_abort_disk_ssts.size() + << ", Files in metadata: " << post_abort_meta_ssts.size() + << ". The difference indicates orphaned in-progress SST file(s)."; + + // Resume and complete compaction to verify DB is still functional + dbfull()->ResumeAllCompactions(); + + ASSERT_OK(dbfull()->CompactRange(cro, nullptr, nullptr)); + + // Verify data integrity - we wrote 4 files * 2000 keys with overlapping keys + VerifyDataIntegrity(/*num_keys=*/2000); +} + +TEST_F(DBCompactionAbortTest, AbortBottommostLevelCompaction) { + Options options = CurrentOptions(); + options.num_levels = 4; + options.level0_file_num_compaction_trigger = 2; + options.max_bytes_for_level_base = 1024 * 10; // 10KB + options.max_bytes_for_level_multiplier = 2; + options.disable_auto_compactions = true; + Reopen(options); + + // Write data to fill multiple levels (non-overlapping keys) + PopulateData(/*num_files=*/6, /*keys_per_file=*/100, + /*value_size=*/500, /*overlapping=*/false); + + // First compact to push data to lower levels + ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + + // Write more data to L0 (overlapping keys) + PopulateData(/*num_files=*/4, /*keys_per_file=*/100, /*value_size=*/500); + + SyncPointAbortHelper helper("CompactionJob::ProcessKeyValueCompaction:Start"); + helper.Setup(dbfull()); + + // Trigger bottommost level compaction + CompactRangeOptions cro; + cro.bottommost_level_compaction = BottommostLevelCompaction::kForce; + Status s = dbfull()->CompactRange(cro, nullptr, nullptr); + ASSERT_TRUE(s.IsIncomplete()); + ASSERT_TRUE(s.IsCompactionAborted()); + + helper.CleanupAndWait(); + dbfull()->ResumeAllCompactions(); + + ASSERT_OK(dbfull()->CompactRange(cro, nullptr, nullptr)); + + VerifyDataIntegrity(/*num_keys=*/600); +} + +// Test that while compactions are aborted, atomic range replace +// (IngestExternalFiles with atomic_replace_range) works correctly. +// This verifies that the abort state doesn't block other write operations +// like atomic range replace. +TEST_F(DBCompactionAbortTest, AbortThenAtomicRangeReplace) { + Options options = CurrentOptions(); + options.level0_file_num_compaction_trigger = 4; + options.max_subcompactions = 2; + options.disable_auto_compactions = true; + Reopen(options); + + // Create a directory for SST files + std::string sst_files_dir = dbname_ + "_sst_files/"; + ASSERT_OK(env_->CreateDirIfMissing(sst_files_dir)); + + // Populate initial data with overlapping keys + PopulateData(/*num_files=*/4, /*keys_per_file=*/100, /*value_size=*/500); + + // Verify initial data + VerifyDataIntegrity(/*num_keys=*/100); + + // Trigger compaction and abort it + SyncPointAbortHelper helper("CompactionJob::ProcessKeyValueCompaction:Start"); + helper.Setup(dbfull()); + + CompactRangeOptions cro; + Status s = dbfull()->CompactRange(cro, nullptr, nullptr); + ASSERT_TRUE(s.IsIncomplete()); + ASSERT_TRUE(s.IsCompactionAborted()); + + helper.CleanupAndWait(); + + // While compaction is still aborted, perform atomic range replace using + // IngestExternalFiles with atomic_replace_range. This verifies that the + // abort state doesn't block other write operations. + // Using RangeOpt() (empty range) means replace everything in the CF. + + // Create an SST file with new data for keys 0-49 (replacing keys 0-99) + std::string sst_file_path = sst_files_dir + "atomic_replace_1.sst"; + SstFileWriter sst_file_writer(EnvOptions(), options); + ASSERT_OK(sst_file_writer.Open(sst_file_path)); + + // Write new values for keys 0-49 + Random rnd(42); + std::unordered_map new_values; + for (int j = 0; j < 50; ++j) { + std::string key = Key(j); + std::string value = "replaced_" + rnd.RandomString(100); + ASSERT_OK(sst_file_writer.Put(key, value)); + new_values[key] = value; + } + ASSERT_OK(sst_file_writer.Finish()); + + // Perform atomic range replace for the entire column family. + // Using RangeOpt() (default constructor) means replace everything in the CF. + IngestExternalFileArg arg; + arg.column_family = db_->DefaultColumnFamily(); + arg.external_files = {sst_file_path}; + arg.atomic_replace_range = RangeOpt(); + // snapshot_consistency must be false when using atomic_replace_range + arg.options.snapshot_consistency = false; + + // Atomic range replace should work even while compactions are aborted + ASSERT_OK(db_->IngestExternalFiles({arg})); + + // Now resume compactions after the atomic range replace + dbfull()->ResumeAllCompactions(); + + // Verify that the atomic range replace worked correctly: + // 1. Keys 0-49 should have new replaced values + std::string val; + for (int j = 0; j < 50; ++j) { + std::string key = Key(j); + ASSERT_OK(db_->Get(ReadOptions(), key, &val)); + auto it = new_values.find(key); + ASSERT_NE(it, new_values.end()); + ASSERT_EQ(it->second, val) << "Value mismatch for replaced key: " << key; + } + + // 2. Keys 50-99 should not exist (they were replaced/deleted by atomic + // replace) + for (int j = 50; j < 100; ++j) { + std::string key = Key(j); + Status get_status = db_->Get(ReadOptions(), key, &val); + ASSERT_TRUE(get_status.IsNotFound()) + << "Key " << key << " should not exist after full CF replace"; + } + + // Clean up SST files directory + ASSERT_OK(DestroyDir(env_, sst_files_dir)); +} + +} // namespace ROCKSDB_NAMESPACE + +int main(int argc, char** argv) { + ROCKSDB_NAMESPACE::port::InstallStackTraceHandler(); + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index 451fbd41c70e..a3f25dd7788f 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -455,6 +455,8 @@ class DBImpl : public DB { void EnableManualCompaction() override; void DisableManualCompaction() override; + void AbortAllCompactions() override; + void ResumeAllCompactions() override; using DB::SetOptions; Status SetOptions( @@ -2789,6 +2791,14 @@ class DBImpl : public DB { // compaction code paths. std::atomic manual_compaction_paused_ = false; + // If non-zero, all compaction jobs (background automatic compactions, + // manual compactions via CompactRange, and foreground CompactFiles calls) + // are being aborted. Compactions will be signaled to stop. Any new + // compaction job would fail immediately. The value indicates how many threads + // have called AbortAllCompactions(). It is accessed in read mode outside the + // DB mutex in compaction code paths. + std::atomic compaction_aborted_ = 0; + // This condition variable is signaled on these conditions: // * whenever bg_compaction_scheduled_ goes down to 0 // * if AnyManualCompaction, whenever a compaction finishes, even if it hasn't diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc index 877b61007b99..77df3b922fd7 100644 --- a/db/db_impl/db_impl_compaction_flush.cc +++ b/db/db_impl/db_impl_compaction_flush.cc @@ -955,6 +955,10 @@ Status DBImpl::CompactRange(const CompactRangeOptions& options, return Status::Incomplete(Status::SubCode::kManualCompactionPaused); } + if (compaction_aborted_.load(std::memory_order_acquire) > 0) { + return Status::Incomplete(Status::SubCode::kCompactionAborted); + } + if (options.canceled && options.canceled->load(std::memory_order_acquire)) { return Status::Incomplete(Status::SubCode::kManualCompactionPaused); } @@ -1487,6 +1491,11 @@ Status DBImpl::CompactFilesImpl( return Status::ShutdownInProgress(); } + // triggered by AbortAllCompactions + if (compaction_aborted_.load(std::memory_order_acquire) > 0) { + return Status::Incomplete(Status::SubCode::kCompactionAborted); + } + // triggered by DisableManualCompactions or by user-set canceled flag in // CompactionOptions if (manual_compaction_paused_.load(std::memory_order_acquire) > 0 || @@ -1637,9 +1646,9 @@ Status DBImpl::CompactFilesImpl( c->mutable_cf_options().paranoid_file_checks, c->mutable_cf_options().report_bg_io_stats, dbname_, &compaction_job_stats, Env::Priority::USER, io_tracer_, - kManualCompactionCanceledFalse_, db_id_, db_session_id_, - c->column_family_data()->GetFullHistoryTsLow(), c->trim_ts(), - &blob_callback_, &bg_compaction_scheduled_, + kManualCompactionCanceledFalse_, compaction_aborted_, db_id_, + db_session_id_, c->column_family_data()->GetFullHistoryTsLow(), + c->trim_ts(), &blob_callback_, &bg_compaction_scheduled_, &bg_bottom_compaction_scheduled_); // Creating a compaction influences the compaction score because the score @@ -2170,6 +2179,17 @@ Status DBImpl::RunManualCompaction( return manual.status; } + if (compaction_aborted_.load(std::memory_order_acquire) > 0) { + // All compactions are being aborted. Return immediately. + int counter = compaction_aborted_.load(std::memory_order_acquire); + ROCKS_LOG_INFO( + immutable_db_options_.info_log, + "RunManualCompaction: Aborting due to compaction_aborted_=%d", counter); + manual.status = Status::Incomplete(Status::SubCode::kCompactionAborted); + manual.done = true; + return manual.status; + } + // When a manual compaction arrives, temporarily disable scheduling of // non-manual compactions and wait until the number of scheduled compaction // jobs drops to zero. This used to be needed to ensure that this manual @@ -2194,6 +2214,13 @@ Status DBImpl::RunManualCompaction( // and `CompactRangeOptions::canceled` might not work well together. while (bg_bottom_compaction_scheduled_ > 0 || bg_compaction_scheduled_ > 0) { + if (compaction_aborted_.load(std::memory_order_acquire) > 0) { + // Pretend the error came from compaction so the below cleanup/error + // handling code can process it. + manual.done = true; + manual.status = Status::Incomplete(Status::SubCode::kCompactionAborted); + break; + } if (manual_compaction_paused_ > 0 || manual.canceled == true) { // Pretend the error came from compaction so the below cleanup/error // handling code can process it. @@ -2312,7 +2339,12 @@ Status DBImpl::RunManualCompaction( if (!scheduled) { // There is nothing scheduled to wait on, so any cancellation can end the // manual now. - if (manual_compaction_paused_ > 0 || manual.canceled == true) { + if (compaction_aborted_.load(std::memory_order_acquire) > 0) { + // Stop waiting since it was canceled. Pretend the error came from + // compaction so the below cleanup/error handling code can process it. + manual.done = true; + manual.status = Status::Incomplete(Status::SubCode::kCompactionAborted); + } else if (manual_compaction_paused_ > 0 || manual.canceled == true) { // Stop waiting since it was canceled. Pretend the error came from // compaction so the below cleanup/error handling code can process it. manual.done = true; @@ -2930,6 +2962,61 @@ void DBImpl::EnableManualCompaction() { manual_compaction_paused_.fetch_sub(1, std::memory_order_release); } +void DBImpl::AbortAllCompactions() { + InstrumentedMutexLock l(&mutex_); + + // Increment the abort counter to signal all compactions to abort + compaction_aborted_.fetch_add(1, std::memory_order_release); + + TEST_SYNC_POINT("DBImpl::AbortAllCompactions:FlagSet"); + + // Mark all manual compactions as canceled + for (const auto& manual_compaction : manual_compaction_dequeue_) { + manual_compaction->canceled = true; + } + + // Wake up any waiting compaction threads to check the abort signal + bg_cv_.SignalAll(); + + // Wait for all running compactions (both manual and automatic) to finish + // or abort before returning. + // Note: bg_cv_.Wait() releases the mutex while waiting, so other threads + // can make progress and signal when compactions complete. + while (bg_bottom_compaction_scheduled_ > 0 || bg_compaction_scheduled_ > 0 || + HasPendingManualCompaction()) { + bg_cv_.Wait(); + } +} + +void DBImpl::ResumeAllCompactions() { + InstrumentedMutexLock l(&mutex_); + int before = compaction_aborted_.load(std::memory_order_acquire); + + // Guard against calling Resume without prior Abort + if (before <= 0) { + ROCKS_LOG_WARN(immutable_db_options_.info_log, + "ResumeAllCompactions called without prior " + "AbortAllCompactions (counter=%d)", + before); + return; + } + + // Decrement the abort counter + compaction_aborted_.fetch_sub(1, std::memory_order_release); + + // As the operation is executed under db mutex, we could just use before value + // to calculate the current value. + int current = before - 1; + ROCKS_LOG_INFO(immutable_db_options_.info_log, + "ResumeAllCompactions: counter %d -> %d", before, current); + + // If this is the last resume call (abort counter back to 0), schedule + // compactions that may have been waiting + if (current == 0) { + MaybeScheduleFlushOrCompaction(); + } +} + void DBImpl::MaybeScheduleFlushOrCompaction() { mutex_.AssertHeld(); TEST_SYNC_POINT("DBImpl::MaybeScheduleFlushOrCompaction:Start"); @@ -2994,6 +3081,9 @@ void DBImpl::MaybeScheduleFlushOrCompaction() { if (bg_compaction_paused_ > 0) { // we paused the background compaction return; + } else if (compaction_aborted_.load(std::memory_order_acquire) > 0) { + // we are aborting all compactions + return; } else if (error_handler_.IsBGWorkStopped()) { // Compaction is not part of the recovery sequence from a hard error. We // might get here because recovery might do a flush and install a new @@ -3531,7 +3621,8 @@ void DBImpl::BackgroundCallCompaction(PrepickedCompaction* prepicked_compaction, 10000); // prevent hot loop mutex_.Lock(); } else if (!s.ok() && !s.IsShutdownInProgress() && - !s.IsManualCompactionPaused() && !s.IsColumnFamilyDropped()) { + !s.IsManualCompactionPaused() && !s.IsColumnFamilyDropped() && + !s.IsCompactionAborted()) { // Wait a little bit before retrying background compaction in // case this is an environmental problem and we do not want to // chew up resources for failed compactions for the duration of @@ -3667,6 +3758,8 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, if (!error_handler_.IsBGWorkStopped()) { if (shutting_down_.load(std::memory_order_acquire)) { status = Status::ShutdownInProgress(); + } else if (compaction_aborted_.load(std::memory_order_acquire) > 0) { + status = Status::Incomplete(Status::SubCode::kCompactionAborted); } else if (is_manual && manual_compaction->canceled.load(std::memory_order_acquire)) { status = Status::Incomplete(Status::SubCode::kManualCompactionPaused); @@ -4283,8 +4376,9 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, &compaction_job_stats, thread_pri, io_tracer_, is_manual ? manual_compaction->canceled : kManualCompactionCanceledFalse_, - db_id_, db_session_id_, c->column_family_data()->GetFullHistoryTsLow(), - c->trim_ts(), &blob_callback_, &bg_compaction_scheduled_, + compaction_aborted_, db_id_, db_session_id_, + c->column_family_data()->GetFullHistoryTsLow(), c->trim_ts(), + &blob_callback_, &bg_compaction_scheduled_, &bg_bottom_compaction_scheduled_); compaction_job.Prepare(std::nullopt /*subcompact to be computed*/); @@ -4367,7 +4461,7 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, } if (status.ok() || status.IsCompactionTooLarge() || - status.IsManualCompactionPaused()) { + status.IsManualCompactionPaused() || status.IsCompactionAborted()) { // Done } else if (status.IsColumnFamilyDropped() || status.IsShutdownInProgress()) { // Ignore compaction errors found during shutting down @@ -4630,6 +4724,7 @@ void DBImpl::BuildCompactionJobInfo( compaction_job_info->cf_id = cfd->GetID(); compaction_job_info->cf_name = cfd->GetName(); compaction_job_info->status = st; + compaction_job_info->aborted = st.IsCompactionAborted(); compaction_job_info->thread_id = env_->GetThreadID(); compaction_job_info->job_id = job_id; compaction_job_info->base_input_level = c->start_level(); diff --git a/db/db_test.cc b/db/db_test.cc index 7456679a152a..062130d1d525 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -3691,6 +3691,8 @@ class ModelDB : public DB { void EnableManualCompaction() override {} void DisableManualCompaction() override {} + void AbortAllCompactions() override {} + void ResumeAllCompactions() override {} Status WaitForCompact( const WaitForCompactOptions& /* wait_for_compact_options */) override { diff --git a/db/internal_stats.cc b/db/internal_stats.cc index c25f7c589b1f..6b2d75385ba4 100644 --- a/db/internal_stats.cc +++ b/db/internal_stats.cc @@ -310,6 +310,7 @@ static const std::string aggregated_table_properties_at_level = static const std::string num_running_compactions = "num-running-compactions"; static const std::string num_running_compaction_sorted_runs = "num-running-compaction-sorted-runs"; +static const std::string compaction_abort_count = "compaction-abort-count"; static const std::string num_running_flushes = "num-running-flushes"; static const std::string actual_delayed_write_rate = "actual-delayed-write-rate"; @@ -362,6 +363,8 @@ const std::string DB::Properties::kNumRunningCompactions = rocksdb_prefix + num_running_compactions; const std::string DB::Properties::kNumRunningCompactionSortedRuns = rocksdb_prefix + num_running_compaction_sorted_runs; +const std::string DB::Properties::kCompactionAbortCount = + rocksdb_prefix + compaction_abort_count; const std::string DB::Properties::kNumRunningFlushes = rocksdb_prefix + num_running_flushes; const std::string DB::Properties::kBackgroundErrors = @@ -594,6 +597,9 @@ const UnorderedMap {DB::Properties::kNumRunningCompactionSortedRuns, {false, nullptr, &InternalStats::HandleNumRunningCompactionSortedRuns, nullptr, nullptr}}, + {DB::Properties::kCompactionAbortCount, + {false, nullptr, &InternalStats::HandleCompactionAbortCount, nullptr, + nullptr}}, {DB::Properties::kActualDelayedWriteRate, {false, nullptr, &InternalStats::HandleActualDelayedWriteRate, nullptr, nullptr}}, @@ -1292,6 +1298,13 @@ bool InternalStats::HandleNumRunningCompactionSortedRuns(uint64_t* value, return true; } +bool InternalStats::HandleCompactionAbortCount(uint64_t* value, DBImpl* db, + Version* /*version*/) { + *value = static_cast( + db->compaction_aborted_.load(std::memory_order_acquire)); + return true; +} + bool InternalStats::HandleBackgroundErrors(uint64_t* value, DBImpl* /*db*/, Version* /*version*/) { // Accumulated number of errors in background flushes or compactions. diff --git a/db/internal_stats.h b/db/internal_stats.h index a1b4fbe6c555..347b3a617aae 100644 --- a/db/internal_stats.h +++ b/db/internal_stats.h @@ -852,6 +852,8 @@ class InternalStats { Version* version); bool HandleNumRunningCompactionSortedRuns(uint64_t* value, DBImpl* db, Version* version); + bool HandleCompactionAbortCount(uint64_t* value, DBImpl* db, + Version* version); bool HandleBackgroundErrors(uint64_t* value, DBImpl* db, Version* version); bool HandleCurSizeActiveMemTable(uint64_t* value, DBImpl* db, Version* version); diff --git a/db_stress_tool/db_stress_common.h b/db_stress_tool/db_stress_common.h index edf6d918aea3..a0e66c149308 100644 --- a/db_stress_tool/db_stress_common.h +++ b/db_stress_tool/db_stress_common.h @@ -218,6 +218,7 @@ DECLARE_int32(reset_stats_one_in); DECLARE_int32(pause_background_one_in); DECLARE_int32(disable_file_deletions_one_in); DECLARE_int32(disable_manual_compaction_one_in); +DECLARE_int32(abort_and_resume_compactions_one_in); DECLARE_int32(compact_range_width); DECLARE_int32(acquire_snapshot_one_in); DECLARE_bool(compare_full_db_state_snapshot); diff --git a/db_stress_tool/db_stress_gflags.cc b/db_stress_tool/db_stress_gflags.cc index f543a00bcbc8..be84c80f75d4 100644 --- a/db_stress_tool/db_stress_gflags.cc +++ b/db_stress_tool/db_stress_gflags.cc @@ -812,6 +812,10 @@ DEFINE_int32( "If non-zero, then DisableManualCompaction()+Enable will be called " "once for every N ops on average. 0 disables."); +DEFINE_int32(abort_and_resume_compactions_one_in, 0, + "If non-zero, then AbortAllCompactions()+Resume will be called " + "once for every N ops on average. 0 disables."); + DEFINE_int32(compact_range_width, 10000, "The width of the ranges passed to CompactRange()."); diff --git a/db_stress_tool/db_stress_test_base.cc b/db_stress_tool/db_stress_test_base.cc index 2dc952de2cb7..43cbd4229a37 100644 --- a/db_stress_tool/db_stress_test_base.cc +++ b/db_stress_tool/db_stress_test_base.cc @@ -1271,6 +1271,11 @@ void StressTest::OperateDb(ThreadState* thread) { ProcessStatus(shared, "TestDisableManualCompaction", status); } + if (thread->rand.OneInOpt(FLAGS_abort_and_resume_compactions_one_in)) { + Status status = TestAbortAndResumeCompactions(thread); + ProcessStatus(shared, "TestAbortAndResumeCompactions", status); + } + if (thread->rand.OneInOpt(FLAGS_verify_checksum_one_in)) { ThreadStatusUtil::SetEnableTracking(FLAGS_enable_thread_tracking); ThreadStatusUtil::SetThreadOperation( @@ -3141,6 +3146,20 @@ Status StressTest::TestDisableManualCompaction(ThreadState* thread) { return Status::OK(); } +Status StressTest::TestAbortAndResumeCompactions(ThreadState* thread) { + // Abort all running compactions and prevent new ones from starting + db_->AbortAllCompactions(); + // Sleep to allow other threads to attempt operations while aborted + // Uses same sleep pattern as TestPauseBackground and + // TestDisableManualCompaction + int pwr2_micros = + std::min(thread->rand.Uniform(25), thread->rand.Uniform(25)); + clock_->SleepForMicroseconds(1 << pwr2_micros); + // Resume compactions + db_->ResumeAllCompactions(); + return Status::OK(); +} + void StressTest::TestAcquireSnapshot(ThreadState* thread, int rand_column_family, const std::string& keystr, uint64_t i) { diff --git a/db_stress_tool/db_stress_test_base.h b/db_stress_tool/db_stress_test_base.h index da1589be541a..3e8bc2af0def 100644 --- a/db_stress_tool/db_stress_test_base.h +++ b/db_stress_tool/db_stress_test_base.h @@ -332,6 +332,8 @@ class StressTest { Status TestDisableManualCompaction(ThreadState* thread); + Status TestAbortAndResumeCompactions(ThreadState* thread); + void TestAcquireSnapshot(ThreadState* thread, int rand_column_family, const std::string& keystr, uint64_t i); diff --git a/include/rocksdb/db.h b/include/rocksdb/db.h index 2abb7eb02513..8b4be252cfd9 100644 --- a/include/rocksdb/db.h +++ b/include/rocksdb/db.h @@ -1253,6 +1253,10 @@ class DB { // sorted runs being processed by currently running compactions. static const std::string kNumRunningCompactionSortedRuns; + // "rocksdb.compaction-abort-count" - returns the current value of the + // compaction abort counter. + static const std::string kCompactionAbortCount; + // "rocksdb.background-errors" - returns accumulated number of background // errors. static const std::string kBackgroundErrors; @@ -1731,6 +1735,46 @@ class DB { // DisableManualCompaction() has been called. virtual void EnableManualCompaction() = 0; + // Abort all compaction work/jobs. This function will signal all + // running compactions (both automatic and manual, background and foreground) + // to abort and will wait for them to finish or abort before returning. After + // this function returns, new compaction work will be aborted immediately + // until ResumeAllCompactions() is called. + // + // The compaction abort is checked periodically (every 1000 keys processed), + // so ongoing compactions should abort as well within a reasonable time. + // This function blocks until all compactions have completed or aborted. + // + // Any output files from aborted compactions are automatically cleaned up, + // ensuring no partial compaction results are installed, except for resumable + // compaction. + // + // This function supports concurrent abort requests from multiple callers + // without coordination between them. The call count is tracked, and + // compactions only resume after the number of ResumeAllCompactions() calls + // matches number of AbortAllCompactions() calls. + // + // Differences with other compaction control APIs: + // - DisableManualCompaction(): Only pauses manual compactions, waits for + // them to finish naturally. AbortAllCompactions() actively cancels both + // automatic and manual compactions. + // - PauseBackgroundWork(): Pauses all background work (flush + compaction), + // waits for work to finish naturally. AbortAllCompactions() only affects + // compactions and actively cancels them. + // + // Note: Compaction service (remote compaction) is not currently supported. + // Aborted compactions return Status::Incomplete with subcode + // kCompactionAborted. + virtual void AbortAllCompactions() = 0; + + // Resume all compactions that were aborted by AbortAllCompactions(). + // This function must be called as many times as AbortAllCompactions() + // has been called in order to resume compactions. This reference-counting + // behavior ensures that if multiple callers independently request an + // abort, compactions will not resume until all of them have called + // ResumeAllCompactions(). + virtual void ResumeAllCompactions() = 0; + // Wait for all flush and compactions jobs to finish. Jobs to wait include the // unscheduled (queued, but not scheduled yet). If the db is shutting down, // Status::ShutdownInProgress will be returned. diff --git a/include/rocksdb/listener.h b/include/rocksdb/listener.h index fe90a7b2ec94..1b41ca81f3d9 100644 --- a/include/rocksdb/listener.h +++ b/include/rocksdb/listener.h @@ -488,6 +488,9 @@ struct CompactionJobInfo { // Information about blob files deleted during compaction in Integrated // BlobDB. std::vector blob_file_garbage_infos; + + // Whether this compaction was aborted via AbortAllCompactions() + bool aborted = false; }; struct MemTableInfo { diff --git a/include/rocksdb/statistics.h b/include/rocksdb/statistics.h index fb75ebee3fca..4efc6cf82b16 100644 --- a/include/rocksdb/statistics.h +++ b/include/rocksdb/statistics.h @@ -162,6 +162,8 @@ enum Tickers : uint32_t { COMPACTION_OPTIMIZED_DEL_DROP_OBSOLETE, // If a compaction was canceled in sfm to prevent ENOSPC COMPACTION_CANCELLED, + // Number of compactions aborted via AbortAllCompactions() + COMPACTION_ABORTED, // Number of keys written to the database via the Put and Write call's NUMBER_KEYS_WRITTEN, diff --git a/include/rocksdb/status.h b/include/rocksdb/status.h index afb9651faf27..c3eeb082c3ed 100644 --- a/include/rocksdb/status.h +++ b/include/rocksdb/status.h @@ -117,6 +117,7 @@ class Status { kMergeOperandThresholdExceeded = 16, kPrefetchLimitReached = 17, kNotExpectedCodePath = 18, + kCompactionAborted = 19, kMaxSubCode }; @@ -483,6 +484,13 @@ class Status { return (code() == kIncomplete) && (subcode() == kManualCompactionPaused); } + // Returns true iff the status indicates compaction aborted. This + // is caused by a call to AbortAllCompactions + bool IsCompactionAborted() const { + MarkChecked(); + return (code() == kIncomplete) && (subcode() == kCompactionAborted); + } + // Returns true iff the status indicates a TxnNotPrepared error. bool IsTxnNotPrepared() const { MarkChecked(); diff --git a/include/rocksdb/utilities/stackable_db.h b/include/rocksdb/utilities/stackable_db.h index 8cd4057fd553..f48acb2433db 100644 --- a/include/rocksdb/utilities/stackable_db.h +++ b/include/rocksdb/utilities/stackable_db.h @@ -375,6 +375,8 @@ class StackableDB : public DB { void DisableManualCompaction() override { return db_->DisableManualCompaction(); } + void AbortAllCompactions() override { return db_->AbortAllCompactions(); } + void ResumeAllCompactions() override { return db_->ResumeAllCompactions(); } Status WaitForCompact( const WaitForCompactOptions& wait_for_compact_options) override { diff --git a/java/rocksjni/rocksjni.cc b/java/rocksjni/rocksjni.cc index 57272170c326..af47c0e95644 100644 --- a/java/rocksjni/rocksjni.cc +++ b/java/rocksjni/rocksjni.cc @@ -2978,6 +2978,28 @@ void Java_org_rocksdb_RocksDB_continueBackgroundWork(JNIEnv* env, jclass, } } +/* + * Class: org_rocksdb_RocksDB + * Method: abortAllCompactions + * Signature: (J)V + */ +void Java_org_rocksdb_RocksDB_abortAllCompactions(JNIEnv*, jclass, + jlong jdb_handle) { + auto* db = reinterpret_cast(jdb_handle); + db->AbortAllCompactions(); +} + +/* + * Class: org_rocksdb_RocksDB + * Method: resumeAllCompactions + * Signature: (J)V + */ +void Java_org_rocksdb_RocksDB_resumeAllCompactions(JNIEnv*, jclass, + jlong jdb_handle) { + auto* db = reinterpret_cast(jdb_handle); + db->ResumeAllCompactions(); +} + /* * Class: org_rocksdb_RocksDB * Method: enableAutoCompaction diff --git a/java/src/main/java/org/rocksdb/RocksDB.java b/java/src/main/java/org/rocksdb/RocksDB.java index fe2f38af64f9..ebe134726982 100644 --- a/java/src/main/java/org/rocksdb/RocksDB.java +++ b/java/src/main/java/org/rocksdb/RocksDB.java @@ -4084,6 +4084,23 @@ public void continueBackgroundWork() throws RocksDBException { continueBackgroundWork(nativeHandle_); } + /** + * Abort all running and pending compaction jobs. This method will signal + * all active compactions to terminate and wait for them to complete. + * No new compactions will be scheduled until {@link #resumeAllCompactions()} is called. + */ + public void abortAllCompactions() { + abortAllCompactions(nativeHandle_); + } + + /** + * Resume compaction scheduling after {@link #abortAllCompactions()} was called. + * Must be called the same number of times as {@link #abortAllCompactions()}. + */ + public void resumeAllCompactions() { + resumeAllCompactions(nativeHandle_); + } + /** * Enable automatic compactions for the given column * families if they were previously disabled. @@ -5036,6 +5053,8 @@ private static native String[] compactFiles(final long handle, final long compac private static native void cancelAllBackgroundWork(final long handle, final boolean wait); private static native void pauseBackgroundWork(final long handle) throws RocksDBException; private static native void continueBackgroundWork(final long handle) throws RocksDBException; + private static native void abortAllCompactions(final long handle); + private static native void resumeAllCompactions(final long handle); private static native void enableAutoCompaction( final long handle, final long[] columnFamilyHandles) throws RocksDBException; private static native int numberLevels(final long handle, final long columnFamilyHandle); diff --git a/monitoring/statistics.cc b/monitoring/statistics.cc index 01b123d195e8..77792f559b53 100644 --- a/monitoring/statistics.cc +++ b/monitoring/statistics.cc @@ -93,6 +93,7 @@ const std::vector> TickersNameMap = { {COMPACTION_OPTIMIZED_DEL_DROP_OBSOLETE, "rocksdb.compaction.optimized.del.drop.obsolete"}, {COMPACTION_CANCELLED, "rocksdb.compaction.cancelled"}, + {COMPACTION_ABORTED, "rocksdb.compaction.aborted"}, {NUMBER_KEYS_WRITTEN, "rocksdb.number.keys.written"}, {NUMBER_KEYS_READ, "rocksdb.number.keys.read"}, {NUMBER_KEYS_UPDATED, "rocksdb.number.keys.updated"}, diff --git a/monitoring/stats_history_test.cc b/monitoring/stats_history_test.cc index ee29bd20921a..f98917a5f4a3 100644 --- a/monitoring/stats_history_test.cc +++ b/monitoring/stats_history_test.cc @@ -185,7 +185,7 @@ TEST_F(StatsHistoryTest, GetStatsHistoryInMemory) { TEST_F(StatsHistoryTest, InMemoryStatsHistoryPurging) { constexpr int kPeriodSec = 1; - constexpr int kEstimatedOneSliceSize = 22000; + constexpr int kEstimatedOneSliceSize = 22100; Options options; options.create_if_missing = true; diff --git a/src.mk b/src.mk index fc54f2804f90..0bae5ee333fd 100644 --- a/src.mk +++ b/src.mk @@ -490,6 +490,7 @@ TEST_MAIN_SOURCES = \ db/db_basic_test.cc \ db/db_block_cache_test.cc \ db/db_bloom_filter_test.cc \ + db/db_compaction_abort_test.cc \ db/db_compaction_filter_test.cc \ db/db_compaction_test.cc \ db/db_clip_test.cc \ diff --git a/unreleased_history/public_api_changes/abort_compaction_apis.md b/unreleased_history/public_api_changes/abort_compaction_apis.md new file mode 100644 index 000000000000..d55882b3935d --- /dev/null +++ b/unreleased_history/public_api_changes/abort_compaction_apis.md @@ -0,0 +1 @@ +Added new virtual methods `AbortAllCompactions()` and `ResumeAllCompactions()` to the `DB` class. Added new `Status::SubCode::kCompactionAborted` to indicate a compaction was aborted. Added `Status::IsCompactionAborted()` helper method to check if a status represents an aborted compaction. diff --git a/util/status.cc b/util/status.cc index 56d62b66190a..cf9e59e96757 100644 --- a/util/status.cc +++ b/util/status.cc @@ -46,7 +46,9 @@ static const char* msgs[static_cast(Status::kMaxSubCode)] = { "IO fenced off", // kIOFenced "Merge operator failed", // kMergeOperatorFailed "Number of operands merged exceeded threshold", // kMergeOperandThresholdExceeded - "MultiScan reached file prefetch limit", // kMultiScanPrefetchLimit + "MultiScan reached file prefetch limit", // kPrefetchLimitReached + "Not expected code path", // kNotExpectedCodePath + "All compactions aborted", // kCompactionAborted }; Status::Status(Code _code, SubCode _subcode, const Slice& msg, From 6760a3477a39298baba47d2bb6751371b0e49729 Mon Sep 17 00:00:00 2001 From: Xingbo Wang Date: Thu, 29 Jan 2026 06:08:01 -0800 Subject: [PATCH 2/7] address feedback --- db/compaction/compaction_job.cc | 6 +++--- db/db_compaction_abort_test.cc | 11 +++-------- 2 files changed, 6 insertions(+), 11 deletions(-) diff --git a/db/compaction/compaction_job.cc b/db/compaction/compaction_job.cc index f76e9c8de244..bcf2c4f5f1fa 100644 --- a/db/compaction/compaction_job.cc +++ b/db/compaction/compaction_job.cc @@ -1576,8 +1576,8 @@ Status CompactionJob::ProcessKeyValue( CompactionIterator* c_iter, const CompactionFileOpenFunc& open_file_func, const CompactionFileCloseFunc& close_file_func, uint64_t& prev_cpu_micros) { // Cron interval for periodic operations: stats update, abort check, - // and sync points. Uses 1000 to maintain responsive abort checking. - const uint64_t kCronEvery = 1000; + // and sync points. Uses 1024 to maintain responsive abort checking. + const uint64_t kCronEveryMask = (1 << 10) - 1; [[maybe_unused]] const std::optional end = sub_compact->end; // Check for abort signal before starting key processing @@ -1601,7 +1601,7 @@ Status CompactionJob::ProcessKeyValue( const uint64_t num_records = c_iter->iter_stats().num_input_records; // Periodic cron operations: stats update, abort check, and sync points - if (num_records % kCronEvery == kCronEvery - 1) { + if ((num_records & kCronEveryMask) == 0) { // Check for abort signal periodically if (compaction_aborted_.load(std::memory_order_acquire) > 0) { status = Status::Incomplete(Status::SubCode::kCompactionAborted); diff --git a/db/db_compaction_abort_test.cc b/db/db_compaction_abort_test.cc index d8b7a8511620..a76e1d689f1f 100644 --- a/db/db_compaction_abort_test.cc +++ b/db/db_compaction_abort_test.cc @@ -283,7 +283,7 @@ class DBCompactionAbortTest : public DBTestBase { } }; -// Parameterized test for abort with different subcompaction configurations +// Parameterized test for abort with different number of max subcompactions. // This consolidates tests that were essentially duplicates with different // max_subcompactions values class DBCompactionAbortSubcompactionTest @@ -355,14 +355,9 @@ TEST_P(DBCompactionAbortStyleTest, AbortCompaction) { ConfigureOptionsForStyle(options, style); Reopen(options); - // Use larger value size for Universal compaction to ensure compaction work - int value_size = (style == kCompactionStyleUniversal) ? 1000 : 100; - PopulateData(/*num_files=*/4, /*keys_per_file=*/100, - /*value_size=*/value_size); + PopulateData(/*num_files=*/4, /*keys_per_file=*/100, /*value_size=*/100); - RunSyncPointAbortTest((style == kCompactionStyleUniversal) - ? "CompactionJob::ProcessKeyValueCompaction:Start" - : "CompactionJob::RunSubcompactions:BeforeStart"); + RunSyncPointAbortTest("CompactionJob::RunSubcompactions:BeforeStart"); VerifyDataIntegrity(/*num_keys=*/100); } From 449f65f193a0037dea9537cb6a7f91af3df2a5ef Mon Sep 17 00:00:00 2001 From: Xingbo Wang Date: Thu, 29 Jan 2026 06:30:53 -0800 Subject: [PATCH 3/7] support abort_and_resume_compactions_one_in --- tools/db_crashtest.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tools/db_crashtest.py b/tools/db_crashtest.py index 6c55f84b6011..321a54cfe5bf 100644 --- a/tools/db_crashtest.py +++ b/tools/db_crashtest.py @@ -199,6 +199,7 @@ def apply_random_seed_per_iteration(): "pause_background_one_in": lambda: random.choice([10000, 1000000]), "disable_file_deletions_one_in": lambda: random.choice([10000, 1000000]), "disable_manual_compaction_one_in": lambda: random.choice([10000, 1000000]), + "abort_and_resume_compactions_one_in": lambda: random.choice([10000, 1000000]), "prefix_size": lambda: random.choice([-1, 1, 5, 7, 8]), "prefixpercent": 5, "progress_reports": 0, From 9550630b2a04054cf1809a6ee66f52c162d885db Mon Sep 17 00:00:00 2001 From: Xingbo Wang Date: Thu, 29 Jan 2026 08:07:11 -0800 Subject: [PATCH 4/7] fix build --- db/compaction/compaction_job.cc | 6 +++--- db/db_test.cc | 3 ++- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/db/compaction/compaction_job.cc b/db/compaction/compaction_job.cc index bcf2c4f5f1fa..d5ac5738527b 100644 --- a/db/compaction/compaction_job.cc +++ b/db/compaction/compaction_job.cc @@ -1576,7 +1576,7 @@ Status CompactionJob::ProcessKeyValue( CompactionIterator* c_iter, const CompactionFileOpenFunc& open_file_func, const CompactionFileCloseFunc& close_file_func, uint64_t& prev_cpu_micros) { // Cron interval for periodic operations: stats update, abort check, - // and sync points. Uses 1024 to maintain responsive abort checking. + // and sync points. Uses 1024 (power of 2) for efficient bitwise check. const uint64_t kCronEveryMask = (1 << 10) - 1; [[maybe_unused]] const std::optional end = sub_compact->end; @@ -1600,8 +1600,8 @@ Status CompactionJob::ProcessKeyValue( const uint64_t num_records = c_iter->iter_stats().num_input_records; - // Periodic cron operations: stats update, abort check, and sync points - if ((num_records & kCronEveryMask) == 0) { + // Periodic cron operations: stats update, abort check. + if ((num_records & kCronEveryMask) == kCronEveryMask) { // Check for abort signal periodically if (compaction_aborted_.load(std::memory_order_acquire) > 0) { status = Status::Incomplete(Status::SubCode::kCompactionAborted); diff --git a/db/db_test.cc b/db/db_test.cc index 062130d1d525..7909763ed0a5 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -6486,7 +6486,8 @@ TEST_P(DBTestWithParam, CompactionTotalTimeTest) { ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr)); // Hard-coded number in CompactionJob::ProcessKeyValueCompaction(). - const int kRecordStatsEvery = 1000; + // Uses 1024 (power of 2) for efficient bitwise check. + const int kRecordStatsEvery = 1024; // The stat COMPACTION_CPU_TOTAL_TIME should be recorded // during compaction and once more after compaction. ASSERT_EQ(n / kRecordStatsEvery + 1, record_count); From e6c0a807fe988916ca626f40b4cefad17a305381 Mon Sep 17 00:00:00 2001 From: Xingbo Wang Date: Thu, 29 Jan 2026 08:33:48 -0800 Subject: [PATCH 5/7] allow abort error in compaction --- db_stress_tool/db_stress_test_base.cc | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/db_stress_tool/db_stress_test_base.cc b/db_stress_tool/db_stress_test_base.cc index 43cbd4229a37..4e74a445b623 100644 --- a/db_stress_tool/db_stress_test_base.cc +++ b/db_stress_tool/db_stress_test_base.cc @@ -3052,8 +3052,9 @@ void StressTest::TestCompactFiles(ThreadState* thread, // TOOD (hx235): allow an exact list of tolerable failures under stress // test bool non_ok_status_allowed = - s.IsManualCompactionPaused() || IsErrorInjectedAndRetryable(s) || - s.IsAborted() || s.IsInvalidArgument() || s.IsNotSupported(); + s.IsManualCompactionPaused() || s.IsCompactionAborted() || + IsErrorInjectedAndRetryable(s) || s.IsAborted() || + s.IsInvalidArgument() || s.IsNotSupported(); if (!non_ok_status_allowed) { fprintf(stderr, "Unable to perform CompactFiles(): %s under specified " @@ -3335,7 +3336,7 @@ void StressTest::TestCompactRange(ThreadState* thread, int64_t rand_key, if (!status.ok()) { // TOOD (hx235): allow an exact list of tolerable failures under stress test bool non_ok_status_allowed = - status.IsManualCompactionPaused() || + status.IsManualCompactionPaused() || status.IsCompactionAborted() || IsErrorInjectedAndRetryable(status) || status.IsAborted() || status.IsInvalidArgument() || status.IsNotSupported(); if (!non_ok_status_allowed) { From e63666842e320d284103dc00bfb7b6e98dab3064 Mon Sep 17 00:00:00 2001 From: Xingbo Wang Date: Thu, 29 Jan 2026 09:51:24 -0800 Subject: [PATCH 6/7] fix crash test failure --- db/db_impl/db_impl_compaction_flush.cc | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc index 77df3b922fd7..2e31ff943cf9 100644 --- a/db/db_impl/db_impl_compaction_flush.cc +++ b/db/db_impl/db_impl_compaction_flush.cc @@ -1719,6 +1719,12 @@ Status DBImpl::CompactFilesImpl( "[%s] [JOB %d] Stopping manual compaction", c->column_family_data()->GetName().c_str(), job_context->job_id); + } else if (status.IsCompactionAborted()) { + // Don't report aborted compaction as error + ROCKS_LOG_INFO(immutable_db_options_.info_log, + "[%s] [JOB %d] Compaction aborted", + c->column_family_data()->GetName().c_str(), + job_context->job_id); } else { ROCKS_LOG_WARN(immutable_db_options_.info_log, "[%s] [JOB %d] Compaction error: %s", @@ -3654,6 +3660,7 @@ void DBImpl::BackgroundCallCompaction(PrepickedCompaction* prepicked_compaction, // case of a failure). Thus, we force full scan in FindObsoleteFiles() FindObsoleteFiles(&job_context, !s.ok() && !s.IsShutdownInProgress() && !s.IsManualCompactionPaused() && + !s.IsCompactionAborted() && !s.IsColumnFamilyDropped() && !s.IsBusy()); TEST_SYNC_POINT("DBImpl::BackgroundCallCompaction:FoundObsoleteFiles"); From 563df0ff6bc34f627652bc63275811f390fce8f5 Mon Sep 17 00:00:00 2001 From: Xingbo Wang Date: Thu, 29 Jan 2026 10:17:34 -0800 Subject: [PATCH 7/7] fix format --- db/db_impl/db_impl_compaction_flush.cc | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc index 2e31ff943cf9..9cf25f639da0 100644 --- a/db/db_impl/db_impl_compaction_flush.cc +++ b/db/db_impl/db_impl_compaction_flush.cc @@ -1721,10 +1721,9 @@ Status DBImpl::CompactFilesImpl( job_context->job_id); } else if (status.IsCompactionAborted()) { // Don't report aborted compaction as error - ROCKS_LOG_INFO(immutable_db_options_.info_log, - "[%s] [JOB %d] Compaction aborted", - c->column_family_data()->GetName().c_str(), - job_context->job_id); + ROCKS_LOG_INFO( + immutable_db_options_.info_log, "[%s] [JOB %d] Compaction aborted", + c->column_family_data()->GetName().c_str(), job_context->job_id); } else { ROCKS_LOG_WARN(immutable_db_options_.info_log, "[%s] [JOB %d] Compaction error: %s",