Skip to content

Commit 44eea30

Browse files
authored
fest: add more concurrent tests and htable-bench for p2 (#620)
Signed-off-by: Yuchen Liang <[email protected]>
1 parent d9bdaa6 commit 44eea30

File tree

6 files changed

+642
-0
lines changed

6 files changed

+642
-0
lines changed

src/container/disk/hash/disk_extendible_hash_table_utils.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,11 @@ void DiskExtendibleHashTable<K, V, KC>::VerifyIntegrity() const {
9191
}
9292
}
9393

94+
template <typename K, typename V, typename KC>
95+
auto DiskExtendibleHashTable<K, V, KC>::GetHeaderPageId() const -> page_id_t {
96+
return header_page_id_;
97+
}
98+
9499
template class DiskExtendibleHashTable<int, int, IntComparator>;
95100
template class DiskExtendibleHashTable<GenericKey<4>, RID, GenericComparator<4>>;
96101
template class DiskExtendibleHashTable<GenericKey<8>, RID, GenericComparator<8>>;

src/include/container/disk/hash/disk_extendible_hash_table.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,14 @@ class DiskExtendibleHashTable {
9090
*/
9191
void VerifyIntegrity() const;
9292

93+
/**
94+
* Helper function to expose the header page id.
95+
*/
96+
auto GetHeaderPageId() const -> page_id_t;
97+
98+
/**
99+
* Helper function to print out the HashTable.
100+
*/
93101
void PrintHT() const;
94102

95103
private:
Lines changed: 359 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,359 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// BusTub
4+
//
5+
// extendible_htable_concurrent_test.cpp
6+
//
7+
// Identification: test/container/disk/hash/extendible_htable_concurrent_test.cpp
8+
//
9+
// Copyright (c) 2015-2023, Carnegie Mellon University Database Group
10+
//
11+
//===----------------------------------------------------------------------===//
12+
13+
#include <thread> // NOLINT
14+
#include <vector>
15+
16+
#include "buffer/buffer_pool_manager.h"
17+
#include "common/logger.h"
18+
#include "container/disk/hash/disk_extendible_hash_table.h"
19+
#include "gtest/gtest.h"
20+
#include "murmur3/MurmurHash3.h"
21+
#include "storage/disk/disk_manager_memory.h"
22+
#include "test_util.h" // NOLINT
23+
24+
namespace bustub {
25+
26+
using bustub::DiskManagerUnlimitedMemory;
27+
28+
//===----------------------------------------------------------------------===//
29+
// helper function to launch multiple threads
30+
template <typename... Args>
31+
void LaunchParallelTest(uint64_t num_threads, Args &&...args) {
32+
std::vector<std::thread> thread_group;
33+
34+
// Launch a group of threads
35+
for (uint64_t thread_itr = 0; thread_itr < num_threads; ++thread_itr) {
36+
thread_group.push_back(std::thread(args..., thread_itr));
37+
}
38+
39+
// Join the threads with the main thread
40+
for (uint64_t thread_itr = 0; thread_itr < num_threads; ++thread_itr) {
41+
thread_group[thread_itr].join();
42+
}
43+
}
44+
45+
// helper function to insert
46+
void InsertHelper(DiskExtendibleHashTable<GenericKey<8>, RID, GenericComparator<8>> *ht,
47+
const std::vector<int64_t> &keys, __attribute__((unused)) uint64_t thread_itr = 0) {
48+
GenericKey<8> index_key;
49+
RID rid;
50+
for (auto key : keys) {
51+
int64_t value = key & 0xFFFFFFFF;
52+
rid.Set(static_cast<int32_t>(key >> 32), value);
53+
index_key.SetFromInteger(key);
54+
ht->Insert(index_key, rid);
55+
}
56+
}
57+
58+
// helper function to seperate insert
59+
void InsertHelperSplit(DiskExtendibleHashTable<GenericKey<8>, RID, GenericComparator<8>> *ht,
60+
const std::vector<int64_t> &keys, int total_threads,
61+
__attribute__((unused)) uint64_t thread_itr) {
62+
GenericKey<8> index_key;
63+
RID rid;
64+
for (auto key : keys) {
65+
if (static_cast<uint64_t>(key) % total_threads == thread_itr) {
66+
int64_t value = key & 0xFFFFFFFF;
67+
rid.Set(static_cast<int32_t>(key >> 32), value);
68+
index_key.SetFromInteger(key);
69+
ht->Insert(index_key, rid);
70+
}
71+
}
72+
}
73+
74+
// helper function to delete
75+
void DeleteHelper(DiskExtendibleHashTable<GenericKey<8>, RID, GenericComparator<8>> *ht,
76+
const std::vector<int64_t> &remove_keys, __attribute__((unused)) uint64_t thread_itr = 0) {
77+
GenericKey<8> index_key;
78+
for (auto key : remove_keys) {
79+
index_key.SetFromInteger(key);
80+
ht->Remove(index_key);
81+
}
82+
}
83+
84+
// helper function to seperate delete
85+
void DeleteHelperSplit(DiskExtendibleHashTable<GenericKey<8>, RID, GenericComparator<8>> *ht,
86+
const std::vector<int64_t> &remove_keys, int total_threads,
87+
__attribute__((unused)) uint64_t thread_itr) {
88+
GenericKey<8> index_key;
89+
for (auto key : remove_keys) {
90+
if (static_cast<uint64_t>(key) % total_threads == thread_itr) {
91+
index_key.SetFromInteger(key);
92+
ht->Remove(index_key);
93+
}
94+
}
95+
}
96+
97+
void LookupHelper(DiskExtendibleHashTable<GenericKey<8>, RID, GenericComparator<8>> *ht,
98+
const std::vector<int64_t> &keys, uint64_t tid, __attribute__((unused)) uint64_t thread_itr = 0) {
99+
GenericKey<8> index_key;
100+
RID rid;
101+
for (auto key : keys) {
102+
int64_t value = key & 0xFFFFFFFF;
103+
rid.Set(static_cast<int32_t>(key >> 32), value);
104+
index_key.SetFromInteger(key);
105+
std::vector<RID> result;
106+
bool res = ht->GetValue(index_key, &result);
107+
ASSERT_EQ(res, true);
108+
ASSERT_EQ(result.size(), 1);
109+
ASSERT_EQ(result[0], rid);
110+
}
111+
}
112+
113+
//===----------------------------------------------------------------------===//
114+
115+
// NOLINTNEXTLINE
116+
TEST(ExtendibleHTableConcurrentTest, DISABLED_InsertTest1) {
117+
// create KeyComparator and index schema
118+
auto key_schema = ParseCreateStatement("a bigint");
119+
GenericComparator<8> comparator(key_schema.get());
120+
121+
auto disk_mgr = std::make_unique<DiskManagerUnlimitedMemory>();
122+
auto bpm = std::make_unique<BufferPoolManager>(50, disk_mgr.get());
123+
124+
// create hash table
125+
DiskExtendibleHashTable<GenericKey<8>, RID, GenericComparator<8>> ht("blah", bpm.get(), comparator,
126+
HashFunction<GenericKey<8>>());
127+
128+
// keys to Insert
129+
std::vector<int64_t> keys;
130+
int64_t scale_factor = 100;
131+
for (int64_t key = 1; key < scale_factor; key++) {
132+
keys.push_back(key);
133+
}
134+
LaunchParallelTest(2, InsertHelper, &ht, keys);
135+
136+
std::vector<RID> rids;
137+
GenericKey<8> index_key;
138+
for (auto key : keys) {
139+
rids.clear();
140+
index_key.SetFromInteger(key);
141+
ht.GetValue(index_key, &rids);
142+
EXPECT_EQ(rids.size(), 1);
143+
144+
int64_t value = key & 0xFFFFFFFF;
145+
EXPECT_EQ(rids[0].GetSlotNum(), value);
146+
}
147+
}
148+
149+
// NOLINTNEXTLINE
150+
TEST(ExtendibleHTableConcurrentTest, DISABLED_InsertTest2) {
151+
// create KeyComparator and index schema
152+
auto key_schema = ParseCreateStatement("a bigint");
153+
GenericComparator<8> comparator(key_schema.get());
154+
auto disk_mgr = std::make_unique<DiskManagerUnlimitedMemory>();
155+
auto bpm = std::make_unique<BufferPoolManager>(50, disk_mgr.get());
156+
157+
// create hash table
158+
DiskExtendibleHashTable<GenericKey<8>, RID, GenericComparator<8>> ht("blah", bpm.get(), comparator,
159+
HashFunction<GenericKey<8>>());
160+
161+
// keys to Insert
162+
std::vector<int64_t> keys;
163+
int64_t scale_factor = 100;
164+
for (int64_t key = 1; key < scale_factor; key++) {
165+
keys.push_back(key);
166+
}
167+
LaunchParallelTest(2, InsertHelperSplit, &ht, keys, 2);
168+
169+
std::vector<RID> rids;
170+
GenericKey<8> index_key;
171+
for (auto key : keys) {
172+
rids.clear();
173+
index_key.SetFromInteger(key);
174+
ht.GetValue(index_key, &rids);
175+
EXPECT_EQ(rids.size(), 1);
176+
177+
int64_t value = key & 0xFFFFFFFF;
178+
EXPECT_EQ(rids[0].GetSlotNum(), value);
179+
}
180+
}
181+
182+
// NOLINTNEXTLINE
183+
TEST(ExtendibleHTableConcurrentTest, DISABLED_DeleteTest1) {
184+
// create KeyComparator and index schema
185+
auto key_schema = ParseCreateStatement("a bigint");
186+
GenericComparator<8> comparator(key_schema.get());
187+
188+
auto disk_mgr = std::make_unique<DiskManagerUnlimitedMemory>();
189+
auto bpm = std::make_unique<BufferPoolManager>(50, disk_mgr.get());
190+
191+
// create hash table
192+
DiskExtendibleHashTable<GenericKey<8>, RID, GenericComparator<8>> ht("blah", bpm.get(), comparator,
193+
HashFunction<GenericKey<8>>());
194+
// sequential insert
195+
std::vector<int64_t> keys = {1, 2, 3, 4, 5};
196+
InsertHelper(&ht, keys);
197+
198+
std::vector<int64_t> remove_keys = {1, 5, 3, 4};
199+
LaunchParallelTest(2, DeleteHelper, &ht, remove_keys);
200+
201+
std::vector<RID> rids;
202+
GenericKey<8> index_key;
203+
for (auto key : keys) {
204+
rids.clear();
205+
index_key.SetFromInteger(key);
206+
ht.GetValue(index_key, &rids);
207+
if (key != 2) {
208+
EXPECT_EQ(rids.size(), 0);
209+
continue;
210+
}
211+
EXPECT_EQ(rids.size(), 1);
212+
213+
int64_t value = key & 0xFFFFFFFF;
214+
EXPECT_EQ(rids[0].GetSlotNum(), value);
215+
}
216+
}
217+
218+
// NOLINTNEXTLINE
219+
TEST(ExtendibleHTableConcurrentTest, DISABLED_DeleteTest2) {
220+
// create KeyComparator and index schema
221+
auto key_schema = ParseCreateStatement("a bigint");
222+
GenericComparator<8> comparator(key_schema.get());
223+
224+
auto disk_mgr = std::make_unique<DiskManagerUnlimitedMemory>();
225+
auto bpm = std::make_unique<BufferPoolManager>(50, disk_mgr.get());
226+
227+
// create hash table
228+
DiskExtendibleHashTable<GenericKey<8>, RID, GenericComparator<8>> ht("blah", bpm.get(), comparator,
229+
HashFunction<GenericKey<8>>());
230+
231+
// sequential insert
232+
std::vector<int64_t> keys = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
233+
InsertHelper(&ht, keys);
234+
235+
std::vector<int64_t> remove_keys = {1, 4, 3, 2, 5, 6};
236+
LaunchParallelTest(2, DeleteHelperSplit, &ht, remove_keys, 2);
237+
238+
std::vector<RID> rids;
239+
GenericKey<8> index_key;
240+
for (auto key : keys) {
241+
rids.clear();
242+
index_key.SetFromInteger(key);
243+
ht.GetValue(index_key, &rids);
244+
if (key <= 6) {
245+
EXPECT_EQ(rids.size(), 0);
246+
continue;
247+
}
248+
EXPECT_EQ(rids.size(), 1);
249+
250+
int64_t value = key & 0xFFFFFFFF;
251+
EXPECT_EQ(rids[0].GetSlotNum(), value);
252+
}
253+
}
254+
255+
TEST(ExtendibleHTableConcurrentTest, DISABLED_MixTest1) {
256+
// create KeyComparator and index schema
257+
auto key_schema = ParseCreateStatement("a bigint");
258+
GenericComparator<8> comparator(key_schema.get());
259+
260+
auto disk_mgr = std::make_unique<DiskManagerUnlimitedMemory>();
261+
auto bpm = std::make_unique<BufferPoolManager>(50, disk_mgr.get());
262+
263+
// create hash table
264+
DiskExtendibleHashTable<GenericKey<8>, RID, GenericComparator<8>> ht("blah", bpm.get(), comparator,
265+
HashFunction<GenericKey<8>>());
266+
267+
// first, populate index
268+
std::vector<int64_t> keys = {1, 2, 3, 4, 5};
269+
InsertHelper(&ht, keys);
270+
271+
// concurrent insert
272+
keys.clear();
273+
for (int i = 6; i <= 10; i++) {
274+
keys.push_back(i);
275+
}
276+
LaunchParallelTest(1, InsertHelper, &ht, keys);
277+
// concurrent delete
278+
std::vector<int64_t> remove_keys = {1, 4, 3, 5, 6};
279+
LaunchParallelTest(1, DeleteHelper, &ht, remove_keys);
280+
281+
std::vector<int64_t> valid_keys = {2, 7, 8, 9, 10};
282+
std::vector<int64_t> invalid_keys = {1, 3, 4, 5, 6};
283+
284+
std::vector<RID> rids;
285+
GenericKey<8> index_key;
286+
for (auto key : valid_keys) {
287+
rids.clear();
288+
index_key.SetFromInteger(key);
289+
ht.GetValue(index_key, &rids);
290+
EXPECT_EQ(rids.size(), 1);
291+
int64_t value = key & 0xFFFFFFFF;
292+
EXPECT_EQ(rids[0].GetSlotNum(), value);
293+
}
294+
for (auto key : invalid_keys) {
295+
rids.clear();
296+
index_key.SetFromInteger(key);
297+
ht.GetValue(index_key, &rids);
298+
EXPECT_EQ(rids.size(), 0);
299+
}
300+
}
301+
302+
TEST(ExtendibleHTableConcurrentTest, DISABLED_MixTest2) {
303+
// create KeyComparator and index schema
304+
auto key_schema = ParseCreateStatement("a bigint");
305+
GenericComparator<8> comparator(key_schema.get());
306+
307+
auto disk_mgr = std::make_unique<DiskManagerUnlimitedMemory>();
308+
auto bpm = std::make_unique<BufferPoolManager>(50, disk_mgr.get());
309+
310+
// create hash table
311+
DiskExtendibleHashTable<GenericKey<8>, RID, GenericComparator<8>> ht("blah", bpm.get(), comparator,
312+
HashFunction<GenericKey<8>>());
313+
314+
// Add preserved_keys
315+
std::vector<int64_t> preserved_keys;
316+
std::vector<int64_t> dynamic_keys;
317+
int64_t total_keys = 50;
318+
int64_t sieve = 5;
319+
for (int64_t i = 1; i <= total_keys; i++) {
320+
if (i % sieve == 0) {
321+
preserved_keys.push_back(i);
322+
} else {
323+
dynamic_keys.push_back(i);
324+
}
325+
}
326+
InsertHelper(&ht, preserved_keys, 1);
327+
328+
auto insert_task = [&](int tid) { InsertHelper(&ht, dynamic_keys, tid); };
329+
auto delete_task = [&](int tid) { DeleteHelper(&ht, dynamic_keys, tid); };
330+
auto lookup_task = [&](int tid) { LookupHelper(&ht, preserved_keys, tid); };
331+
332+
std::vector<std::thread> threads;
333+
std::vector<std::function<void(int)>> tasks;
334+
tasks.emplace_back(insert_task);
335+
tasks.emplace_back(delete_task);
336+
tasks.emplace_back(lookup_task);
337+
338+
size_t num_threads = 6;
339+
for (size_t i = 0; i < num_threads; i++) {
340+
threads.emplace_back(std::thread{tasks[i % tasks.size()], i});
341+
}
342+
for (size_t i = 0; i < num_threads; i++) {
343+
threads[i].join();
344+
}
345+
346+
// Check all preserved keys exist
347+
std::vector<RID> rids;
348+
GenericKey<8> index_key;
349+
for (auto key : preserved_keys) {
350+
rids.clear();
351+
index_key.SetFromInteger(key);
352+
ht.GetValue(index_key, &rids);
353+
EXPECT_EQ(rids.size(), 1);
354+
int64_t value = key & 0xFFFFFFFF;
355+
EXPECT_EQ(rids[0].GetSlotNum(), value);
356+
}
357+
}
358+
359+
} // namespace bustub

tools/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ add_subdirectory(wasm-bpt-printer)
66
add_subdirectory(terrier_bench)
77
add_subdirectory(bpm_bench)
88
add_subdirectory(btree_bench)
9+
add_subdirectory(htable_bench)
910

1011
add_backward(shell)
1112
add_backward(sqllogictest)

0 commit comments

Comments
 (0)