Skip to content

Commit 0dc050b

Browse files
author
caijieming
committed
support multi-thread compaction
1 parent 343cfc7 commit 0dc050b

File tree

10 files changed

+656
-258
lines changed

10 files changed

+656
-258
lines changed

src/leveldb/db/db_impl.cc

Lines changed: 230 additions & 126 deletions
Large diffs are not rendered by default.

src/leveldb/db/db_impl.h

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,12 @@ class DBImpl : public DB {
9696
friend class DBTable;
9797
struct CompactionState;
9898
struct Writer;
99+
struct CompactionTask {
100+
int64_t id;
101+
double score;
102+
uint64_t timeout;
103+
DBImpl* db;
104+
};
99105

100106
Iterator* NewInternalIterator(const ReadOptions&,
101107
SequenceNumber* latest_snapshot);
@@ -110,19 +116,19 @@ class DBImpl : public DB {
110116

111117
// Compact the in-memory write buffer to disk. Switches to a new
112118
// log-file/memtable and writes a new descriptor iff successful.
113-
Status CompactMemTable()
119+
Status CompactMemTable(bool* sched_idle = NULL)
114120
EXCLUSIVE_LOCKS_REQUIRED(mutex_);
115121

116-
Status WriteLevel0Table(MemTable* mem, VersionEdit* edit, Version* base)
122+
Status WriteLevel0Table(MemTable* mem, VersionEdit* edit, Version* base, uint64_t* number = NULL)
117123
EXCLUSIVE_LOCKS_REQUIRED(mutex_);
118124

119125
Status MakeRoomForWrite(bool force /* compact even if there is room? */)
120126
EXCLUSIVE_LOCKS_REQUIRED(mutex_);
121127

122128
void MaybeScheduleCompaction() EXCLUSIVE_LOCKS_REQUIRED(mutex_);
123129
static void BGWork(void* db);
124-
void BackgroundCall();
125-
Status BackgroundCompaction() EXCLUSIVE_LOCKS_REQUIRED(mutex_);
130+
void BackgroundCall(CompactionTask* task);
131+
Status BackgroundCompaction(bool* sched_idle) EXCLUSIVE_LOCKS_REQUIRED(mutex_);
126132
void CleanupCompaction(CompactionState* compact)
127133
EXCLUSIVE_LOCKS_REQUIRED(mutex_);
128134
Status DoCompactionWork(CompactionState* compact)
@@ -197,18 +203,26 @@ class DBImpl : public DB {
197203
std::set<uint64_t> pending_outputs_;
198204

199205
// Has a background compaction been scheduled or is running?
200-
bool bg_compaction_scheduled_;
201-
double bg_compaction_score_;
202-
uint64_t bg_compaction_timeout_;
203-
int64_t bg_schedule_id_;
206+
std::vector<CompactionTask*> bg_compaction_tasks_;
207+
int bg_compaction_scheduled_;
208+
std::vector<double> bg_compaction_score_;
209+
std::vector<int64_t> bg_schedule_id_;
210+
uint64_t bg_compaction_timeout_; // the lastest timeout compaction
204211

205212
// Information for a manual compaction
213+
enum ManualCompactState {
214+
kManualCompactIdle,
215+
kManualCompactConflict,
216+
kManualCompactWakeup,
217+
};
206218
struct ManualCompaction {
207219
int level;
208220
bool done;
221+
bool being_sched;
209222
const InternalKey* begin; // NULL means beginning of key range
210223
const InternalKey* end; // NULL means end of key range
211224
InternalKey tmp_storage; // Used to keep track of compaction progress
225+
ManualCompactState compaction_conflict; // 0 == idle, 1 == conflict, 2 == wake
212226
};
213227
ManualCompaction* manual_compaction_;
214228

src/leveldb/db/db_table.cc

Lines changed: 0 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -301,22 +301,6 @@ Status DBTable::Init() {
301301
uint32_t i = *it;
302302
DBImpl* impl = lg_list_[i];
303303
s = impl->RecoverLastDumpToLevel0(lg_edits[i]);
304-
305-
// LogAndApply to lg's manifest
306-
if (s.ok()) {
307-
MutexLock lock(&impl->mutex_);
308-
s = impl->versions_->LogAndApply(lg_edits[i], &impl->mutex_);
309-
if (s.ok()) {
310-
impl->DeleteObsoleteFiles();
311-
impl->MaybeScheduleCompaction();
312-
} else {
313-
Log(options_.info_log, "[%s] Fail to modify manifest of lg %d",
314-
dbname_.c_str(),
315-
i);
316-
}
317-
} else {
318-
Log(options_.info_log, "[%s] Fail to dump log to level 0", dbname_.c_str());
319-
}
320304
delete lg_edits[i];
321305
}
322306

@@ -925,7 +909,6 @@ Status DBTable::RecoverLogFile(uint64_t log_number, uint64_t recover_limit,
925909
}
926910
}
927911
delete file;
928-
929912
return status;
930913
}
931914

src/leveldb/db/memtable.cc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ MemTable::MemTable(const InternalKeyComparator& cmp, CompactStrategyFactory* com
2626
: last_seq_(0),
2727
comparator_(cmp),
2828
refs_(0),
29+
being_flushed_(false),
2930
table_(comparator_, &arena_),
3031
empty_(true),
3132
compact_strategy_factory_(compact_strategy_factory) {

src/leveldb/db/memtable.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,13 @@ class MemTable {
7979
empty_ = false;
8080
}
8181

82+
bool BeingFlushed() { return being_flushed_;}
83+
void SetBeingFlushed(bool flag) {
84+
assert(flag ? !being_flushed_
85+
: being_flushed_);
86+
being_flushed_ = flag;
87+
}
88+
8289
virtual ~MemTable();
8390

8491
protected:
@@ -97,6 +104,7 @@ class MemTable {
97104

98105
KeyComparator comparator_;
99106
int refs_;
107+
bool being_flushed_;
100108

101109
Arena arena_;
102110
Table table_;

src/leveldb/db/version_edit.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ struct FileMetaData {
3333
InternalKey largest; // Largest internal key served by table
3434
bool smallest_fake; // smallest is not real, have out-of-range keys
3535
bool largest_fake; // largest is not real, have out-of-range keys
36+
bool being_compacted; // Is this file undergoing compaction?
3637

3738
FileMetaData() :
3839
refs(0),
@@ -43,7 +44,8 @@ struct FileMetaData {
4344
file_size(0),
4445
data_size(0),
4546
smallest_fake(false),
46-
largest_fake(false) { }
47+
largest_fake(false),
48+
being_compacted(false) { }
4749
};
4850

4951
class VersionEdit {

0 commit comments

Comments
 (0)