Skip to content

Commit bc1ff22

Browse files
author
caijieming
committed
issue #934: multithread compactiong support
1 parent b66c065 commit bc1ff22

File tree

10 files changed

+526
-199
lines changed

10 files changed

+526
-199
lines changed

src/leveldb/db/db_impl.cc

Lines changed: 180 additions & 106 deletions
Large diffs are not rendered by default.

src/leveldb/db/db_impl.h

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -111,10 +111,10 @@ class DBImpl : public DB {
111111

112112
// Compact the in-memory write buffer to disk. Switches to a new
113113
// log-file/memtable and writes a new descriptor iff successful.
114-
Status CompactMemTable()
114+
Status CompactMemTable(bool* sched_idle = NULL)
115115
EXCLUSIVE_LOCKS_REQUIRED(mutex_);
116116

117-
Status WriteLevel0Table(MemTable* mem, VersionEdit* edit, Version* base)
117+
Status WriteLevel0Table(MemTable* mem, VersionEdit* edit, Version* base, uint64_t* number = NULL)
118118
EXCLUSIVE_LOCKS_REQUIRED(mutex_);
119119

120120
Status MakeRoomForWrite(bool force /* compact even if there is room? */)
@@ -124,7 +124,7 @@ class DBImpl : public DB {
124124
void MaybeScheduleCompaction() EXCLUSIVE_LOCKS_REQUIRED(mutex_);
125125
static void BGWork(void* db);
126126
void BackgroundCall();
127-
Status BackgroundCompaction() EXCLUSIVE_LOCKS_REQUIRED(mutex_);
127+
Status BackgroundCompaction(bool* sched_idle) EXCLUSIVE_LOCKS_REQUIRED(mutex_);
128128
void CleanupCompaction(CompactionState* compact)
129129
EXCLUSIVE_LOCKS_REQUIRED(mutex_);
130130
Status DoCompactionWork(CompactionState* compact)
@@ -193,17 +193,24 @@ class DBImpl : public DB {
193193
std::set<uint64_t> pending_outputs_;
194194

195195
// Has a background compaction been scheduled or is running?
196-
bool bg_compaction_scheduled_;
197-
double bg_compaction_score_;
198-
int64_t bg_schedule_id_;
196+
int bg_compaction_scheduled_;
197+
std::vector<double> bg_compaction_score_;
198+
std::vector<int64_t> bg_schedule_id_;
199199

200200
// Information for a manual compaction
201+
enum ManualCompactState {
202+
kManualCompactIdle,
203+
kManualCompactConflict,
204+
kManualCompactWakeup,
205+
};
201206
struct ManualCompaction {
202207
int level;
203208
bool done;
209+
bool being_sched;
204210
const InternalKey* begin; // NULL means beginning of key range
205211
const InternalKey* end; // NULL means end of key range
206212
InternalKey tmp_storage; // Used to keep track of compaction progress
213+
int compaction_conflict; // 0 == idle, 1 == conflict, 2 == wake
207214
};
208215
ManualCompaction* manual_compaction_;
209216

src/leveldb/db/db_table.cc

Lines changed: 1 addition & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -292,22 +292,6 @@ Status DBTable::Init() {
292292
uint32_t i = *it;
293293
DBImpl* impl = lg_list_[i];
294294
s = impl->RecoverLastDumpToLevel0(lg_edits[i]);
295-
296-
// LogAndApply to lg's manifest
297-
if (s.ok()) {
298-
MutexLock lock(&impl->mutex_);
299-
s = impl->versions_->LogAndApply(lg_edits[i], &impl->mutex_);
300-
if (s.ok()) {
301-
impl->DeleteObsoleteFiles();
302-
impl->MaybeScheduleCompaction();
303-
} else {
304-
Log(options_.info_log, "[%s] Fail to modify manifest of lg %d",
305-
dbname_.c_str(),
306-
i);
307-
}
308-
} else {
309-
Log(options_.info_log, "[%s] Fail to dump log to level 0", dbname_.c_str());
310-
}
311295
delete lg_edits[i];
312296
}
313297

@@ -820,7 +804,6 @@ Status DBTable::RecoverLogFile(uint64_t log_number, uint64_t recover_limit,
820804
if (this->status != NULL && this->status->ok()) *this->status = s;
821805
}
822806
};
823-
824807
mutex_.AssertHeld();
825808

826809
// Open the log file
@@ -860,7 +843,7 @@ Status DBTable::RecoverLogFile(uint64_t log_number, uint64_t recover_limit,
860843
// dbname_.c_str(), batch_seq, last_sequence_, WriteBatchInternal::Count(&batch));
861844
if (last_seq >= recover_limit) {
862845
Log(options_.info_log, "[%s] exceed limit %lu, ignore %lu ~ %lu",
863-
dbname_.c_str(), recover_limit, first_seq, last_seq);
846+
dbname_.c_str(), recover_limit, first_seq, last_seq);
864847
continue;
865848
}
866849

@@ -915,7 +898,6 @@ Status DBTable::RecoverLogFile(uint64_t log_number, uint64_t recover_limit,
915898
}
916899
}
917900
delete file;
918-
919901
return status;
920902
}
921903

src/leveldb/db/memtable.cc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ MemTable::MemTable(const InternalKeyComparator& cmp, CompactStrategyFactory* com
2626
: last_seq_(0),
2727
comparator_(cmp),
2828
refs_(0),
29+
being_flushed_(false),
2930
table_(comparator_, &arena_),
3031
empty_(true),
3132
compact_strategy_factory_(compact_strategy_factory) {

src/leveldb/db/memtable.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,13 @@ class MemTable {
7979
empty_ = false;
8080
}
8181

82+
bool BeingFlushed() { return being_flushed_;}
83+
void SetBeingFlushed(bool flag) {
84+
assert(flag ? !being_flushed_
85+
: being_flushed_);
86+
being_flushed_ = flag;
87+
}
88+
8289
virtual ~MemTable();
8390

8491
protected:
@@ -97,6 +104,7 @@ class MemTable {
97104

98105
KeyComparator comparator_;
99106
int refs_;
107+
bool being_flushed_;
100108

101109
Arena arena_;
102110
Table table_;

src/leveldb/db/version_edit.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,14 +30,16 @@ struct FileMetaData {
3030
InternalKey largest; // Largest internal key served by table
3131
bool smallest_fake; // smallest is not real, have out-of-range keys
3232
bool largest_fake; // largest is not real, have out-of-range keys
33+
bool being_compacted; // Is this file undergoing compaction?
3334

3435
FileMetaData() :
3536
refs(0),
3637
allowed_seeks(1 << 30),
3738
file_size(0),
3839
data_size(0),
3940
smallest_fake(false),
40-
largest_fake(false) { }
41+
largest_fake(false),
42+
being_compacted(false) { }
4143
};
4244

4345
class VersionEdit {

0 commit comments

Comments
 (0)