Skip to content

Commit 691adcf

Browse files
author
caijieming
committed
issue #934: multi-thread compaction support
1 parent b66c065 commit 691adcf

File tree

10 files changed

+552
-205
lines changed

10 files changed

+552
-205
lines changed

src/leveldb/db/db_impl.cc

Lines changed: 199 additions & 111 deletions
Large diffs are not rendered by default.

src/leveldb/db/db_impl.h

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,11 @@ class DBImpl : public DB {
9797
friend class DBTable;
9898
struct CompactionState;
9999
struct Writer;
100+
struct CompactionTask {
101+
int64_t id;
102+
double score;
103+
DBImpl* db;
104+
};
100105

101106
Iterator* NewInternalIterator(const ReadOptions&,
102107
SequenceNumber* latest_snapshot);
@@ -111,10 +116,10 @@ class DBImpl : public DB {
111116

112117
// Compact the in-memory write buffer to disk. Switches to a new
113118
// log-file/memtable and writes a new descriptor iff successful.
114-
Status CompactMemTable()
119+
Status CompactMemTable(bool* sched_idle = NULL)
115120
EXCLUSIVE_LOCKS_REQUIRED(mutex_);
116121

117-
Status WriteLevel0Table(MemTable* mem, VersionEdit* edit, Version* base)
122+
Status WriteLevel0Table(MemTable* mem, VersionEdit* edit, Version* base, uint64_t* number = NULL)
118123
EXCLUSIVE_LOCKS_REQUIRED(mutex_);
119124

120125
Status MakeRoomForWrite(bool force /* compact even if there is room? */)
@@ -123,8 +128,8 @@ class DBImpl : public DB {
123128

124129
void MaybeScheduleCompaction() EXCLUSIVE_LOCKS_REQUIRED(mutex_);
125130
static void BGWork(void* db);
126-
void BackgroundCall();
127-
Status BackgroundCompaction() EXCLUSIVE_LOCKS_REQUIRED(mutex_);
131+
void BackgroundCall(CompactionTask* task);
132+
Status BackgroundCompaction(bool* sched_idle) EXCLUSIVE_LOCKS_REQUIRED(mutex_);
128133
void CleanupCompaction(CompactionState* compact)
129134
EXCLUSIVE_LOCKS_REQUIRED(mutex_);
130135
Status DoCompactionWork(CompactionState* compact)
@@ -193,17 +198,25 @@ class DBImpl : public DB {
193198
std::set<uint64_t> pending_outputs_;
194199

195200
// Has a background compaction been scheduled or is running?
196-
bool bg_compaction_scheduled_;
197-
double bg_compaction_score_;
198-
int64_t bg_schedule_id_;
201+
std::set<CompactionTask*> bg_compaction_tasks_;
202+
int bg_compaction_scheduled_;
203+
std::vector<double> bg_compaction_score_;
204+
std::vector<int64_t> bg_schedule_id_;
199205

200206
// Information for a manual compaction
207+
enum ManualCompactState {
208+
kManualCompactIdle,
209+
kManualCompactConflict,
210+
kManualCompactWakeup,
211+
};
201212
struct ManualCompaction {
202213
int level;
203214
bool done;
215+
bool being_sched;
204216
const InternalKey* begin; // NULL means beginning of key range
205217
const InternalKey* end; // NULL means end of key range
206218
InternalKey tmp_storage; // Used to keep track of compaction progress
219+
int compaction_conflict; // 0 == idle, 1 == conflict, 2 == wake
207220
};
208221
ManualCompaction* manual_compaction_;
209222

src/leveldb/db/db_table.cc

Lines changed: 1 addition & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -292,22 +292,6 @@ Status DBTable::Init() {
292292
uint32_t i = *it;
293293
DBImpl* impl = lg_list_[i];
294294
s = impl->RecoverLastDumpToLevel0(lg_edits[i]);
295-
296-
// LogAndApply to lg's manifest
297-
if (s.ok()) {
298-
MutexLock lock(&impl->mutex_);
299-
s = impl->versions_->LogAndApply(lg_edits[i], &impl->mutex_);
300-
if (s.ok()) {
301-
impl->DeleteObsoleteFiles();
302-
impl->MaybeScheduleCompaction();
303-
} else {
304-
Log(options_.info_log, "[%s] Fail to modify manifest of lg %d",
305-
dbname_.c_str(),
306-
i);
307-
}
308-
} else {
309-
Log(options_.info_log, "[%s] Fail to dump log to level 0", dbname_.c_str());
310-
}
311295
delete lg_edits[i];
312296
}
313297

@@ -820,7 +804,6 @@ Status DBTable::RecoverLogFile(uint64_t log_number, uint64_t recover_limit,
820804
if (this->status != NULL && this->status->ok()) *this->status = s;
821805
}
822806
};
823-
824807
mutex_.AssertHeld();
825808

826809
// Open the log file
@@ -860,7 +843,7 @@ Status DBTable::RecoverLogFile(uint64_t log_number, uint64_t recover_limit,
860843
// dbname_.c_str(), batch_seq, last_sequence_, WriteBatchInternal::Count(&batch));
861844
if (last_seq >= recover_limit) {
862845
Log(options_.info_log, "[%s] exceed limit %lu, ignore %lu ~ %lu",
863-
dbname_.c_str(), recover_limit, first_seq, last_seq);
846+
dbname_.c_str(), recover_limit, first_seq, last_seq);
864847
continue;
865848
}
866849

@@ -915,7 +898,6 @@ Status DBTable::RecoverLogFile(uint64_t log_number, uint64_t recover_limit,
915898
}
916899
}
917900
delete file;
918-
919901
return status;
920902
}
921903

src/leveldb/db/memtable.cc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ MemTable::MemTable(const InternalKeyComparator& cmp, CompactStrategyFactory* com
2626
: last_seq_(0),
2727
comparator_(cmp),
2828
refs_(0),
29+
being_flushed_(false),
2930
table_(comparator_, &arena_),
3031
empty_(true),
3132
compact_strategy_factory_(compact_strategy_factory) {

src/leveldb/db/memtable.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,13 @@ class MemTable {
7979
empty_ = false;
8080
}
8181

82+
bool BeingFlushed() { return being_flushed_;}
83+
void SetBeingFlushed(bool flag) {
84+
assert(flag ? !being_flushed_
85+
: being_flushed_);
86+
being_flushed_ = flag;
87+
}
88+
8289
virtual ~MemTable();
8390

8491
protected:
@@ -97,6 +104,7 @@ class MemTable {
97104

98105
KeyComparator comparator_;
99106
int refs_;
107+
bool being_flushed_;
100108

101109
Arena arena_;
102110
Table table_;

src/leveldb/db/version_edit.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,14 +30,16 @@ struct FileMetaData {
3030
InternalKey largest; // Largest internal key served by table
3131
bool smallest_fake; // smallest is not real, have out-of-range keys
3232
bool largest_fake; // largest is not real, have out-of-range keys
33+
bool being_compacted; // Is this file undergoing compaction?
3334

3435
FileMetaData() :
3536
refs(0),
3637
allowed_seeks(1 << 30),
3738
file_size(0),
3839
data_size(0),
3940
smallest_fake(false),
40-
largest_fake(false) { }
41+
largest_fake(false),
42+
being_compacted(false) { }
4143
};
4244

4345
class VersionEdit {

0 commit comments

Comments
 (0)