Skip to content
This repository was archived by the owner on Sep 27, 2019. It is now read-only.

Commit 5243395

Browse files
committed
Address review
1 parent 700f986 commit 5243395

File tree

5 files changed

+24
-21
lines changed

5 files changed

+24
-21
lines changed

src/codegen/operator/order_by_translator.cpp

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -303,13 +303,6 @@ void OrderByTranslator::Produce() const {
303303
CodeGen &codegen = GetCodeGen();
304304
auto *sorter_ptr = LoadStatePtr(sorter_id_);
305305

306-
#if 0
307-
// The tuples have been materialized into the buffer space, NOW SORT!!!
308-
if (!child_pipeline_.IsParallel()) {
309-
sorter_.Sort(codegen, sorter_ptr);
310-
}
311-
#endif
312-
313306
// Now iterate over the sorted list
314307
auto *i32_type = codegen.Int32Type();
315308
auto vec_size = Vector::kDefaultVectorSize.load();

src/codegen/operator/table_scan_translator.cpp

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -147,8 +147,10 @@ TableScanTranslator::TableScanTranslator(const planner::SeqScanPlan &scan,
147147
}
148148
}
149149

150-
// TODO merge serial and parallel ...
151-
// TODO cleanup zonemap stuff
150+
// TODO merge serial and parallel since there is a lot of duplication
151+
// TODO cleanup zonemap stuff - zonemaps hardcode memory pointers which may not
152+
// live as long as the compiled query. Hence, it may point to invalid memory
153+
// at some invocation of the query.
152154

153155
llvm::Value *TableScanTranslator::LoadTablePtr(CodeGen &codegen) const {
154156
const storage::DataTable &table = *GetScanPlan().GetTable();
@@ -202,7 +204,7 @@ void TableScanTranslator::ProduceParallel() const {
202204
// The input arguments
203205
const storage::DataTable &table = *GetScanPlan().GetTable();
204206

205-
// The use RuntimeFunctions::ExecuteTableScan() to launch a parallel scan.
207+
// We use RuntimeFunctions::ExecuteTableScan() to launch a parallel scan.
206208
// We pass in the database and table IDs to scan the correct table.
207209
auto *dispatcher =
208210
RuntimeFunctionsProxy::ExecuteTableScan.GetFunction(codegen);

src/codegen/runtime_functions.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -169,11 +169,11 @@ void RuntimeFunctions::ExecuteTableScan(
169169
thread_states.Allocate(num_tasks);
170170

171171
// Create count down latch
172-
common::synchronization::CountDownLatch latch(num_tasks);
172+
common::synchronization::CountDownLatch latch{num_tasks};
173173

174174
// Now, submit the tasks
175175
for (uint32_t task_id = 0; task_id < num_tasks; task_id++) {
176-
bool last_task = task_id == num_tasks - 1;
176+
bool last_task = (task_id == num_tasks - 1);
177177
auto tilegroup_start = task_id * num_tilegroups_per_task;
178178
auto tilegroup_stop =
179179
last_task ? num_tilegroups : tilegroup_start + num_tilegroups_per_task;
@@ -216,7 +216,7 @@ void RuntimeFunctions::ExecutePerState(
216216

217217
// Create count down latch
218218
uint32_t num_tasks = thread_states.NumThreads();
219-
common::synchronization::CountDownLatch latch(num_tasks);
219+
common::synchronization::CountDownLatch latch{num_tasks};
220220

221221
// Loop over states
222222
for (uint32_t tid = 0; tid < num_tasks; tid++) {

src/codegen/util/sorter.cpp

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -115,25 +115,31 @@ void Sorter::SortParallel(
115115
sorters.push_back(sorter);
116116
}
117117

118-
// Sort each run in parallel
118+
// This is a 2D matrix that stores, for each sorter, a list of separators. A
119+
// separator separates a sorted run into equal, evenly spaces peices.
120+
// separators[i][j] lists the j-th separator key in the i-th separator.
121+
// We use these separator from each sorter to evenly split the inputs across
122+
// all worker threads by taking a median-of-medians.
119123
std::vector<std::vector<char *>> separators(sorters.size());
120124
for (uint32_t i = 0; i < separators.size(); i++) {
121125
separators[i].resize(sorters.size());
122126
}
123127

124-
// Time
128+
// Time it all
125129
Timer<std::milli> timer;
126130
timer.Start();
127131

128-
common::synchronization::CountDownLatch sort_latch(sorters.size());
132+
common::synchronization::CountDownLatch sort_latch{sorters.size()};
133+
134+
// Sort each run in parallel
129135
auto &work_pool = threadpool::MonoQueuePool::GetExecutionInstance();
130136
for (uint32_t sort_idx = 0; sort_idx < sorters.size(); sort_idx++) {
131137
work_pool.SubmitTask([&sorters, &separators, &sort_latch, sort_idx]() {
132138
// First sort
133139
auto *sorter = sorters[sort_idx];
134140
sorter->Sort();
135141

136-
// Now compute local separators
142+
// Now compute local separators that "evenly" divide the input
137143
auto part_size = sorter->NumTuples() / sorters.size();
138144
for (uint32_t idx = 0; idx < separators.size() - 1; idx++) {
139145
separators[idx][sort_idx] = sorter->tuples_[(idx + 1) * part_size];
@@ -201,7 +207,9 @@ void Sorter::SortParallel(
201207
timer.Reset();
202208
timer.Start();
203209

204-
common::synchronization::CountDownLatch merge_latch(merge_work.size());
210+
common::synchronization::CountDownLatch merge_latch{merge_work.size()};
211+
212+
// Divide up the merging work across all threads
205213
auto heap_cmp = [this](const std::pair<char **, char **> &l,
206214
const std::pair<char **, char **> &r) {
207215
return !(cmp_func_(*l.first, *r.first) < 0);

src/include/common/synchronization/count_down_latch.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ namespace synchronization {
2828
class CountDownLatch {
2929
public:
3030
/// Constructor
31-
explicit CountDownLatch(uint32_t count);
31+
explicit CountDownLatch(uint64_t count);
3232

3333
/// This class cannot be copy or move-constructed
3434
DISALLOW_COPY_AND_MOVE(CountDownLatch);
@@ -43,7 +43,7 @@ class CountDownLatch {
4343
uint32_t GetCount();
4444

4545
private:
46-
uint32_t count_;
46+
uint64_t count_;
4747
std::mutex mutex_;
4848
std::condition_variable cv_;
4949
};
@@ -54,7 +54,7 @@ class CountDownLatch {
5454
///
5555
////////////////////////////////////////////////////////////////////////////////
5656

57-
inline CountDownLatch::CountDownLatch(uint32_t count) : count_(count) {}
57+
inline CountDownLatch::CountDownLatch(uint64_t count) : count_(count) {}
5858

5959
inline bool CountDownLatch::Await(uint64_t nanos) {
6060
std::unique_lock<std::mutex> lock(mutex_);

0 commit comments

Comments
 (0)