Skip to content
This repository was archived by the owner on Sep 27, 2019. It is now read-only.

Commit cc9a025

Browse files
committed
Fix memory leaks when parallel merging hash tables by transferring memory blocks from thread-local tables into the global table.
1 parent 6b9edc2 commit cc9a025

File tree

3 files changed

+60
-10
lines changed

3 files changed

+60
-10
lines changed

src/codegen/util/hash_table.cpp

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,30 @@ HashTable::Entry *HashTable::EntryBuffer::NextFree() {
7676
return entry;
7777
}
7878

79+
void HashTable::EntryBuffer::TransferMemoryBlocks(
80+
HashTable::EntryBuffer &target) {
81+
// Find end of our memory block chain
82+
MemoryBlock *tail = block_;
83+
84+
// Check if there is anything to transfer
85+
if (tail == nullptr) {
86+
return;
87+
}
88+
89+
// Move to the end
90+
while (tail->next != nullptr) {
91+
tail = tail->next;
92+
}
93+
94+
// Transfer everything to the target entry buffer
95+
do {
96+
tail->next = target.block_;
97+
} while (!::peloton::atomic_cas(&target.block_, target.block_, block_));
98+
99+
// Success
100+
block_ = nullptr;
101+
}
102+
79103
////////////////////////////////////////////////////////////////////////////////
80104
///
81105
/// Hash Table
@@ -224,7 +248,7 @@ void HashTable::ReserveLazy(
224248
PELOTON_MEMSET(directory_, 0, alloc_size);
225249
}
226250

227-
void HashTable::MergeLazyUnfinished(const HashTable &other) {
251+
void HashTable::MergeLazyUnfinished(HashTable &other) {
228252
// Begin with the head of the linked list of entries, stored in the first
229253
// directory entry
230254
PELOTON_ASSERT(other.directory_[0] != nullptr);
@@ -248,6 +272,10 @@ void HashTable::MergeLazyUnfinished(const HashTable &other) {
248272

249273
// Increment number of elements
250274
::peloton::atomic_add(&num_elems_, other.NumElements());
275+
276+
// Transfer all allocated memory blocks in the other table into this one
277+
other.num_elems_ = other.capacity_ = 0;
278+
other.entry_buffer_.TransferMemoryBlocks(entry_buffer_);
251279
}
252280

253281
void HashTable::Resize() {

src/include/codegen/util/hash_table.h

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ class HashTable {
112112
*
113113
* @param The hash table whose contents we will merge into this one..
114114
*/
115-
void MergeLazyUnfinished(const HashTable &other);
115+
void MergeLazyUnfinished(HashTable &other);
116116

117117
//////////////////////////////////////////////////////////////////////////////
118118
///
@@ -216,21 +216,34 @@ class HashTable {
216216
*/
217217
Entry *NextFree();
218218

219+
/**
220+
* Transfer all allocated memory blocks in this entry buffer into the target
221+
* buffer.
222+
*
223+
* @param target Buffer where all blocks are transferred to
224+
*/
225+
void TransferMemoryBlocks(EntryBuffer &target);
226+
219227
private:
220-
// This struct allows us to chain together chunks of memory
228+
// This struct represents a chunk of heap memory. We chain together these
229+
// chunks to avoid the need for a std::vector.
221230
struct MemoryBlock {
222231
MemoryBlock *next;
223232
char data[0];
224233
};
225234

226235
// The memory pool where block allocations are sourced
227236
::peloton::type::AbstractPool &memory_;
237+
228238
// The sizes of each entry
229239
uint32_t entry_size_;
240+
230241
// The current active block
231242
MemoryBlock *block_;
243+
232244
// A pointer into the block where the next position is
233245
char *next_entry_;
246+
234247
// The number of available bytes left in the block
235248
uint64_t available_bytes_;
236249
};

test/codegen/hash_table_test.cpp

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -195,22 +195,24 @@ TEST_F(HashTableTest, CanInsertLazilyWithDups) {
195195
}
196196

197197
TEST_F(HashTableTest, ParallelMerge) {
198-
codegen::util::HashTable global_table{GetMemPool(), sizeof(Key),
199-
sizeof(Value)};
200-
201198
constexpr uint32_t num_threads = 4;
202199
constexpr uint32_t to_insert = 20000;
203200

204-
std::mutex keys_mutex;
205-
std::vector<Key> keys;
206-
201+
// Allocate hash tables for each thread
207202
executor::ExecutorContext exec_ctx{nullptr};
208203

209-
// Allocate hash tables for each thread
210204
auto &thread_states = exec_ctx.GetThreadStates();
211205
thread_states.Reset(sizeof(codegen::util::HashTable));
212206
thread_states.Allocate(num_threads);
213207

208+
// The keys we insert
209+
std::mutex keys_mutex;
210+
std::vector<Key> keys;
211+
212+
// The global hash table
213+
codegen::util::HashTable global_table{*exec_ctx.GetPool(), sizeof(Key),
214+
sizeof(Value)};
215+
214216
auto add_key = [&keys_mutex, &keys](const Key &k) {
215217
std::lock_guard<std::mutex> lock{keys_mutex};
216218
keys.emplace_back(k);
@@ -260,6 +262,13 @@ TEST_F(HashTableTest, ParallelMerge) {
260262
// Now merge thread-local tables into global table in parallel
261263
LaunchParallelTest(num_threads, merge_fn);
262264

265+
// Clean up local tables
266+
for (uint32_t tid = 0; tid < num_threads; tid++) {
267+
auto *table = reinterpret_cast<codegen::util::HashTable *>(
268+
thread_states.AccessThreadState(tid));
269+
codegen::util::HashTable::Destroy(*table);
270+
}
271+
263272
// Now probe global
264273
EXPECT_EQ(to_insert * num_threads, global_table.NumElements());
265274
EXPECT_LE(global_table.NumElements(), global_table.Capacity());

0 commit comments

Comments
 (0)