Skip to content

Commit 3e40e3f

Browse files
authored
Support dynamic expansion of RDMA block pool (#3155)
1 parent d2ea819 commit 3e40e3f

File tree

3 files changed

+81
-75
lines changed

3 files changed

+81
-75
lines changed

src/brpc/rdma/block_pool.cpp

Lines changed: 78 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,9 @@
2525
#include "butil/iobuf.h"
2626
#include "butil/object_pool.h"
2727
#include "butil/thread_local.h"
28-
#include "bthread/bthread.h"
28+
#include "butil/memory/scope_guard.h"
2929
#include "brpc/rdma/block_pool.h"
3030

31-
3231
namespace brpc {
3332
namespace rdma {
3433

@@ -98,6 +97,8 @@ struct GlobalInfo {
9897
std::vector<size_t> idle_size[BLOCK_SIZE_COUNT];
9998
int region_num[BLOCK_SIZE_COUNT];
10099
butil::Mutex extend_lock;
100+
std::vector<IdleNode*> expansion_list[BLOCK_SIZE_COUNT];
101+
std::vector<size_t> expansion_size[BLOCK_SIZE_COUNT];
101102
};
102103
static GlobalInfo* g_info = NULL;
103104

@@ -129,36 +130,20 @@ uint32_t GetRegionId(const void* buf) {
129130
return r->id;
130131
}
131132

132-
// When both rdma_memory_pool_max_regions and rdma_memory_pool_buckets are
133-
// greater than 1, dynamic memory expansion may cause concurrent modification
134-
// issues in the memory linked list due to lock contention problems. To address
135-
// this, we increase the region_num count for each block_type. Dynamic memory
136-
// expansion is only permitted when both of the following conditions are met:
137-
// rdma_memory_pool_buckets equals 1
138-
// g_info->region_num[block_type] is less than 1
139-
static bool CanExtendBlockRuntime(int block_type) {
140-
return FLAGS_rdma_memory_pool_buckets == 1 ||
141-
g_info->region_num[block_type] < 1;
142-
}
143-
144-
static void* ExtendBlockPoolImpl(void* region_base, size_t region_size,
145-
int block_type) {
146-
if (CanExtendBlockRuntime(block_type) == false) {
147-
LOG(INFO) << "Runtime extend memory only support one bucket or region "
148-
"num is zero for per block_type";
133+
static void* ExtendBlockPoolImpl(void* region_base, size_t region_size, int block_type) {
134+
auto region_base_guard = butil::MakeScopeGuard([region_base]() {
149135
free(region_base);
150-
errno = ENOMEM;
151-
return NULL;
152-
}
136+
});
137+
153138
if (g_region_num == FLAGS_rdma_memory_pool_max_regions) {
154-
LOG(INFO) << "Memory pool reaches max regions";
155-
free(region_base);
139+
LOG_EVERY_SECOND(ERROR) << "Memory pool reaches max regions";
156140
errno = ENOMEM;
157141
return NULL;
158142
}
143+
159144
uint32_t id = g_cb(region_base, region_size);
160145
if (id == 0) {
161-
free(region_base);
146+
errno = EINVAL;
162147
return NULL;
163148
}
164149

@@ -170,7 +155,7 @@ static void* ExtendBlockPoolImpl(void* region_base, size_t region_size,
170155
for (size_t j = 0; j < i; ++j) {
171156
butil::return_object<IdleNode>(node[j]);
172157
}
173-
free(region_base);
158+
errno = ENOMEM;
174159
return NULL;
175160
}
176161
}
@@ -184,12 +169,15 @@ static void* ExtendBlockPoolImpl(void* region_base, size_t region_size,
184169
for (size_t i = 0; i < g_buckets; ++i) {
185170
node[i]->start = (void*)(region->start + i * (region_size / g_buckets));
186171
node[i]->len = region_size / g_buckets;
187-
node[i]->next = g_info->idle_list[block_type][i];
188-
g_info->idle_list[block_type][i] = node[i];
189-
g_info->idle_size[block_type][i] += node[i]->len;
172+
node[i]->next = g_info->expansion_list[block_type][i];
173+
g_info->expansion_list[block_type][i] = node[i];
174+
g_info->expansion_size[block_type][i] += node[i]->len;
190175
}
191176
g_info->region_num[block_type]++;
192177

178+
// `region_base' is inuse, cannot be freed.
179+
region_base_guard.dismiss();
180+
193181
return region_base;
194182
}
195183

@@ -203,7 +191,7 @@ static void* ExtendBlockPool(size_t region_size, int block_type) {
203191
if (FLAGS_rdma_memory_pool_user_specified_memory) {
204192
LOG_EVERY_SECOND(ERROR) << "Fail to extend new region, "
205193
"rdma_memory_pool_user_specified_memory is "
206-
"true, ExtendBlockPool is disabled";
194+
"true, ExtendBlockPool is disabled";
207195
return NULL;
208196
}
209197

@@ -222,24 +210,27 @@ static void* ExtendBlockPool(size_t region_size, int block_type) {
222210
return ExtendBlockPoolImpl(region_base, region_size, block_type);
223211
}
224212

225-
void* ExtendBlockPoolByUser(void* region_base, size_t region_size,
226-
int block_type) {
227-
if (FLAGS_rdma_memory_pool_user_specified_memory == false) {
213+
void* ExtendBlockPoolByUser(void* region_base, size_t region_size, int block_type) {
214+
auto region_base_guard = butil::MakeScopeGuard([region_base]() {
215+
free(region_base);
216+
});
217+
218+
if (!FLAGS_rdma_memory_pool_user_specified_memory) {
228219
LOG_EVERY_SECOND(ERROR) << "User extend memory is disabled";
229220
return NULL;
230221
}
231222
if (reinterpret_cast<uintptr_t>(region_base) % 4096 != 0) {
232223
LOG_EVERY_SECOND(ERROR) << "region_base must be 4096 aligned";
224+
errno = EINVAL;
233225
return NULL;
234226
}
235227

236-
uint64_t index = butil::fast_rand() % g_buckets;
237-
BAIDU_SCOPED_LOCK(*g_info->lock[block_type][index]);
238-
BAIDU_SCOPED_LOCK(g_info->extend_lock);
239228
region_size =
240229
region_size * BYTES_IN_MB / g_block_size[block_type] / g_buckets;
241230
region_size *= g_block_size[block_type] * g_buckets;
242231

232+
region_base_guard.dismiss();
233+
BAIDU_SCOPED_LOCK(g_info->extend_lock);
243234
return ExtendBlockPoolImpl(region_base, region_size, block_type);
244235
}
245236

@@ -316,6 +307,14 @@ bool InitBlockPool(RegisterCallback cb) {
316307
return false;
317308
}
318309
}
310+
g_info->expansion_list[i].resize(g_buckets, NULL);
311+
if (g_info->expansion_list[i].size() != g_buckets) {
312+
return false;
313+
}
314+
g_info->expansion_size[i].resize(g_buckets, 0);
315+
if (g_info->expansion_size[i].size() != g_buckets) {
316+
return false;
317+
}
319318
}
320319

321320
g_dump_mutex = new butil::Mutex;
@@ -332,66 +331,74 @@ bool InitBlockPool(RegisterCallback cb) {
332331
return false;
333332
}
334333

334+
static void MoveExpansionList2EmptyIdleList(int block_type, size_t index) {
335+
CHECK(NULL == g_info->idle_list[block_type][index]);
336+
337+
g_info->idle_list[block_type][index] = g_info->expansion_list[block_type][index];
338+
g_info->idle_size[block_type][index] += g_info->expansion_size[block_type][index];
339+
g_info->expansion_list[block_type][index] = NULL;
340+
g_info->expansion_size[block_type][index] = 0;
341+
}
342+
335343
static void* AllocBlockFrom(int block_type) {
336344
bool locked = false;
337345
if (BAIDU_UNLIKELY(g_dump_enable)) {
338346
g_dump_mutex->lock();
339347
locked = true;
340348
}
349+
BUTIL_SCOPE_EXIT {
350+
if (locked) {
351+
g_dump_mutex->unlock();
352+
}
353+
};
354+
341355
void* ptr = NULL;
342-
if (block_type == 0 && tls_idle_list != NULL){
356+
if (0 == block_type && NULL != tls_idle_list) {
343357
CHECK(tls_idle_num > 0);
344358
IdleNode* n = tls_idle_list;
345359
tls_idle_list = n->next;
346360
ptr = n->start;
347361
butil::return_object<IdleNode>(n);
348362
tls_idle_num--;
349-
if (locked) {
350-
g_dump_mutex->unlock();
351-
}
352363
return ptr;
353364
}
354365

355-
uint64_t index = butil::fast_rand() % g_buckets;
366+
size_t index = butil::fast_rand() % g_buckets;
356367
BAIDU_SCOPED_LOCK(*g_info->lock[block_type][index]);
357368
IdleNode* node = g_info->idle_list[block_type][index];
358-
if (!node) {
369+
if (NULL == node) {
359370
BAIDU_SCOPED_LOCK(g_info->extend_lock);
360371
node = g_info->idle_list[block_type][index];
361-
if (!node) {
362-
// There is no block left, extend a new region
363-
if (!ExtendBlockPool(FLAGS_rdma_memory_pool_increase_size_mb,
364-
block_type)) {
372+
if (NULL == node && NULL != g_info->expansion_list[block_type][index]) {
373+
MoveExpansionList2EmptyIdleList(block_type, index);
374+
node = g_info->idle_list[block_type][index];
375+
}
376+
if (NULL == node) {
377+
// There is no block left, extend a new region.
378+
if (!ExtendBlockPool(FLAGS_rdma_memory_pool_increase_size_mb, block_type)) {
365379
LOG_EVERY_SECOND(ERROR) << "Fail to extend new region. "
366380
<< "You can set the size of memory pool larger. "
367381
<< "Refer to the help message of these flags: "
368382
<< "rdma_memory_pool_initial_size_mb, "
369383
<< "rdma_memory_pool_increase_size_mb, "
370384
<< "rdma_memory_pool_max_regions.";
371-
if (locked) {
372-
g_dump_mutex->unlock();
373-
}
374385
return NULL;
375386
}
387+
MoveExpansionList2EmptyIdleList(block_type, index);
376388
node = g_info->idle_list[block_type][index];
377389
}
378390
}
379-
if (node) {
380-
ptr = node->start;
381-
if (node->len > g_block_size[block_type]) {
382-
node->start = (char*)node->start + g_block_size[block_type];
383-
node->len -= g_block_size[block_type];
384-
} else {
385-
g_info->idle_list[block_type][index] = node->next;
386-
butil::return_object<IdleNode>(node);
387-
}
388-
g_info->idle_size[block_type][index] -= g_block_size[block_type];
391+
CHECK(NULL != node);
392+
393+
ptr = node->start;
394+
if (node->len > g_block_size[block_type]) {
395+
node->start = (char*)node->start + g_block_size[block_type];
396+
node->len -= g_block_size[block_type];
389397
} else {
390-
if (locked) {
391-
g_dump_mutex->unlock();
392-
}
393-
return NULL;
398+
g_info->idle_list[block_type][index] = node->next;
399+
butil::return_object<IdleNode>(node);
394400
}
401+
g_info->idle_size[block_type][index] -= g_block_size[block_type];
395402

396403
// Move more blocks from global list to tls list
397404
if (block_type == 0) {
@@ -417,9 +424,6 @@ static void* AllocBlockFrom(int block_type) {
417424
}
418425
}
419426

420-
if (locked) {
421-
g_dump_mutex->unlock();
422-
}
423427
return ptr;
424428
}
425429

@@ -482,6 +486,12 @@ int DeallocBlock(void* buf) {
482486
g_dump_mutex->lock();
483487
locked = true;
484488
}
489+
BUTIL_SCOPE_EXIT {
490+
if (locked) {
491+
g_dump_mutex->unlock();
492+
}
493+
};
494+
485495
if (block_type == 0 && tls_idle_num < (uint32_t)FLAGS_rdma_memory_pool_tls_cache_num) {
486496
if (!tls_inited) {
487497
tls_inited = true;
@@ -494,9 +504,6 @@ int DeallocBlock(void* buf) {
494504
tls_idle_num++;
495505
node->next = tls_idle_list;
496506
tls_idle_list = node;
497-
if (locked) {
498-
g_dump_mutex->unlock();
499-
}
500507
return 0;
501508
}
502509

@@ -527,9 +534,6 @@ int DeallocBlock(void* buf) {
527534
g_info->idle_list[block_type][index] = node;
528535
g_info->idle_size[block_type][index] += node->len;
529536
}
530-
if (locked) {
531-
g_dump_mutex->unlock();
532-
}
533537
return 0;
534538
}
535539

@@ -557,7 +561,8 @@ void DumpMemoryPoolInfo(std::ostream& os) {
557561
for (int i = 0; i < BLOCK_SIZE_COUNT; ++i) {
558562
os << "\tFor block size " << GetBlockSize(i) << ":\n";
559563
for (size_t j = 0; j < g_buckets; ++j) {
560-
os << "\t\tBucket " << j << ": " << g_info->idle_size[i][j] << "\n";
564+
os << "\t\tBucket " << j << ": {" << g_info->idle_size[i][j]
565+
<< ", " << g_info->expansion_size[i][j] << "}\n";
561566
}
562567
}
563568
os << "Thread Local Cache Info:\n";

src/brpc/rdma/block_pool.h

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -80,8 +80,7 @@ bool InitBlockPool(RegisterCallback cb);
8080
// FLAGS_rdma_memory_pool_user_specified_memory is true, user is responsibility
8181
// of extending memory blocks , this ensuring flexibility for advanced use
8282
// cases.
83-
void* ExtendBlockPoolByUser(void* region_base, size_t region_size,
84-
int block_type);
83+
void* ExtendBlockPoolByUser(void* region_base, size_t region_size, int block_type);
8584

8685
// Allocate a buf with length at least @a size (require: size>0)
8786
// Return the address allocated, NULL if failed and errno is set.

src/butil/memory/scope_guard.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,4 +104,6 @@ operator+(ScopeExitHelper, Callback&& callback) {
104104
auto BRPC_ANONYMOUS_VARIABLE(SCOPE_EXIT) = \
105105
::butil::internal::ScopeExitHelper() + [&]() noexcept
106106

107+
#define BUTIL_SCOPE_EXIT BRPC_SCOPE_EXIT
108+
107109
#endif // BUTIL_SCOPED_GUARD_H

0 commit comments

Comments
 (0)