Skip to content
This repository was archived by the owner on Sep 27, 2019. It is now read-only.

Commit dd4c067

Browse files
committed
Sorter uses memory pool
1 parent cc1a693 commit dd4c067

File tree

7 files changed

+122
-78
lines changed

7 files changed

+122
-78
lines changed

src/codegen/operator/order_by_translator.cpp

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,9 @@ OrderByTranslator::OrderByTranslator(const planner::OrderByPlan &plan,
191191
}
192192

193193
void OrderByTranslator::InitializeQueryState() {
194-
sorter_.Init(GetCodeGen(), LoadStatePtr(sorter_id_), compare_func_);
194+
auto *sorter_ptr = LoadStatePtr(sorter_id_);
195+
auto *exec_ctx_ptr = GetExecutorContextPtr();
196+
sorter_.Init(GetCodeGen(), sorter_ptr, exec_ctx_ptr, compare_func_);
195197
}
196198

197199
void OrderByTranslator::TearDownQueryState() {
@@ -364,7 +366,8 @@ void OrderByTranslator::InitializePipelineState(PipelineContext &pipeline_ctx) {
364366
pipeline_ctx.IsParallel()) {
365367
CodeGen &codegen = GetCodeGen();
366368
auto *sorter_ptr = pipeline_ctx.LoadStatePtr(codegen, thread_sorter_id_);
367-
sorter_.Init(codegen, sorter_ptr, compare_func_);
369+
auto *exec_ctx_ptr = GetExecutorContextPtr();
370+
sorter_.Init(codegen, sorter_ptr, exec_ctx_ptr, compare_func_);
368371
}
369372
}
370373

src/codegen/sorter.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,11 @@ Sorter::Sorter(CodeGen &codegen, const std::vector<type::Type> &row_desc) {
3535
}
3636

3737
void Sorter::Init(CodeGen &codegen, llvm::Value *sorter_ptr,
38+
llvm::Value *executor_ctx,
3839
llvm::Value *comparison_func) const {
3940
auto *tuple_size = codegen.Const32(storage_format_.GetStorageSize());
40-
codegen.Call(SorterProxy::Init, {sorter_ptr, comparison_func, tuple_size});
41+
codegen.Call(SorterProxy::Init,
42+
{sorter_ptr, executor_ctx, comparison_func, tuple_size});
4143
}
4244

4345
void Sorter::Append(CodeGen &codegen, llvm::Value *sorter_ptr,

src/codegen/util/sorter.cpp

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,16 @@
1818

1919
#include "common/synchronization/count_down_latch.h"
2020
#include "common/timer.h"
21-
#include "storage/backend_manager.h"
2221
#include "threadpool/mono_queue_pool.h"
2322

2423
namespace peloton {
2524
namespace codegen {
2625
namespace util {
2726

28-
Sorter::Sorter(ComparisonFunction func, uint32_t tuple_size)
29-
: cmp_func_(func),
27+
Sorter::Sorter(::peloton::type::AbstractPool &memory, ComparisonFunction func,
28+
uint32_t tuple_size)
29+
: memory_(memory),
30+
cmp_func_(func),
3031
tuple_size_(tuple_size),
3132
buffer_pos_(nullptr),
3233
buffer_end_(nullptr),
@@ -39,12 +40,11 @@ Sorter::Sorter(ComparisonFunction func, uint32_t tuple_size)
3940

4041
Sorter::~Sorter() {
4142
uint64_t total_alloc = 0;
42-
auto &backend_manager = storage::BackendManager::GetInstance();
4343
for (const auto &iter : blocks_) {
4444
void *block = iter.first;
4545
total_alloc += iter.second;
4646
PELOTON_ASSERT(block != nullptr);
47-
backend_manager.Release(BackendType::MM, block);
47+
memory_.Free(block);
4848
}
4949
buffer_pos_ = buffer_end_ = nullptr;
5050
tuples_start_ = tuples_end_ = nullptr;
@@ -54,9 +54,9 @@ Sorter::~Sorter() {
5454
tuples_.size(), blocks_.size(), total_alloc / 1024.0);
5555
}
5656

57-
void Sorter::Init(Sorter &sorter, ComparisonFunction func,
58-
uint32_t tuple_size) {
59-
new (&sorter) Sorter(func, tuple_size);
57+
void Sorter::Init(Sorter &sorter, executor::ExecutorContext &exec_ctx,
58+
ComparisonFunction func, uint32_t tuple_size) {
59+
new (&sorter) Sorter(*exec_ctx.GetPool(), func, tuple_size);
6060
}
6161

6262
void Sorter::Destroy(Sorter &sorter) { sorter.~Sorter(); }
@@ -254,9 +254,7 @@ void Sorter::MakeRoomForNewTuple() {
254254
LOG_TRACE("Allocating block of size %.2lf KB ...", next_alloc_size_ / 1024.0);
255255

256256
// We need to allocate another block
257-
void *block = storage::BackendManager::GetInstance().Allocate(
258-
BackendType::MM, next_alloc_size_);
259-
PELOTON_ASSERT(block != nullptr);
257+
void *block = memory_.Allocate(next_alloc_size_);
260258
blocks_.emplace_back(block, next_alloc_size_);
261259

262260
// Setup new buffer boundaries

src/include/codegen/proxy/sorter_proxy.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,8 @@ namespace codegen {
2121
PROXY(Sorter) {
2222
// clang-format off
2323
DECLARE_MEMBER(0,
24-
char[sizeof(void *) + // comparison function
24+
char[sizeof(void *) + // abstract pool reference
25+
sizeof(void *) + // comparison function
2526
sizeof(uint32_t) + // tuple size
2627
sizeof(char *) + // buffer start
2728
sizeof(char *) + // buffer end

src/include/codegen/sorter.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ class Sorter {
3535
* @brief Initialize the given sorter instance with the comparison function
3636
*/
3737
void Init(CodeGen &codegen, llvm::Value *sorter_ptr,
38-
llvm::Value *comparison_func) const;
38+
llvm::Value *executor_ctx, llvm::Value *comparison_func) const;
3939

4040
/**
4141
* @brief Append the given tuple into the sorter instance

src/include/codegen/util/sorter.h

Lines changed: 55 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
//
77
// Identification: src/include/codegen/util/sorter.h
88
//
9-
// Copyright (c) 2015-2017, Carnegie Mellon University Database Group
9+
// Copyright (c) 2015-2018, Carnegie Mellon University Database Group
1010
//
1111
//===----------------------------------------------------------------------===//
1212

@@ -32,58 +32,83 @@ namespace util {
3232
* This class only supports forward iteration since this is all that's required.
3333
* Additionally, Sorter does not serialize elements into its memory space.
3434
* Instead, it allocates space for incoming tuples on demand and returns a
35-
* pointer to the call, relying on her to serialize into the space. This assumes
36-
* well-behaved callers. We **could** accept a Serializer type as part of
37-
* Init(..), but we don't need it at this moment.
35+
* pointer to the call, relying on her to serialize into the space.
3836
*/
3937
class Sorter {
4038
private:
4139
// We allocate 4KB of buffer space upon initialization
4240
static constexpr uint64_t kInitialBufferSize = 4 * 1024;
4341

4442
public:
45-
typedef int (*ComparisonFunction)(const char *left_tuple,
46-
const char *right_tuple);
43+
using ComparisonFunction = int (*)(const char *left_tuple,
44+
const char *right_tuple);
4745

48-
/// Constructor
49-
Sorter(ComparisonFunction func, uint32_t tuple_size);
46+
/**
47+
* Constructor to create and setup this sorter instance.
48+
*
49+
* TODO(pmenon): Right now, Sorters assume that all tuples that are inserted
50+
* have the same size. If we choose to perform null-suppression or any
51+
* compression, this is no longer true.
52+
*
53+
* @param memory A memory pool that is used for all allocations during the
54+
* lifetime of the sorter.
55+
* @param func The comparison function used to compare two tuples stored in
56+
* this sorter
57+
* @param tuple_size The size of the tuples stored in this sorter
58+
*/
59+
Sorter(::peloton::type::AbstractPool &memory, ComparisonFunction func,
60+
uint32_t tuple_size);
5061

51-
/// Destructor
62+
/**
63+
* Destructor. This destructor cleans up returns all memory it has allocated
64+
* back to the memory pool that injected at construction.
65+
*/
5266
~Sorter();
5367

5468
/**
55-
* @brief This static function initializes the given sorter instance with the
56-
* given comparison function and assumes all input tuples have the given size.
69+
* This static function initializes the given sorter instance with the given
70+
* comparison function and assumes all input tuples have the given size. This
71+
* method is used from codegen to invoke the constructor of the sorter
72+
* sorter instance.
5773
*
5874
* @param sorter The sorter instance we are initializing
5975
* @param func The comparison function used during sort
6076
* @param tuple_size The size of the tuple in bytes
6177
*/
62-
static void Init(Sorter &sorter, ComparisonFunction func,
63-
uint32_t tuple_size);
78+
static void Init(Sorter &sorter, executor::ExecutorContext &ctx,
79+
ComparisonFunction func, uint32_t tuple_size);
6480

6581
/**
66-
* @brief Cleans up all resources maintained by the given sorter instance
82+
* Cleans up all resources maintained by the given sorter instance. This
83+
* method is used from codegen to invoke the destructor of a sorter instance.
84+
*
85+
* @param sorter The sorter instance we're destroying the resources from
6786
*/
6887
static void Destroy(Sorter &sorter);
6988

7089
/**
71-
* @brief Allocate space for a new input tuple in this sorter. It is assumed
72-
* that the size of the tuple is equivalent to the tuple size provided when
73-
* this sorter was initialized.
90+
* Allocate space for a new input tuple in this sorter. It is assumed that the
91+
* size of the tuple is equivalent to the tuple size provided when this sorter
92+
* was initialized.
7493
*
7594
* @return A pointer to a memory space large enough to store one tuple
7695
*/
7796
char *StoreInputTuple();
7897

7998
/**
80-
* @brief Sort all tuples stored in this sorter.
99+
* Sort all tuples stored in this sorter instance. This is a single-threaded
100+
* synchronous call.
81101
*/
82102
void Sort();
83103

84104
/**
85-
* @brief Perform a parallel sort of all sorter instances stored in the thread
86-
* states object.
105+
* Perform a parallel sort of all sorter instances stored in the thread states
106+
* object.
107+
*
108+
* @param thread_states The states object where all the sorter instances are
109+
* stored.
110+
* @param sorter_offset The offset in each thread state where the sorters are
111+
* stored.
87112
*/
88113
void SortParallel(
89114
const executor::ExecutorContext::ThreadStates &thread_states,
@@ -95,17 +120,25 @@ class Sorter {
95120
///
96121
//////////////////////////////////////////////////////////////////////////////
97122

98-
/// Return the number of tuples
123+
/** Return the number tuples stored in this sorter instance */
99124
uint64_t NumTuples() const { return tuples_.size(); }
100125

101-
/// Iterators
126+
/** Iterators */
102127
std::vector<char *>::iterator begin() { return tuples_.begin(); }
103128
std::vector<char *>::iterator end() { return tuples_.end(); }
104129

105130
private:
131+
/**
132+
* Allocate room for a new tuple. If room is already available, return
133+
* immediately. If room has to be made, allocate a block of memory from the
134+
* memory pool.
135+
*/
106136
void MakeRoomForNewTuple();
107137

108138
private:
139+
// The memory pool where this sorter sources memory from
140+
::peloton::type::AbstractPool &memory_;
141+
109142
// The comparison function
110143
ComparisonFunction cmp_func_;
111144

test/codegen/sorter_test.cpp

Lines changed: 47 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
//
77
// Identification: test/codegen/sorter_test.cpp
88
//
9-
// Copyright (c) 2015-17, Carnegie Mellon University Database Group
9+
// Copyright (c) 2015-2018, Carnegie Mellon University Database Group
1010
//
1111
//===----------------------------------------------------------------------===//
1212

@@ -37,11 +37,6 @@ static int CompareTuplesForAscending(const char *a, const char *b) {
3737

3838
class SorterTest : public PelotonTest {
3939
public:
40-
// The sorter instance
41-
codegen::util::Sorter sorter_;
42-
43-
SorterTest() : sorter_(CompareTuplesForAscending, sizeof(TestTuple)) {}
44-
4540
static void LoadSorter(codegen::util::Sorter &sorter, uint64_t num_inserts) {
4641
std::random_device r;
4742
std::default_random_engine e(r());
@@ -72,30 +67,37 @@ class SorterTest : public PelotonTest {
7267
}
7368

7469
void TestSort(uint64_t num_tuples_to_insert = 100) {
75-
// Time this stuff
76-
Timer<std::ratio<1, 1000>> timer;
77-
timer.Start();
70+
::peloton::type::EphemeralPool pool;
7871

79-
// Load the sorter
80-
LoadSorter(sorter_, num_tuples_to_insert);
72+
{
73+
codegen::util::Sorter sorter{pool, CompareTuplesForAscending,
74+
sizeof(TestTuple)};
8175

82-
timer.Stop();
83-
LOG_INFO("Loading %" PRId64 " tuples into sort took %.2f ms",
84-
num_tuples_to_insert, timer.GetDuration());
85-
timer.Reset();
86-
timer.Start();
76+
// Time this stuff
77+
Timer<std::ratio<1, 1000>> timer;
78+
timer.Start();
8779

88-
// Sort
89-
sorter_.Sort();
80+
// Load the sorter
81+
LoadSorter(sorter, num_tuples_to_insert);
9082

91-
timer.Stop();
92-
LOG_INFO("Sorting %" PRId64 " tuples took %.2f ms", num_tuples_to_insert,
93-
timer.GetDuration());
83+
timer.Stop();
84+
LOG_INFO("Loading %" PRId64 " tuples into sort took %.2f ms",
85+
num_tuples_to_insert, timer.GetDuration());
86+
timer.Reset();
87+
timer.Start();
88+
89+
// Sort
90+
sorter.Sort();
91+
92+
timer.Stop();
93+
LOG_INFO("Sorting %" PRId64 " tuples took %.2f ms", num_tuples_to_insert,
94+
timer.GetDuration());
9495

95-
// Check sorted results
96-
CheckSorted(sorter_, true);
96+
// Check sorted results
97+
CheckSorted(sorter, true);
9798

98-
EXPECT_EQ(num_tuples_to_insert, sorter_.NumTuples());
99+
EXPECT_EQ(num_tuples_to_insert, sorter.NumTuples());
100+
}
99101
}
100102
};
101103

@@ -110,6 +112,7 @@ TEST_F(SorterTest, BenchmarkSorter) {
110112
}
111113

112114
TEST_F(SorterTest, ParallelSortTest) {
115+
// A fake executor context associated to no transaction
113116
executor::ExecutorContext ctx(nullptr);
114117

115118
uint32_t num_threads = 4;
@@ -127,31 +130,35 @@ TEST_F(SorterTest, ParallelSortTest) {
127130
for (uint32_t i = 0; i < num_threads; i++) {
128131
auto *sorter = reinterpret_cast<codegen::util::Sorter *>(
129132
thread_states.AccessThreadState(i));
130-
codegen::util::Sorter::Init(*sorter, CompareTuplesForAscending,
133+
codegen::util::Sorter::Init(*sorter, ctx, CompareTuplesForAscending,
131134
sizeof(TestTuple));
132135
LoadSorter(*sorter, ntuples_per_sorter);
133136
}
134137

135-
Timer<std::milli> timer;
136-
timer.Start();
138+
{
139+
codegen::util::Sorter main_sorter{*ctx.GetPool(), CompareTuplesForAscending,
140+
sizeof(TestTuple)};
141+
Timer<std::milli> timer;
142+
timer.Start();
137143

138-
// Sort parallel
139-
sorter_.SortParallel(thread_states, 0);
144+
// Sort parallel
145+
main_sorter.SortParallel(thread_states, 0);
140146

141-
timer.Stop();
142-
LOG_INFO("Parallel sort took: %.2lf ms", timer.GetDuration());
147+
timer.Stop();
148+
LOG_INFO("Parallel sort took: %.2lf ms", timer.GetDuration());
143149

144-
// Check main sorter is sorted
145-
CheckSorted(sorter_, true);
150+
// Check main sorter is sorted
151+
CheckSorted(main_sorter, true);
146152

147-
// Check result size
148-
EXPECT_EQ(num_tuples, sorter_.NumTuples());
153+
// Check result size
154+
EXPECT_EQ(num_tuples, main_sorter.NumTuples());
149155

150-
// Clean up
151-
for (uint32_t i = 0; i < num_threads; i++) {
152-
auto *sorter = reinterpret_cast<codegen::util::Sorter *>(
153-
thread_states.AccessThreadState(i));
154-
codegen::util::Sorter::Destroy(*sorter);
156+
// Clean up
157+
for (uint32_t i = 0; i < num_threads; i++) {
158+
auto *sorter = reinterpret_cast<codegen::util::Sorter *>(
159+
thread_states.AccessThreadState(i));
160+
codegen::util::Sorter::Destroy(*sorter);
161+
}
155162
}
156163
}
157164

0 commit comments

Comments
 (0)