Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions BUCK
Original file line number Diff line number Diff line change
Expand Up @@ -4823,6 +4823,12 @@ cpp_unittest_wrapper(name="db_clip_test",
extra_compiler_flags=[])


cpp_unittest_wrapper(name="db_compaction_abort_test",
srcs=["db/db_compaction_abort_test.cc"],
deps=[":rocksdb_test_lib"],
extra_compiler_flags=[])


cpp_unittest_wrapper(name="db_compaction_filter_test",
srcs=["db/db_compaction_filter_test.cc"],
deps=[":rocksdb_test_lib"],
Expand Down
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -1381,6 +1381,7 @@ if(WITH_TESTS)
db/db_bloom_filter_test.cc
db/db_compaction_filter_test.cc
db/db_compaction_test.cc
db/db_compaction_abort_test.cc
db/db_clip_test.cc
db/db_dynamic_level_test.cc
db/db_encryption_test.cc
Expand Down
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -1442,6 +1442,9 @@ db_compaction_filter_test: $(OBJ_DIR)/db/db_compaction_filter_test.o $(TEST_LIBR
db_compaction_test: $(OBJ_DIR)/db/db_compaction_test.o $(TEST_LIBRARY) $(LIBRARY)
$(AM_LINK)

db_compaction_abort_test: $(OBJ_DIR)/db/db_compaction_abort_test.o $(TEST_LIBRARY) $(LIBRARY)
$(AM_LINK)

db_clip_test: $(OBJ_DIR)/db/db_clip_test.o $(TEST_LIBRARY) $(LIBRARY)
$(AM_LINK)

Expand Down
8 changes: 8 additions & 0 deletions db/c.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8570,6 +8570,14 @@ void rocksdb_enable_manual_compaction(rocksdb_t* db) {
db->rep->EnableManualCompaction();
}

void rocksdb_abort_all_compactions(rocksdb_t* db) {
db->rep->AbortAllCompactions();
}

void rocksdb_resume_all_compactions(rocksdb_t* db) {
db->rep->ResumeAllCompactions();
}

rocksdb_statistics_histogram_data_t*
rocksdb_statistics_histogram_data_create() {
return new rocksdb_statistics_histogram_data_t{};
Expand Down
2 changes: 1 addition & 1 deletion db/c_test.c
Original file line number Diff line number Diff line change
Expand Up @@ -4447,7 +4447,7 @@ int main(int argc, char** argv) {

StartPhase("statistics");
{
const uint32_t BYTES_WRITTEN_TICKER = 60;
const uint32_t BYTES_WRITTEN_TICKER = 61;
const uint32_t DB_WRITE_HIST = 1;

rocksdb_statistics_histogram_data_t* hist =
Expand Down
145 changes: 122 additions & 23 deletions db/compaction/compaction_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,10 @@ const char* GetCompactionProximalOutputRangeTypeString(
}
}

// Static constant for compaction abort flag - always false, used for
// compaction service jobs that don't support abort signaling
const std::atomic<int> CompactionJob::kCompactionAbortedFalse{0};

CompactionJob::CompactionJob(
int job_id, Compaction* compaction, const ImmutableDBOptions& db_options,
const MutableDBOptions& mutable_db_options, const FileOptions& file_options,
Expand All @@ -141,10 +145,10 @@ CompactionJob::CompactionJob(
CompactionJobStats* compaction_job_stats, Env::Priority thread_pri,
const std::shared_ptr<IOTracer>& io_tracer,
const std::atomic<bool>& manual_compaction_canceled,
const std::string& db_id, const std::string& db_session_id,
std::string full_history_ts_low, std::string trim_ts,
BlobFileCompletionCallback* blob_callback, int* bg_compaction_scheduled,
int* bg_bottom_compaction_scheduled)
const std::atomic<int>& compaction_aborted, const std::string& db_id,
const std::string& db_session_id, std::string full_history_ts_low,
std::string trim_ts, BlobFileCompletionCallback* blob_callback,
int* bg_compaction_scheduled, int* bg_bottom_compaction_scheduled)
: compact_(new CompactionState(compaction)),
internal_stats_(compaction->compaction_reason(), 1),
db_options_(db_options),
Expand All @@ -168,6 +172,7 @@ CompactionJob::CompactionJob(
versions_(versions),
shutting_down_(shutting_down),
manual_compaction_canceled_(manual_compaction_canceled),
compaction_aborted_(compaction_aborted),
db_directory_(db_directory),
blob_output_directory_(blob_output_directory),
db_mutex_(db_mutex),
Expand Down Expand Up @@ -708,6 +713,7 @@ void CompactionJob::InitializeCompactionRun() {
}

void CompactionJob::RunSubcompactions() {
TEST_SYNC_POINT("CompactionJob::RunSubcompactions:BeforeStart");
const size_t num_threads = compact_->sub_compact_states.size();
assert(num_threads > 0);
compact_->compaction->GetOrInitInputTableProperties();
Expand Down Expand Up @@ -753,6 +759,71 @@ void CompactionJob::RemoveEmptyOutputs() {
}
}

void CompactionJob::CleanupAbortedSubcompactions() {
ColumnFamilyData* cfd = compact_->compaction->column_family_data();

uint64_t total_sst_files_deleted = 0;
uint64_t total_blob_files_deleted = 0;

// Track the first file deletion error to report at the end
Status first_error;
int deletion_errors = 0;

// Mark all subcompactions as aborted and delete their output files
for (auto& sub_compact : compact_->sub_compact_states) {
// Mark this subcompaction as aborted
sub_compact.status =
Status::Incomplete(Status::SubCode::kCompactionAborted);

// Delete all files (SST and blob) tracked during compaction.
// GetOutputFilePaths() contains ALL file paths created, including
// in-progress files that may have been removed from outputs_ or
// blob_file_additions_.
for (const bool is_proximal_level : {false, true}) {
if (is_proximal_level &&
!compact_->compaction->SupportsPerKeyPlacement()) {
continue;
}
for (const std::string& file_path :
sub_compact.Outputs(is_proximal_level)->GetOutputFilePaths()) {
Status s = env_->DeleteFile(file_path);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't a compaction with non-ok status get automatically cleaned up? Why do we need to explicitly do the cleanup here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The normal cleanup path (Cleanup() in compaction_outputs.h lines 247-253) only abandons in-progress builders. This does not delete already-finished output files that were successfully written to disk.

When compaction runs with multiple subcompactions in parallel:

  1. Subcompaction A completes successfully → produces finished SST/blob files on disk
  2. Subcompaction B gets aborted (or the overall compaction is paused)

The overall compaction status becomes CompactionAborted or ManualCompactionPaused.
At this point, Subcompaction A's output files are fully written and finished on disk
But the overall compaction is aborted, so these files will never be installed to the LSM tree
Without explicit cleanup, these files become orphans on disk

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting. Thanks for the clarification. I think you're right. Is this a problem with subcompactions in general then? For example, if subcompaction B fails due to IO error, then there's no cleanup of subcompaction A's files. Not saying that needs to be addressed in this PR since its a separate issue, but is it something to be tracked?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did some more investigation around this. There is another function FindObsoleteFiles that scan directories to find files not in any live version, and perform clean up, on compaction or flush failure. We could rely on that for rare failure such as IO error. For abort operation, we could switch to that as well. However, it would break resumable compaction, as FindObsoleteFiles does not know a compaction is resumable or not.

if (s.ok()) {
// Count SST vs blob files by checking extension
if (file_path.find(".sst") != std::string::npos) {
total_sst_files_deleted++;
} else if (file_path.find(".blob") != std::string::npos) {
total_blob_files_deleted++;
}
} else if (!s.IsNotFound()) {
if (first_error.ok()) {
first_error = s;
}
deletion_errors++;
}
}
}
sub_compact.CleanupOutputs();
}

if (stats_) {
RecordTick(stats_, COMPACTION_ABORTED);
}

ROCKS_LOG_INFO(db_options_.info_log,
"[%s] [JOB %d] Compaction aborted: deleted %" PRIu64
" SST files and %" PRIu64 " blob files",
cfd->GetName().c_str(), job_id_, total_sst_files_deleted,
total_blob_files_deleted);

if (!first_error.ok()) {
ROCKS_LOG_ERROR(db_options_.info_log,
"[%s] [JOB %d] Cleanup completed with %d file deletion "
"errors. First error: %s",
cfd->GetName().c_str(), job_id_, deletion_errors,
first_error.ToString().c_str());
}
}

bool CompactionJob::HasNewBlobFiles() const {
for (const auto& state : compact_->sub_compact_states) {
if (state.Current().HasBlobFileAdditions()) {
Expand Down Expand Up @@ -1004,6 +1075,15 @@ Status CompactionJob::Run() {

Status status = CollectSubcompactionErrors();

// If compaction was aborted or manually paused, clean up any output files
// from completed subcompactions to prevent orphaned files on disk.
// Skip cleanup for resumable compaction (when progress writer is set)
// because the output files are needed for resumption.
if ((status.IsCompactionAborted() || status.IsManualCompactionPaused()) &&
compaction_progress_writer_ == nullptr) {
CleanupAbortedSubcompactions();
}

if (status.ok()) {
status = SyncOutputDirectories();
}
Expand Down Expand Up @@ -1415,10 +1495,10 @@ InternalIterator* CompactionJob::CreateInputIterator(
return input;
}

void CompactionJob::CreateBlobFileBuilder(SubcompactionState* sub_compact,
ColumnFamilyData* cfd,
BlobFileResources& blob_resources,
const WriteOptions& write_options) {
void CompactionJob::CreateBlobFileBuilder(
SubcompactionState* sub_compact, ColumnFamilyData* cfd,
std::unique_ptr<BlobFileBuilder>& blob_file_builder,
const WriteOptions& write_options) {
const auto& mutable_cf_options =
sub_compact->compaction->mutable_cf_options();

Expand All @@ -1427,24 +1507,24 @@ void CompactionJob::CreateBlobFileBuilder(SubcompactionState* sub_compact,
if (mutable_cf_options.enable_blob_files &&
sub_compact->compaction->output_level() >=
mutable_cf_options.blob_file_starting_level) {
blob_resources.blob_file_builder = std::make_unique<BlobFileBuilder>(
blob_file_builder = std::make_unique<BlobFileBuilder>(
versions_, fs_.get(), &sub_compact->compaction->immutable_options(),
&mutable_cf_options, &file_options_, &write_options, db_id_,
db_session_id_, job_id_, cfd->GetID(), cfd->GetName(), write_hint_,
io_tracer_, blob_callback_, BlobFileCreationReason::kCompaction,
&blob_resources.blob_file_paths,
sub_compact->Current().GetOutputFilePathsPtr(),
sub_compact->Current().GetBlobFileAdditionsPtr());
} else {
blob_resources.blob_file_builder = nullptr;
blob_file_builder = nullptr;
}
}

std::unique_ptr<CompactionIterator> CompactionJob::CreateCompactionIterator(
SubcompactionState* sub_compact, ColumnFamilyData* cfd,
InternalIterator* input, const CompactionFilter* compaction_filter,
MergeHelper& merge, BlobFileResources& blob_resources,
MergeHelper& merge, std::unique_ptr<BlobFileBuilder>& blob_file_builder,
const WriteOptions& write_options) {
CreateBlobFileBuilder(sub_compact, cfd, blob_resources, write_options);
CreateBlobFileBuilder(sub_compact, cfd, blob_file_builder, write_options);

const std::string* const full_history_ts_low =
full_history_ts_low_.empty() ? nullptr : &full_history_ts_low_;
Expand All @@ -1456,7 +1536,7 @@ std::unique_ptr<CompactionIterator> CompactionJob::CreateCompactionIterator(
job_context_->earliest_write_conflict_snapshot,
job_context_->GetJobSnapshotSequence(), job_context_->snapshot_checker,
env_, ShouldReportDetailedTime(env_, stats_), sub_compact->RangeDelAgg(),
blob_resources.blob_file_builder.get(), db_options_.allow_data_in_errors,
blob_file_builder.get(), db_options_.allow_data_in_errors,
db_options_.enforce_single_del_contracts, manual_compaction_canceled_,
sub_compact->compaction
->DoesInputReferenceBlobFiles() /* must_count_input_entries */,
Expand Down Expand Up @@ -1495,10 +1575,17 @@ Status CompactionJob::ProcessKeyValue(
SubcompactionState* sub_compact, ColumnFamilyData* cfd,
CompactionIterator* c_iter, const CompactionFileOpenFunc& open_file_func,
const CompactionFileCloseFunc& close_file_func, uint64_t& prev_cpu_micros) {
Status status;
const uint64_t kRecordStatsEvery = 1000;
// Cron interval for periodic operations: stats update, abort check,
// and sync points. Uses 1024 (power of 2) for efficient bitwise check.
const uint64_t kCronEveryMask = (1 << 10) - 1;
[[maybe_unused]] const std::optional<const Slice> end = sub_compact->end;

// Check for abort signal before starting key processing
if (compaction_aborted_.load(std::memory_order_acquire) > 0) {
return Status::Incomplete(Status::SubCode::kCompactionAborted);
}

Status status;
IterKey prev_iter_output_key;
ParsedInternalKey prev_iter_output_internal_key;

Expand All @@ -1511,8 +1598,16 @@ Status CompactionJob::ProcessKeyValue(
assert(!end.has_value() ||
cfd->user_comparator()->Compare(c_iter->user_key(), *end) < 0);

if (c_iter->iter_stats().num_input_records % kRecordStatsEvery ==
kRecordStatsEvery - 1) {
const uint64_t num_records = c_iter->iter_stats().num_input_records;

// Periodic cron operations: stats update, abort check.
if ((num_records & kCronEveryMask) == kCronEveryMask) {
// Check for abort signal periodically
if (compaction_aborted_.load(std::memory_order_acquire) > 0) {
status = Status::Incomplete(Status::SubCode::kCompactionAborted);
break;
}

UpdateSubcompactionJobStatsIncrementally(
c_iter, &sub_compact->compaction_job_stats,
db_options_.clock->CPUMicros(), prev_cpu_micros);
Expand Down Expand Up @@ -1719,6 +1814,7 @@ Status CompactionJob::FinalizeBlobFiles(SubcompactionState* sub_compact,
}

void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
TEST_SYNC_POINT("CompactionJob::ProcessKeyValueCompaction:Start");
assert(sub_compact);
assert(sub_compact->compaction);

Expand Down Expand Up @@ -1772,11 +1868,11 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
false /* internal key corruption is expected */,
job_context_->GetLatestSnapshotSequence(), job_context_->snapshot_checker,
compact_->compaction->level(), db_options_.stats);
BlobFileResources blob_resources;
std::unique_ptr<BlobFileBuilder> blob_file_builder;

auto c_iter =
CreateCompactionIterator(sub_compact, cfd, input_iter, compaction_filter,
merge, blob_resources, write_options);
merge, blob_file_builder, write_options);
assert(c_iter);
c_iter->SeekToFirst();

Expand All @@ -1794,9 +1890,8 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
status = FinalizeProcessKeyValueStatus(cfd, input_iter, c_iter.get(), status);

FinalizeSubcompaction(sub_compact, status, open_file_func, close_file_func,
blob_resources.blob_file_builder.get(), c_iter.get(),
input_iter, start_cpu_micros, prev_cpu_micros,
io_stats);
blob_file_builder.get(), c_iter.get(), input_iter,
start_cpu_micros, prev_cpu_micros, io_stats);

NotifyOnSubcompactionCompleted(sub_compact);
}
Expand Down Expand Up @@ -2295,6 +2390,10 @@ Status CompactionJob::OpenCompactionOutputFile(SubcompactionState* sub_compact,
Status s;
IOStatus io_s = NewWritableFile(fs_.get(), fname, &writable_file, fo_copy);
s = io_s;
if (io_s.ok()) {
// Track the SST file path for cleanup on abort.
outputs.AddOutputFilePath(fname);
}
if (sub_compact->io_status.ok()) {
sub_compact->io_status = io_s;
// Since this error is really a copy of the io_s that is checked below as s,
Expand Down
21 changes: 11 additions & 10 deletions db/compaction/compaction_job.h
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,9 @@ class SubcompactionState;

class CompactionJob {
public:
// Constant false aborted flag, used for compaction service jobs
static const std::atomic<int> kCompactionAbortedFalse;

CompactionJob(int job_id, Compaction* compaction,
const ImmutableDBOptions& db_options,
const MutableDBOptions& mutable_db_options,
Expand All @@ -157,6 +160,7 @@ class CompactionJob {
Env::Priority thread_pri,
const std::shared_ptr<IOTracer>& io_tracer,
const std::atomic<bool>& manual_compaction_canceled,
const std::atomic<int>& compaction_aborted,
const std::string& db_id = "",
const std::string& db_session_id = "",
std::string full_history_ts_low = "", std::string trim_ts = "",
Expand Down Expand Up @@ -299,6 +303,7 @@ class CompactionJob {
void RunSubcompactions();
void UpdateTimingStats(uint64_t start_micros);
void RemoveEmptyOutputs();
void CleanupAbortedSubcompactions();
bool HasNewBlobFiles() const;
Status CollectSubcompactionErrors();
Status SyncOutputDirectories();
Expand Down Expand Up @@ -363,11 +368,6 @@ class CompactionJob {
std::unique_ptr<InternalIterator> trim_history_iter;
};

struct BlobFileResources {
std::vector<std::string> blob_file_paths;
std::unique_ptr<BlobFileBuilder> blob_file_builder;
};

bool ShouldUseLocalCompaction(SubcompactionState* sub_compact);
CompactionIOStatsSnapshot InitializeIOStats();
Status SetupAndValidateCompactionFilter(
Expand All @@ -382,14 +382,14 @@ class CompactionJob {
SubcompactionState* sub_compact, ColumnFamilyData* cfd,
SubcompactionInternalIterators& iterators,
SubcompactionKeyBoundaries& boundaries, ReadOptions& read_options);
void CreateBlobFileBuilder(SubcompactionState* sub_compact,
ColumnFamilyData* cfd,
BlobFileResources& blob_resources,
const WriteOptions& write_options);
void CreateBlobFileBuilder(
SubcompactionState* sub_compact, ColumnFamilyData* cfd,
std::unique_ptr<BlobFileBuilder>& blob_file_builder,
const WriteOptions& write_options);
std::unique_ptr<CompactionIterator> CreateCompactionIterator(
SubcompactionState* sub_compact, ColumnFamilyData* cfd,
InternalIterator* input_iter, const CompactionFilter* compaction_filter,
MergeHelper& merge, BlobFileResources& blob_resources,
MergeHelper& merge, std::unique_ptr<BlobFileBuilder>& blob_file_builder,
const WriteOptions& write_options);
std::pair<CompactionFileOpenFunc, CompactionFileCloseFunc> CreateFileHandlers(
SubcompactionState* sub_compact, SubcompactionKeyBoundaries& boundaries);
Expand Down Expand Up @@ -461,6 +461,7 @@ class CompactionJob {
VersionSet* versions_;
const std::atomic<bool>* shutting_down_;
const std::atomic<bool>& manual_compaction_canceled_;
const std::atomic<int>& compaction_aborted_;
FSDirectory* db_directory_;
FSDirectory* blob_output_directory_;
InstrumentedMutex* db_mutex_;
Expand Down
Loading
Loading