diff --git a/BUCK b/BUCK index 8fa8f35d3d9e..7037c44e778f 100644 --- a/BUCK +++ b/BUCK @@ -4823,6 +4823,12 @@ cpp_unittest_wrapper(name="db_clip_test", extra_compiler_flags=[]) +cpp_unittest_wrapper(name="db_compaction_abort_test", + srcs=["db/db_compaction_abort_test.cc"], + deps=[":rocksdb_test_lib"], + extra_compiler_flags=[]) + + cpp_unittest_wrapper(name="db_compaction_filter_test", srcs=["db/db_compaction_filter_test.cc"], deps=[":rocksdb_test_lib"], diff --git a/CMakeLists.txt b/CMakeLists.txt index c0194e58d61c..b3fc440fe311 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1381,6 +1381,7 @@ if(WITH_TESTS) db/db_bloom_filter_test.cc db/db_compaction_filter_test.cc db/db_compaction_test.cc + db/db_compaction_abort_test.cc db/db_clip_test.cc db/db_dynamic_level_test.cc db/db_encryption_test.cc diff --git a/Makefile b/Makefile index 4f62ad5b576e..b2c3a8f6b741 100644 --- a/Makefile +++ b/Makefile @@ -1442,6 +1442,9 @@ db_compaction_filter_test: $(OBJ_DIR)/db/db_compaction_filter_test.o $(TEST_LIBR db_compaction_test: $(OBJ_DIR)/db/db_compaction_test.o $(TEST_LIBRARY) $(LIBRARY) $(AM_LINK) +db_compaction_abort_test: $(OBJ_DIR)/db/db_compaction_abort_test.o $(TEST_LIBRARY) $(LIBRARY) + $(AM_LINK) + db_clip_test: $(OBJ_DIR)/db/db_clip_test.o $(TEST_LIBRARY) $(LIBRARY) $(AM_LINK) diff --git a/db/c.cc b/db/c.cc index 7abab13a6fda..dae0d0ebb569 100644 --- a/db/c.cc +++ b/db/c.cc @@ -8570,6 +8570,14 @@ void rocksdb_enable_manual_compaction(rocksdb_t* db) { db->rep->EnableManualCompaction(); } +void rocksdb_abort_all_compactions(rocksdb_t* db) { + db->rep->AbortAllCompactions(); +} + +void rocksdb_resume_all_compactions(rocksdb_t* db) { + db->rep->ResumeAllCompactions(); +} + rocksdb_statistics_histogram_data_t* rocksdb_statistics_histogram_data_create() { return new rocksdb_statistics_histogram_data_t{}; diff --git a/db/c_test.c b/db/c_test.c index 7f05dd2ab4b2..6811fe4ae8cb 100644 --- a/db/c_test.c +++ b/db/c_test.c @@ -4447,7 +4447,7 @@ int main(int argc, char** argv) { StartPhase("statistics"); { - const uint32_t BYTES_WRITTEN_TICKER = 60; + const uint32_t BYTES_WRITTEN_TICKER = 61; const uint32_t DB_WRITE_HIST = 1; rocksdb_statistics_histogram_data_t* hist = diff --git a/db/compaction/compaction_job.cc b/db/compaction/compaction_job.cc index 3d51f8fd5410..d5ac5738527b 100644 --- a/db/compaction/compaction_job.cc +++ b/db/compaction/compaction_job.cc @@ -128,6 +128,10 @@ const char* GetCompactionProximalOutputRangeTypeString( } } +// Static constant for compaction abort flag - always false, used for +// compaction service jobs that don't support abort signaling +const std::atomic CompactionJob::kCompactionAbortedFalse{0}; + CompactionJob::CompactionJob( int job_id, Compaction* compaction, const ImmutableDBOptions& db_options, const MutableDBOptions& mutable_db_options, const FileOptions& file_options, @@ -141,10 +145,10 @@ CompactionJob::CompactionJob( CompactionJobStats* compaction_job_stats, Env::Priority thread_pri, const std::shared_ptr& io_tracer, const std::atomic& manual_compaction_canceled, - const std::string& db_id, const std::string& db_session_id, - std::string full_history_ts_low, std::string trim_ts, - BlobFileCompletionCallback* blob_callback, int* bg_compaction_scheduled, - int* bg_bottom_compaction_scheduled) + const std::atomic& compaction_aborted, const std::string& db_id, + const std::string& db_session_id, std::string full_history_ts_low, + std::string trim_ts, BlobFileCompletionCallback* blob_callback, + int* bg_compaction_scheduled, int* bg_bottom_compaction_scheduled) : compact_(new CompactionState(compaction)), internal_stats_(compaction->compaction_reason(), 1), db_options_(db_options), @@ -168,6 +172,7 @@ CompactionJob::CompactionJob( versions_(versions), shutting_down_(shutting_down), manual_compaction_canceled_(manual_compaction_canceled), + compaction_aborted_(compaction_aborted), db_directory_(db_directory), blob_output_directory_(blob_output_directory), db_mutex_(db_mutex), @@ -708,6 +713,7 @@ void CompactionJob::InitializeCompactionRun() { } void CompactionJob::RunSubcompactions() { + TEST_SYNC_POINT("CompactionJob::RunSubcompactions:BeforeStart"); const size_t num_threads = compact_->sub_compact_states.size(); assert(num_threads > 0); compact_->compaction->GetOrInitInputTableProperties(); @@ -753,6 +759,71 @@ void CompactionJob::RemoveEmptyOutputs() { } } +void CompactionJob::CleanupAbortedSubcompactions() { + ColumnFamilyData* cfd = compact_->compaction->column_family_data(); + + uint64_t total_sst_files_deleted = 0; + uint64_t total_blob_files_deleted = 0; + + // Track the first file deletion error to report at the end + Status first_error; + int deletion_errors = 0; + + // Mark all subcompactions as aborted and delete their output files + for (auto& sub_compact : compact_->sub_compact_states) { + // Mark this subcompaction as aborted + sub_compact.status = + Status::Incomplete(Status::SubCode::kCompactionAborted); + + // Delete all files (SST and blob) tracked during compaction. + // GetOutputFilePaths() contains ALL file paths created, including + // in-progress files that may have been removed from outputs_ or + // blob_file_additions_. + for (const bool is_proximal_level : {false, true}) { + if (is_proximal_level && + !compact_->compaction->SupportsPerKeyPlacement()) { + continue; + } + for (const std::string& file_path : + sub_compact.Outputs(is_proximal_level)->GetOutputFilePaths()) { + Status s = env_->DeleteFile(file_path); + if (s.ok()) { + // Count SST vs blob files by checking extension + if (file_path.find(".sst") != std::string::npos) { + total_sst_files_deleted++; + } else if (file_path.find(".blob") != std::string::npos) { + total_blob_files_deleted++; + } + } else if (!s.IsNotFound()) { + if (first_error.ok()) { + first_error = s; + } + deletion_errors++; + } + } + } + sub_compact.CleanupOutputs(); + } + + if (stats_) { + RecordTick(stats_, COMPACTION_ABORTED); + } + + ROCKS_LOG_INFO(db_options_.info_log, + "[%s] [JOB %d] Compaction aborted: deleted %" PRIu64 + " SST files and %" PRIu64 " blob files", + cfd->GetName().c_str(), job_id_, total_sst_files_deleted, + total_blob_files_deleted); + + if (!first_error.ok()) { + ROCKS_LOG_ERROR(db_options_.info_log, + "[%s] [JOB %d] Cleanup completed with %d file deletion " + "errors. First error: %s", + cfd->GetName().c_str(), job_id_, deletion_errors, + first_error.ToString().c_str()); + } +} + bool CompactionJob::HasNewBlobFiles() const { for (const auto& state : compact_->sub_compact_states) { if (state.Current().HasBlobFileAdditions()) { @@ -1004,6 +1075,15 @@ Status CompactionJob::Run() { Status status = CollectSubcompactionErrors(); + // If compaction was aborted or manually paused, clean up any output files + // from completed subcompactions to prevent orphaned files on disk. + // Skip cleanup for resumable compaction (when progress writer is set) + // because the output files are needed for resumption. + if ((status.IsCompactionAborted() || status.IsManualCompactionPaused()) && + compaction_progress_writer_ == nullptr) { + CleanupAbortedSubcompactions(); + } + if (status.ok()) { status = SyncOutputDirectories(); } @@ -1415,10 +1495,10 @@ InternalIterator* CompactionJob::CreateInputIterator( return input; } -void CompactionJob::CreateBlobFileBuilder(SubcompactionState* sub_compact, - ColumnFamilyData* cfd, - BlobFileResources& blob_resources, - const WriteOptions& write_options) { +void CompactionJob::CreateBlobFileBuilder( + SubcompactionState* sub_compact, ColumnFamilyData* cfd, + std::unique_ptr& blob_file_builder, + const WriteOptions& write_options) { const auto& mutable_cf_options = sub_compact->compaction->mutable_cf_options(); @@ -1427,24 +1507,24 @@ void CompactionJob::CreateBlobFileBuilder(SubcompactionState* sub_compact, if (mutable_cf_options.enable_blob_files && sub_compact->compaction->output_level() >= mutable_cf_options.blob_file_starting_level) { - blob_resources.blob_file_builder = std::make_unique( + blob_file_builder = std::make_unique( versions_, fs_.get(), &sub_compact->compaction->immutable_options(), &mutable_cf_options, &file_options_, &write_options, db_id_, db_session_id_, job_id_, cfd->GetID(), cfd->GetName(), write_hint_, io_tracer_, blob_callback_, BlobFileCreationReason::kCompaction, - &blob_resources.blob_file_paths, + sub_compact->Current().GetOutputFilePathsPtr(), sub_compact->Current().GetBlobFileAdditionsPtr()); } else { - blob_resources.blob_file_builder = nullptr; + blob_file_builder = nullptr; } } std::unique_ptr CompactionJob::CreateCompactionIterator( SubcompactionState* sub_compact, ColumnFamilyData* cfd, InternalIterator* input, const CompactionFilter* compaction_filter, - MergeHelper& merge, BlobFileResources& blob_resources, + MergeHelper& merge, std::unique_ptr& blob_file_builder, const WriteOptions& write_options) { - CreateBlobFileBuilder(sub_compact, cfd, blob_resources, write_options); + CreateBlobFileBuilder(sub_compact, cfd, blob_file_builder, write_options); const std::string* const full_history_ts_low = full_history_ts_low_.empty() ? nullptr : &full_history_ts_low_; @@ -1456,7 +1536,7 @@ std::unique_ptr CompactionJob::CreateCompactionIterator( job_context_->earliest_write_conflict_snapshot, job_context_->GetJobSnapshotSequence(), job_context_->snapshot_checker, env_, ShouldReportDetailedTime(env_, stats_), sub_compact->RangeDelAgg(), - blob_resources.blob_file_builder.get(), db_options_.allow_data_in_errors, + blob_file_builder.get(), db_options_.allow_data_in_errors, db_options_.enforce_single_del_contracts, manual_compaction_canceled_, sub_compact->compaction ->DoesInputReferenceBlobFiles() /* must_count_input_entries */, @@ -1495,10 +1575,17 @@ Status CompactionJob::ProcessKeyValue( SubcompactionState* sub_compact, ColumnFamilyData* cfd, CompactionIterator* c_iter, const CompactionFileOpenFunc& open_file_func, const CompactionFileCloseFunc& close_file_func, uint64_t& prev_cpu_micros) { - Status status; - const uint64_t kRecordStatsEvery = 1000; + // Cron interval for periodic operations: stats update, abort check, + // and sync points. Uses 1024 (power of 2) for efficient bitwise check. + const uint64_t kCronEveryMask = (1 << 10) - 1; [[maybe_unused]] const std::optional end = sub_compact->end; + // Check for abort signal before starting key processing + if (compaction_aborted_.load(std::memory_order_acquire) > 0) { + return Status::Incomplete(Status::SubCode::kCompactionAborted); + } + + Status status; IterKey prev_iter_output_key; ParsedInternalKey prev_iter_output_internal_key; @@ -1511,8 +1598,16 @@ Status CompactionJob::ProcessKeyValue( assert(!end.has_value() || cfd->user_comparator()->Compare(c_iter->user_key(), *end) < 0); - if (c_iter->iter_stats().num_input_records % kRecordStatsEvery == - kRecordStatsEvery - 1) { + const uint64_t num_records = c_iter->iter_stats().num_input_records; + + // Periodic cron operations: stats update, abort check. + if ((num_records & kCronEveryMask) == kCronEveryMask) { + // Check for abort signal periodically + if (compaction_aborted_.load(std::memory_order_acquire) > 0) { + status = Status::Incomplete(Status::SubCode::kCompactionAborted); + break; + } + UpdateSubcompactionJobStatsIncrementally( c_iter, &sub_compact->compaction_job_stats, db_options_.clock->CPUMicros(), prev_cpu_micros); @@ -1719,6 +1814,7 @@ Status CompactionJob::FinalizeBlobFiles(SubcompactionState* sub_compact, } void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { + TEST_SYNC_POINT("CompactionJob::ProcessKeyValueCompaction:Start"); assert(sub_compact); assert(sub_compact->compaction); @@ -1772,11 +1868,11 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { false /* internal key corruption is expected */, job_context_->GetLatestSnapshotSequence(), job_context_->snapshot_checker, compact_->compaction->level(), db_options_.stats); - BlobFileResources blob_resources; + std::unique_ptr blob_file_builder; auto c_iter = CreateCompactionIterator(sub_compact, cfd, input_iter, compaction_filter, - merge, blob_resources, write_options); + merge, blob_file_builder, write_options); assert(c_iter); c_iter->SeekToFirst(); @@ -1794,9 +1890,8 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { status = FinalizeProcessKeyValueStatus(cfd, input_iter, c_iter.get(), status); FinalizeSubcompaction(sub_compact, status, open_file_func, close_file_func, - blob_resources.blob_file_builder.get(), c_iter.get(), - input_iter, start_cpu_micros, prev_cpu_micros, - io_stats); + blob_file_builder.get(), c_iter.get(), input_iter, + start_cpu_micros, prev_cpu_micros, io_stats); NotifyOnSubcompactionCompleted(sub_compact); } @@ -2295,6 +2390,10 @@ Status CompactionJob::OpenCompactionOutputFile(SubcompactionState* sub_compact, Status s; IOStatus io_s = NewWritableFile(fs_.get(), fname, &writable_file, fo_copy); s = io_s; + if (io_s.ok()) { + // Track the SST file path for cleanup on abort. + outputs.AddOutputFilePath(fname); + } if (sub_compact->io_status.ok()) { sub_compact->io_status = io_s; // Since this error is really a copy of the io_s that is checked below as s, diff --git a/db/compaction/compaction_job.h b/db/compaction/compaction_job.h index c9dac611cd6f..8b942c6fe64d 100644 --- a/db/compaction/compaction_job.h +++ b/db/compaction/compaction_job.h @@ -142,6 +142,9 @@ class SubcompactionState; class CompactionJob { public: + // Constant false aborted flag, used for compaction service jobs + static const std::atomic kCompactionAbortedFalse; + CompactionJob(int job_id, Compaction* compaction, const ImmutableDBOptions& db_options, const MutableDBOptions& mutable_db_options, @@ -157,6 +160,7 @@ class CompactionJob { Env::Priority thread_pri, const std::shared_ptr& io_tracer, const std::atomic& manual_compaction_canceled, + const std::atomic& compaction_aborted, const std::string& db_id = "", const std::string& db_session_id = "", std::string full_history_ts_low = "", std::string trim_ts = "", @@ -299,6 +303,7 @@ class CompactionJob { void RunSubcompactions(); void UpdateTimingStats(uint64_t start_micros); void RemoveEmptyOutputs(); + void CleanupAbortedSubcompactions(); bool HasNewBlobFiles() const; Status CollectSubcompactionErrors(); Status SyncOutputDirectories(); @@ -363,11 +368,6 @@ class CompactionJob { std::unique_ptr trim_history_iter; }; - struct BlobFileResources { - std::vector blob_file_paths; - std::unique_ptr blob_file_builder; - }; - bool ShouldUseLocalCompaction(SubcompactionState* sub_compact); CompactionIOStatsSnapshot InitializeIOStats(); Status SetupAndValidateCompactionFilter( @@ -382,14 +382,14 @@ class CompactionJob { SubcompactionState* sub_compact, ColumnFamilyData* cfd, SubcompactionInternalIterators& iterators, SubcompactionKeyBoundaries& boundaries, ReadOptions& read_options); - void CreateBlobFileBuilder(SubcompactionState* sub_compact, - ColumnFamilyData* cfd, - BlobFileResources& blob_resources, - const WriteOptions& write_options); + void CreateBlobFileBuilder( + SubcompactionState* sub_compact, ColumnFamilyData* cfd, + std::unique_ptr& blob_file_builder, + const WriteOptions& write_options); std::unique_ptr CreateCompactionIterator( SubcompactionState* sub_compact, ColumnFamilyData* cfd, InternalIterator* input_iter, const CompactionFilter* compaction_filter, - MergeHelper& merge, BlobFileResources& blob_resources, + MergeHelper& merge, std::unique_ptr& blob_file_builder, const WriteOptions& write_options); std::pair CreateFileHandlers( SubcompactionState* sub_compact, SubcompactionKeyBoundaries& boundaries); @@ -461,6 +461,7 @@ class CompactionJob { VersionSet* versions_; const std::atomic* shutting_down_; const std::atomic& manual_compaction_canceled_; + const std::atomic& compaction_aborted_; FSDirectory* db_directory_; FSDirectory* blob_output_directory_; InstrumentedMutex* db_mutex_; diff --git a/db/compaction/compaction_job_test.cc b/db/compaction/compaction_job_test.cc index 95d74be4d485..ce55dfe4f8ee 100644 --- a/db/compaction/compaction_job_test.cc +++ b/db/compaction/compaction_job_test.cc @@ -676,8 +676,8 @@ class CompactionJobTestBase : public testing::Test { &event_logger, false, false, dbname_, &compaction_job_stats_, Env::Priority::USER, nullptr /* IOTracer */, /*manual_compaction_canceled=*/kManualCompactionCanceledFalse, - env_->GenerateUniqueId(), DBImpl::GenerateDbSessionId(nullptr), - full_history_ts_low_); + CompactionJob::kCompactionAbortedFalse, env_->GenerateUniqueId(), + DBImpl::GenerateDbSessionId(nullptr), full_history_ts_low_); VerifyInitializationOfCompactionJobStats(compaction_job_stats_); compaction_job.Prepare(std::nullopt /*subcompact to be computed*/); @@ -2545,7 +2545,8 @@ class ResumableCompactionJobTest : public CompactionJobTestBase { versions_.get(), &shutting_down_, &log_buffer, nullptr, nullptr, nullptr, stats.get(), &mutex_, &error_handler_, &job_context, table_cache_, &event_logger, false, false, dbname_, &job_stats, - Env::Priority::USER, nullptr, cancel_, env_->GenerateUniqueId(), + Env::Priority::USER, nullptr, cancel_, + CompactionJob::kCompactionAbortedFalse, env_->GenerateUniqueId(), DBImpl::GenerateDbSessionId(nullptr), ""); compaction_job.Prepare(std::nullopt, compaction_progress, diff --git a/db/compaction/compaction_outputs.h b/db/compaction/compaction_outputs.h index 6f9de28efcfd..757e1b6b85ed 100644 --- a/db/compaction/compaction_outputs.h +++ b/db/compaction/compaction_outputs.h @@ -84,6 +84,19 @@ class CompactionOutputs { bool HasBlobFileAdditions() const { return !blob_file_additions_.empty(); } + // Get all file paths (SST and blob) created during compaction. + const std::vector& GetOutputFilePaths() const { + return output_file_paths_; + } + + std::vector* GetOutputFilePathsPtr() { + return &output_file_paths_; + } + + void AddOutputFilePath(const std::string& path) { + output_file_paths_.push_back(path); + } + BlobGarbageMeter* CreateBlobGarbageMeter() { assert(!is_proximal_level_); blob_garbage_meter_ = std::make_unique(); @@ -321,6 +334,12 @@ class CompactionOutputs { std::vector blob_file_additions_; std::unique_ptr blob_garbage_meter_; + // All file paths (SST and blob) created during compaction. + // Used for cleanup on abort - ensures orphan files are deleted even if + // they were removed from outputs_ or blob_file_additions_ (e.g., by + // RemoveLastEmptyOutput when file_size is 0 because builder was abandoned). + std::vector output_file_paths_; + // Per level's output stat InternalStats::CompactionStats stats_; diff --git a/db/compaction/compaction_service_job.cc b/db/compaction/compaction_service_job.cc index d9eea538193f..cb88c53d8f8d 100644 --- a/db/compaction/compaction_service_job.cc +++ b/db/compaction/compaction_service_job.cc @@ -117,6 +117,14 @@ CompactionJob::ProcessKeyValueCompactionWithCompactionService( std::string debug_str_before_wait = compaction->input_version()->DebugString(/*hex=*/true); + // TODO: Update CompactionService API to support abort and resume + // functionality. Currently, remote compaction jobs cannot be aborted via + // AbortAllCompactions() because the CompactionService interface lacks methods + // to signal abort to remote workers and to properly resume after an abort. + // The API needs to be extended with: + // - A method to signal abort to running remote compaction jobs + // - A method to resume/re-enable scheduling after an abort is lifted + ROCKS_LOG_INFO(db_options_.info_log, "[%s] [JOB %d] Waiting for remote compaction...", compaction->column_family_data()->GetName().c_str(), job_id_); @@ -312,16 +320,17 @@ CompactionServiceCompactionJob::CompactionServiceCompactionJob( std::string output_path, const CompactionServiceInput& compaction_service_input, CompactionServiceResult* compaction_service_result) - : CompactionJob(job_id, compaction, db_options, mutable_db_options, - file_options, versions, shutting_down, log_buffer, nullptr, - output_directory, nullptr, stats, db_mutex, - db_error_handler, job_context, std::move(table_cache), - event_logger, - compaction->mutable_cf_options().paranoid_file_checks, - compaction->mutable_cf_options().report_bg_io_stats, dbname, - &(compaction_service_result->stats), Env::Priority::USER, - io_tracer, manual_compaction_canceled, db_id, db_session_id, - compaction->column_family_data()->GetFullHistoryTsLow()), + : CompactionJob( + job_id, compaction, db_options, mutable_db_options, file_options, + versions, shutting_down, log_buffer, nullptr, output_directory, + nullptr, stats, db_mutex, db_error_handler, job_context, + std::move(table_cache), event_logger, + compaction->mutable_cf_options().paranoid_file_checks, + compaction->mutable_cf_options().report_bg_io_stats, dbname, + &(compaction_service_result->stats), Env::Priority::USER, io_tracer, + manual_compaction_canceled, CompactionJob::kCompactionAbortedFalse, + db_id, db_session_id, + compaction->column_family_data()->GetFullHistoryTsLow()), output_path_(std::move(output_path)), compaction_input_(compaction_service_input), compaction_result_(compaction_service_result) {} diff --git a/db/compaction/subcompaction_state.h b/db/compaction/subcompaction_state.h index 09af46540ca9..38785f9ae085 100644 --- a/db/compaction/subcompaction_state.h +++ b/db/compaction/subcompaction_state.h @@ -95,6 +95,14 @@ class SubcompactionState { proximal_level_outputs_.RemoveLastEmptyOutput(); } + // Cleanup output builders for abandoning in-progress files. + void CleanupOutputs() { + compaction_outputs_.Cleanup(); + if (compaction->SupportsPerKeyPlacement()) { + proximal_level_outputs_.Cleanup(); + } + } + void BuildSubcompactionJobInfo( SubcompactionJobInfo& subcompaction_job_info) const { const Compaction* c = compaction; diff --git a/db/db_compaction_abort_test.cc b/db/db_compaction_abort_test.cc new file mode 100644 index 000000000000..a76e1d689f1f --- /dev/null +++ b/db/db_compaction_abort_test.cc @@ -0,0 +1,993 @@ +// Copyright (c) Meta Platforms, Inc. and affiliates. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include +#include +#include +#include + +#include "db/compaction/compaction_job.h" +#include "db/db_impl/db_impl_secondary.h" +#include "db/db_test_util.h" +#include "options/options_helper.h" +#include "port/stack_trace.h" +#include "rocksdb/db.h" +#include "rocksdb/sst_file_writer.h" +#include "test_util/sync_point.h" +#include "test_util/testharness.h" + +namespace ROCKSDB_NAMESPACE { + +// Helper class to manage abort synchronization in tests. +// +// Compaction abort could happen at various stage of compaction. +// To test this, we need to trigger abort at different stage. This requires +// precise control on the timing of abort API invocation. To achieve this in a +// consistent way across various tests, we invoke AbortAllCompactions() within +// the sync point callback, that is added at various stages of compaction. +// However as the abort API is a blocking call, calling it within the sync point +// callback on the compaction thread would cause deadlock. This test helper +// class is designed to solve this challenge. +// +// 1. Abort must happen from a different thread: +// AbortAllCompactions() is typically called from the compaction thread +// via a sync point callback, so that we could precisely control the time of +// API invocation to simulate abort at different stage of compaction. +// However, we can't block the compaction thread waiting for the abort to +// complete - the compaction needs to continue executing to actually check +// the abort flag and exit. So we spawn a separate thread to call +// AbortAllCompactions(). +// +// 2. We need to know when abort completes: +// After compaction returns (with aborted status), we often need to: +// - Verify state (e.g., no output files created) +// - Call ResumeAllCompactions() +// - Run compaction again to verify it succeeds +// We must wait for the abort thread to finish before proceeding, otherwise +// we might call Resume before Abort completes, causing race conditions. +// +// 3. Sync point callbacks may fire multiple times: +// With multiple subcompactions, a callback like +// "CompactionJob::ProcessKeyValueCompaction:Start" fires once per +// subcompaction. We only want to trigger abort once, so we use +// abort_triggered_ as a guard. +// +// 4. Tests may need multiple abort cycles: +// Some tests (e.g., MultipleAbortResumeSequence) do abort->resume->abort +// multiple times. The class supports this by auto-resetting when a +// previous abort has completed. +class AbortSynchronizer { + public: + AbortSynchronizer() : abort_cv_(&abort_mutex_) {} + + ~AbortSynchronizer() { + // Join the thread if it was started - ensures clean shutdown + if (abort_thread_.joinable()) { + abort_thread_.join(); + } + } + + // Non-copyable, non-movable due to thread member + AbortSynchronizer(const AbortSynchronizer&) = delete; + AbortSynchronizer& operator=(const AbortSynchronizer&) = delete; + + // Trigger abort from a separate thread. + // - Safe to call multiple times; only first call in each cycle spawns thread + // - If a previous abort has completed, automatically resets state first + // - The spawned thread calls AbortAllCompactions() and signals completion + void TriggerAbort(DBImpl* db) { + // If previous abort completed, reset state to allow new abort + if (abort_triggered_.load() && abort_completed_.load()) { + Reset(); + } + + if (!abort_triggered_.exchange(true)) { + abort_thread_ = std::thread([this, db]() { + db->AbortAllCompactions(); + SignalAbortCompleted(); + }); + } + } + + // Wait for the abort thread to complete. + // Call this AFTER compaction returns to ensure the abort thread has finished + // before proceeding with Resume or other operations. + void WaitForAbortCompletion() { + MutexLock l(&abort_mutex_); + while (!abort_completed_.load()) { + abort_cv_.Wait(); + } + } + + // Reset state for reuse. Joins any previous thread first. + // Called automatically by TriggerAbort() if previous abort completed, + // but can also be called explicitly for clarity. + void Reset() { + if (abort_thread_.joinable()) { + abort_thread_.join(); + } + abort_triggered_.store(false); + abort_completed_.store(false); + } + + bool IsAbortTriggered() const { return abort_triggered_.load(); } + + private: + void SignalAbortCompleted() { + MutexLock l(&abort_mutex_); + abort_completed_.store(true); + abort_cv_.SignalAll(); + } + + std::atomic abort_triggered_{false}; // Guards against multiple spawns + std::atomic abort_completed_{false}; // Signals thread completion + port::Mutex abort_mutex_; + port::CondVar abort_cv_; + std::thread abort_thread_; // The thread that calls AbortAllCompactions() +}; + +// Helper to clean up SyncPoint state after tests +inline void CleanupSyncPoints() { + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks(); +} + +// Helper class that combines AbortSynchronizer with sync point setup for +// deterministic abort triggering. This adds sync point coordination on top +// of AbortSynchronizer: +// +// This is useful when you need deterministic timing - the callback won't +// return until AbortAllCompactions() has actually set the abort flag, +// guaranteeing the compaction will see it on the next check. +class SyncPointAbortHelper { + public: + explicit SyncPointAbortHelper(const std::string& trigger_point) + : trigger_point_(trigger_point) {} + + // Set up sync points and callbacks. Call this before starting compaction. + void Setup(DBImpl* db_impl) { + db_impl_ = db_impl; + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({ + {"DBImpl::AbortAllCompactions:FlagSet", kWaitPointName}, + }); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + trigger_point_, [this](void* /*arg*/) { + // Use AbortSynchronizer to handle the abort in a separate thread + abort_sync_.TriggerAbort(db_impl_); + + // Wait for abort flag to be set via sync point dependency + // This ensures deterministic timing - compaction will see the flag + TEST_SYNC_POINT_CALLBACK(kWaitPointName, nullptr); + }); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + } + + // Wait for the abort to complete. Call this after compaction returns. + void WaitForAbortCompletion() { abort_sync_.WaitForAbortCompletion(); } + + // Clean up sync points and wait for abort completion in one call + void CleanupAndWait() { + CleanupSyncPoints(); + WaitForAbortCompletion(); + } + + private: + static constexpr const char* kWaitPointName = + "SyncPointAbortHelper::WaitForAbort"; + std::string trigger_point_; + DBImpl* db_impl_{nullptr}; + AbortSynchronizer abort_sync_; +}; + +class DBCompactionAbortTest : public DBTestBase { + public: + DBCompactionAbortTest() + : DBTestBase("db_compaction_abort_test", /*env_do_fsync=*/false) {} + + protected: + // Map to track the latest value of each key for verification + std::unordered_map expected_values_; + + // Statistics object for verifying compaction metrics + std::shared_ptr stats_; + + // Get current options with statistics enabled + Options GetOptionsWithStats() { + Options options = CurrentOptions(); + stats_ = CreateDBStatistics(); + options.statistics = stats_; + return options; + } + + // Populate database with test data. + // If overlapping=true, uses the same key range (0 to keys_per_file-1) in each + // file to ensure compaction has work to do. + // If overlapping=false, uses non-overlapping keys across files. + void PopulateData(int num_files, int keys_per_file, int value_size, + bool overlapping = true, int seed = 301) { + Random rnd(seed); + for (int i = 0; i < num_files; ++i) { + for (int j = 0; j < keys_per_file; ++j) { + int key_index = overlapping ? j : (j + i * keys_per_file); + std::string key = Key(key_index); + std::string value = rnd.RandomString(value_size); + ASSERT_OK(Put(key, value)); + expected_values_[key] = value; + } + ASSERT_OK(Flush()); + } + } + + // Verify data integrity by reading all keys and comparing with expected + // values + void VerifyDataIntegrity(int num_keys, int start_key = 0) { + std::string val; + for (int j = start_key; j < start_key + num_keys; ++j) { + std::string key = Key(j); + ASSERT_OK(dbfull()->Get(ReadOptions(), key, &val)); + auto it = expected_values_.find(key); + if (it != expected_values_.end()) { + ASSERT_EQ(it->second, val) << "Value mismatch for key: " << key; + } + } + } + + // Clear expected values (useful when reopening DB or between tests) + void ClearExpectedValues() { expected_values_.clear(); } + + // Run the common abort test pattern with SyncPointAbortHelper: + // 1. Set up sync point abort helper + // 2. Run compaction and verify it's aborted + // 3. Verify COMPACTION_ABORTED stat increased (if stats enabled) + // 4. Clean up, resume, and verify compaction succeeds + // 5. Verify COMPACT_WRITE_BYTES increased (if stats enabled) + void RunSyncPointAbortTest(const std::string& trigger_point, + CompactRangeOptions cro = CompactRangeOptions()) { + // Capture stats and file counts before abort + uint64_t aborted_before = 0; + uint64_t write_bytes_before = 0; + if (stats_) { + aborted_before = stats_->getTickerCount(COMPACTION_ABORTED); + write_bytes_before = stats_->getTickerCount(COMPACT_WRITE_BYTES); + } + + SyncPointAbortHelper helper(trigger_point); + helper.Setup(dbfull()); + + Status s = dbfull()->CompactRange(cro, nullptr, nullptr); + ASSERT_TRUE(s.IsIncomplete()); + ASSERT_TRUE(s.IsCompactionAborted()); + + // Verify abort was counted + if (stats_) { + uint64_t aborted_after = stats_->getTickerCount(COMPACTION_ABORTED); + ASSERT_GT(aborted_after, aborted_before) + << "COMPACTION_ABORTED stat should increase after abort"; + } + + helper.CleanupAndWait(); + dbfull()->ResumeAllCompactions(); + + ASSERT_OK(dbfull()->CompactRange(cro, nullptr, nullptr)); + + // Verify compaction completed and wrote bytes + if (stats_) { + uint64_t write_bytes_after = stats_->getTickerCount(COMPACT_WRITE_BYTES); + ASSERT_GT(write_bytes_after, write_bytes_before) + << "COMPACT_WRITE_BYTES should increase after successful compaction"; + } + } +}; + +// Parameterized test for abort with different number of max subcompactions. +// This consolidates tests that were essentially duplicates with different +// max_subcompactions values +class DBCompactionAbortSubcompactionTest + : public DBCompactionAbortTest, + public ::testing::WithParamInterface {}; + +TEST_P(DBCompactionAbortSubcompactionTest, AbortWithVaryingSubcompactions) { + int max_subcompactions = GetParam(); + + Options options = GetOptionsWithStats(); + options.level0_file_num_compaction_trigger = 4; + options.max_subcompactions = max_subcompactions; + options.disable_auto_compactions = true; + Reopen(options); + + PopulateData(/*num_files=*/4, /*keys_per_file=*/100, /*value_size=*/100); + + RunSyncPointAbortTest("CompactionJob::RunSubcompactions:BeforeStart"); + + VerifyDataIntegrity(/*num_keys=*/100); +} + +INSTANTIATE_TEST_CASE_P(SubcompactionVariants, + DBCompactionAbortSubcompactionTest, + ::testing::Values(1, 2, 4), + [](const ::testing::TestParamInfo& param_info) { + return "MaxSubcompactionCount_" + + std::to_string(param_info.param); + }); + +// Parameterized test for abort with different compaction styles +// This consolidates tests for Level, Universal, and FIFO compaction styles +class DBCompactionAbortStyleTest + : public DBCompactionAbortTest, + public ::testing::WithParamInterface { + protected: + // Configure options based on compaction style + void ConfigureOptionsForStyle(Options& options, CompactionStyle style) { + options.compaction_style = style; + options.level0_file_num_compaction_trigger = 4; + options.disable_auto_compactions = true; + + switch (style) { + case kCompactionStyleLevel: + // Level compaction uses default settings + break; + case kCompactionStyleUniversal: + options.compaction_options_universal.size_ratio = 10; + break; + case kCompactionStyleFIFO: + // Set a large max_table_files_size to avoid deletion compaction + options.compaction_options_fifo.max_table_files_size = + 100 * 1024 * 1024; + // Enable intra-L0 compaction which goes through normal compaction path + options.compaction_options_fifo.allow_compaction = true; + options.max_open_files = -1; // Required for FIFO compaction + break; + default: + break; + } + } +}; + +TEST_P(DBCompactionAbortStyleTest, AbortCompaction) { + CompactionStyle style = GetParam(); + + Options options = GetOptionsWithStats(); + options.max_subcompactions = 1; + ConfigureOptionsForStyle(options, style); + Reopen(options); + + PopulateData(/*num_files=*/4, /*keys_per_file=*/100, /*value_size=*/100); + + RunSyncPointAbortTest("CompactionJob::RunSubcompactions:BeforeStart"); + + VerifyDataIntegrity(/*num_keys=*/100); +} + +INSTANTIATE_TEST_CASE_P( + CompactionStyleVariants, DBCompactionAbortStyleTest, + ::testing::Values(kCompactionStyleLevel, kCompactionStyleUniversal, + kCompactionStyleFIFO), + [](const ::testing::TestParamInfo& param_info) { + return OptionsHelper::compaction_style_to_string.at(param_info.param); + }); + +TEST_F(DBCompactionAbortTest, AbortManualCompaction) { + Options options = GetOptionsWithStats(); + options.level0_file_num_compaction_trigger = 10; + options.disable_auto_compactions = true; + Reopen(options); + + PopulateData(/*num_files=*/5, /*keys_per_file=*/100, /*value_size=*/1000); + + CompactRangeOptions cro; + cro.exclusive_manual_compaction = true; + RunSyncPointAbortTest("CompactionJob::ProcessKeyValueCompaction:Start", cro); + + VerifyDataIntegrity(/*num_keys=*/100); +} + +TEST_F(DBCompactionAbortTest, AbortAutomaticCompaction) { + Options options = CurrentOptions(); + options.level0_file_num_compaction_trigger = 4; + options.max_subcompactions = 2; + options.disable_auto_compactions = false; + Reopen(options); + + Random rnd(301); + AbortSynchronizer abort_sync; + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "CompactionJob::ProcessKeyValueCompaction:Start", + [&](void* /*arg*/) { abort_sync.TriggerAbort(dbfull()); }); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + + for (int i = 0; i < 4; ++i) { + for (int j = 0; j < 100; ++j) { + ASSERT_OK(Put(Key(j), rnd.RandomString(1000))); + } + ASSERT_OK(Flush()); + } + + ASSERT_OK(dbfull()->TEST_WaitForCompact()); + + CleanupSyncPoints(); + + abort_sync.WaitForAbortCompletion(); + dbfull()->ResumeAllCompactions(); + + for (int j = 0; j < 100; ++j) { + ASSERT_OK(Put(Key(j), rnd.RandomString(1000))); + } + ASSERT_OK(Flush()); + + ASSERT_OK(dbfull()->TEST_WaitForCompact()); + + std::string val; + for (int j = 0; j < 100; ++j) { + ASSERT_OK(dbfull()->Get(ReadOptions(), Key(j), &val)); + } +} + +TEST_F(DBCompactionAbortTest, AbortAndVerifyNoOutputFiles) { + Options options = CurrentOptions(); + options.level0_file_num_compaction_trigger = 4; + options.max_subcompactions = 2; + options.disable_auto_compactions = true; + Reopen(options); + + PopulateData(/*num_files=*/4, /*keys_per_file=*/100, /*value_size=*/1000); + + int num_l0_files_before = NumTableFilesAtLevel(0); + int num_l1_files_before = NumTableFilesAtLevel(1); + + SyncPointAbortHelper helper("CompactionJob::ProcessKeyValueCompaction:Start"); + helper.Setup(dbfull()); + + CompactRangeOptions cro; + Status s = dbfull()->CompactRange(cro, nullptr, nullptr); + ASSERT_TRUE(s.IsIncomplete()); + ASSERT_TRUE(s.IsCompactionAborted()); + + CleanupSyncPoints(); + + int num_l0_files_after = NumTableFilesAtLevel(0); + int num_l1_files_after = NumTableFilesAtLevel(1); + + ASSERT_EQ(num_l0_files_before, num_l0_files_after); + ASSERT_EQ(num_l1_files_before, num_l1_files_after); + + helper.WaitForAbortCompletion(); + dbfull()->ResumeAllCompactions(); + + ASSERT_OK(dbfull()->CompactRange(cro, nullptr, nullptr)); + + int num_l0_files_final = NumTableFilesAtLevel(0); + int num_l1_files_final = NumTableFilesAtLevel(1); + + ASSERT_EQ(0, num_l0_files_final); + ASSERT_GT(num_l1_files_final, 0); + + VerifyDataIntegrity(/*num_keys=*/100); +} + +TEST_F(DBCompactionAbortTest, MultipleAbortResumeSequence) { + Options options = CurrentOptions(); + options.level0_file_num_compaction_trigger = 4; + options.max_subcompactions = 2; + options.disable_auto_compactions = true; + Reopen(options); + + PopulateData(/*num_files=*/4, /*keys_per_file=*/100, /*value_size=*/1000); + + for (int round = 0; round < 3; ++round) { + // Use SyncPointAbortHelper for deterministic abort timing - it waits + // for the abort flag to be set via sync point dependency + SyncPointAbortHelper helper( + "CompactionJob::ProcessKeyValueCompaction:Start"); + helper.Setup(dbfull()); + + CompactRangeOptions cro; + Status s = dbfull()->CompactRange(cro, nullptr, nullptr); + ASSERT_TRUE(s.IsIncomplete()); + ASSERT_TRUE(s.IsCompactionAborted()); + + helper.CleanupAndWait(); + dbfull()->ResumeAllCompactions(); + } + + ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + + VerifyDataIntegrity(/*num_keys=*/100); +} + +TEST_F(DBCompactionAbortTest, AbortWithOutputFilesCleanup) { + Options options = CurrentOptions(); + options.num_levels = 2; // Ensure compaction output goes to L1 + options.level0_file_num_compaction_trigger = 4; + options.max_subcompactions = 2; + options.disable_auto_compactions = true; + options.target_file_size_base = 50 * 1024; + Reopen(options); + + PopulateData(/*num_files=*/4, /*keys_per_file=*/100, /*value_size=*/100); + + SyncPointAbortHelper helper("CompactionJob::RunSubcompactions:BeforeStart"); + helper.Setup(dbfull()); + + CompactRangeOptions cro; + Status s = dbfull()->CompactRange(cro, nullptr, nullptr); + ASSERT_TRUE(s.IsIncomplete()); + ASSERT_TRUE(s.IsCompactionAborted()); + + CleanupSyncPoints(); + + int num_l1_files_after_abort = NumTableFilesAtLevel(1); + ASSERT_EQ(0, num_l1_files_after_abort); + + helper.WaitForAbortCompletion(); + dbfull()->ResumeAllCompactions(); + + ASSERT_OK(dbfull()->CompactRange(cro, nullptr, nullptr)); + + // Verify L0 files are compacted and L1 has output files + int num_l0_files_final = NumTableFilesAtLevel(0); + int num_l1_files_final = NumTableFilesAtLevel(1); + ASSERT_EQ(0, num_l0_files_final) + << "L0 should be empty after successful compaction"; + ASSERT_GT(num_l1_files_final, 0) + << "L1 should have files after successful compaction"; + + VerifyDataIntegrity(/*num_keys=*/100); +} + +TEST_F(DBCompactionAbortTest, NestedAbortResumeCalls) { + // Test that nested AbortAllCompactions() calls work correctly with the + // counter + Options options = CurrentOptions(); + options.level0_file_num_compaction_trigger = 4; + options.max_subcompactions = 2; + options.disable_auto_compactions = true; + Reopen(options); + + PopulateData(/*num_files=*/4, /*keys_per_file=*/100, /*value_size=*/1000); + + // First abort call + dbfull()->AbortAllCompactions(); + + // Nested abort call (counter should be 2) + dbfull()->AbortAllCompactions(); + + // Compaction should still be blocked after one resume + dbfull()->ResumeAllCompactions(); + + // Compaction should still return aborted because counter is still 1 + CompactRangeOptions cro; + Status s = dbfull()->CompactRange(cro, nullptr, nullptr); + ASSERT_TRUE(s.IsIncomplete()); + ASSERT_TRUE(s.IsCompactionAborted()); + + // Second resume - counter should be 0 now + dbfull()->ResumeAllCompactions(); + + // Compaction should succeed now + ASSERT_OK(dbfull()->CompactRange(cro, nullptr, nullptr)); + + VerifyDataIntegrity(/*num_keys=*/100); +} + +TEST_F(DBCompactionAbortTest, AbortCompactFilesAPI) { + // Test that AbortAllCompactions works with CompactFiles API + Options options = CurrentOptions(); + options.level0_file_num_compaction_trigger = 100; // Disable auto compaction + options.disable_auto_compactions = true; + Reopen(options); + + PopulateData(/*num_files=*/4, /*keys_per_file=*/100, /*value_size=*/1000); + + // Get the L0 file names + std::vector files_to_compact; + ColumnFamilyMetaData cf_meta; + dbfull()->GetColumnFamilyMetaData(dbfull()->DefaultColumnFamily(), &cf_meta); + for (const auto& file : cf_meta.levels[0].files) { + files_to_compact.push_back(file.name); + } + ASSERT_GE(files_to_compact.size(), 2); + + SyncPointAbortHelper helper("CompactionJob::ProcessKeyValueCompaction:Start"); + helper.Setup(dbfull()); + + CompactionOptions compact_options; + Status s = dbfull()->CompactFiles(compact_options, files_to_compact, 1); + ASSERT_TRUE(s.IsIncomplete()); + ASSERT_TRUE(s.IsCompactionAborted()); + + helper.CleanupAndWait(); + dbfull()->ResumeAllCompactions(); + + // CompactFiles should work after resume + ASSERT_OK(dbfull()->CompactFiles(compact_options, files_to_compact, 1)); + + VerifyDataIntegrity(/*num_keys=*/100); +} + +TEST_F(DBCompactionAbortTest, AbortDoesNotAffectFlush) { + // Test that AbortAllCompactions does not affect flush operations + Options options = CurrentOptions(); + options.level0_file_num_compaction_trigger = 100; + options.disable_auto_compactions = true; + Reopen(options); + + Random rnd(301); + for (int j = 0; j < 100; ++j) { + ASSERT_OK(Put(Key(j), rnd.RandomString(1000))); + } + + // Abort compactions + dbfull()->AbortAllCompactions(); + + // Flush should still work + ASSERT_OK(Flush()); + + // Write more data + for (int j = 100; j < 200; ++j) { + ASSERT_OK(Put(Key(j), rnd.RandomString(1000))); + } + + // Flush should still work + ASSERT_OK(Flush()); + + // Resume compactions + dbfull()->ResumeAllCompactions(); + + VerifyDataIntegrity(/*num_keys=*/200); +} + +TEST_F(DBCompactionAbortTest, AbortBeforeCompactionStarts) { + // Test aborting before any compaction has started + Options options = CurrentOptions(); + options.level0_file_num_compaction_trigger = 4; + options.disable_auto_compactions = true; + Reopen(options); + + PopulateData(/*num_files=*/4, /*keys_per_file=*/100, /*value_size=*/1000); + + // Abort before starting compaction + dbfull()->AbortAllCompactions(); + + // Compaction should immediately return aborted + CompactRangeOptions cro; + Status s = dbfull()->CompactRange(cro, nullptr, nullptr); + ASSERT_TRUE(s.IsIncomplete()); + ASSERT_TRUE(s.IsCompactionAborted()); + + // Resume + dbfull()->ResumeAllCompactions(); + + // Now compaction should work + ASSERT_OK(dbfull()->CompactRange(cro, nullptr, nullptr)); + + // Verify L0 files are compacted + ASSERT_EQ(0, NumTableFilesAtLevel(0)); +} + +// Test that in-progress blob and SST files are properly cleaned up when +// compaction is aborted. This specifically tests the case where abort happens +// while files are being written (opened but not yet completed/closed). +// This catches the bug where files exist on disk but are removed from the +// outputs_ vector (e.g., by RemoveLastEmptyOutput when file_size is 0 because +// the builder was abandoned), leaving orphan files. +TEST_F(DBCompactionAbortTest, AbortWithInProgressFileCleanup) { + Options options = CurrentOptions(); + options.level0_file_num_compaction_trigger = 4; + options.max_subcompactions = + 1; // Single subcompaction for deterministic behavior + options.disable_auto_compactions = true; + options.target_file_size_base = 32 * 1024; // 32KB + + // Enable BlobDB with garbage collection to force blob rewriting during + // compaction + options.enable_blob_files = true; + options.min_blob_size = 0; // All values go to blob files + options.blob_file_size = + 1024 * 1024; // 1MB - large enough to not close during test + // Enable blob garbage collection - this forces blob data to be rewritten + // during compaction, creating new blob files + options.enable_blob_garbage_collection = true; + options.blob_garbage_collection_age_cutoff = 1.0; // Include all blob files + options.blob_garbage_collection_force_threshold = 0.0; // Always force GC + + Reopen(options); + + // Write enough data to trigger the periodic abort check (every 1000 records). + // 4 files * 2000 keys = 2000 unique overlapping keys processed during + // compaction. The sync point triggers at 999, 1999, etc. + PopulateData(/*num_files=*/4, /*keys_per_file=*/2000, /*value_size=*/500); + + // Helper function to get blob files on disk with their names + auto GetBlobFilesOnDisk = [this]() -> std::vector { + std::vector blob_files; + std::vector files; + EXPECT_OK(env_->GetChildren(dbname_, &files)); + for (const auto& f : files) { + if (f.find(".blob") != std::string::npos) { + blob_files.push_back(f); + } + } + std::sort(blob_files.begin(), blob_files.end()); + return blob_files; + }; + + // Helper function to get blob file count in metadata + auto GetBlobFilesInMetadata = [this]() -> std::vector { + std::vector blob_file_numbers; + ColumnFamilyMetaData cf_meta; + dbfull()->GetColumnFamilyMetaData(db_->DefaultColumnFamily(), &cf_meta); + for (const auto& blob_meta : cf_meta.blob_files) { + blob_file_numbers.push_back(blob_meta.blob_file_number); + } + std::sort(blob_file_numbers.begin(), blob_file_numbers.end()); + return blob_file_numbers; + }; + + // Helper function to get SST files on disk + auto GetSstFilesOnDisk = [this]() -> std::vector { + std::vector sst_files; + std::vector files; + EXPECT_OK(env_->GetChildren(dbname_, &files)); + for (const auto& f : files) { + if (f.find(".sst") != std::string::npos) { + sst_files.push_back(f); + } + } + std::sort(sst_files.begin(), sst_files.end()); + return sst_files; + }; + + // Helper function to get SST file numbers in metadata + auto GetSstFilesInMetadata = [this]() -> std::vector { + std::vector sst_file_numbers; + ColumnFamilyMetaData cf_meta; + dbfull()->GetColumnFamilyMetaData(db_->DefaultColumnFamily(), &cf_meta); + for (const auto& level : cf_meta.levels) { + for (const auto& file : level.files) { + // Extract file number from the file name (e.g., "000010.sst" -> 10) + uint64_t file_num = 0; + std::string fname = file.name; + // Remove leading path separators if present + size_t pos = fname.rfind('/'); + if (pos != std::string::npos) { + fname = fname.substr(pos + 1); + } + if (sscanf(fname.c_str(), "%" PRIu64, &file_num) == 1) { + sst_file_numbers.push_back(file_num); + } + } + } + std::sort(sst_file_numbers.begin(), sst_file_numbers.end()); + return sst_file_numbers; + }; + + std::vector initial_blob_files = GetBlobFilesOnDisk(); + std::vector initial_meta_blobs = GetBlobFilesInMetadata(); + std::vector initial_sst_files = GetSstFilesOnDisk(); + std::vector initial_meta_ssts = GetSstFilesInMetadata(); + + ASSERT_GT(initial_blob_files.size(), 0u) << "Expected initial blob files"; + ASSERT_EQ(initial_blob_files.size(), initial_meta_blobs.size()) + << "Initial blob files should match between disk and metadata"; + ASSERT_GT(initial_sst_files.size(), 0u) << "Expected initial SST files"; + ASSERT_EQ(initial_sst_files.size(), initial_meta_ssts.size()) + << "Initial SST files should match between disk and metadata"; + + // Tracking variables for blob file lifecycle + std::atomic blob_writes{0}; + std::atomic abort_triggered{false}; + AbortSynchronizer abort_sync; + + // Set up dependency: the wait point will block until FlagSet is hit + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({ + {"DBImpl::AbortAllCompactions:FlagSet", + "DBCompactionAbortTest::InProgressBlob:WaitForAbort"}, + }); + + // Trigger abort after some blob writes during compaction output. + // This ensures we have an in-progress blob file when abort happens. + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "BlobFileBuilder::WriteBlobToFile:AddRecord", [&](void* /*arg*/) { + int count = blob_writes.fetch_add(1) + 1; + + // Trigger abort after 100 blob writes - this ensures: + // 1. A blob file has been opened (for writing) + // 2. Some data has been written to it + // 3. But it's not yet completed (blob_file_size is 1MB) + if (count == 100 && !abort_triggered.exchange(true)) { + abort_sync.TriggerAbort(dbfull()); + // Wait for abort flag to be set - this sync point blocks until + // FlagSet is processed + TEST_SYNC_POINT_CALLBACK( + "DBCompactionAbortTest::InProgressBlob:WaitForAbort", nullptr); + } + }); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + + // Run compaction - it should be aborted while blob file is in-progress + CompactRangeOptions cro; + Status s = dbfull()->CompactRange(cro, nullptr, nullptr); + + ASSERT_TRUE(s.IsIncomplete()) + << "Expected compaction to be aborted, got: " << s.ToString(); + + CleanupSyncPoints(); + abort_sync.WaitForAbortCompletion(); + + // Check state after abort + std::vector post_abort_disk_blobs = GetBlobFilesOnDisk(); + std::vector post_abort_meta_blobs = GetBlobFilesInMetadata(); + std::vector post_abort_disk_ssts = GetSstFilesOnDisk(); + std::vector post_abort_meta_ssts = GetSstFilesInMetadata(); + + // This is the key assertion for blob files: files on disk should match + // metadata. If the in-progress blob file was NOT cleaned up, there will be an + // extra file on disk that's not in metadata (orphan). + ASSERT_EQ(post_abort_disk_blobs.size(), post_abort_meta_blobs.size()) + << "Orphan blob file detected! In-progress blob file was not cleaned up " + "after abort. Files on disk: " + << post_abort_disk_blobs.size() + << ", Files in metadata: " << post_abort_meta_blobs.size() + << ". The difference indicates orphaned in-progress blob file(s)."; + + // This is the key assertion for SST files: files on disk should match + // metadata. If the in-progress SST file was NOT cleaned up, there will be an + // extra file on disk that's not in metadata (orphan). + ASSERT_EQ(post_abort_disk_ssts.size(), post_abort_meta_ssts.size()) + << "Orphan SST file detected! In-progress SST file was not cleaned up " + "after abort. Files on disk: " + << post_abort_disk_ssts.size() + << ", Files in metadata: " << post_abort_meta_ssts.size() + << ". The difference indicates orphaned in-progress SST file(s)."; + + // Resume and complete compaction to verify DB is still functional + dbfull()->ResumeAllCompactions(); + + ASSERT_OK(dbfull()->CompactRange(cro, nullptr, nullptr)); + + // Verify data integrity - we wrote 4 files * 2000 keys with overlapping keys + VerifyDataIntegrity(/*num_keys=*/2000); +} + +TEST_F(DBCompactionAbortTest, AbortBottommostLevelCompaction) { + Options options = CurrentOptions(); + options.num_levels = 4; + options.level0_file_num_compaction_trigger = 2; + options.max_bytes_for_level_base = 1024 * 10; // 10KB + options.max_bytes_for_level_multiplier = 2; + options.disable_auto_compactions = true; + Reopen(options); + + // Write data to fill multiple levels (non-overlapping keys) + PopulateData(/*num_files=*/6, /*keys_per_file=*/100, + /*value_size=*/500, /*overlapping=*/false); + + // First compact to push data to lower levels + ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + + // Write more data to L0 (overlapping keys) + PopulateData(/*num_files=*/4, /*keys_per_file=*/100, /*value_size=*/500); + + SyncPointAbortHelper helper("CompactionJob::ProcessKeyValueCompaction:Start"); + helper.Setup(dbfull()); + + // Trigger bottommost level compaction + CompactRangeOptions cro; + cro.bottommost_level_compaction = BottommostLevelCompaction::kForce; + Status s = dbfull()->CompactRange(cro, nullptr, nullptr); + ASSERT_TRUE(s.IsIncomplete()); + ASSERT_TRUE(s.IsCompactionAborted()); + + helper.CleanupAndWait(); + dbfull()->ResumeAllCompactions(); + + ASSERT_OK(dbfull()->CompactRange(cro, nullptr, nullptr)); + + VerifyDataIntegrity(/*num_keys=*/600); +} + +// Test that while compactions are aborted, atomic range replace +// (IngestExternalFiles with atomic_replace_range) works correctly. +// This verifies that the abort state doesn't block other write operations +// like atomic range replace. +TEST_F(DBCompactionAbortTest, AbortThenAtomicRangeReplace) { + Options options = CurrentOptions(); + options.level0_file_num_compaction_trigger = 4; + options.max_subcompactions = 2; + options.disable_auto_compactions = true; + Reopen(options); + + // Create a directory for SST files + std::string sst_files_dir = dbname_ + "_sst_files/"; + ASSERT_OK(env_->CreateDirIfMissing(sst_files_dir)); + + // Populate initial data with overlapping keys + PopulateData(/*num_files=*/4, /*keys_per_file=*/100, /*value_size=*/500); + + // Verify initial data + VerifyDataIntegrity(/*num_keys=*/100); + + // Trigger compaction and abort it + SyncPointAbortHelper helper("CompactionJob::ProcessKeyValueCompaction:Start"); + helper.Setup(dbfull()); + + CompactRangeOptions cro; + Status s = dbfull()->CompactRange(cro, nullptr, nullptr); + ASSERT_TRUE(s.IsIncomplete()); + ASSERT_TRUE(s.IsCompactionAborted()); + + helper.CleanupAndWait(); + + // While compaction is still aborted, perform atomic range replace using + // IngestExternalFiles with atomic_replace_range. This verifies that the + // abort state doesn't block other write operations. + // Using RangeOpt() (empty range) means replace everything in the CF. + + // Create an SST file with new data for keys 0-49 (replacing keys 0-99) + std::string sst_file_path = sst_files_dir + "atomic_replace_1.sst"; + SstFileWriter sst_file_writer(EnvOptions(), options); + ASSERT_OK(sst_file_writer.Open(sst_file_path)); + + // Write new values for keys 0-49 + Random rnd(42); + std::unordered_map new_values; + for (int j = 0; j < 50; ++j) { + std::string key = Key(j); + std::string value = "replaced_" + rnd.RandomString(100); + ASSERT_OK(sst_file_writer.Put(key, value)); + new_values[key] = value; + } + ASSERT_OK(sst_file_writer.Finish()); + + // Perform atomic range replace for the entire column family. + // Using RangeOpt() (default constructor) means replace everything in the CF. + IngestExternalFileArg arg; + arg.column_family = db_->DefaultColumnFamily(); + arg.external_files = {sst_file_path}; + arg.atomic_replace_range = RangeOpt(); + // snapshot_consistency must be false when using atomic_replace_range + arg.options.snapshot_consistency = false; + + // Atomic range replace should work even while compactions are aborted + ASSERT_OK(db_->IngestExternalFiles({arg})); + + // Now resume compactions after the atomic range replace + dbfull()->ResumeAllCompactions(); + + // Verify that the atomic range replace worked correctly: + // 1. Keys 0-49 should have new replaced values + std::string val; + for (int j = 0; j < 50; ++j) { + std::string key = Key(j); + ASSERT_OK(db_->Get(ReadOptions(), key, &val)); + auto it = new_values.find(key); + ASSERT_NE(it, new_values.end()); + ASSERT_EQ(it->second, val) << "Value mismatch for replaced key: " << key; + } + + // 2. Keys 50-99 should not exist (they were replaced/deleted by atomic + // replace) + for (int j = 50; j < 100; ++j) { + std::string key = Key(j); + Status get_status = db_->Get(ReadOptions(), key, &val); + ASSERT_TRUE(get_status.IsNotFound()) + << "Key " << key << " should not exist after full CF replace"; + } + + // Clean up SST files directory + ASSERT_OK(DestroyDir(env_, sst_files_dir)); +} + +} // namespace ROCKSDB_NAMESPACE + +int main(int argc, char** argv) { + ROCKSDB_NAMESPACE::port::InstallStackTraceHandler(); + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index 451fbd41c70e..a3f25dd7788f 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -455,6 +455,8 @@ class DBImpl : public DB { void EnableManualCompaction() override; void DisableManualCompaction() override; + void AbortAllCompactions() override; + void ResumeAllCompactions() override; using DB::SetOptions; Status SetOptions( @@ -2789,6 +2791,14 @@ class DBImpl : public DB { // compaction code paths. std::atomic manual_compaction_paused_ = false; + // If non-zero, all compaction jobs (background automatic compactions, + // manual compactions via CompactRange, and foreground CompactFiles calls) + // are being aborted. Compactions will be signaled to stop. Any new + // compaction job would fail immediately. The value indicates how many threads + // have called AbortAllCompactions(). It is accessed in read mode outside the + // DB mutex in compaction code paths. + std::atomic compaction_aborted_ = 0; + // This condition variable is signaled on these conditions: // * whenever bg_compaction_scheduled_ goes down to 0 // * if AnyManualCompaction, whenever a compaction finishes, even if it hasn't diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc index 877b61007b99..9cf25f639da0 100644 --- a/db/db_impl/db_impl_compaction_flush.cc +++ b/db/db_impl/db_impl_compaction_flush.cc @@ -955,6 +955,10 @@ Status DBImpl::CompactRange(const CompactRangeOptions& options, return Status::Incomplete(Status::SubCode::kManualCompactionPaused); } + if (compaction_aborted_.load(std::memory_order_acquire) > 0) { + return Status::Incomplete(Status::SubCode::kCompactionAborted); + } + if (options.canceled && options.canceled->load(std::memory_order_acquire)) { return Status::Incomplete(Status::SubCode::kManualCompactionPaused); } @@ -1487,6 +1491,11 @@ Status DBImpl::CompactFilesImpl( return Status::ShutdownInProgress(); } + // triggered by AbortAllCompactions + if (compaction_aborted_.load(std::memory_order_acquire) > 0) { + return Status::Incomplete(Status::SubCode::kCompactionAborted); + } + // triggered by DisableManualCompactions or by user-set canceled flag in // CompactionOptions if (manual_compaction_paused_.load(std::memory_order_acquire) > 0 || @@ -1637,9 +1646,9 @@ Status DBImpl::CompactFilesImpl( c->mutable_cf_options().paranoid_file_checks, c->mutable_cf_options().report_bg_io_stats, dbname_, &compaction_job_stats, Env::Priority::USER, io_tracer_, - kManualCompactionCanceledFalse_, db_id_, db_session_id_, - c->column_family_data()->GetFullHistoryTsLow(), c->trim_ts(), - &blob_callback_, &bg_compaction_scheduled_, + kManualCompactionCanceledFalse_, compaction_aborted_, db_id_, + db_session_id_, c->column_family_data()->GetFullHistoryTsLow(), + c->trim_ts(), &blob_callback_, &bg_compaction_scheduled_, &bg_bottom_compaction_scheduled_); // Creating a compaction influences the compaction score because the score @@ -1710,6 +1719,11 @@ Status DBImpl::CompactFilesImpl( "[%s] [JOB %d] Stopping manual compaction", c->column_family_data()->GetName().c_str(), job_context->job_id); + } else if (status.IsCompactionAborted()) { + // Don't report aborted compaction as error + ROCKS_LOG_INFO( + immutable_db_options_.info_log, "[%s] [JOB %d] Compaction aborted", + c->column_family_data()->GetName().c_str(), job_context->job_id); } else { ROCKS_LOG_WARN(immutable_db_options_.info_log, "[%s] [JOB %d] Compaction error: %s", @@ -2170,6 +2184,17 @@ Status DBImpl::RunManualCompaction( return manual.status; } + if (compaction_aborted_.load(std::memory_order_acquire) > 0) { + // All compactions are being aborted. Return immediately. + int counter = compaction_aborted_.load(std::memory_order_acquire); + ROCKS_LOG_INFO( + immutable_db_options_.info_log, + "RunManualCompaction: Aborting due to compaction_aborted_=%d", counter); + manual.status = Status::Incomplete(Status::SubCode::kCompactionAborted); + manual.done = true; + return manual.status; + } + // When a manual compaction arrives, temporarily disable scheduling of // non-manual compactions and wait until the number of scheduled compaction // jobs drops to zero. This used to be needed to ensure that this manual @@ -2194,6 +2219,13 @@ Status DBImpl::RunManualCompaction( // and `CompactRangeOptions::canceled` might not work well together. while (bg_bottom_compaction_scheduled_ > 0 || bg_compaction_scheduled_ > 0) { + if (compaction_aborted_.load(std::memory_order_acquire) > 0) { + // Pretend the error came from compaction so the below cleanup/error + // handling code can process it. + manual.done = true; + manual.status = Status::Incomplete(Status::SubCode::kCompactionAborted); + break; + } if (manual_compaction_paused_ > 0 || manual.canceled == true) { // Pretend the error came from compaction so the below cleanup/error // handling code can process it. @@ -2312,7 +2344,12 @@ Status DBImpl::RunManualCompaction( if (!scheduled) { // There is nothing scheduled to wait on, so any cancellation can end the // manual now. - if (manual_compaction_paused_ > 0 || manual.canceled == true) { + if (compaction_aborted_.load(std::memory_order_acquire) > 0) { + // Stop waiting since it was canceled. Pretend the error came from + // compaction so the below cleanup/error handling code can process it. + manual.done = true; + manual.status = Status::Incomplete(Status::SubCode::kCompactionAborted); + } else if (manual_compaction_paused_ > 0 || manual.canceled == true) { // Stop waiting since it was canceled. Pretend the error came from // compaction so the below cleanup/error handling code can process it. manual.done = true; @@ -2930,6 +2967,61 @@ void DBImpl::EnableManualCompaction() { manual_compaction_paused_.fetch_sub(1, std::memory_order_release); } +void DBImpl::AbortAllCompactions() { + InstrumentedMutexLock l(&mutex_); + + // Increment the abort counter to signal all compactions to abort + compaction_aborted_.fetch_add(1, std::memory_order_release); + + TEST_SYNC_POINT("DBImpl::AbortAllCompactions:FlagSet"); + + // Mark all manual compactions as canceled + for (const auto& manual_compaction : manual_compaction_dequeue_) { + manual_compaction->canceled = true; + } + + // Wake up any waiting compaction threads to check the abort signal + bg_cv_.SignalAll(); + + // Wait for all running compactions (both manual and automatic) to finish + // or abort before returning. + // Note: bg_cv_.Wait() releases the mutex while waiting, so other threads + // can make progress and signal when compactions complete. + while (bg_bottom_compaction_scheduled_ > 0 || bg_compaction_scheduled_ > 0 || + HasPendingManualCompaction()) { + bg_cv_.Wait(); + } +} + +void DBImpl::ResumeAllCompactions() { + InstrumentedMutexLock l(&mutex_); + int before = compaction_aborted_.load(std::memory_order_acquire); + + // Guard against calling Resume without prior Abort + if (before <= 0) { + ROCKS_LOG_WARN(immutable_db_options_.info_log, + "ResumeAllCompactions called without prior " + "AbortAllCompactions (counter=%d)", + before); + return; + } + + // Decrement the abort counter + compaction_aborted_.fetch_sub(1, std::memory_order_release); + + // As the operation is executed under db mutex, we could just use before value + // to calculate the current value. + int current = before - 1; + ROCKS_LOG_INFO(immutable_db_options_.info_log, + "ResumeAllCompactions: counter %d -> %d", before, current); + + // If this is the last resume call (abort counter back to 0), schedule + // compactions that may have been waiting + if (current == 0) { + MaybeScheduleFlushOrCompaction(); + } +} + void DBImpl::MaybeScheduleFlushOrCompaction() { mutex_.AssertHeld(); TEST_SYNC_POINT("DBImpl::MaybeScheduleFlushOrCompaction:Start"); @@ -2994,6 +3086,9 @@ void DBImpl::MaybeScheduleFlushOrCompaction() { if (bg_compaction_paused_ > 0) { // we paused the background compaction return; + } else if (compaction_aborted_.load(std::memory_order_acquire) > 0) { + // we are aborting all compactions + return; } else if (error_handler_.IsBGWorkStopped()) { // Compaction is not part of the recovery sequence from a hard error. We // might get here because recovery might do a flush and install a new @@ -3531,7 +3626,8 @@ void DBImpl::BackgroundCallCompaction(PrepickedCompaction* prepicked_compaction, 10000); // prevent hot loop mutex_.Lock(); } else if (!s.ok() && !s.IsShutdownInProgress() && - !s.IsManualCompactionPaused() && !s.IsColumnFamilyDropped()) { + !s.IsManualCompactionPaused() && !s.IsColumnFamilyDropped() && + !s.IsCompactionAborted()) { // Wait a little bit before retrying background compaction in // case this is an environmental problem and we do not want to // chew up resources for failed compactions for the duration of @@ -3563,6 +3659,7 @@ void DBImpl::BackgroundCallCompaction(PrepickedCompaction* prepicked_compaction, // case of a failure). Thus, we force full scan in FindObsoleteFiles() FindObsoleteFiles(&job_context, !s.ok() && !s.IsShutdownInProgress() && !s.IsManualCompactionPaused() && + !s.IsCompactionAborted() && !s.IsColumnFamilyDropped() && !s.IsBusy()); TEST_SYNC_POINT("DBImpl::BackgroundCallCompaction:FoundObsoleteFiles"); @@ -3667,6 +3764,8 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, if (!error_handler_.IsBGWorkStopped()) { if (shutting_down_.load(std::memory_order_acquire)) { status = Status::ShutdownInProgress(); + } else if (compaction_aborted_.load(std::memory_order_acquire) > 0) { + status = Status::Incomplete(Status::SubCode::kCompactionAborted); } else if (is_manual && manual_compaction->canceled.load(std::memory_order_acquire)) { status = Status::Incomplete(Status::SubCode::kManualCompactionPaused); @@ -4283,8 +4382,9 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, &compaction_job_stats, thread_pri, io_tracer_, is_manual ? manual_compaction->canceled : kManualCompactionCanceledFalse_, - db_id_, db_session_id_, c->column_family_data()->GetFullHistoryTsLow(), - c->trim_ts(), &blob_callback_, &bg_compaction_scheduled_, + compaction_aborted_, db_id_, db_session_id_, + c->column_family_data()->GetFullHistoryTsLow(), c->trim_ts(), + &blob_callback_, &bg_compaction_scheduled_, &bg_bottom_compaction_scheduled_); compaction_job.Prepare(std::nullopt /*subcompact to be computed*/); @@ -4367,7 +4467,7 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, } if (status.ok() || status.IsCompactionTooLarge() || - status.IsManualCompactionPaused()) { + status.IsManualCompactionPaused() || status.IsCompactionAborted()) { // Done } else if (status.IsColumnFamilyDropped() || status.IsShutdownInProgress()) { // Ignore compaction errors found during shutting down @@ -4630,6 +4730,7 @@ void DBImpl::BuildCompactionJobInfo( compaction_job_info->cf_id = cfd->GetID(); compaction_job_info->cf_name = cfd->GetName(); compaction_job_info->status = st; + compaction_job_info->aborted = st.IsCompactionAborted(); compaction_job_info->thread_id = env_->GetThreadID(); compaction_job_info->job_id = job_id; compaction_job_info->base_input_level = c->start_level(); diff --git a/db/db_test.cc b/db/db_test.cc index 7456679a152a..7909763ed0a5 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -3691,6 +3691,8 @@ class ModelDB : public DB { void EnableManualCompaction() override {} void DisableManualCompaction() override {} + void AbortAllCompactions() override {} + void ResumeAllCompactions() override {} Status WaitForCompact( const WaitForCompactOptions& /* wait_for_compact_options */) override { @@ -6484,7 +6486,8 @@ TEST_P(DBTestWithParam, CompactionTotalTimeTest) { ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr)); // Hard-coded number in CompactionJob::ProcessKeyValueCompaction(). - const int kRecordStatsEvery = 1000; + // Uses 1024 (power of 2) for efficient bitwise check. + const int kRecordStatsEvery = 1024; // The stat COMPACTION_CPU_TOTAL_TIME should be recorded // during compaction and once more after compaction. ASSERT_EQ(n / kRecordStatsEvery + 1, record_count); diff --git a/db/internal_stats.cc b/db/internal_stats.cc index c25f7c589b1f..6b2d75385ba4 100644 --- a/db/internal_stats.cc +++ b/db/internal_stats.cc @@ -310,6 +310,7 @@ static const std::string aggregated_table_properties_at_level = static const std::string num_running_compactions = "num-running-compactions"; static const std::string num_running_compaction_sorted_runs = "num-running-compaction-sorted-runs"; +static const std::string compaction_abort_count = "compaction-abort-count"; static const std::string num_running_flushes = "num-running-flushes"; static const std::string actual_delayed_write_rate = "actual-delayed-write-rate"; @@ -362,6 +363,8 @@ const std::string DB::Properties::kNumRunningCompactions = rocksdb_prefix + num_running_compactions; const std::string DB::Properties::kNumRunningCompactionSortedRuns = rocksdb_prefix + num_running_compaction_sorted_runs; +const std::string DB::Properties::kCompactionAbortCount = + rocksdb_prefix + compaction_abort_count; const std::string DB::Properties::kNumRunningFlushes = rocksdb_prefix + num_running_flushes; const std::string DB::Properties::kBackgroundErrors = @@ -594,6 +597,9 @@ const UnorderedMap {DB::Properties::kNumRunningCompactionSortedRuns, {false, nullptr, &InternalStats::HandleNumRunningCompactionSortedRuns, nullptr, nullptr}}, + {DB::Properties::kCompactionAbortCount, + {false, nullptr, &InternalStats::HandleCompactionAbortCount, nullptr, + nullptr}}, {DB::Properties::kActualDelayedWriteRate, {false, nullptr, &InternalStats::HandleActualDelayedWriteRate, nullptr, nullptr}}, @@ -1292,6 +1298,13 @@ bool InternalStats::HandleNumRunningCompactionSortedRuns(uint64_t* value, return true; } +bool InternalStats::HandleCompactionAbortCount(uint64_t* value, DBImpl* db, + Version* /*version*/) { + *value = static_cast( + db->compaction_aborted_.load(std::memory_order_acquire)); + return true; +} + bool InternalStats::HandleBackgroundErrors(uint64_t* value, DBImpl* /*db*/, Version* /*version*/) { // Accumulated number of errors in background flushes or compactions. diff --git a/db/internal_stats.h b/db/internal_stats.h index a1b4fbe6c555..347b3a617aae 100644 --- a/db/internal_stats.h +++ b/db/internal_stats.h @@ -852,6 +852,8 @@ class InternalStats { Version* version); bool HandleNumRunningCompactionSortedRuns(uint64_t* value, DBImpl* db, Version* version); + bool HandleCompactionAbortCount(uint64_t* value, DBImpl* db, + Version* version); bool HandleBackgroundErrors(uint64_t* value, DBImpl* db, Version* version); bool HandleCurSizeActiveMemTable(uint64_t* value, DBImpl* db, Version* version); diff --git a/db_stress_tool/db_stress_common.h b/db_stress_tool/db_stress_common.h index 2768a1eff1df..eca5656204f1 100644 --- a/db_stress_tool/db_stress_common.h +++ b/db_stress_tool/db_stress_common.h @@ -218,6 +218,7 @@ DECLARE_int32(reset_stats_one_in); DECLARE_int32(pause_background_one_in); DECLARE_int32(disable_file_deletions_one_in); DECLARE_int32(disable_manual_compaction_one_in); +DECLARE_int32(abort_and_resume_compactions_one_in); DECLARE_int32(compact_range_width); DECLARE_int32(acquire_snapshot_one_in); DECLARE_bool(compare_full_db_state_snapshot); diff --git a/db_stress_tool/db_stress_gflags.cc b/db_stress_tool/db_stress_gflags.cc index dead587f5945..0678609ecaa8 100644 --- a/db_stress_tool/db_stress_gflags.cc +++ b/db_stress_tool/db_stress_gflags.cc @@ -796,6 +796,10 @@ DEFINE_int32( "If non-zero, then DisableManualCompaction()+Enable will be called " "once for every N ops on average. 0 disables."); +DEFINE_int32(abort_and_resume_compactions_one_in, 0, + "If non-zero, then AbortAllCompactions()+Resume will be called " + "once for every N ops on average. 0 disables."); + DEFINE_int32(compact_range_width, 10000, "The width of the ranges passed to CompactRange()."); diff --git a/db_stress_tool/db_stress_test_base.cc b/db_stress_tool/db_stress_test_base.cc index e826b2552a78..cbce41a89d2e 100644 --- a/db_stress_tool/db_stress_test_base.cc +++ b/db_stress_tool/db_stress_test_base.cc @@ -1271,6 +1271,11 @@ void StressTest::OperateDb(ThreadState* thread) { ProcessStatus(shared, "TestDisableManualCompaction", status); } + if (thread->rand.OneInOpt(FLAGS_abort_and_resume_compactions_one_in)) { + Status status = TestAbortAndResumeCompactions(thread); + ProcessStatus(shared, "TestAbortAndResumeCompactions", status); + } + if (thread->rand.OneInOpt(FLAGS_verify_checksum_one_in)) { ThreadStatusUtil::SetEnableTracking(FLAGS_enable_thread_tracking); ThreadStatusUtil::SetThreadOperation( @@ -3047,8 +3052,9 @@ void StressTest::TestCompactFiles(ThreadState* thread, // TOOD (hx235): allow an exact list of tolerable failures under stress // test bool non_ok_status_allowed = - s.IsManualCompactionPaused() || IsErrorInjectedAndRetryable(s) || - s.IsAborted() || s.IsInvalidArgument() || s.IsNotSupported(); + s.IsManualCompactionPaused() || s.IsCompactionAborted() || + IsErrorInjectedAndRetryable(s) || s.IsAborted() || + s.IsInvalidArgument() || s.IsNotSupported(); if (!non_ok_status_allowed) { fprintf(stderr, "Unable to perform CompactFiles(): %s under specified " @@ -3141,6 +3147,20 @@ Status StressTest::TestDisableManualCompaction(ThreadState* thread) { return Status::OK(); } +Status StressTest::TestAbortAndResumeCompactions(ThreadState* thread) { + // Abort all running compactions and prevent new ones from starting + db_->AbortAllCompactions(); + // Sleep to allow other threads to attempt operations while aborted + // Uses same sleep pattern as TestPauseBackground and + // TestDisableManualCompaction + int pwr2_micros = + std::min(thread->rand.Uniform(25), thread->rand.Uniform(25)); + clock_->SleepForMicroseconds(1 << pwr2_micros); + // Resume compactions + db_->ResumeAllCompactions(); + return Status::OK(); +} + void StressTest::TestAcquireSnapshot(ThreadState* thread, int rand_column_family, const std::string& keystr, uint64_t i) { @@ -3316,7 +3336,7 @@ void StressTest::TestCompactRange(ThreadState* thread, int64_t rand_key, if (!status.ok()) { // TOOD (hx235): allow an exact list of tolerable failures under stress test bool non_ok_status_allowed = - status.IsManualCompactionPaused() || + status.IsManualCompactionPaused() || status.IsCompactionAborted() || IsErrorInjectedAndRetryable(status) || status.IsAborted() || status.IsInvalidArgument() || status.IsNotSupported(); if (!non_ok_status_allowed) { diff --git a/db_stress_tool/db_stress_test_base.h b/db_stress_tool/db_stress_test_base.h index da1589be541a..3e8bc2af0def 100644 --- a/db_stress_tool/db_stress_test_base.h +++ b/db_stress_tool/db_stress_test_base.h @@ -332,6 +332,8 @@ class StressTest { Status TestDisableManualCompaction(ThreadState* thread); + Status TestAbortAndResumeCompactions(ThreadState* thread); + void TestAcquireSnapshot(ThreadState* thread, int rand_column_family, const std::string& keystr, uint64_t i); diff --git a/include/rocksdb/db.h b/include/rocksdb/db.h index 2abb7eb02513..8b4be252cfd9 100644 --- a/include/rocksdb/db.h +++ b/include/rocksdb/db.h @@ -1253,6 +1253,10 @@ class DB { // sorted runs being processed by currently running compactions. static const std::string kNumRunningCompactionSortedRuns; + // "rocksdb.compaction-abort-count" - returns the current value of the + // compaction abort counter. + static const std::string kCompactionAbortCount; + // "rocksdb.background-errors" - returns accumulated number of background // errors. static const std::string kBackgroundErrors; @@ -1731,6 +1735,46 @@ class DB { // DisableManualCompaction() has been called. virtual void EnableManualCompaction() = 0; + // Abort all compaction work/jobs. This function will signal all + // running compactions (both automatic and manual, background and foreground) + // to abort and will wait for them to finish or abort before returning. After + // this function returns, new compaction work will be aborted immediately + // until ResumeAllCompactions() is called. + // + // The compaction abort is checked periodically (every 1000 keys processed), + // so ongoing compactions should abort as well within a reasonable time. + // This function blocks until all compactions have completed or aborted. + // + // Any output files from aborted compactions are automatically cleaned up, + // ensuring no partial compaction results are installed, except for resumable + // compaction. + // + // This function supports concurrent abort requests from multiple callers + // without coordination between them. The call count is tracked, and + // compactions only resume after the number of ResumeAllCompactions() calls + // matches number of AbortAllCompactions() calls. + // + // Differences with other compaction control APIs: + // - DisableManualCompaction(): Only pauses manual compactions, waits for + // them to finish naturally. AbortAllCompactions() actively cancels both + // automatic and manual compactions. + // - PauseBackgroundWork(): Pauses all background work (flush + compaction), + // waits for work to finish naturally. AbortAllCompactions() only affects + // compactions and actively cancels them. + // + // Note: Compaction service (remote compaction) is not currently supported. + // Aborted compactions return Status::Incomplete with subcode + // kCompactionAborted. + virtual void AbortAllCompactions() = 0; + + // Resume all compactions that were aborted by AbortAllCompactions(). + // This function must be called as many times as AbortAllCompactions() + // has been called in order to resume compactions. This reference-counting + // behavior ensures that if multiple callers independently request an + // abort, compactions will not resume until all of them have called + // ResumeAllCompactions(). + virtual void ResumeAllCompactions() = 0; + // Wait for all flush and compactions jobs to finish. Jobs to wait include the // unscheduled (queued, but not scheduled yet). If the db is shutting down, // Status::ShutdownInProgress will be returned. diff --git a/include/rocksdb/listener.h b/include/rocksdb/listener.h index fe90a7b2ec94..1b41ca81f3d9 100644 --- a/include/rocksdb/listener.h +++ b/include/rocksdb/listener.h @@ -488,6 +488,9 @@ struct CompactionJobInfo { // Information about blob files deleted during compaction in Integrated // BlobDB. std::vector blob_file_garbage_infos; + + // Whether this compaction was aborted via AbortAllCompactions() + bool aborted = false; }; struct MemTableInfo { diff --git a/include/rocksdb/statistics.h b/include/rocksdb/statistics.h index bdffbbb25a03..ae4ef5792408 100644 --- a/include/rocksdb/statistics.h +++ b/include/rocksdb/statistics.h @@ -162,6 +162,8 @@ enum Tickers : uint32_t { COMPACTION_OPTIMIZED_DEL_DROP_OBSOLETE, // If a compaction was canceled in sfm to prevent ENOSPC COMPACTION_CANCELLED, + // Number of compactions aborted via AbortAllCompactions() + COMPACTION_ABORTED, // Number of keys written to the database via the Put and Write call's NUMBER_KEYS_WRITTEN, diff --git a/include/rocksdb/status.h b/include/rocksdb/status.h index afb9651faf27..c3eeb082c3ed 100644 --- a/include/rocksdb/status.h +++ b/include/rocksdb/status.h @@ -117,6 +117,7 @@ class Status { kMergeOperandThresholdExceeded = 16, kPrefetchLimitReached = 17, kNotExpectedCodePath = 18, + kCompactionAborted = 19, kMaxSubCode }; @@ -483,6 +484,13 @@ class Status { return (code() == kIncomplete) && (subcode() == kManualCompactionPaused); } + // Returns true iff the status indicates compaction aborted. This + // is caused by a call to AbortAllCompactions + bool IsCompactionAborted() const { + MarkChecked(); + return (code() == kIncomplete) && (subcode() == kCompactionAborted); + } + // Returns true iff the status indicates a TxnNotPrepared error. bool IsTxnNotPrepared() const { MarkChecked(); diff --git a/include/rocksdb/utilities/stackable_db.h b/include/rocksdb/utilities/stackable_db.h index 8cd4057fd553..f48acb2433db 100644 --- a/include/rocksdb/utilities/stackable_db.h +++ b/include/rocksdb/utilities/stackable_db.h @@ -375,6 +375,8 @@ class StackableDB : public DB { void DisableManualCompaction() override { return db_->DisableManualCompaction(); } + void AbortAllCompactions() override { return db_->AbortAllCompactions(); } + void ResumeAllCompactions() override { return db_->ResumeAllCompactions(); } Status WaitForCompact( const WaitForCompactOptions& wait_for_compact_options) override { diff --git a/java/rocksjni/rocksjni.cc b/java/rocksjni/rocksjni.cc index 57272170c326..af47c0e95644 100644 --- a/java/rocksjni/rocksjni.cc +++ b/java/rocksjni/rocksjni.cc @@ -2978,6 +2978,28 @@ void Java_org_rocksdb_RocksDB_continueBackgroundWork(JNIEnv* env, jclass, } } +/* + * Class: org_rocksdb_RocksDB + * Method: abortAllCompactions + * Signature: (J)V + */ +void Java_org_rocksdb_RocksDB_abortAllCompactions(JNIEnv*, jclass, + jlong jdb_handle) { + auto* db = reinterpret_cast(jdb_handle); + db->AbortAllCompactions(); +} + +/* + * Class: org_rocksdb_RocksDB + * Method: resumeAllCompactions + * Signature: (J)V + */ +void Java_org_rocksdb_RocksDB_resumeAllCompactions(JNIEnv*, jclass, + jlong jdb_handle) { + auto* db = reinterpret_cast(jdb_handle); + db->ResumeAllCompactions(); +} + /* * Class: org_rocksdb_RocksDB * Method: enableAutoCompaction diff --git a/java/src/main/java/org/rocksdb/RocksDB.java b/java/src/main/java/org/rocksdb/RocksDB.java index fe2f38af64f9..ebe134726982 100644 --- a/java/src/main/java/org/rocksdb/RocksDB.java +++ b/java/src/main/java/org/rocksdb/RocksDB.java @@ -4084,6 +4084,23 @@ public void continueBackgroundWork() throws RocksDBException { continueBackgroundWork(nativeHandle_); } + /** + * Abort all running and pending compaction jobs. This method will signal + * all active compactions to terminate and wait for them to complete. + * No new compactions will be scheduled until {@link #resumeAllCompactions()} is called. + */ + public void abortAllCompactions() { + abortAllCompactions(nativeHandle_); + } + + /** + * Resume compaction scheduling after {@link #abortAllCompactions()} was called. + * Must be called the same number of times as {@link #abortAllCompactions()}. + */ + public void resumeAllCompactions() { + resumeAllCompactions(nativeHandle_); + } + /** * Enable automatic compactions for the given column * families if they were previously disabled. @@ -5036,6 +5053,8 @@ private static native String[] compactFiles(final long handle, final long compac private static native void cancelAllBackgroundWork(final long handle, final boolean wait); private static native void pauseBackgroundWork(final long handle) throws RocksDBException; private static native void continueBackgroundWork(final long handle) throws RocksDBException; + private static native void abortAllCompactions(final long handle); + private static native void resumeAllCompactions(final long handle); private static native void enableAutoCompaction( final long handle, final long[] columnFamilyHandles) throws RocksDBException; private static native int numberLevels(final long handle, final long columnFamilyHandle); diff --git a/monitoring/statistics.cc b/monitoring/statistics.cc index 231e5b400288..ccc92bcb6152 100644 --- a/monitoring/statistics.cc +++ b/monitoring/statistics.cc @@ -93,6 +93,7 @@ const std::vector> TickersNameMap = { {COMPACTION_OPTIMIZED_DEL_DROP_OBSOLETE, "rocksdb.compaction.optimized.del.drop.obsolete"}, {COMPACTION_CANCELLED, "rocksdb.compaction.cancelled"}, + {COMPACTION_ABORTED, "rocksdb.compaction.aborted"}, {NUMBER_KEYS_WRITTEN, "rocksdb.number.keys.written"}, {NUMBER_KEYS_READ, "rocksdb.number.keys.read"}, {NUMBER_KEYS_UPDATED, "rocksdb.number.keys.updated"}, diff --git a/monitoring/stats_history_test.cc b/monitoring/stats_history_test.cc index ee29bd20921a..f98917a5f4a3 100644 --- a/monitoring/stats_history_test.cc +++ b/monitoring/stats_history_test.cc @@ -185,7 +185,7 @@ TEST_F(StatsHistoryTest, GetStatsHistoryInMemory) { TEST_F(StatsHistoryTest, InMemoryStatsHistoryPurging) { constexpr int kPeriodSec = 1; - constexpr int kEstimatedOneSliceSize = 22000; + constexpr int kEstimatedOneSliceSize = 22100; Options options; options.create_if_missing = true; diff --git a/src.mk b/src.mk index fc54f2804f90..0bae5ee333fd 100644 --- a/src.mk +++ b/src.mk @@ -490,6 +490,7 @@ TEST_MAIN_SOURCES = \ db/db_basic_test.cc \ db/db_block_cache_test.cc \ db/db_bloom_filter_test.cc \ + db/db_compaction_abort_test.cc \ db/db_compaction_filter_test.cc \ db/db_compaction_test.cc \ db/db_clip_test.cc \ diff --git a/tools/db_crashtest.py b/tools/db_crashtest.py index 30dd435980af..707fdd27b594 100644 --- a/tools/db_crashtest.py +++ b/tools/db_crashtest.py @@ -199,6 +199,7 @@ def apply_random_seed_per_iteration(): "pause_background_one_in": lambda: random.choice([10000, 1000000]), "disable_file_deletions_one_in": lambda: random.choice([10000, 1000000]), "disable_manual_compaction_one_in": lambda: random.choice([10000, 1000000]), + "abort_and_resume_compactions_one_in": lambda: random.choice([10000, 1000000]), "prefix_size": lambda: random.choice([-1, 1, 5, 7, 8]), "prefixpercent": 5, "progress_reports": 0, diff --git a/unreleased_history/public_api_changes/abort_compaction_apis.md b/unreleased_history/public_api_changes/abort_compaction_apis.md new file mode 100644 index 000000000000..d55882b3935d --- /dev/null +++ b/unreleased_history/public_api_changes/abort_compaction_apis.md @@ -0,0 +1 @@ +Added new virtual methods `AbortAllCompactions()` and `ResumeAllCompactions()` to the `DB` class. Added new `Status::SubCode::kCompactionAborted` to indicate a compaction was aborted. Added `Status::IsCompactionAborted()` helper method to check if a status represents an aborted compaction. diff --git a/util/status.cc b/util/status.cc index 56d62b66190a..cf9e59e96757 100644 --- a/util/status.cc +++ b/util/status.cc @@ -46,7 +46,9 @@ static const char* msgs[static_cast(Status::kMaxSubCode)] = { "IO fenced off", // kIOFenced "Merge operator failed", // kMergeOperatorFailed "Number of operands merged exceeded threshold", // kMergeOperandThresholdExceeded - "MultiScan reached file prefetch limit", // kMultiScanPrefetchLimit + "MultiScan reached file prefetch limit", // kPrefetchLimitReached + "Not expected code path", // kNotExpectedCodePath + "All compactions aborted", // kCompactionAborted }; Status::Status(Code _code, SubCode _subcode, const Slice& msg,