Skip to content

Commit 7735a56

Browse files
committed
support: use producer and consumer tokens in mpmc queue of byte buffer pool
1 parent 7391926 commit 7735a56

File tree

1 file changed

+36
-12
lines changed

1 file changed

+36
-12
lines changed

include/srsran/support/memory_pool/fixed_size_memory_block_pool.h

Lines changed: 36 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -60,19 +60,30 @@ class fixed_size_memory_block_pool
6060
/// The number of batches of blocks that a worker can store in its own thread for non-contended access.
6161
constexpr static size_t MAX_LOCAL_BATCH_CAPACITY = 64U;
6262

63-
using local_cache_type = static_vector<free_memory_block_list, MAX_LOCAL_BATCH_CAPACITY>;
63+
/// A batch of memory blocks that is exchanged in bulk between the central and local caches.
64+
using memory_block_batch = free_memory_block_list;
65+
66+
/// Thread-local cache that stores a list of batches of memory blocks.
67+
using local_cache_type = static_vector<memory_block_batch, MAX_LOCAL_BATCH_CAPACITY>;
6468

6569
/// Ctor of the memory pool. It is set as private because the class works as a singleton.
6670
explicit fixed_size_memory_block_pool(size_t nof_blocks_, size_t memory_block_size_) :
71+
// Make sure that there are no gaps between blocks when they are allocated as paret of a single array.
6772
mblock_size(align_next(memory_block_size_, alignof(std::max_align_t))),
68-
nof_blocks(nof_blocks_),
73+
// Make sure all batches are filled with block_batch_size blocks.
74+
nof_blocks(ceil(nof_blocks_ / (double)block_batch_size) * block_batch_size),
75+
// Calculate the maximum number of batches that can be stored in the local cache.
6976
max_local_batches(
7077
std::max(std::min((size_t)MAX_LOCAL_BATCH_CAPACITY, static_cast<size_t>(nof_blocks / block_batch_size / 32U)),
7178
static_cast<size_t>(2U))),
7279
// Allocate the required memory for the given number of segments and segment size.
7380
allocated_memory(mblock_size * nof_blocks),
74-
// Push all segments to the central cache.
75-
central_mem_cache(ceil(nof_blocks / (double)block_batch_size))
81+
// Pre-reserve space in the central cache to hold all batches and avoid reallocations.
82+
// Given that we use the MPMC queue in https://github.com/cameron314/concurrentqueue, we have to over-dimension it
83+
// to account the potential number of producers. The way to exactly over-dimension this queue is inconvenient, so
84+
// we just try to conservatively ensure it can accommodate up to 32 producers for a block size of 32. If this is
85+
// not enough, the queue will resize itself and malloc in the process.
86+
central_mem_cache(nof_total_batches() + 2 * 32 * 32)
7687
{
7788
srsran_assert(nof_blocks > max_local_cache_size(),
7889
"The number of segments in the pool must be much larger than the thread cache size ({} <= {})",
@@ -83,7 +94,8 @@ class fixed_size_memory_block_pool
8394
mblock_size,
8495
free_memory_block_list::min_memory_block_align());
8596

86-
const unsigned nof_batches = ceil(nof_blocks / (double)block_batch_size);
97+
// Push all memory blocks to the central cache in batches.
98+
const unsigned nof_batches = nof_total_batches();
8799
for (unsigned i = 0; i != nof_batches; ++i) {
88100
free_memory_block_list batch;
89101
for (unsigned j = 0; j != block_batch_size; ++j) {
@@ -139,7 +151,7 @@ class fixed_size_memory_block_pool
139151

140152
// Local cache is empty. Attempt memory block pop from central cache.
141153
free_memory_block_list batch;
142-
if (central_mem_cache.try_dequeue(batch)) {
154+
if (central_mem_cache.try_dequeue(w_ctx->consumer_token, batch)) {
143155
w_ctx->local_cache.push_back(batch);
144156
node = w_ctx->local_cache.back().try_pop();
145157
}
@@ -180,7 +192,7 @@ class fixed_size_memory_block_pool
180192
// Local cache is full. Rebalance by sending batches of blocks to central cache.
181193
// We leave one batch in the local cache.
182194
for (unsigned i = 0; i != max_local_batches - 1; ++i) {
183-
report_fatal_error_if_not(central_mem_cache.enqueue(w_ctx->local_cache.back()),
195+
report_fatal_error_if_not(central_mem_cache.enqueue(w_ctx->producer_token, w_ctx->local_cache.back()),
184196
"Failed to push allocated batch back to central cache");
185197
w_ctx->local_cache.pop_back();
186198
}
@@ -203,10 +215,19 @@ class fixed_size_memory_block_pool
203215

204216
private:
205217
struct worker_ctxt {
206-
std::thread::id id;
218+
/// Thread ID of the worker.
219+
std::thread::id id;
220+
/// Thread-local cache of memory blocks.
207221
local_cache_type local_cache;
222+
/// Producer Token for fast enqueueing to the central cache.
223+
moodycamel::ProducerToken producer_token;
224+
/// Consumer Token for fast dequeueing to the central cache.
225+
moodycamel::ConsumerToken consumer_token;
208226

209-
worker_ctxt() : id(std::this_thread::get_id()) {}
227+
worker_ctxt(fixed_size_memory_block_pool& parent) :
228+
id(std::this_thread::get_id()), producer_token(parent.central_mem_cache), consumer_token(parent.central_mem_cache)
229+
{
230+
}
210231
~worker_ctxt()
211232
{
212233
pool_type& pool = pool_type::get_instance();
@@ -219,7 +240,7 @@ class fixed_size_memory_block_pool
219240
pool.incomplete_batch.push(local_cache.back().try_pop());
220241
if (pool.incomplete_batch.size() >= block_batch_size) {
221242
// The incomplete batch is now complete and can be pushed to the central cache.
222-
report_error_if_not(pool.central_mem_cache.enqueue(pool.incomplete_batch),
243+
report_error_if_not(pool.central_mem_cache.enqueue(producer_token, pool.incomplete_batch),
223244
"Failed to push blocks to central cache");
224245
pool.incomplete_batch.clear();
225246
}
@@ -228,7 +249,7 @@ class fixed_size_memory_block_pool
228249
local_cache.pop_back();
229250
continue;
230251
}
231-
report_error_if_not(pool.central_mem_cache.enqueue(local_cache.back()),
252+
report_error_if_not(pool.central_mem_cache.enqueue(producer_token, local_cache.back()),
232253
"Failed to push blocks back to central cache");
233254
local_cache.pop_back();
234255
}
@@ -237,10 +258,13 @@ class fixed_size_memory_block_pool
237258

238259
worker_ctxt* get_worker_cache()
239260
{
240-
thread_local worker_ctxt worker_cache;
261+
thread_local worker_ctxt worker_cache{*this};
241262
return &worker_cache;
242263
}
243264

265+
/// Number of batches of memory blocks stored in the pool.
266+
size_t nof_total_batches() const { return (nof_blocks + block_batch_size - 1) / block_batch_size; }
267+
244268
const size_t mblock_size;
245269
const size_t nof_blocks;
246270
const size_t max_local_batches;

0 commit comments

Comments
 (0)