Skip to content
This repository was archived by the owner on Sep 27, 2019. It is now read-only.

Commit b63b0af

Browse files
committed
1. Force serial execution for deletes, inserts, updates, and aggregations (for now ...)
2. Cleaned up names in Pipeline/PipelineContext. 3. Moved decision for parallel scans into optimizer. 4. Added setting to determine minimum table size before considering parallel execution.
1 parent f8f75a6 commit b63b0af

22 files changed

+183
-130
lines changed

src/codegen/aggregation.cpp

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,7 @@ void Aggregation::TearDownQueryState(CodeGen &codegen) {
199199
void Aggregation::CreateInitialGlobalValues(CodeGen &codegen,
200200
llvm::Value *space) const {
201201
PELOTON_ASSERT(IsGlobal());
202-
UpdateableStorage::NullBitmap null_bitmap(codegen, storage_, space);
202+
UpdateableStorage::NullBitmap null_bitmap{codegen, storage_, space};
203203
null_bitmap.InitAllNull(codegen);
204204
null_bitmap.WriteBack(codegen);
205205
}
@@ -213,7 +213,7 @@ void Aggregation::CreateInitialValues(
213213
PELOTON_ASSERT(!IsGlobal());
214214

215215
// The null bitmap tracker
216-
UpdateableStorage::NullBitmap null_bitmap(codegen, storage_, space);
216+
UpdateableStorage::NullBitmap null_bitmap{codegen, storage_, space};
217217

218218
// Initialize bitmap to all NULLs
219219
null_bitmap.InitAllNull(codegen);
@@ -247,7 +247,7 @@ void Aggregation::CreateInitialValues(
247247
"Unexpected aggregate type [%s] when creating initial values",
248248
ExpressionTypeToString(agg_info.aggregate_type).c_str());
249249
LOG_ERROR("%s", message.c_str());
250-
throw Exception(ExceptionType::UNKNOWN_TYPE, message);
250+
throw Exception{ExceptionType::UNKNOWN_TYPE, message};
251251
}
252252
}
253253

@@ -295,13 +295,13 @@ void Aggregation::DoInitializeValue(
295295
} else {
296296
raw_initial = codegen.Const64(1);
297297
}
298-
codegen::Value initial_val(type::BigInt::Instance(), raw_initial);
298+
codegen::Value initial_val{type::BigInt::Instance(), raw_initial};
299299
storage_.SetValueSkipNull(codegen, space, storage_index, initial_val);
300300
break;
301301
}
302302
case ExpressionType::AGGREGATE_COUNT_STAR: {
303303
// The initial value for COUNT(*) is 1
304-
codegen::Value one(type::BigInt::Instance(), codegen.Const64(1));
304+
codegen::Value one{type::BigInt::Instance(), codegen.Const64(1)};
305305
storage_.SetValueSkipNull(codegen, space, storage_index, one);
306306
break;
307307
}
@@ -343,11 +343,11 @@ void Aggregation::DoAdvanceValue(CodeGen &codegen, llvm::Value *space,
343343
if (update.IsNullable()) {
344344
llvm::Value *not_null = update.IsNotNull(codegen);
345345
raw_update =
346-
codegen::Value(type::BigInt::Instance(),
347-
codegen->CreateZExt(not_null, codegen.Int64Type()));
346+
codegen::Value{type::BigInt::Instance(),
347+
codegen->CreateZExt(not_null, codegen.Int64Type())};
348348
} else {
349349
raw_update =
350-
codegen::Value(type::BigInt::Instance(), codegen.Const64(1));
350+
codegen::Value{type::BigInt::Instance(), codegen.Const64(1)};
351351
}
352352

353353
// Add to aggregate
@@ -356,7 +356,7 @@ void Aggregation::DoAdvanceValue(CodeGen &codegen, llvm::Value *space,
356356
}
357357
case ExpressionType::AGGREGATE_COUNT_STAR: {
358358
auto curr = storage_.GetValueSkipNull(codegen, space, storage_index);
359-
auto delta = codegen::Value(type::BigInt::Instance(), codegen.Const64(1));
359+
auto delta = codegen::Value{type::BigInt::Instance(), codegen.Const64(1)};
360360
next = curr.Add(codegen, delta);
361361
break;
362362
}
@@ -397,9 +397,9 @@ void Aggregation::DoNullCheck(
397397
// Fetch null byte so we can phi-resolve it after all the branches
398398
llvm::Value *null_byte_snapshot = null_bitmap.ByteFor(codegen, storage_index);
399399

400-
lang::If valid_update(codegen, update_not_null, "Agg.IfValidUpdate");
400+
lang::If valid_update{codegen, update_not_null, "Agg.IfValidUpdate"};
401401
{
402-
lang::If agg_is_null(codegen, agg_null, "Agg.IfAggIsNull");
402+
lang::If agg_is_null{codegen, agg_null, "Agg.IfAggIsNull"};
403403
{
404404
// (2)
405405
switch (type) {
@@ -410,7 +410,7 @@ void Aggregation::DoNullCheck(
410410
break;
411411
}
412412
case ExpressionType::AGGREGATE_COUNT: {
413-
codegen::Value one(type::BigInt::Instance(), codegen.Const64(1));
413+
codegen::Value one{type::BigInt::Instance(), codegen.Const64(1)};
414414
storage_.SetValue(codegen, space, storage_index, one, null_bitmap);
415415
break;
416416
}
@@ -484,7 +484,7 @@ void Aggregation::AdvanceValue(
484484
"Unexpected aggregate type [%s] when advancing aggregator",
485485
ExpressionTypeToString(aggregate_info.aggregate_type).c_str());
486486
LOG_ERROR("%s", message.c_str());
487-
throw Exception(ExceptionType::UNKNOWN_TYPE, message);
487+
throw Exception{ExceptionType::UNKNOWN_TYPE, message};
488488
}
489489
}
490490
}
@@ -546,7 +546,7 @@ void Aggregation::AdvanceValues(
546546
// table)
547547
auto name = "agg" + std::to_string(aggregate_info.source_index) +
548548
".advanceValues.ifAggValueIsDistinct";
549-
lang::If agg_is_distinct(codegen, condition, name);
549+
lang::If agg_is_distinct{codegen, condition, name};
550550
{
551551
// Advance value
552552
AdvanceValue(codegen, space, next_vals, aggregate_info, null_bitmap);
@@ -631,7 +631,7 @@ void Aggregation::FinalizeValues(
631631
"Unexpected aggregate type [%s] when finalizing aggregator",
632632
ExpressionTypeToString(agg_type).c_str());
633633
LOG_ERROR("%s", message.c_str());
634-
throw Exception(ExceptionType::UNKNOWN_TYPE, message);
634+
throw Exception{ExceptionType::UNKNOWN_TYPE, message};
635635
}
636636
}
637637
}

src/codegen/oa_hash_table.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -281,7 +281,7 @@ OAHashTable::ProbeResult OAHashTable::TranslateProbing(
281281
LoadHashEntryField(codegen, entry_ptr, 0, 1);
282282
llvm::Value *is_hash_match =
283283
codegen->CreateICmpEQ(entry_hash_value, hash_value);
284-
lang::If hash_match_branch(codegen, is_hash_match, "hashMatch");
284+
lang::If hash_match_branch{codegen, is_hash_match, "hashMatch"};
285285
{
286286
// Load the key from the HashEntry *
287287
std::vector<codegen::Value> entry_key{};
@@ -291,7 +291,7 @@ OAHashTable::ProbeResult OAHashTable::TranslateProbing(
291291
// Check if the provided key matches what's in the HashEntry
292292
llvm::Value *is_key_match =
293293
Value::TestEquality(codegen, key, entry_key).GetValue();
294-
lang::If key_match_branch(codegen, is_key_match, "keyMatch");
294+
lang::If key_match_branch{codegen, is_key_match, "keyMatch"};
295295
{
296296
// Set result value to true if key was found
297297
probe_result.key_exists = codegen.ConstBool(true);
@@ -565,7 +565,7 @@ void OAHashTable::Iterate(CodeGen &codegen, llvm::Value *hash_table,
565565
llvm::Value *status_neq_zero = IsPtrUnEqualTo(codegen, kv_p, 0UL);
566566

567567
// If the bucket is not free
568-
lang::If bucket_occupied(codegen, status_neq_zero, "bucketIsOccupied");
568+
lang::If bucket_occupied{codegen, status_neq_zero, "bucketIsOccupied"};
569569
{
570570
// Read keys and return the pointer to value
571571
std::vector<codegen::Value> entry_key{};

src/codegen/operator/delete_translator.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ DeleteTranslator::DeleteTranslator(const planner::DeletePlan &delete_plan,
2626
Pipeline &pipeline)
2727
: OperatorTranslator(delete_plan, context, pipeline),
2828
table_(*delete_plan.GetTable()) {
29+
pipeline.SetSerial();
30+
2931
// Also create the translator for our child.
3032
context.Prepare(*delete_plan.GetChild(0), pipeline);
3133

src/codegen/operator/hash_group_by_translator.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -360,7 +360,7 @@ void HashGroupByTranslator::ProduceResults::ProcessEntries(
360360
// Iterate over the batch, performing a branching predicate check
361361
batch.Iterate(codegen, [&](RowBatch::Row &row) {
362362
codegen::Value valid_row = row.DeriveValue(codegen, *predicate);
363-
lang::If is_valid_row(codegen, valid_row);
363+
lang::If is_valid_row{codegen, valid_row};
364364
{
365365
// The row is valid, send along the pipeline
366366
ctx_.Consume(row);

src/codegen/operator/hash_translator.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,9 @@ namespace codegen {
3030
HashTranslator::HashTranslator(const planner::HashPlan &hash_plan,
3131
CompilationContext &context, Pipeline &pipeline)
3232
: OperatorTranslator(hash_plan, context, pipeline) {
33+
// Distincts are serial (for now ...)
34+
pipeline.SetSerial();
35+
3336
CodeGen &codegen = GetCodeGen();
3437
QueryState &query_state = context.GetQueryState();
3538

src/codegen/operator/insert_translator.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,9 @@ InsertTranslator::InsertTranslator(const planner::InsertPlan &insert_plan,
2727
Pipeline &pipeline)
2828
: OperatorTranslator(insert_plan, context, pipeline),
2929
table_storage_(*insert_plan.GetTable()->GetSchema()) {
30+
// Inserts happen serially
31+
pipeline.SetSerial();
32+
3033
// Create the translator for its child only when there is a child
3134
if (insert_plan.GetChildrenSize() != 0) {
3235
context.Prepare(*insert_plan.GetChild(0), pipeline);

src/codegen/operator/order_by_translator.cpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,10 @@ OrderByTranslator::OrderByTranslator(const planner::OrderByPlan &plan,
124124
CompilationContext &context,
125125
Pipeline &pipeline)
126126
: OperatorTranslator(plan, context, pipeline),
127-
child_pipeline_(this, Pipeline::Parallelism::Flexible) {
127+
child_pipeline_(this, Pipeline::Parallelism::Serial) {
128+
// Aggregations happen serially (for now ...)
129+
pipeline.SetSerial();
130+
128131
// Prepare the child
129132
context.Prepare(*plan.GetChild(0), child_pipeline_);
130133

src/codegen/operator/table_scan_translator.cpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,10 @@ TableScanTranslator::TableScanTranslator(const planner::SeqScanPlan &scan,
136136
Pipeline &pipeline)
137137
: OperatorTranslator(scan, context, pipeline), table_(*scan.GetTable()) {
138138
// Set ourselves as the source of the pipeline
139-
pipeline.MarkSource(this, Pipeline::Parallelism::Serial);
139+
auto parallelism = scan.IsParallel() ? Pipeline::Parallelism::Parallel
140+
: Pipeline::Parallelism::Serial;
141+
pipeline.MarkSource(this, parallelism);
142+
140143
// If there is a predicate, prepare a translator for it
141144
const auto *predicate = scan.GetPredicate();
142145
if (predicate != nullptr) {

src/codegen/operator/update_translator.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,9 @@ UpdateTranslator::UpdateTranslator(const planner::UpdatePlan &update_plan,
2727
Pipeline &pipeline)
2828
: OperatorTranslator(update_plan, context, pipeline),
2929
table_storage_(*update_plan.GetTable()->GetSchema()) {
30+
// Updates happen serially
31+
pipeline.SetSerial();
32+
3033
// Create the translator for our child and derived attributes
3134
context.Prepare(*update_plan.GetChild(0), pipeline);
3235

src/codegen/pipeline.cpp

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -105,8 +105,7 @@ void PipelineContext::LoopOverStates::DoParallel(
105105
auto *thread_state_ptr = func.GetArgumentByPosition(1);
106106

107107
// Setup access to the thread state
108-
PipelineContext::ScopedStateAccess state_access{
109-
ctx_, func.GetArgumentByPosition(1)};
108+
PipelineContext::SetState state_access{ctx_, func.GetArgumentByPosition(1)};
110109

111110
// Execute function body
112111
body(thread_state_ptr);
@@ -417,8 +416,8 @@ void Pipeline::InitializePipeline(PipelineContext &pipeline_ctx) {
417416
FunctionDeclaration init_decl{cc, func_name, visibility, ret_type, args};
418417
FunctionBuilder init_func{cc, init_decl};
419418
{
420-
PipelineContext::ScopedStateAccess state_access{
421-
pipeline_ctx, init_func.GetArgumentByPosition(1)};
419+
PipelineContext::SetState state_access{pipeline_ctx,
420+
init_func.GetArgumentByPosition(1)};
422421

423422
// Set initialized flag
424423
pipeline_ctx.MarkInitialized(codegen);
@@ -453,7 +452,7 @@ void Pipeline::CompletePipeline(PipelineContext &pipeline_ctx) {
453452
// Loop over all states to allow operators to clean up components
454453
PipelineContext::LoopOverStates loop_state{pipeline_ctx};
455454
loop_state.Do([this, &pipeline_ctx](llvm::Value *thread_state) {
456-
PipelineContext::ScopedStateAccess state_access{pipeline_ctx, thread_state};
455+
PipelineContext::SetState state_access{pipeline_ctx, thread_state};
457456
// Let operators in the pipeline clean up any pipeline state
458457
for (auto riter = pipeline_.rbegin(), rend = pipeline_.rend();
459458
riter != rend; ++riter) {
@@ -500,7 +499,7 @@ void Pipeline::Run(
500499
}
501500

502501
void Pipeline::DoRun(
503-
PipelineContext &pipeline_context, llvm::Function *dispatch_func,
502+
PipelineContext &pipeline_ctx, llvm::Function *dispatch_func,
504503
const std::vector<llvm::Value *> &dispatch_args,
505504
const std::vector<llvm::Type *> &pipeline_args_types,
506505
const std::function<void(ConsumerContext &,
@@ -532,19 +531,18 @@ void Pipeline::DoRun(
532531
// If the pipeline is parallel, we need to call the generated init function
533532
if (IsParallel()) {
534533
thread_state = codegen->CreatePointerCast(
535-
thread_state, pipeline_context.GetThreadStateType()->getPointerTo());
534+
thread_state, pipeline_ctx.GetThreadStateType()->getPointerTo());
536535

537-
auto *init_func = pipeline_context.thread_init_func_;
536+
auto *init_func = pipeline_ctx.thread_init_func_;
538537
codegen.CallFunc(init_func, {query_state, thread_state});
539538
}
540539

541540
// Setup the thread state access for the pipeline context
542-
PipelineContext::ScopedStateAccess state_access(pipeline_context,
543-
thread_state);
541+
PipelineContext::SetState state_access{pipeline_ctx, thread_state};
544542

545543
// First initialize the execution consumer
546544
auto &execution_consumer = compilation_ctx_.GetExecutionConsumer();
547-
execution_consumer.InitializePipelineState(pipeline_context);
545+
execution_consumer.InitializePipelineState(pipeline_ctx);
548546

549547
// Pull out the input parameters
550548
std::vector<llvm::Value *> pipeline_args;
@@ -553,13 +551,13 @@ void Pipeline::DoRun(
553551
}
554552

555553
// Generate pipeline body
556-
ConsumerContext ctx(compilation_ctx_, *this, &pipeline_context);
554+
ConsumerContext ctx{compilation_ctx_, *this, &pipeline_ctx};
557555
body(ctx, pipeline_args);
558556

559557
// Finish
560558
func.ReturnAndFinish();
561559
}
562-
pipeline_context.pipeline_func_ = func.GetFunction();
560+
pipeline_ctx.pipeline_func_ = func.GetFunction();
563561

564562
// The pipeline function we generated above encapsulates the logic for all
565563
// operators in the pipeline. If we're executing it serially then we directly

0 commit comments

Comments
 (0)