Skip to content

Commit d658954

Browse files
author
Duc Hieu Pham
committed
Make add and remove pluggable compaction service thread-safe
Summary: We just need to add and remove under the lock, as well as make a copy of remote compaction service under the lock during creation of a CompactionJob Test Plan: unit test still passes Reviewers: dhruba, #platform Reviewed By: dhruba, #platform Subscribers: joe Differential Revision: https://rockset.phacility.com/D5827
1 parent 368e0bb commit d658954

File tree

2 files changed

+14
-8
lines changed

2 files changed

+14
-8
lines changed

db/db_impl/db_impl.h

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -828,14 +828,19 @@ class DBImpl : public DB {
828828
bool sanitize) override;
829829

830830
// This registered service will be called to do a remote compaction
831-
virtual Status RegisterPluggableCompactionService(std::unique_ptr<PluggableCompactionService> rservice) override {
831+
virtual Status RegisterPluggableCompactionService(
832+
std::unique_ptr<PluggableCompactionService> rservice) override {
833+
mutex_.Lock();
832834
remote_compaction_service_ = std::move(rservice);
835+
mutex_.Unlock();
833836
return Status::OK();
834837
}
835838

836839
// Clearoff any registered pluggable compaction service
837840
virtual void UnRegisterPluggableCompactionService() override {
838-
remote_compaction_service_.reset(nullptr);
841+
mutex_.Lock();
842+
remote_compaction_service_ = nullptr;
843+
mutex_.Unlock();
839844
}
840845

841846
// Print information of all tombstones of all iterators to the std::string
@@ -1973,7 +1978,8 @@ class DBImpl : public DB {
19731978
InstrumentedCondVar atomic_flush_install_cv_;
19741979

19751980
// The pluggable compaction service, if registered.
1976-
std::unique_ptr<PluggableCompactionService> remote_compaction_service_;
1981+
// Protected by mutex_
1982+
std::shared_ptr<PluggableCompactionService> remote_compaction_service_;
19771983

19781984
Status doCompact(const CompactionOptions& compact_options,
19791985
ColumnFamilyData* cfd,

db/db_impl/db_impl_compaction_flush.cc

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2750,15 +2750,15 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
27502750
NotifyOnCompactionBegin(c->column_family_data(), c.get(), status,
27512751
compaction_job_stats, job_context->job_id);
27522752

2753+
// Make a copy of remote compaction service here, because it could be
2754+
// cleared once mutex_ is unlocked.
2755+
auto remote_compaction_service = remote_compaction_service_;
27532756
mutex_.Unlock();
27542757
TEST_SYNC_POINT_CALLBACK(
27552758
"DBImpl::BackgroundCompaction:NonTrivial:BeforeRun", nullptr);
27562759

2757-
// This check is not thread safe. XXX
2758-
bool remote = (remote_compaction_service_ != nullptr);
2759-
2760-
if (remote) {
2761-
compaction_job.RunRemote(remote_compaction_service_.get());
2760+
if (remote_compaction_service) {
2761+
compaction_job.RunRemote(remote_compaction_service.get());
27622762
} else {
27632763
compaction_job.Run();
27642764
}

0 commit comments

Comments
 (0)