Skip to content

Commit b45434e

Browse files
author
caijieming
committed
issue #934: multi-thread compaction support
1 parent 6b17c28 commit b45434e

File tree

10 files changed

+645
-256
lines changed

10 files changed

+645
-256
lines changed

src/leveldb/db/db_impl.cc

Lines changed: 218 additions & 122 deletions
Large diffs are not rendered by default.

src/leveldb/db/db_impl.h

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,12 @@ class DBImpl : public DB {
9696
friend class DBTable;
9797
struct CompactionState;
9898
struct Writer;
99+
struct CompactionTask {
100+
int64_t id; // compaction thread id
101+
double score; // compaction score
102+
uint64_t timeout; // compaction task delay time
103+
DBImpl* db;
104+
};
99105

100106
Iterator* NewInternalIterator(const ReadOptions&,
101107
SequenceNumber* latest_snapshot);
@@ -110,19 +116,19 @@ class DBImpl : public DB {
110116

111117
// Compact the in-memory write buffer to disk. Switches to a new
112118
// log-file/memtable and writes a new descriptor iff successful.
113-
Status CompactMemTable()
119+
Status CompactMemTable(bool* sched_idle = NULL)
114120
EXCLUSIVE_LOCKS_REQUIRED(mutex_);
115121

116-
Status WriteLevel0Table(MemTable* mem, VersionEdit* edit, Version* base)
122+
Status WriteLevel0Table(MemTable* mem, VersionEdit* edit, Version* base, uint64_t* number = NULL)
117123
EXCLUSIVE_LOCKS_REQUIRED(mutex_);
118124

119125
Status MakeRoomForWrite(bool force /* compact even if there is room? */)
120126
EXCLUSIVE_LOCKS_REQUIRED(mutex_);
121127

122128
void MaybeScheduleCompaction() EXCLUSIVE_LOCKS_REQUIRED(mutex_);
123129
static void BGWork(void* db);
124-
void BackgroundCall();
125-
Status BackgroundCompaction() EXCLUSIVE_LOCKS_REQUIRED(mutex_);
130+
void BackgroundCall(CompactionTask* task);
131+
Status BackgroundCompaction(bool* sched_idle) EXCLUSIVE_LOCKS_REQUIRED(mutex_);
126132
void CleanupCompaction(CompactionState* compact)
127133
EXCLUSIVE_LOCKS_REQUIRED(mutex_);
128134
Status DoCompactionWork(CompactionState* compact)
@@ -196,18 +202,24 @@ class DBImpl : public DB {
196202
std::set<uint64_t> pending_outputs_;
197203

198204
// Has a background compaction been scheduled or is running?
199-
bool bg_compaction_scheduled_;
200-
double bg_compaction_score_;
201-
uint64_t bg_compaction_timeout_;
202-
int64_t bg_schedule_id_;
205+
std::vector<CompactionTask*> bg_compaction_tasks_;
206+
std::vector<double> bg_compaction_score_;
207+
std::vector<int64_t> bg_schedule_id_;
203208

204209
// Information for a manual compaction
210+
enum ManualCompactState {
211+
kManualCompactIdle, // manual compact inited
212+
kManualCompactConflict, // manual compact run simultaneously
213+
kManualCompactWakeup, // restart delay compact task
214+
};
205215
struct ManualCompaction {
206216
int level;
207217
bool done;
218+
bool being_sched;
208219
const InternalKey* begin; // NULL means beginning of key range
209220
const InternalKey* end; // NULL means end of key range
210221
InternalKey tmp_storage; // Used to keep track of compaction progress
222+
ManualCompactState compaction_conflict; // 0 == idle, 1 == conflict, 2 == wake
211223
};
212224
ManualCompaction* manual_compaction_;
213225

src/leveldb/db/db_table.cc

Lines changed: 0 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -311,22 +311,6 @@ Status DBTable::Init() {
311311
uint32_t i = *it;
312312
DBImpl* impl = lg_list_[i];
313313
s = impl->RecoverLastDumpToLevel0(lg_edits[i]);
314-
315-
// LogAndApply to lg's manifest
316-
if (s.ok()) {
317-
MutexLock lock(&impl->mutex_);
318-
s = impl->versions_->LogAndApply(lg_edits[i], &impl->mutex_);
319-
if (s.ok()) {
320-
impl->DeleteObsoleteFiles();
321-
impl->MaybeScheduleCompaction();
322-
} else {
323-
Log(options_.info_log, "[%s] Fail to modify manifest of lg %d",
324-
dbname_.c_str(),
325-
i);
326-
}
327-
} else {
328-
Log(options_.info_log, "[%s] Fail to dump log to level 0", dbname_.c_str());
329-
}
330314
delete lg_edits[i];
331315
}
332316

@@ -936,7 +920,6 @@ Status DBTable::RecoverLogFile(uint64_t log_number, uint64_t recover_limit,
936920
}
937921
}
938922
delete file;
939-
940923
return status;
941924
}
942925

src/leveldb/db/memtable.cc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ MemTable::MemTable(const InternalKeyComparator& cmp, CompactStrategyFactory* com
2626
: last_seq_(0),
2727
comparator_(cmp),
2828
refs_(0),
29+
being_flushed_(false),
2930
table_(comparator_, &arena_),
3031
empty_(true),
3132
compact_strategy_factory_(compact_strategy_factory) {

src/leveldb/db/memtable.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,13 @@ class MemTable {
7979
empty_ = false;
8080
}
8181

82+
bool BeingFlushed() { return being_flushed_;}
83+
void SetBeingFlushed(bool flag) {
84+
assert(flag ? !being_flushed_
85+
: being_flushed_);
86+
being_flushed_ = flag;
87+
}
88+
8289
virtual ~MemTable();
8390

8491
protected:
@@ -97,6 +104,7 @@ class MemTable {
97104

98105
KeyComparator comparator_;
99106
int refs_;
107+
bool being_flushed_;
100108

101109
Arena arena_;
102110
Table table_;

src/leveldb/db/version_edit.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ struct FileMetaData {
3333
InternalKey largest; // Largest internal key served by table
3434
bool smallest_fake; // smallest is not real, have out-of-range keys
3535
bool largest_fake; // largest is not real, have out-of-range keys
36+
bool being_compacted; // Is this file undergoing compaction?
3637

3738
FileMetaData() :
3839
refs(0),
@@ -44,7 +45,8 @@ struct FileMetaData {
4445
file_size(0),
4546
data_size(0),
4647
smallest_fake(false),
47-
largest_fake(false) { }
48+
largest_fake(false),
49+
being_compacted(false) { }
4850
};
4951

5052
class VersionEdit {

0 commit comments

Comments
 (0)