Skip to content

Commit 1a928c2

Browse files
Jing118facebook-github-bot
authored andcommitted
Add insert hints for each writebatch (facebook#5728)
Summary: Add insert hints for each writebatch so that they can be used in concurrent write, and add write option to enable it. Bench result (qps): `./db_bench --benchmarks=fillseq -allow_concurrent_memtable_write=true -num=4000000 -batch-size=1 -threads=1 -db=/data3/ylj/tmp -write_buffer_size=536870912 -num_column_families=4` master: | batch size \ thread num | 1 | 2 | 4 | 8 | | ----------------------- | ------- | ------- | ------- | ------- | | 1 | 387883 | 220790 | 308294 | 490998 | | 10 | 1397208 | 978911 | 1275684 | 1733395 | | 100 | 2045414 | 1589927 | 1798782 | 2681039 | | 1000 | 2228038 | 1698252 | 1839877 | 2863490 | fillseq with writebatch hint: | batch size \ thread num | 1 | 2 | 4 | 8 | | ----------------------- | ------- | ------- | ------- | ------- | | 1 | 286005 | 223570 | 300024 | 466981 | | 10 | 970374 | 813308 | 1399299 | 1753588 | | 100 | 1962768 | 1983023 | 2676577 | 3086426 | | 1000 | 2195853 | 2676782 | 3231048 | 3638143 | Pull Request resolved: facebook#5728 Differential Revision: D17297240 fbshipit-source-id: b053590a6d77871f1ef2f911a7bd013b3899b26c
1 parent a378a4c commit 1a928c2

File tree

13 files changed

+155
-20
lines changed

13 files changed

+155
-20
lines changed

db/c.cc

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3224,6 +3224,11 @@ void rocksdb_writeoptions_set_low_pri(
32243224
opt->rep.low_pri = v;
32253225
}
32263226

3227+
void rocksdb_writeoptions_set_memtable_insert_hint_per_batch(
3228+
rocksdb_writeoptions_t* opt, unsigned char v) {
3229+
opt->rep.memtable_insert_hint_per_batch = v;
3230+
}
3231+
32273232
rocksdb_compactoptions_t* rocksdb_compactoptions_create() {
32283233
return new rocksdb_compactoptions_t;
32293234
}

db/db_impl/db_impl_write.cc

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
174174
&trim_history_scheduler_,
175175
write_options.ignore_missing_column_families, 0 /*log_number*/, this,
176176
true /*concurrent_memtable_writes*/, seq_per_batch_, w.batch_cnt,
177-
batch_per_txn_);
177+
batch_per_txn_, write_options.memtable_insert_hint_per_batch);
178178

179179
PERF_TIMER_START(write_pre_and_post_process_time);
180180
}
@@ -397,7 +397,8 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
397397
&trim_history_scheduler_,
398398
write_options.ignore_missing_column_families, 0 /*log_number*/,
399399
this, true /*concurrent_memtable_writes*/, seq_per_batch_,
400-
w.batch_cnt, batch_per_txn_);
400+
w.batch_cnt, batch_per_txn_,
401+
write_options.memtable_insert_hint_per_batch);
401402
}
402403
}
403404
if (seq_used != nullptr) {
@@ -564,7 +565,9 @@ Status DBImpl::PipelinedWriteImpl(const WriteOptions& write_options,
564565
w.status = WriteBatchInternal::InsertInto(
565566
&w, w.sequence, &column_family_memtables, &flush_scheduler_,
566567
&trim_history_scheduler_, write_options.ignore_missing_column_families,
567-
0 /*log_number*/, this, true /*concurrent_memtable_writes*/);
568+
0 /*log_number*/, this, true /*concurrent_memtable_writes*/,
569+
false /*seq_per_batch*/, 0 /*batch_cnt*/, true /*batch_per_txn*/,
570+
write_options.memtable_insert_hint_per_batch);
568571
if (write_thread_.CompleteParallelMemTableWriter(&w)) {
569572
MemTableInsertStatusCheck(w.status);
570573
versions_->SetLastSequence(w.write_group->last_sequence);
@@ -603,7 +606,8 @@ Status DBImpl::UnorderedWriteMemtable(const WriteOptions& write_options,
603606
&w, w.sequence, &column_family_memtables, &flush_scheduler_,
604607
&trim_history_scheduler_, write_options.ignore_missing_column_families,
605608
0 /*log_number*/, this, true /*concurrent_memtable_writes*/,
606-
seq_per_batch_, sub_batch_cnt);
609+
seq_per_batch_, sub_batch_cnt, true /*batch_per_txn*/,
610+
write_options.memtable_insert_hint_per_batch);
607611

608612
WriteStatusCheck(w.status);
609613
if (write_options.disableWAL) {

db/memtable.cc

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -469,7 +469,7 @@ MemTable::MemTableStats MemTable::ApproximateStats(const Slice& start_ikey,
469469
bool MemTable::Add(SequenceNumber s, ValueType type,
470470
const Slice& key, /* user key */
471471
const Slice& value, bool allow_concurrent,
472-
MemTablePostProcessInfo* post_process_info) {
472+
MemTablePostProcessInfo* post_process_info, void** hint) {
473473
// Format of an entry is concatenation of:
474474
// key_size : varint32 of internal_key.size()
475475
// key bytes : char[internal_key.size()]
@@ -547,7 +547,9 @@ bool MemTable::Add(SequenceNumber s, ValueType type,
547547
assert(post_process_info == nullptr);
548548
UpdateFlushState();
549549
} else {
550-
bool res = table->InsertKeyConcurrently(handle);
550+
bool res = (hint == nullptr)
551+
? table->InsertKeyConcurrently(handle)
552+
: table->InsertKeyWithHintConcurrently(handle, hint);
551553
if (UNLIKELY(!res)) {
552554
return res;
553555
}

db/memtable.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,8 @@ class MemTable {
182182
// the <key, seq> already exists.
183183
bool Add(SequenceNumber seq, ValueType type, const Slice& key,
184184
const Slice& value, bool allow_concurrent = false,
185-
MemTablePostProcessInfo* post_process_info = nullptr);
185+
MemTablePostProcessInfo* post_process_info = nullptr,
186+
void** hint = nullptr);
186187

187188
// Used to Get value associated with key or Get Merge Operands associated
188189
// with key.

db/write_batch.cc

Lines changed: 33 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
#include <stack>
4040
#include <stdexcept>
4141
#include <type_traits>
42+
#include <unordered_map>
4243
#include <vector>
4344

4445
#include "db/column_family.h"
@@ -1225,6 +1226,22 @@ class MemTableInserter : public WriteBatch::Handler {
12251226
DupDetector duplicate_detector_;
12261227
bool dup_dectector_on_;
12271228

1229+
bool hint_per_batch_;
1230+
bool hint_created_;
1231+
// Hints for this batch
1232+
using HintMap = std::unordered_map<MemTable*, void*>;
1233+
using HintMapType = std::aligned_storage<sizeof(HintMap)>::type;
1234+
HintMapType hint_;
1235+
1236+
HintMap& GetHintMap() {
1237+
assert(hint_per_batch_);
1238+
if (!hint_created_) {
1239+
new (&hint_) HintMap();
1240+
hint_created_ = true;
1241+
}
1242+
return *reinterpret_cast<HintMap*>(&hint_);
1243+
}
1244+
12281245
MemPostInfoMap& GetPostMap() {
12291246
assert(concurrent_memtable_writes_);
12301247
if(!post_info_created_) {
@@ -1258,7 +1275,7 @@ class MemTableInserter : public WriteBatch::Handler {
12581275
uint64_t recovering_log_number, DB* db,
12591276
bool concurrent_memtable_writes,
12601277
bool* has_valid_writes = nullptr, bool seq_per_batch = false,
1261-
bool batch_per_txn = true)
1278+
bool batch_per_txn = true, bool hint_per_batch = false)
12621279
: sequence_(_sequence),
12631280
cf_mems_(cf_mems),
12641281
flush_scheduler_(flush_scheduler),
@@ -1282,7 +1299,9 @@ class MemTableInserter : public WriteBatch::Handler {
12821299
write_before_prepare_(!batch_per_txn),
12831300
unprepared_batch_(false),
12841301
duplicate_detector_(),
1285-
dup_dectector_on_(false) {
1302+
dup_dectector_on_(false),
1303+
hint_per_batch_(hint_per_batch),
1304+
hint_created_(false) {
12861305
assert(cf_mems_);
12871306
}
12881307

@@ -1295,6 +1314,12 @@ class MemTableInserter : public WriteBatch::Handler {
12951314
reinterpret_cast<MemPostInfoMap*>
12961315
(&mem_post_info_map_)->~MemPostInfoMap();
12971316
}
1317+
if (hint_created_) {
1318+
for (auto iter : GetHintMap()) {
1319+
delete[] reinterpret_cast<char*>(iter.second);
1320+
}
1321+
reinterpret_cast<HintMap*>(&hint_)->~HintMap();
1322+
}
12981323
delete rebuilding_trx_;
12991324
}
13001325

@@ -1404,7 +1429,8 @@ class MemTableInserter : public WriteBatch::Handler {
14041429
if (!moptions->inplace_update_support) {
14051430
bool mem_res =
14061431
mem->Add(sequence_, value_type, key, value,
1407-
concurrent_memtable_writes_, get_post_process_info(mem));
1432+
concurrent_memtable_writes_, get_post_process_info(mem),
1433+
hint_per_batch_ ? &GetHintMap()[mem] : nullptr);
14081434
if (UNLIKELY(!mem_res)) {
14091435
assert(seq_per_batch_);
14101436
ret_status = Status::TryAgain("key+seq exists");
@@ -1487,7 +1513,8 @@ class MemTableInserter : public WriteBatch::Handler {
14871513
MemTable* mem = cf_mems_->GetMemTable();
14881514
bool mem_res =
14891515
mem->Add(sequence_, delete_type, key, value,
1490-
concurrent_memtable_writes_, get_post_process_info(mem));
1516+
concurrent_memtable_writes_, get_post_process_info(mem),
1517+
hint_per_batch_ ? &GetHintMap()[mem] : nullptr);
14911518
if (UNLIKELY(!mem_res)) {
14921519
assert(seq_per_batch_);
14931520
ret_status = Status::TryAgain("key+seq exists");
@@ -1962,7 +1989,7 @@ Status WriteBatchInternal::InsertInto(
19621989
TrimHistoryScheduler* trim_history_scheduler,
19631990
bool ignore_missing_column_families, uint64_t log_number, DB* db,
19641991
bool concurrent_memtable_writes, bool seq_per_batch, size_t batch_cnt,
1965-
bool batch_per_txn) {
1992+
bool batch_per_txn, bool hint_per_batch) {
19661993
#ifdef NDEBUG
19671994
(void)batch_cnt;
19681995
#endif
@@ -1971,7 +1998,7 @@ Status WriteBatchInternal::InsertInto(
19711998
sequence, memtables, flush_scheduler, trim_history_scheduler,
19721999
ignore_missing_column_families, log_number, db,
19732000
concurrent_memtable_writes, nullptr /*has_valid_writes*/, seq_per_batch,
1974-
batch_per_txn);
2001+
batch_per_txn, hint_per_batch);
19752002
SetSequence(writer->batch, sequence);
19762003
inserter.set_log_number_ref(writer->log_ref);
19772004
Status s = writer->batch->Iterate(&inserter);

db/write_batch_internal.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,8 @@ class WriteBatchInternal {
188188
uint64_t log_number = 0, DB* db = nullptr,
189189
bool concurrent_memtable_writes = false,
190190
bool seq_per_batch = false, size_t batch_cnt = 0,
191-
bool batch_per_txn = true);
191+
bool batch_per_txn = true,
192+
bool hint_per_batch = false);
192193

193194
static Status Append(WriteBatch* dst, const WriteBatch* src,
194195
const bool WAL_only = false);

include/rocksdb/c.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1263,6 +1263,9 @@ extern ROCKSDB_LIBRARY_API void rocksdb_writeoptions_set_no_slowdown(
12631263
rocksdb_writeoptions_t*, unsigned char);
12641264
extern ROCKSDB_LIBRARY_API void rocksdb_writeoptions_set_low_pri(
12651265
rocksdb_writeoptions_t*, unsigned char);
1266+
extern ROCKSDB_LIBRARY_API void
1267+
rocksdb_writeoptions_set_memtable_insert_hint_per_batch(rocksdb_writeoptions_t*,
1268+
unsigned char);
12661269

12671270
/* Compact range options */
12681271

include/rocksdb/memtablerep.h

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,28 @@ class MemTableRep {
120120
return true;
121121
}
122122

123+
// Same as ::InsertWithHint, but allow concurrnet write
124+
//
125+
// If hint points to nullptr, a new hint will be allocated on heap, otherwise
126+
// the hint will be updated to reflect the last insert location. The hint is
127+
// owned by the caller and it is the caller's responsibility to delete the
128+
// hint later.
129+
//
130+
// Currently only skip-list based memtable implement the interface. Other
131+
// implementations will fallback to InsertConcurrently() by default.
132+
virtual void InsertWithHintConcurrently(KeyHandle handle, void** /*hint*/) {
133+
// Ignore the hint by default.
134+
InsertConcurrently(handle);
135+
}
136+
137+
// Same as ::InsertWithHintConcurrently
138+
// Returns false if MemTableRepFactory::CanHandleDuplicatedKey() is true and
139+
// the <key, seq> already exists.
140+
virtual bool InsertKeyWithHintConcurrently(KeyHandle handle, void** hint) {
141+
InsertWithHintConcurrently(handle, hint);
142+
return true;
143+
}
144+
123145
// Like Insert(handle), but may be called concurrent with other calls
124146
// to InsertConcurrently for other handles.
125147
//

include/rocksdb/options.h

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1356,6 +1356,15 @@ struct WriteOptions {
13561356
// Default: false
13571357
bool low_pri;
13581358

1359+
// If true, this writebatch will maintain the last insert positions of each
1360+
// memtable as hints in concurrent write. It can improve write performance
1361+
// in concurrent writes if keys in one writebatch are sequential. In
1362+
// non-concurrent writes (when concurrent_memtable_writes is false) this
1363+
// option will be ignored.
1364+
//
1365+
// Default: false
1366+
bool memtable_insert_hint_per_batch;
1367+
13591368
// Timestamp of write operation, e.g. Put. All timestamps of the same
13601369
// database must share the same length and format. The user is also
13611370
// responsible for providing a customized compare function via Comparator to
@@ -1373,6 +1382,7 @@ struct WriteOptions {
13731382
ignore_missing_column_families(false),
13741383
no_slowdown(false),
13751384
low_pri(false),
1385+
memtable_insert_hint_per_batch(false),
13761386
timestamp(nullptr) {}
13771387
};
13781388

memtable/inlineskiplist.h

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,9 @@ class InlineSkipList {
8686
// Allocate a splice using allocator.
8787
Splice* AllocateSplice();
8888

89+
// Allocate a splice on heap.
90+
Splice* AllocateSpliceOnHeap();
91+
8992
// Inserts a key allocated by AllocateKey, after the actual key value
9093
// has been filled in.
9194
//
@@ -105,6 +108,12 @@ class InlineSkipList {
105108
// REQUIRES: no concurrent calls to any of inserts.
106109
bool InsertWithHint(const char* key, void** hint);
107110

111+
// Like InsertConcurrently, but with a hint
112+
//
113+
// REQUIRES: nothing that compares equal to key is currently in the list.
114+
// REQUIRES: no concurrent calls that use same hint
115+
bool InsertWithHintConcurrently(const char* key, void** hint);
116+
108117
// Like Insert, but external synchronization is not required.
109118
bool InsertConcurrently(const char* key);
110119

@@ -642,6 +651,18 @@ InlineSkipList<Comparator>::AllocateSplice() {
642651
return splice;
643652
}
644653

654+
template <class Comparator>
655+
typename InlineSkipList<Comparator>::Splice*
656+
InlineSkipList<Comparator>::AllocateSpliceOnHeap() {
657+
size_t array_size = sizeof(Node*) * (kMaxHeight_ + 1);
658+
char* raw = new char[sizeof(Splice) + array_size * 2];
659+
Splice* splice = reinterpret_cast<Splice*>(raw);
660+
splice->height_ = 0;
661+
splice->prev_ = reinterpret_cast<Node**>(raw + sizeof(Splice));
662+
splice->next_ = reinterpret_cast<Node**>(raw + sizeof(Splice) + array_size);
663+
return splice;
664+
}
665+
645666
template <class Comparator>
646667
bool InlineSkipList<Comparator>::Insert(const char* key) {
647668
return Insert<false>(key, seq_splice_, false);
@@ -668,6 +689,18 @@ bool InlineSkipList<Comparator>::InsertWithHint(const char* key, void** hint) {
668689
return Insert<false>(key, splice, true);
669690
}
670691

692+
template <class Comparator>
693+
bool InlineSkipList<Comparator>::InsertWithHintConcurrently(const char* key,
694+
void** hint) {
695+
assert(hint != nullptr);
696+
Splice* splice = reinterpret_cast<Splice*>(*hint);
697+
if (splice == nullptr) {
698+
splice = AllocateSpliceOnHeap();
699+
*hint = reinterpret_cast<void*>(splice);
700+
}
701+
return Insert<true>(key, splice, true);
702+
}
703+
671704
template <class Comparator>
672705
template <bool prefetch_before>
673706
void InlineSkipList<Comparator>::FindSpliceForLevel(const DecodedKey& key,

0 commit comments

Comments
 (0)