Skip to content

Commit aafd5c5

Browse files
authored
Merge pull request ceph#55696 from bill-scales/main
common/mempool.cc: Improve performance of sharding Reviewed-by: Adam Kupczyk <[email protected]> Reviewed-by: Jose Juan Palacios-Perez <[email protected]> Reviewed-by: John Agombar <[email protected]> Reviewed-by: Radoslaw Zarzynski <[email protected]>
2 parents 080c083 + 9dfe5c1 commit aafd5c5

File tree

5 files changed

+127
-124
lines changed

5 files changed

+127
-124
lines changed

src/common/mempool.cc

Lines changed: 69 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -12,27 +12,77 @@
1212
*
1313
*/
1414

15+
#include <thread>
1516
#include "include/mempool.h"
1617
#include "include/demangle.h"
1718

18-
#if defined(_GNU_SOURCE) && defined(WITH_SEASTAR)
19-
#else
20-
// Thread local variables should save index, not &shard[index],
21-
// because shard[] is defined in the class
22-
static thread_local size_t thread_shard_index = mempool::num_shards;
23-
#endif
24-
2519
// default to debug_mode off
2620
bool mempool::debug_mode = false;
2721

2822
// --------------------------------------------------------------
2923

24+
namespace mempool {
25+
26+
static size_t num_shard_bits;
27+
static size_t num_shards;
28+
29+
std::unique_ptr<shard_t[]> shards = std::make_unique<shard_t[]>(get_num_shards());
30+
}
31+
32+
size_t mempool::get_num_shards(void) {
33+
static std::once_flag once;
34+
std::call_once(once,[&]() {
35+
unsigned int threads = std::thread::hardware_concurrency();
36+
if (threads == 0) {
37+
threads = DEFAULT_SHARDS;
38+
}
39+
threads = std::clamp<unsigned int>(threads, MIN_SHARDS, MAX_SHARDS);
40+
threads--;
41+
while (threads != 0) {
42+
num_shard_bits++;
43+
threads>>=1;
44+
}
45+
num_shards = 1 << num_shard_bits;
46+
});
47+
return num_shards;
48+
}
49+
50+
// There are 2 implementations of pick_a_shard_int, SCHED GETCPU is
51+
// the preferred implementaion, ROUND ROBIN is used if sched_getcpu() is not
52+
// available.
53+
int mempool::pick_a_shard_int(void) {
54+
#if defined(MEMPOOL_SCHED_GETCPU)
55+
// SCHED_GETCPU: Shards are assigned to CPU cores. Threads use sched_getcpu()
56+
// to query the core before every access to the shard. Other than the (very
57+
// rare) situation where a context switch occurs between calling
58+
// sched_getcpu() and updating the shard there is no cache line
59+
// contention
60+
return sched_getcpu() & ((1 << num_shard_bits) - 1);
61+
#else
62+
// ROUND_ROBIN: Static assignment of threads to shards using a round robin
63+
// distribution. This minimizes the number of threads sharing the same shard,
64+
// but threads sharing the same shard will cause cache line ping pong when
65+
// the threads are running on different cores (likely)
66+
static int thread_shard_next;
67+
static std::mutex thread_shard_mtx;
68+
static thread_local size_t thread_shard_index = MAX_SHARDS;
69+
70+
if (thread_shard_index == MAX_SHARDS) {
71+
// Thread has not been assigned to a shard yet
72+
std::lock_guard<std::mutex> lck (thread_shard_mtx);
73+
thread_shard_index = thread_shard_next++ & ((1 << num_shard_bits) - 1);
74+
}
75+
return thread_shard_index;
76+
#endif
77+
}
78+
3079
mempool::pool_t& mempool::get_pool(mempool::pool_index_t ix)
3180
{
3281
// We rely on this array being initialized before any invocation of
3382
// this function, even if it is called by ctors in other compilation
3483
// units that are being initialized before this compilation unit.
3584
static mempool::pool_t table[num_pools];
85+
table[ix].pool_index = ix;
3686
return table[ix];
3787
}
3888

@@ -73,8 +123,8 @@ void mempool::set_debug_mode(bool d)
73123
size_t mempool::pool_t::allocated_bytes() const
74124
{
75125
ssize_t result = 0;
76-
for (size_t i = 0; i < num_shards; ++i) {
77-
result += shard[i].bytes;
126+
for (size_t i = 0; i < get_num_shards(); ++i) {
127+
result += shards[i].pool[pool_index].bytes;
78128
}
79129
if (result < 0) {
80130
// we raced with some unbalanced allocations/deallocations
@@ -86,8 +136,8 @@ size_t mempool::pool_t::allocated_bytes() const
86136
size_t mempool::pool_t::allocated_items() const
87137
{
88138
ssize_t result = 0;
89-
for (size_t i = 0; i < num_shards; ++i) {
90-
result += shard[i].items;
139+
for (size_t i = 0; i < get_num_shards(); ++i) {
140+
result += shards[i].pool[pool_index].items;
91141
}
92142
if (result < 0) {
93143
// we raced with some unbalanced allocations/deallocations
@@ -98,47 +148,31 @@ size_t mempool::pool_t::allocated_items() const
98148

99149
void mempool::pool_t::adjust_count(ssize_t items, ssize_t bytes)
100150
{
101-
#if defined(_GNU_SOURCE) && defined(WITH_SEASTAR)
102-
// the expected path: we alway pick the shard for a cpu core
103-
// a thread is executing on.
104-
const size_t shard_index = pick_a_shard_int();
105-
#else
106-
// fallback for lack of sched_getcpu()
107-
const size_t shard_index = []() {
108-
if (thread_shard_index == num_shards) {
109-
thread_shard_index = pick_a_shard_int();
110-
}
111-
return thread_shard_index;
112-
}();
113-
#endif
114-
shard[shard_index].items += items;
115-
shard[shard_index].bytes += bytes;
151+
const auto shid = pick_a_shard_int();
152+
auto& shard = shards[shid].pool[pool_index];
153+
shard.items += items;
154+
shard.bytes += bytes;
116155
}
117156

118157
void mempool::pool_t::get_stats(
119158
stats_t *total,
120159
std::map<std::string, stats_t> *by_type) const
121160
{
122-
for (size_t i = 0; i < num_shards; ++i) {
123-
total->items += shard[i].items;
124-
total->bytes += shard[i].bytes;
161+
for (size_t i = 0; i < get_num_shards(); ++i) {
162+
total->items += shards[i].pool[pool_index].items;
163+
total->bytes += shards[i].pool[pool_index].bytes;
125164
}
126165
if (debug_mode) {
127166
std::lock_guard shard_lock(lock);
128167
for (auto &p : type_map) {
129168
std::string n = ceph_demangle(p.second.type_name);
130169
stats_t &s = (*by_type)[n];
131-
#ifdef WITH_SEASTAR
132170
s.bytes = 0;
133171
s.items = 0;
134-
for (size_t i = 0 ; i < num_shards; ++i) {
172+
for (size_t i = 0 ; i < get_num_shards(); ++i) {
135173
s.bytes += p.second.shards[i].items * p.second.item_size;
136174
s.items += p.second.shards[i].items;
137175
}
138-
#else
139-
s.bytes = p.second.items * p.second.item_size;
140-
s.items = p.second.items;
141-
#endif
142176
}
143177
}
144178
}

src/include/mempool.h

Lines changed: 39 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,12 @@
2626
#include <boost/container/flat_set.hpp>
2727
#include <boost/container/flat_map.hpp>
2828

29-
#if defined(_GNU_SOURCE) && defined(WITH_SEASTAR)
29+
#if defined(_GNU_SOURCE)
30+
# define MEMPOOL_SCHED_GETCPU
3031
# include <sched.h>
3132
#endif
3233

3334
#include "common/Formatter.h"
34-
#include "common/ceph_atomic.h"
3535
#include "include/ceph_assert.h"
3636
#include "include/compact_map.h"
3737
#include "include/compact_set.h"
@@ -196,54 +196,23 @@ extern void set_debug_mode(bool d);
196196
// --------------------------------------------------------------
197197
class pool_t;
198198

199-
// we shard pool stats across many shard_t's to reduce the amount
200-
// of cacheline ping pong.
201199
enum {
202-
num_shard_bits = 5
203-
};
204-
enum {
205-
num_shards = 1 << num_shard_bits
206-
};
207-
208-
static size_t pick_a_shard_int() {
209-
#if defined(_GNU_SOURCE) && defined(WITH_SEASTAR)
210-
// a thread local storage is actually just an approximation;
211-
// what we truly want is a _cpu local storage_.
212-
//
213-
// on the architectures we care about sched_getcpu() is
214-
// a syscall-handled-in-userspace (vdso!). it grabs the cpu
215-
// id kernel exposes to a task on context switch.
216-
return sched_getcpu() & ((1 << num_shard_bits) - 1);
200+
#if defined(MEMPOOL_SCHED_GETCPU)
201+
MIN_SHARDS = 1, //1
217202
#else
218-
// Dirt cheap, see:
219-
// https://fossies.org/dox/glibc-2.32/pthread__self_8c_source.html
220-
size_t me = (size_t)pthread_self();
221-
size_t i = (me >> CEPH_PAGE_SHIFT) & ((1 << num_shard_bits) - 1);
222-
return i;
203+
MIN_SHARDS = 1<<5, //32
223204
#endif
224-
}
225-
226-
//
227-
// Align shard to a cacheline.
228-
//
229-
// It would be possible to retrieve the value at runtime (for instance
230-
// with getconf LEVEL1_DCACHE_LINESIZE or grep -m1 cache_alignment
231-
// /proc/cpuinfo). It is easier to hard code the largest cache
232-
// linesize for all known processors (128 bytes). If the actual cache
233-
// linesize is smaller on a given processor, it will just waste a few
234-
// bytes.
235-
//
236-
struct shard_t {
237-
ceph::atomic<size_t> bytes = {0};
238-
ceph::atomic<size_t> items = {0};
239-
char __padding[128 - sizeof(ceph::atomic<size_t>)*2];
240-
} __attribute__ ((aligned (128)));
205+
DEFAULT_SHARDS = 1<<5, //32
206+
MAX_SHARDS = 1<<7 //128
207+
};
241208

242-
static_assert(sizeof(shard_t) == 128, "shard_t should be cacheline-sized");
209+
int pick_a_shard_int(void);
210+
size_t get_num_shards(void);
243211

244212
struct stats_t {
245-
ssize_t items = 0;
246-
ssize_t bytes = 0;
213+
std::atomic<size_t> items = {0};
214+
std::atomic<size_t> bytes = {0};
215+
247216
void dump(ceph::Formatter *f) const {
248217
f->dump_int("items", items);
249218
f->dump_int("bytes", bytes);
@@ -256,24 +225,36 @@ struct stats_t {
256225
}
257226
};
258227

228+
// Align shard to a cacheline, group stats for all mempools in the
229+
// same shard to improve cache line density.
230+
//
231+
// It would be possible to retrieve the value at runtime (for instance
232+
// with getconf LEVEL1_DCACHE_LINESIZE or grep -m1 cache_alignment
233+
// /proc/cpuinfo). It is easier to hard code the largest cache
234+
// linesize for all known processors (128 bytes). If the actual cache
235+
// linesize is smaller on a given processor, it will just waste a few
236+
// bytes.
237+
//
238+
struct shard_t {
239+
stats_t pool[num_pools];
240+
} __attribute__ ((aligned (128)));
241+
static_assert(sizeof(shard_t)%128 == 0, "shard_t should be cacheline-sized");
242+
243+
extern std::unique_ptr<shard_t[]> shards;
244+
259245
pool_t& get_pool(pool_index_t ix);
260246
const char *get_pool_name(pool_index_t ix);
261247

262248
struct type_t {
263249
const char *type_name;
264250
size_t item_size;
265-
#ifdef WITH_SEASTAR
266251
struct type_shard_t {
267-
ceph::atomic<ssize_t> items = {0}; // signed
268-
char __padding[128 - sizeof(ceph::atomic<ssize_t>)];
252+
std::atomic<ssize_t> items = {0}; // signed
253+
char __padding[128 - sizeof(std::atomic<ssize_t>)];
269254
} __attribute__ ((aligned (128)));
270255
static_assert(sizeof(type_shard_t) == 128,
271256
"type_shard_t should be cacheline-sized");
272-
type_shard_t shards[num_shards];
273-
#else
274-
// XXX: consider dropping this case for classic with perf tests
275-
ceph::atomic<ssize_t> items = {0}; // signed
276-
#endif
257+
std::unique_ptr<type_shard_t[]> shards = std::make_unique<type_shard_t[]>(get_num_shards());
277258
};
278259

279260
struct type_info_hash {
@@ -283,14 +264,14 @@ struct type_info_hash {
283264
};
284265

285266
class pool_t {
286-
shard_t shard[num_shards];
287-
288267
mutable std::mutex lock; // only used for types list
289268
std::unordered_map<const char *, type_t> type_map;
290269

291270
template<pool_index_t, typename T>
292271
friend class pool_allocator;
293272
public:
273+
pool_index_t pool_index;
274+
294275
//
295276
// How much this pool consumes. O(<num_shards>)
296277
//
@@ -362,15 +343,11 @@ class pool_allocator {
362343
T* allocate(size_t n, void *p = nullptr) {
363344
size_t total = sizeof(T) * n;
364345
const auto shid = pick_a_shard_int();
365-
auto& shard = pool->shard[shid];
346+
auto& shard = shards[shid].pool[pool->pool_index];
366347
shard.bytes += total;
367348
shard.items += n;
368349
if (type) {
369-
#ifdef WITH_SEASTAR
370350
type->shards[shid].items += n;
371-
#else
372-
type->items += n;
373-
#endif
374351
}
375352
T* r = reinterpret_cast<T*>(new char[total]);
376353
return r;
@@ -379,31 +356,23 @@ class pool_allocator {
379356
void deallocate(T* p, size_t n) {
380357
size_t total = sizeof(T) * n;
381358
const auto shid = pick_a_shard_int();
382-
auto& shard = pool->shard[shid];
359+
auto& shard = shards[shid].pool[pool->pool_index];
383360
shard.bytes -= total;
384361
shard.items -= n;
385362
if (type) {
386-
#ifdef WITH_SEASTAR
387363
type->shards[shid].items -= n;
388-
#else
389-
type->items -= n;
390-
#endif
391364
}
392365
delete[] reinterpret_cast<char*>(p);
393366
}
394367

395368
T* allocate_aligned(size_t n, size_t align, void *p = nullptr) {
396369
size_t total = sizeof(T) * n;
397370
const auto shid = pick_a_shard_int();
398-
auto& shard = pool->shard[shid];
371+
auto& shard = shards[shid].pool[pool->pool_index];
399372
shard.bytes += total;
400373
shard.items += n;
401374
if (type) {
402-
#ifdef WITH_SEASTAR
403375
type->shards[shid].items += n;
404-
#else
405-
type->items += n;
406-
#endif
407376
}
408377
char *ptr;
409378
int rc = ::posix_memalign((void**)(void*)&ptr, align, total);
@@ -416,15 +385,11 @@ class pool_allocator {
416385
void deallocate_aligned(T* p, size_t n) {
417386
size_t total = sizeof(T) * n;
418387
const auto shid = pick_a_shard_int();
419-
auto& shard = pool->shard[shid];
388+
auto& shard = shards[shid].pool[pool->pool_index];
420389
shard.bytes -= total;
421390
shard.items -= n;
422391
if (type) {
423-
#ifdef WITH_SEASTAR
424392
type->shards[shid].items -= n;
425-
#else
426-
type->items -= n;
427-
#endif
428393
}
429394
aligned_free(p);
430395
}

src/test/objectstore/test_bluefs_ex.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@ class BlueFS_ex : virtual public ::testing::Test {
133133
conf.ApplyChanges();
134134

135135
auto stop_at_fixed_point = [&](uint32_t i) -> void {
136-
if (i == stop_point) exit(107);
136+
if (i == stop_point) _exit(107);
137137
};
138138
BlueFS fs(g_ceph_context);
139139
fs.tracepoint_async_compact = stop_at_fixed_point;

0 commit comments

Comments
 (0)