Skip to content

Commit d6b4454

Browse files
frankistcodebot
authored andcommitted
adt: implementation of new concurrent byte buffer pool that groups memory blocks into batches for quicker rebalancing
1 parent 50dffd3 commit d6b4454

File tree

1 file changed

+75
-46
lines changed

1 file changed

+75
-46
lines changed

include/srsran/support/memory_pool/fixed_size_memory_block_pool.h

Lines changed: 75 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
#pragma once
1212

1313
#include "memory_block_list.h"
14+
#include "srsran/adt/static_vector.h"
1415
#include "srsran/support/error_handling.h"
1516
#include "srsran/support/srsran_assert.h"
1617
#include <mutex>
@@ -21,7 +22,7 @@ namespace srsran {
2122

2223
/**
2324
* Concurrent memory pool of memory blocks of equal size. This pool is thread-safe.
24-
* Each worker keeps a separate thread-local memory block cache that it uses for fast, uncontended allocation and
25+
* Each worker keeps a separate thread-local memory block cache that it uses for fast, non-contended allocation and
2526
* deallocation. When accessing a thread-local cache, no locks are required.
2627
*
2728
* When the local cache gets depleted, the worker tries to obtain a batch of segments from a central memory block cache.
@@ -51,32 +52,43 @@ class fixed_size_memory_block_pool
5152
{
5253
using pool_type = fixed_size_memory_block_pool<IdTag, DebugSanitizeAddress>;
5354

54-
/// The number of blocks a worker tries to steal from the central memory block cache in a single batch.
55-
constexpr static size_t max_local_cache1_size = 16;
55+
/// The number of blocks in batch that the worker can steal from the central cache.
56+
constexpr static size_t block_batch_size = 32U;
57+
58+
/// The number of batches of blocks that a worker can store in its own thread for non-contended access.
59+
constexpr static size_t MAX_LOCAL_BATCH_CAPACITY = 64U;
60+
61+
using local_cache_type = static_vector<free_memory_block_list, MAX_LOCAL_BATCH_CAPACITY>;
5662

5763
/// Ctor of the memory pool. It is set as private because the class works as a singleton.
5864
explicit fixed_size_memory_block_pool(size_t nof_blocks_, size_t memory_block_size_) :
59-
mblock_size(align_next(memory_block_size_, alignof(std::max_align_t))), nof_blocks(nof_blocks_)
65+
mblock_size(align_next(memory_block_size_, alignof(std::max_align_t))),
66+
nof_blocks(nof_blocks_),
67+
max_local_batches(
68+
std::max(std::min((size_t)MAX_LOCAL_BATCH_CAPACITY, static_cast<size_t>(nof_blocks / block_batch_size / 32U)),
69+
static_cast<size_t>(1U)))
6070
{
61-
srsran_assert(nof_blocks > max_local_cache1_size,
62-
"The number of segments in the pool must be larger than the thread cache size ({} <= {})",
71+
srsran_assert(nof_blocks > max_local_cache_size(),
72+
"The number of segments in the pool must be much larger than the thread cache size ({} <= {})",
6373
nof_blocks,
64-
(size_t)max_local_cache1_size);
74+
max_local_cache_size());
6575
srsran_assert(mblock_size > free_memory_block_list::min_memory_block_align(),
6676
"Segment size is too small ({} <= {})",
6777
mblock_size,
6878
free_memory_block_list::min_memory_block_align());
6979

7080
// Allocate the required memory for the given number of segments and segment size.
71-
size_t total_mem = mblock_size * nof_blocks;
81+
const size_t total_mem = mblock_size * nof_blocks;
7282
allocated_memory.resize(total_mem);
7383

7484
// Push all segments to the central cache.
85+
const size_t nof_batches = ceil(nof_blocks / (double)block_batch_size);
86+
central_mem_cache.resize(nof_batches);
7587
for (unsigned i = 0; i != nof_blocks; ++i) {
76-
central_mem_cache.push(static_cast<void*>(allocated_memory.data() + (mblock_size * i)));
88+
const unsigned batch_idx = i / block_batch_size;
89+
const unsigned offset = i * mblock_size;
90+
central_mem_cache[batch_idx].push(allocated_memory.data() + offset);
7791
}
78-
79-
max_cache2_size = max_local_cache_size() - max_local_cache1_size;
8092
}
8193

8294
public:
@@ -87,7 +99,7 @@ class fixed_size_memory_block_pool
8799

88100
~fixed_size_memory_block_pool()
89101
{
90-
std::lock_guard<std::mutex> lock(mutex);
102+
std::lock_guard<std::mutex> lock(central_cache_mutex);
91103
allocated_memory.clear();
92104
}
93105

@@ -106,10 +118,7 @@ class fixed_size_memory_block_pool
106118
size_t nof_memory_blocks() const { return nof_blocks; }
107119

108120
/// Maximum number of blocks that can be stored in the thread-local memory block cache.
109-
size_t max_local_cache_size() const
110-
{
111-
return max_local_cache1_size + std::max((size_t)max_local_cache1_size, nof_memory_blocks() / 32U);
112-
}
121+
size_t max_local_cache_size() const { return max_local_batches * block_batch_size; }
113122

114123
/// Allocate a node from the memory pool with the maximum size.
115124
void* allocate_node() noexcept { return allocate_node(memory_block_size()); }
@@ -120,16 +129,22 @@ class fixed_size_memory_block_pool
120129
srsran_assert(sz <= mblock_size, "Allocated node size={} exceeds max object size={}", sz, mblock_size);
121130
worker_ctxt* w_ctx = get_worker_cache();
122131

123-
// Attempt memory block pop from cache 1.
124-
void* node = w_ctx->local_cache1.try_pop();
125-
if (node == nullptr) {
126-
// Cache 1 is empty. Attempt memory block pop from cache 2.
127-
node = w_ctx->local_cache2.try_pop();
132+
// Attempt memory block pop from local cache.
133+
void* node = nullptr;
134+
while (not w_ctx->local_cache.empty()) {
135+
node = w_ctx->local_cache.back().try_pop();
136+
if (node != nullptr) {
137+
return node;
138+
}
139+
w_ctx->local_cache.pop_back();
128140
}
129-
if (node == nullptr) {
130-
// Local caches are depleted. Pop a batch of memory blocks from central cache.
131-
w_ctx->local_cache1 = central_mem_cache.try_pop_list(max_local_cache1_size + 1);
132-
node = w_ctx->local_cache1.try_pop();
141+
142+
// Local cache is empty. Attempt memory block pop from central cache.
143+
std::lock_guard<std::mutex> lock(central_cache_mutex);
144+
if (not central_mem_cache.empty()) {
145+
w_ctx->local_cache.push_back(central_mem_cache.back());
146+
central_mem_cache.pop_back();
147+
node = w_ctx->local_cache.back().try_pop();
133148
}
134149

135150
return node;
@@ -144,7 +159,7 @@ class fixed_size_memory_block_pool
144159

145160
if (DebugSanitizeAddress) {
146161
// For debug purposes.
147-
std::lock_guard<std::mutex> lock(mutex);
162+
std::lock_guard<std::mutex> lock(debug_mutex);
148163
bool found = false;
149164
for (unsigned i = 0; i != nof_blocks; ++i) {
150165
if (allocated_memory.data() + i * mblock_size == static_cast<uint8_t*>(p)) {
@@ -156,40 +171,53 @@ class fixed_size_memory_block_pool
156171
}
157172
}
158173

159-
// push to local memory block cache.
160-
if (w_ctx->local_cache1.size() < max_local_cache1_size) {
161-
w_ctx->local_cache1.push(p);
162-
} else {
163-
w_ctx->local_cache2.push(p);
174+
// Verify if new batch needs to be created in local cache.
175+
if (w_ctx->local_cache.empty() or w_ctx->local_cache.back().size() >= block_batch_size) {
176+
w_ctx->local_cache.emplace_back();
177+
}
178+
179+
// Push block to local cache.
180+
w_ctx->local_cache.back().push(p);
164181

165-
if (w_ctx->local_cache2.size() >= max_cache2_size) {
166-
// if local cache 2 reached max capacity, send all its blocks back to central cache
167-
central_mem_cache.steal_blocks(w_ctx->local_cache2);
182+
if (w_ctx->local_cache.size() >= max_local_batches and w_ctx->local_cache.back().size() >= block_batch_size) {
183+
// Local cache is full. Rebalance by sending batches of blocks to central cache.
184+
// We leave one batch in the local cache.
185+
std::lock_guard<std::mutex> lock(central_cache_mutex);
186+
for (unsigned i = 0; i != max_local_batches - 1; ++i) {
187+
central_mem_cache.push_back(w_ctx->local_cache.back());
188+
w_ctx->local_cache.pop_back();
168189
}
169190
}
170191
}
171192

172193
void print_all_buffers()
173194
{
174-
auto* worker = get_worker_cache();
195+
auto* worker = get_worker_cache();
196+
unsigned count = 0;
197+
for (const auto& l : worker->local_cache) {
198+
count += l.size();
199+
}
200+
175201
fmt::print("There are {}/{} buffers in central memory block cache. This thread contains {} in its local cache.\n",
176202
central_mem_cache.size(),
177203
nof_memory_blocks(),
178-
worker->local_cache1.size() + worker->local_cache2.size());
204+
count);
179205
}
180206

181207
private:
182208
struct worker_ctxt {
183-
std::thread::id id;
184-
free_memory_block_list local_cache1;
185-
free_memory_block_list local_cache2;
209+
std::thread::id id;
210+
local_cache_type local_cache;
186211

187212
worker_ctxt() : id(std::this_thread::get_id()) {}
188213
~worker_ctxt()
189214
{
190-
concurrent_free_memory_block_list& central_cache = pool_type::get_instance().central_mem_cache;
191-
central_cache.steal_blocks(local_cache1);
192-
central_cache.steal_blocks(local_cache2);
215+
pool_type& pool = pool_type::get_instance();
216+
std::lock_guard<std::mutex> lock(pool.central_cache_mutex);
217+
while (not local_cache.empty()) {
218+
pool.central_mem_cache.push_back(local_cache.back());
219+
local_cache.pop_back();
220+
}
193221
}
194222
};
195223

@@ -201,12 +229,13 @@ class fixed_size_memory_block_pool
201229

202230
const size_t mblock_size;
203231
const size_t nof_blocks;
232+
const size_t max_local_batches;
204233

205-
size_t max_cache2_size = 0;
234+
std::mutex central_cache_mutex;
235+
std::vector<free_memory_block_list> central_mem_cache;
206236

207-
concurrent_free_memory_block_list central_mem_cache;
208-
std::mutex mutex;
209-
std::vector<uint8_t> allocated_memory;
237+
std::mutex debug_mutex;
238+
std::vector<uint8_t> allocated_memory;
210239
};
211240

212241
} // namespace srsran

0 commit comments

Comments
 (0)