Skip to content
This repository was archived by the owner on Sep 27, 2019. It is now read-only.

Commit e5379f2

Browse files
committed
Cleaned up query interface to be synchronous.
1 parent 1a9db41 commit e5379f2

File tree

6 files changed

+63
-86
lines changed

6 files changed

+63
-86
lines changed

src/codegen/query.cpp

Lines changed: 5 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -24,15 +24,14 @@ namespace codegen {
2424
Query::Query(const planner::AbstractPlan &query_plan)
2525
: query_plan_(query_plan) {}
2626

27-
void Query::Execute(std::unique_ptr<executor::ExecutorContext> executor_context,
28-
ExecutionConsumer &consumer,
29-
std::function<void(executor::ExecutionResult)> on_complete,
30-
RuntimeStats *stats) {
27+
void Query::Execute(executor::ExecutorContext &executor_context,
28+
ExecutionConsumer &consumer, RuntimeStats *stats) {
3129
CodeGen codegen{code_context_};
3230

3331
llvm::Type *query_state_type = query_state_.GetType();
3432
size_t parameter_size = codegen.SizeOf(query_state_type);
35-
PELOTON_ASSERT((parameter_size % 8 == 0) && "parameter size not multiple of 8");
33+
PELOTON_ASSERT((parameter_size % 8 == 0) &&
34+
"parameter size not multiple of 8");
3635

3736
// Allocate some space for the function arguments
3837
std::unique_ptr<char[]> param_data{new char[parameter_size]};
@@ -41,18 +40,14 @@ void Query::Execute(std::unique_ptr<executor::ExecutorContext> executor_context,
4140

4241
// We use this handy class to avoid complex casting and pointer manipulation
4342
struct FunctionArguments {
44-
// storage::StorageManager *storage_manager;
4543
executor::ExecutorContext *executor_context;
46-
// QueryParameters *query_parameters;
4744
char *consumer_arg;
4845
char rest[0];
4946
} PACKED;
5047

5148
// Set up the function arguments
5249
auto *func_args = reinterpret_cast<FunctionArguments *>(param_data.get());
53-
// func_args->storage_manager = storage::StorageManager::GetInstance();
54-
func_args->executor_context = executor_context.get();
55-
// func_args->query_parameters = &executor_context->GetParams();
50+
func_args->executor_context = &executor_context;
5651
func_args->consumer_arg = consumer.GetConsumerState();
5752

5853
// Timer
@@ -104,11 +99,6 @@ void Query::Execute(std::unique_ptr<executor::ExecutorContext> executor_context,
10499
timer.Stop();
105100
stats->tear_down_ms = timer.GetDuration();
106101
}
107-
108-
executor::ExecutionResult result;
109-
result.m_result = ResultType::SUCCESS;
110-
result.m_processed = executor_context->num_processed;
111-
on_complete(result);
112102
}
113103

114104
bool Query::Prepare(const QueryFunctions &query_funcs) {

src/executor/plan_executor.cpp

Lines changed: 31 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -49,36 +49,47 @@ static void CompileAndExecutePlan(
4949
plan->GetOutputColumns(columns);
5050
codegen::BufferingConsumer consumer{columns, context};
5151

52-
std::unique_ptr<executor::ExecutorContext> executor_context(
53-
new executor::ExecutorContext(txn,
54-
codegen::QueryParameters(*plan, params)));
52+
// The executor context for this execution
53+
executor::ExecutorContext executor_context{
54+
txn, codegen::QueryParameters(*plan, params)};
5555

56-
// Compile the query
56+
// Check if we have a cached compiled plan already
5757
codegen::Query *query = codegen::QueryCache::Instance().Find(plan);
5858
if (query == nullptr) {
59+
// Cached plan doesn't exist, let's compile the query
5960
codegen::QueryCompiler compiler;
6061
auto compiled_query = compiler.Compile(
61-
*plan, executor_context->GetParams().GetQueryParametersMap(), consumer);
62+
*plan, executor_context.GetParams().GetQueryParametersMap(), consumer);
63+
64+
// Grab an instance to the plan
6265
query = compiled_query.get();
66+
67+
// Insert the compiled plan into the cache
6368
codegen::QueryCache::Instance().Add(plan, std::move(compiled_query));
6469
}
6570

66-
auto on_query_result =
67-
[&on_complete, &consumer, plan](executor::ExecutionResult result) {
68-
std::vector<ResultValue> values;
69-
for (const auto &tuple : consumer.GetOutputTuples()) {
70-
for (uint32_t i = 0; i < tuple.tuple_.size(); i++) {
71-
auto column_val = tuple.GetValue(i);
72-
auto str = column_val.IsNull() ? "" : column_val.ToString();
73-
LOG_TRACE("column content: [%s]", str.c_str());
74-
values.push_back(std::move(str));
75-
}
76-
}
77-
plan->ClearParameterValues();
78-
on_complete(result, std::move(values));
79-
};
71+
// Execute the query!
72+
query->Execute(executor_context, consumer);
73+
74+
// Execution complete, setup the results
75+
executor::ExecutionResult result;
76+
result.m_processed = executor_context.num_processed;
77+
result.m_result = ResultType::SUCCESS;
8078

81-
query->Execute(std::move(executor_context), consumer, on_query_result);
79+
// Iterate over results
80+
std::vector<ResultValue> values;
81+
for (const auto &tuple : consumer.GetOutputTuples()) {
82+
for (uint32_t i = 0; i < tuple.tuple_.size(); i++) {
83+
auto column_val = tuple.GetValue(i);
84+
auto str = column_val.IsNull() ? "" : column_val.ToString();
85+
LOG_TRACE("column content: [%s]", str.c_str());
86+
values.push_back(std::move(str));
87+
}
88+
}
89+
90+
// Done, invoke callback
91+
plan->ClearParameterValues();
92+
on_complete(result, std::move(values));
8293
}
8394

8495
static void InterpretPlan(

src/include/codegen/query.h

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -66,19 +66,18 @@ class Query {
6666
/**
6767
* @brief Executes the compiled query.
6868
*
69-
* This function is **asynchronous** - it returns before execution completes,
70-
* and invokes a user-provided callback on completion. It is the user's
71-
* responsibility that the result consumer object has a lifetime as far as
72-
* the return of the callback.
73-
*
7469
* @param executor_context Stores transaction and parameters.
7570
* @param consumer Stores the result.
76-
* @param on_complete The callback to be invoked when execution completes.
71+
* @param stats Handy struct to collect various runtime timing statistics
7772
*/
78-
void Execute(std::unique_ptr<executor::ExecutorContext> executor_context,
79-
ExecutionConsumer &consumer,
80-
std::function<void(executor::ExecutionResult)> on_complete,
81-
RuntimeStats *stats = nullptr);
73+
void Execute(executor::ExecutorContext &executor_context,
74+
ExecutionConsumer &consumer, RuntimeStats *stats = nullptr);
75+
76+
//////////////////////////////////////////////////////////////////////////////
77+
///
78+
/// Accessors
79+
///
80+
//////////////////////////////////////////////////////////////////////////////
8281

8382
/// Return the query plan
8483
const planner::AbstractPlan &GetPlan() const { return query_plan_; }
@@ -92,7 +91,7 @@ class Query {
9291
private:
9392
friend class QueryCompiler;
9493

95-
/// Constructor
94+
/// Constructor. Private so callers use the QueryCompiler class.
9695
explicit Query(const planner::AbstractPlan &query_plan);
9796

9897
private:

test/codegen/bloom_filter_test.cpp

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@
3636
namespace peloton {
3737
namespace test {
3838

39-
class BloomFilterCodegenTest : public PelotonTest {
39+
class BloomFilterCodegenTest : public PelotonCodeGenTest {
4040
public:
4141
BloomFilterCodegenTest() {
4242
// Create test db
@@ -299,19 +299,20 @@ double BloomFilterCodegenTest::ExecuteJoin(std::string query,
299299
// Binding
300300
planner::BindingContext context;
301301
plan->PerformBinding(context);
302+
303+
executor::ExecutorContext executor_context{txn};
304+
302305
// Use simple CountConsumer since we don't care about the result
303306
codegen::CountingConsumer consumer;
307+
304308
// Compile the query
305309
codegen::QueryCompiler compiler;
306310
codegen::Query::RuntimeStats stats;
307-
std::unique_ptr<executor::ExecutorContext> executor_context(
308-
new executor::ExecutorContext{txn});
309311
auto compiled_query = compiler.Compile(
310-
*plan, executor_context->GetParams().GetQueryParametersMap(), consumer);
312+
*plan, executor_context.GetParams().GetQueryParametersMap(), consumer);
311313

312314
// Run
313-
PelotonCodeGenTest::ExecuteSync(*compiled_query,
314-
std::move(executor_context), consumer);
315+
compiled_query->Execute(executor_context, consumer, &stats);
315316

316317
LOG_INFO("Execution Time: %0.0f ms", stats.plan_ms);
317318
total_runtime += stats.plan_ms;

test/codegen/testing_codegen_util.cpp

Lines changed: 10 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -252,24 +252,6 @@ void PelotonCodeGenTest::CreateAndLoadTableWithLayout(
252252
txn_manager.CommitTransaction(txn);
253253
}
254254

255-
void PelotonCodeGenTest::ExecuteSync(
256-
codegen::Query &query,
257-
std::unique_ptr<executor::ExecutorContext> executor_context,
258-
codegen::ExecutionConsumer &consumer) {
259-
std::mutex mu;
260-
std::condition_variable cond;
261-
bool finished = false;
262-
query.Execute(std::move(executor_context), consumer,
263-
[&](executor::ExecutionResult) {
264-
std::unique_lock<decltype(mu)> lock(mu);
265-
finished = true;
266-
cond.notify_one();
267-
});
268-
269-
std::unique_lock<decltype(mu)> lock(mu);
270-
cond.wait(lock, [&] { return finished; });
271-
}
272-
273255
codegen::QueryCompiler::CompileStats PelotonCodeGenTest::CompileAndExecute(
274256
planner::AbstractPlan &plan, codegen::ExecutionConsumer &consumer) {
275257
codegen::QueryParameters parameters(plan, {});
@@ -280,14 +262,14 @@ codegen::QueryCompiler::CompileStats PelotonCodeGenTest::CompileAndExecute(
280262

281263
// Compile the query.
282264
codegen::QueryCompiler::CompileStats stats;
283-
auto compiled_query = codegen::QueryCompiler().Compile(
265+
auto query = codegen::QueryCompiler().Compile(
284266
plan, parameters.GetQueryParametersMap(), consumer, &stats);
285267

286-
// Execute the query.
287-
ExecuteSync(*compiled_query,
288-
std::unique_ptr<executor::ExecutorContext>(
289-
new executor::ExecutorContext(txn, std::move(parameters))),
290-
consumer);
268+
// Executor context
269+
executor::ExecutorContext exec_ctx{txn, std::move(parameters)};
270+
271+
// Execute the query
272+
query->Execute(exec_ctx, consumer);
291273

292274
// Commit the transaction.
293275
txn_manager.CommitTransaction(txn);
@@ -303,9 +285,8 @@ codegen::QueryCompiler::CompileStats PelotonCodeGenTest::CompileAndExecuteCache(
303285
auto &txn_manager = concurrency::TransactionManagerFactory::GetInstance();
304286
auto *txn = txn_manager.BeginTransaction();
305287

306-
std::unique_ptr<executor::ExecutorContext> executor_context(
307-
new executor::ExecutorContext(txn,
308-
codegen::QueryParameters(*plan, params)));
288+
executor::ExecutorContext exec_ctx{txn,
289+
codegen::QueryParameters(*plan, params)};
309290

310291
// Compile
311292
codegen::QueryCompiler::CompileStats stats;
@@ -314,13 +295,13 @@ codegen::QueryCompiler::CompileStats PelotonCodeGenTest::CompileAndExecuteCache(
314295
if (query == nullptr) {
315296
codegen::QueryCompiler compiler;
316297
auto compiled_query = compiler.Compile(
317-
*plan, executor_context->GetParams().GetQueryParametersMap(), consumer);
298+
*plan, exec_ctx.GetParams().GetQueryParametersMap(), consumer);
318299
query = compiled_query.get();
319300
codegen::QueryCache::Instance().Add(plan, std::move(compiled_query));
320301
}
321302

322303
// Execute the query.
323-
ExecuteSync(*query, std::move(executor_context), consumer);
304+
query->Execute(exec_ctx, consumer);
324305

325306
// Commit the transaction.
326307
txn_manager.CommitTransaction(txn);

test/include/codegen/testing_codegen_util.h

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -89,11 +89,6 @@ class PelotonCodeGenTest : public PelotonTest {
8989
oid_t tile_group_count, oid_t column_count,
9090
bool is_inlined);
9191

92-
static void ExecuteSync(
93-
codegen::Query &query,
94-
std::unique_ptr<executor::ExecutorContext> executor_context,
95-
codegen::ExecutionConsumer &consumer);
96-
9792
// Compile and execute the given plan
9893
codegen::QueryCompiler::CompileStats CompileAndExecute(
9994
planner::AbstractPlan &plan, codegen::ExecutionConsumer &consumer);

0 commit comments

Comments
 (0)