Skip to content
This repository was archived by the owner on May 9, 2024. It is now read-only.

Commit 12cf4fc

Browse files
committed
Introduce per-thread memory pools for lock-free aloocations in quantile.
Signed-off-by: ienkovich <[email protected]>
1 parent 1a8c101 commit 12cf4fc

File tree

3 files changed

+76
-7
lines changed

3 files changed

+76
-7
lines changed

omniscidb/ResultSet/RowSetMemoryOwner.h

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
#include "Logger/Logger.h"
3232
#include "Shared/approx_quantile.h"
3333
#include "Shared/quantile.h"
34+
#include "Shared/thread_count.h"
3435
#include "StringDictionary/StringDictionaryProxy.h"
3536
#include "ThirdParty/robin_hood.h"
3637

@@ -41,6 +42,19 @@ class ResultSet;
4142
* managed allocator object
4243
*/
4344
class RowSetMemoryOwner final : public SimpleAllocator, boost::noncopyable {
45+
private:
46+
struct ThreadMemPool {
47+
ThreadMemPool() : data(nullptr), size(0) {}
48+
ThreadMemPool(const ThreadMemPool& other) = default;
49+
ThreadMemPool& operator=(const ThreadMemPool& other) = default;
50+
51+
int8_t* data;
52+
size_t size;
53+
};
54+
55+
constexpr static size_t SMALL_MEM_POOL_SIZE = 10 << 20; // 10MB
56+
constexpr static size_t MAX_IGNORED_FRAGMENT = 1 << 20; // 1MB
57+
4458
public:
4559
RowSetMemoryOwner(DataProvider* data_provider,
4660
const size_t arena_block_size,
@@ -52,6 +66,7 @@ class RowSetMemoryOwner final : public SimpleAllocator, boost::noncopyable {
5266
// size up to 256 bytes to avoid such cache conflicts. This allows to significantly
5367
// reduce amount of allocated virtual memory which is important for ASAN runs.
5468
allocator_ = std::make_unique<Arena>(arena_block_size);
69+
small_mem_pools_.resize(cpu_threads());
5570
}
5671

5772
enum class StringTranslationType { SOURCE_INTERSECTION, SOURCE_UNION };
@@ -67,6 +82,35 @@ class RowSetMemoryOwner final : public SimpleAllocator, boost::noncopyable {
6782
allocator_->allocate(std::max(num_bytes, (size_t)256)));
6883
}
6984

85+
int8_t* allocateSmallMtNoLock(size_t size, size_t thread_idx = 0) override {
86+
if (size > SMALL_MEM_POOL_SIZE) {
87+
return allocate(size);
88+
}
89+
90+
// Round-up size to keep 8-byte alignment.
91+
size = (size + 7) & (~7);
92+
93+
// Normally, we use TBB thread index and don't expect it to be greater than
94+
// cpu_threads() but we don't respect g_cpu_threads_override currently for TBB.
95+
if (thread_idx >= small_mem_pools_.size()) {
96+
return allocate(size);
97+
}
98+
99+
auto& pool = small_mem_pools_[thread_idx];
100+
if (size > pool.size) {
101+
if (pool.size > MAX_IGNORED_FRAGMENT) {
102+
return allocate(size);
103+
}
104+
pool.data = allocate(SMALL_MEM_POOL_SIZE);
105+
pool.size = SMALL_MEM_POOL_SIZE;
106+
}
107+
108+
auto res = pool.data;
109+
pool.data += size;
110+
pool.size -= size;
111+
return res;
112+
}
113+
70114
int8_t* allocateCountDistinctBuffer(const size_t num_bytes,
71115
const size_t thread_idx = 0) {
72116
int8_t* buffer = allocate(num_bytes, thread_idx);
@@ -267,6 +311,10 @@ class RowSetMemoryOwner final : public SimpleAllocator, boost::noncopyable {
267311
size_t arena_block_size_; // for cloning
268312
std::unique_ptr<Arena> allocator_;
269313

314+
// Small memory pools that get memory from the base arena and are used
315+
// for lock-free allocation of small memory batches in execution kernels.
316+
std::vector<ThreadMemPool> small_mem_pools_;
317+
270318
mutable std::mutex state_mutex_;
271319

272320
friend class ResultSet;

omniscidb/Shared/SimpleAllocator.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,4 +22,12 @@ class SimpleAllocator {
2222

2323
public:
2424
virtual int8_t* allocate(const size_t num_bytes, const size_t thread_idx = 0) = 0;
25+
// This allocation method is supposed to be used by execution kernels for allocating
26+
// small memory batches. Callers are responsible for not using the same thread_idx
27+
// values from different threads. This enables lock-free thread local memory pools
28+
// usage for better performance. Implementations are likely to fallback to a regular
29+
// allocation for big memory chunks and for thread indexes exceeding cpu_count().
30+
virtual int8_t* allocateSmallMtNoLock(size_t size, size_t thread_idx = 0) {
31+
return allocate(size);
32+
}
2533
};

omniscidb/Shared/quantile.h

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@
1111

1212
#include "IR/OpTypeEnums.h"
1313

14+
#include <tbb/task_arena.h>
15+
1416
#include <algorithm>
1517
#include <cmath>
1618
#include <cstdint>
@@ -30,6 +32,8 @@ class ChunkedArray {
3032
size_t max_elems;
3133
};
3234

35+
using ChunkVector = std::vector<Chunk>;
36+
3337
// Random access iterator to be used with std::nth_element.
3438
template <typename T>
3539
class Iterator {
@@ -40,7 +44,7 @@ class ChunkedArray {
4044
typedef T& reference;
4145
typedef std::random_access_iterator_tag iterator_category;
4246

43-
Iterator(const std::vector<Chunk>* chunks, size_t chunk_idx, size_t chunk_offs)
47+
Iterator(const ChunkVector* chunks, size_t chunk_idx, size_t chunk_offs)
4448
: chunks_(chunks), chunk_idx_(chunk_idx), chunk_offs_(chunk_offs) {}
4549

4650
Iterator(const Iterator& other) = default;
@@ -166,7 +170,7 @@ class ChunkedArray {
166170
}
167171

168172
private:
169-
const std::vector<Chunk>* chunks_;
173+
const ChunkVector* chunks_;
170174
// Current chunk index. Can be equal to size of chunks_ vector for `end` iterator.
171175
size_t chunk_idx_;
172176
// Offset in the current chunk. Should always be less than chunk size when the
@@ -180,11 +184,16 @@ class ChunkedArray {
180184
void push(T value) {
181185
// Check if we need to allocate a new chunk.
182186
if (chunks_.empty() || cur_idx_ == chunks_.back().max_elems) {
183-
// Allocator is most probably a RowSetMemoryOwner object. It is not supposed to be
184-
// used to allocate very small objects, so we start with 1 KB and double it each
185-
// time with 64KB limit.
187+
if (thread_idx_ < 0) {
188+
thread_idx_ = tbb::this_task_arena::current_thread_index();
189+
} else if (thread_idx_ != tbb::this_task_arena::current_thread_index()) {
190+
// Pushing elements from different threads is not allowed bacause can cause
191+
// memory corruption.
192+
abort();
193+
}
186194
size_t size_to_allocate = std::max((size_t)64, (size_t)1 << chunks_.size()) << 10;
187-
Chunk chunk{allocator_->allocate(size_to_allocate), size_to_allocate / sizeof(T)};
195+
Chunk chunk{allocator_->allocateSmallMtNoLock(size_to_allocate, thread_idx_),
196+
size_to_allocate / sizeof(T)};
188197
chunks_.emplace_back(chunk);
189198
cur_idx_ = 0;
190199
}
@@ -243,9 +252,13 @@ class ChunkedArray {
243252
SimpleAllocator* allocator_;
244253
// All chunks except the last one should be full, i.e. they hold
245254
// chunk.max_elems elements.
246-
std::vector<Chunk> chunks_;
255+
ChunkVector chunks_;
247256
// Insertion position in the last chunk.
248257
size_t cur_idx_;
258+
// Thread index working with this quantile object. We assume elements are pushed
259+
// by a single thread only. Index is determined on the first push. Merge can be
260+
// done from different threads.
261+
int thread_idx_ = -1;
249262
};
250263

251264
class Quantile {

0 commit comments

Comments
 (0)