Skip to content

Commit f37946d

Browse files
BenHuddlestondaverigby
authored andcommitted
MB-33805: Create 1 shard per core in topkeys
There is a lot of cache contention in topkeys. Triton 2 node 80/20 R/W test: No change - ~3,200,000 ops/s Topkeys disabled - 3,868,560 ops/s Topkeys shared per core - 3,441,297 ops/s Previously this patch hit 3,731,861 ops/s but some other regression or contention has been introduced. Change-Id: I4b4b87fdf053453390c317c4477cade67bd1503e Reviewed-on: http://review.couchbase.org/107975 Reviewed-by: Dave Rigby <[email protected]> Tested-by: Build Bot <[email protected]>
1 parent 0ec87f9 commit f37946d

File tree

3 files changed

+160
-65
lines changed

3 files changed

+160
-65
lines changed

daemon/topkeys.cc

Lines changed: 98 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -17,26 +17,43 @@
1717

1818
#include "topkeys.h"
1919
#include "settings.h"
20+
21+
#include <platform/sysinfo.h>
22+
23+
#include <folly/concurrency/CacheLocality.h>
2024
#include <inttypes.h>
2125
#include <nlohmann/json.hpp>
2226
#include <stdlib.h>
2327
#include <sys/types.h>
28+
2429
#include <algorithm>
2530
#include <cstring>
2631
#include <gsl/gsl>
32+
#include <set>
2733
#include <stdexcept>
2834

2935
/*
3036
* Implementation Details
3137
*
3238
* === TopKeys ===
3339
*
34-
* The TopKeys class is split into NUM_SHARDS shards, each which owns
35-
* 1/NUM_SHARDS of the keyspace. This is to allow some level of
36-
* concurrent access - each shard has a mutex guarding all access.
37-
* Other than that the TopKeys class is pretty uninteresting - it
38-
* simply passes on requests to the correct Shard, and when statistics
39-
* are requested it aggregates information from each shard.
40+
* The TopKeys class is split into shards for performance reasons. We create one
41+
* shard per (logical) core to:
42+
*
43+
* a) prevent any cache contention
44+
* b) allow as much concurrent access as possible (each shard is guarded by a
45+
* mutex)
46+
*
47+
* Topkeys passes on requests to the correct Shard (determined by the core id of
48+
* the calling thread), and when statistics are requested it aggregates
49+
* information from each shard. When aggregating information, the TopKeys class
50+
* has to remove duplicates from the possible pool of top keys because we shard
51+
* per core for performance. Previously, TopKeys would create 8 shards
52+
* (regardless of machine size), each with storage for a configurably amount of
53+
* keys and shard by key hash (which meant that we would not have duplicate keys
54+
* across shards). Now that we shard per core instead of by key hash, to keep
55+
* the size of the stat output the same we need a minimum of 8 X N keys per
56+
* shard (because each shard could be an exact duplicate of the others).
4057
*
4158
* === TopKeys::Shard ===
4259
*
@@ -86,10 +103,10 @@
86103
* contents is replaced by the incoming key. Finally the linked-list
87104
* is updated to move the updated element to the head of the list.
88105
*/
89-
90-
TopKeys::TopKeys(int mkeys) {
106+
TopKeys::TopKeys(int mkeys)
107+
: keys_to_return(mkeys * legacy_multiplier), shards(cb::get_cpu_count()) {
91108
for (auto& shard : shards) {
92-
shard.setMaxKeys(mkeys);
109+
shard->setMaxKeys(keys_to_return);
93110
}
94111
}
95112

@@ -123,14 +140,13 @@ ENGINE_ERROR_CODE TopKeys::json_stats(nlohmann::json& object,
123140
return ENGINE_SUCCESS;
124141
}
125142

126-
TopKeys::Shard& TopKeys::getShard(size_t key_hash) {
127-
/* This is special-cased for 8 */
128-
static_assert(NUM_SHARDS == 8,
129-
"Topkeys::getShard() special-cased for SHARDS==8");
130-
return shards[key_hash & 0x7];
143+
TopKeys::Shard& TopKeys::getShard() {
144+
auto stripe =
145+
folly::AccessSpreader<std::atomic>::cachedCurrent(shards.size());
146+
return *shards[stripe];
131147
}
132148

133-
TopKeys::Shard::topkey_t* TopKeys::Shard::searchForKey(
149+
TopKeys::topkey_t* TopKeys::Shard::searchForKey(
134150
size_t key_hash, const cb::const_char_buffer& key) {
135151
for (auto& topkey : storage) {
136152
if (topkey.first.hash == key_hash) {
@@ -153,7 +169,7 @@ bool TopKeys::Shard::updateKey(const cb::const_char_buffer& key,
153169

154170
topkey_t* found_key = searchForKey(key_hash, key);
155171

156-
if (found_key == NULL) {
172+
if (!found_key) {
157173
// Key not found.
158174
if (storage.size() == max_keys) {
159175
// Re-use the lowest keys' storage.
@@ -202,29 +218,40 @@ void TopKeys::doUpdateKey(const void* key,
202218
}
203219

204220
try {
221+
// We store a key hash to make lookup of topkeys faster and because the
222+
// memory footprint is relatively small.
205223
cb::const_char_buffer key_buf(static_cast<const char*>(key), nkey);
206224
std::hash<cb::const_char_buffer> hash_fn;
207225
const size_t key_hash = hash_fn(key_buf);
208226

209-
getShard(key_hash).updateKey(key_buf, key_hash, operation_time);
227+
getShard().updateKey(key_buf, key_hash, operation_time);
210228
} catch (const std::bad_alloc&) {
211229
// Failed to increment topkeys, continue...
212230
}
213231
}
214232

215233
struct tk_context {
234+
using CallbackFn = void (*)(const std::string&,
235+
const topkey_item_t&,
236+
void*);
216237
tk_context(const void* c,
217238
const AddStatFn& a,
218239
rel_time_t t,
219-
nlohmann::json* arr)
220-
: cookie(c), add_stat(a), current_time(t), array(arr) {
240+
nlohmann::json* arr,
241+
CallbackFn callbackFn)
242+
: cookie(c),
243+
add_stat(a),
244+
current_time(t),
245+
array(arr),
246+
callbackFunction(callbackFn) {
221247
// empty
222248
}
223249

224250
const void* cookie;
225251
AddStatFn add_stat;
226252
rel_time_t current_time;
227253
nlohmann::json* array;
254+
CallbackFn callbackFunction;
228255
};
229256

230257
static void tk_iterfunc(const std::string& key,
@@ -284,14 +311,23 @@ static void tk_jsonfunc(const std::string& key,
284311
c->array->push_back(obj);
285312
}
286313

314+
static void tk_aggregate_func(const TopKeys::topkey_t& it, void* arg) {
315+
auto* map = (std::unordered_map<std::string, topkey_item_t>*)arg;
316+
317+
auto res = map->insert(std::make_pair(it.first.key, it.second));
318+
319+
// If insert failed, then we have a duplicate top key. Add the stats
320+
if (!res.second) {
321+
res.first->second.ti_access_count += it.second.ti_access_count;
322+
}
323+
}
324+
287325
ENGINE_ERROR_CODE TopKeys::doStats(const void* cookie,
288326
rel_time_t current_time,
289327
const AddStatFn& add_stat) {
290-
struct tk_context context(cookie, add_stat, current_time, nullptr);
291-
292-
for (auto& shard : shards) {
293-
shard.accept_visitor(tk_iterfunc, &context);
294-
}
328+
struct tk_context context(
329+
cookie, add_stat, current_time, nullptr, &tk_iterfunc);
330+
doStatsInner(context);
295331

296332
return ENGINE_SUCCESS;
297333
}
@@ -309,21 +345,51 @@ ENGINE_ERROR_CODE TopKeys::doStats(const void* cookie,
309345
ENGINE_ERROR_CODE TopKeys::do_json_stats(nlohmann::json& object,
310346
rel_time_t current_time) {
311347
nlohmann::json topkeys = nlohmann::json::array();
312-
struct tk_context context(nullptr, nullptr, current_time, &topkeys);
313-
314-
/* Collate the topkeys JSON object */
315-
for (auto& shard : shards) {
316-
shard.accept_visitor(tk_jsonfunc, &context);
317-
}
348+
struct tk_context context(
349+
nullptr, nullptr, current_time, &topkeys, &tk_jsonfunc);
350+
doStatsInner(context);
318351

352+
auto str = topkeys.dump();
319353
object["topkeys"] = topkeys;
354+
320355
return ENGINE_SUCCESS;
321356
}
322357

323358
void TopKeys::Shard::accept_visitor(iterfunc_t visitor_func,
324359
void* visitor_ctx) {
325360
std::lock_guard<std::mutex> lock(mutex);
326-
for (const auto key : list) {
327-
visitor_func(key->first.key, key->second, visitor_ctx);
361+
for (const auto* key : list) {
362+
visitor_func(*key, visitor_ctx);
328363
}
329364
}
365+
366+
void TopKeys::doStatsInner(const tk_context& stat_context) {
367+
// 1) Find the unique set of top keys by putting every top key in a map
368+
std::unordered_map<std::string, topkey_item_t> map =
369+
std::unordered_map<std::string, topkey_item_t>();
370+
371+
for (auto& shard : shards) {
372+
shard->accept_visitor(tk_aggregate_func, &map);
373+
}
374+
375+
// Easiest way to sort this by access_count is to drop the contents of the
376+
// map into a vector and sort that.
377+
auto items = std::vector<topkey_stat_t>(map.begin(), map.end());
378+
std::sort(items.begin(),
379+
items.end(),
380+
[](const TopKeys::topkey_stat_t& a,
381+
const TopKeys::topkey_stat_t& b) {
382+
// Sort by number of accesses
383+
return a.second.ti_access_count > b.second.ti_access_count;
384+
});
385+
386+
// 2) Iterate on this set making the required callback for each key. We only
387+
// iterate from the start of the container (highest access count) to the
388+
// number of keys to return.
389+
std::for_each(items.begin(),
390+
std::min(items.begin() + keys_to_return, items.end()),
391+
[stat_context](const topkey_stat_t& t) {
392+
stat_context.callbackFunction(
393+
t.first, t.second, (void*)&stat_context);
394+
});
395+
}

daemon/topkeys.h

Lines changed: 41 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,13 @@
2121
#include <platform/sized_buffer.h>
2222
#include <array>
2323

24+
#include <folly/CachelinePadded.h>
2425
#include <list>
2526
#include <mutex>
2627
#include <string>
28+
#include <unordered_map>
2729
#include <vector>
28-
30+
struct tk_context;
2931
/*
3032
* TopKeys
3133
*
@@ -48,13 +50,24 @@ struct topkey_item_t {
4850
*/
4951
class TopKeys {
5052
public:
51-
/* Constructor.
53+
/** Constructor.
5254
* @param mkeys Number of keys stored in each shard (i.e. up to
53-
* mkeys * SHARDS will be tracked).
55+
* mkeys * SHARDS will be tracked).
5456
*/
5557
explicit TopKeys(int mkeys);
5658
~TopKeys();
5759

60+
// Key type used in Shard. We store hashes of the key for faster lookup.
61+
// Public for use in shard visitor functions.
62+
struct KeyId {
63+
size_t hash;
64+
std::string key;
65+
};
66+
67+
// Pair of the key's string and the statistics related to it.
68+
typedef std::pair<KeyId, topkey_item_t> topkey_t;
69+
typedef std::pair<std::string, topkey_item_t> topkey_stat_t;
70+
5871
void updateKey(const void* key, size_t nkey, rel_time_t operation_time);
5972

6073
ENGINE_ERROR_CODE stats(const void* cookie,
@@ -81,28 +94,37 @@ class TopKeys {
8194
protected:
8295
void doUpdateKey(const void* key, size_t nkey, rel_time_t operation_time);
8396

97+
void doStatsInner(const tk_context& stat_context);
8498
ENGINE_ERROR_CODE doStats(const void* cookie,
8599
rel_time_t current_time,
86100
const AddStatFn& add_stat);
87101

88102
ENGINE_ERROR_CODE do_json_stats(nlohmann::json& object,
89103
rel_time_t current_time);
90-
91104
private:
92-
// Number of shards the keyspace is broken into. Permits some level of
93-
// concurrent update (there is one mutex per shard).
94-
static const int NUM_SHARDS = 8;
105+
/**
106+
* Topkeys previously worked by storing 8 shards with variable size. As we
107+
* now shard per core and not by key, topkeys may exist in multiple shards.
108+
* We need to multiply our requested shard size by 8 to ensure that we
109+
* have the same topkeys capacity.
110+
*/
111+
const size_t legacy_multiplier = 8;
112+
113+
/**
114+
* The number of topkeys that we return via stats
115+
*/
116+
const size_t keys_to_return;
95117

96118
class Shard;
97119

98-
Shard& getShard(size_t key_hash);
120+
Shard& getShard();
99121

100122
// One of N Shards which the keyspace has been broken
101123
// into.
102124
// Responsible for tracking the top {mkeys} within it's keyspace.
103125
class Shard {
104126
public:
105-
void setMaxKeys(int mkeys) {
127+
void setMaxKeys(size_t mkeys) {
106128
max_keys = mkeys;
107129
storage.reserve(max_keys);
108130
// reallocating storage invalidates the LRU list.
@@ -119,47 +141,38 @@ class TopKeys {
119141
size_t key_hash,
120142
rel_time_t operation_time);
121143

122-
typedef void (*iterfunc_t)(const std::string& key,
123-
const topkey_item_t& it,
124-
void* arg);
144+
typedef void (*iterfunc_t)(const topkey_t& it, void* arg);
125145

126146
/* For each key in this shard, invoke the given callback function.
127147
*/
128148
void accept_visitor(iterfunc_t visitor_func, void* visitor_ctx);
129149

130150
private:
131-
struct KeyId {
132-
size_t hash;
133-
std::string key;
134-
};
135-
136-
// Pair of the key's string and the statistics related to it.
137-
typedef std::pair<KeyId, topkey_item_t> topkey_t;
138-
139151
// An ordered list of topkey_t*, used for LRU.
140152
typedef std::list<topkey_t*> key_history_t;
141153

142-
// Vector topket_t, used for actual topke storage.
154+
// Vector topkey_t, used for actual topkey storage.
143155
typedef std::vector<topkey_t> key_storage_t;
144156

145157
// Searches for the given key. If found returns a pointer to the
146158
// topkey_t, else returns NULL.
147159
topkey_t* searchForKey(size_t hash, const cb::const_char_buffer& key);
148160

149-
// Maxumum numbers of keys to be tracked per shard.
150-
unsigned int max_keys;
151-
152-
// mutex to serial access to this shard.
153-
std::mutex mutex;
161+
// Maximum numbers of keys to be tracked per shard.
162+
size_t max_keys;
154163

155164
// list of keys, ordered from most-recently used (front) to least
156165
// recently used (back).
157166
key_history_t list;
158167

159168
// Underlying topkey storage.
160169
key_storage_t storage;
170+
171+
// Mutex to serialize access to this shard.
172+
std::mutex mutex;
161173
};
162174

163-
// array of topkey shards.
164-
std::array<Shard, NUM_SHARDS> shards;
175+
// Array of topkey shards. We have one shard per core so we need to
176+
// cache line pad the shards to prevent any contention.
177+
std::vector<folly::CachelinePadded<Shard>> shards;
165178
};

0 commit comments

Comments
 (0)