Skip to content
This repository was archived by the owner on Sep 27, 2019. It is now read-only.

Commit f7f56f0

Browse files
Tianyi Chenapavlo
authored andcommitted
done throwing error handling in execution
1 parent 8f2128d commit f7f56f0

File tree

9 files changed

+110
-100
lines changed

9 files changed

+110
-100
lines changed

src/common/init.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,14 +12,14 @@
1212

1313
#include "common/init.h"
1414

15-
#include <google/protobuf/stubs/common.h>
1615
#include <gflags/gflags.h>
16+
#include <google/protobuf/stubs/common.h>
1717

1818
#include "brain/index_tuner.h"
1919
#include "brain/layout_tuner.h"
2020
#include "catalog/catalog.h"
21-
#include "common/thread_pool.h"
2221
#include "common/statement_cache_manager.h"
22+
#include "common/thread_pool.h"
2323
#include "concurrency/transaction_manager_factory.h"
2424
#include "gc/gc_manager_factory.h"
2525
#include "settings/settings_manager.h"
@@ -83,7 +83,7 @@ void PelotonInit::Initialize() {
8383
// initialize the catalog and add the default database, so we don't do this on
8484
// the first query
8585
pg_catalog->CreateDatabase(DEFAULT_DB_NAME, txn);
86-
86+
8787
txn_manager.CommitTransaction(txn);
8888

8989
// Initialize the Statement Cache Manager

src/executor/plan_executor.cpp

Lines changed: 19 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
#include "codegen/buffering_consumer.h"
1616
#include "codegen/query.h"
1717
#include "codegen/query_cache.h"
18-
#include "concurrency/transaction_manager_factory.h"
1918
#include "codegen/query_compiler.h"
2019
#include "common/logger.h"
2120
#include "concurrency/transaction_manager_factory.h"
@@ -64,19 +63,19 @@ static void CompileAndExecutePlan(
6463
codegen::QueryCache::Instance().Add(plan, std::move(compiled_query));
6564
}
6665

67-
auto on_query_result =
68-
[&on_complete, &consumer](executor::ExecutionResult result) {
69-
std::vector<ResultValue> values;
70-
for (const auto &tuple : consumer.GetOutputTuples()) {
71-
for (uint32_t i = 0; i < tuple.tuple_.size(); i++) {
72-
auto column_val = tuple.GetValue(i);
73-
auto str = column_val.IsNull() ? "" : column_val.ToString();
74-
LOG_TRACE("column content: [%s]", str.c_str());
75-
values.push_back(std::move(str));
76-
}
77-
}
78-
on_complete(result, std::move(values));
79-
};
66+
auto on_query_result = [&on_complete,
67+
&consumer](executor::ExecutionResult result) {
68+
std::vector<ResultValue> values;
69+
for (const auto &tuple : consumer.GetOutputTuples()) {
70+
for (uint32_t i = 0; i < tuple.tuple_.size(); i++) {
71+
auto column_val = tuple.GetValue(i);
72+
auto str = column_val.IsNull() ? "" : column_val.ToString();
73+
LOG_TRACE("column content: [%s]", str.c_str());
74+
values.push_back(std::move(str));
75+
}
76+
}
77+
on_complete(result, std::move(values));
78+
};
8079

8180
query->Execute(std::move(executor_context), consumer, on_query_result);
8281
}
@@ -101,6 +100,7 @@ static void InterpretPlan(
101100
status = executor_tree->Init();
102101
if (status != true) {
103102
result.m_result = ResultType::FAILURE;
103+
result.m_error_message = "Executor tree init failed";
104104
CleanExecutorTree(executor_tree.get());
105105
on_complete(result, std::move(values));
106106
return;
@@ -146,16 +146,18 @@ void PlanExecutor::ExecutePlan(
146146

147147
bool codegen_enabled =
148148
settings::SettingsManager::GetBool(settings::SettingId::codegen);
149-
149+
150150
try {
151151
if (codegen_enabled && codegen::QueryCompiler::IsSupported(*plan)) {
152-
CompileAndExecutePlan(plan, txn, params, std::move(on_complete));
152+
CompileAndExecutePlan(plan, txn, params, on_complete);
153153
} else {
154-
InterpretPlan(plan, txn, params, result_format, std::move(on_complete));
154+
InterpretPlan(plan, txn, params, result_format, on_complete);
155155
}
156156
} catch (Exception &e) {
157157
ExecutionResult result;
158158
result.m_result = ResultType::FAILURE;
159+
result.m_error_message = e.what();
160+
LOG_ERROR("error thrown in Execution: %s", result.m_error_message.c_str());
159161
on_complete(result, {});
160162
}
161163
}

src/include/executor/plan_executor.h

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,9 @@
1212

1313
#pragma once
1414

15+
#include "common/internal_types.h"
1516
#include "common/statement.h"
1617
#include "executor/abstract_executor.h"
17-
#include "common/internal_types.h"
1818

1919
namespace peloton {
2020

@@ -34,9 +34,13 @@ struct ExecutionResult {
3434
// number of tuples processed
3535
uint32_t m_processed;
3636

37+
// string of error message
38+
std::string m_error_message;
39+
3740
ExecutionResult() {
3841
m_processed = 0;
3942
m_result = ResultType::SUCCESS;
43+
m_error_message = "";
4044
}
4145
};
4246

@@ -51,13 +55,13 @@ class PlanExecutor {
5155
* Before ExecutePlan, a node first receives value list, so we should
5256
* pass value list directly rather than passing Postgres's ParamListInfo
5357
*/
54-
static void ExecutePlan(
55-
std::shared_ptr<planner::AbstractPlan> plan,
56-
concurrency::TransactionContext *txn,
57-
const std::vector<type::Value> &params,
58-
const std::vector<int> &result_format,
59-
std::function<void(executor::ExecutionResult,
60-
std::vector<ResultValue> &&)> on_complete);
58+
static void ExecutePlan(std::shared_ptr<planner::AbstractPlan> plan,
59+
concurrency::TransactionContext *txn,
60+
const std::vector<type::Value> &params,
61+
const std::vector<int> &result_format,
62+
std::function<void(executor::ExecutionResult,
63+
std::vector<ResultValue> &&)>
64+
on_complete);
6165

6266
/*
6367
* @brief When a peloton node recvs a query plan, this function is invoked

src/include/traffic_cop/traffic_cop.h

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -133,10 +133,6 @@ class TrafficCop {
133133

134134
std::vector<type::Value> &GetParamVal() { return param_values_; }
135135

136-
void SetErrorMessage(std::string error_message) {
137-
error_message_ = std::move(error_message);
138-
}
139-
140136
std::string &GetErrorMessage() { return error_message_; }
141137

142138
void SetQueuing(bool is_queuing) { is_queuing_ = is_queuing; }

src/network/postgres_protocol_handler.cpp

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -306,9 +306,9 @@ ProcessResult PostgresProtocolHandler::ExecQueryMessage(
306306
}
307307
// Did not find statement with same name
308308
else {
309-
traffic_cop_->SetErrorMessage("The prepared statement does not exist");
310-
SendErrorResponse({{NetworkMessageType::HUMAN_READABLE_ERROR,
311-
traffic_cop_->GetErrorMessage()}});
309+
std::string error_message = "The prepared statement does not exist";
310+
SendErrorResponse(
311+
{{NetworkMessageType::HUMAN_READABLE_ERROR, error_message}});
312312
SendReadyForQuery(NetworkTransactionStateType::IDLE);
313313
return ProcessResult::COMPLETE;
314314
}
@@ -373,11 +373,11 @@ void PostgresProtocolHandler::ExecQueryMessageGetResult(ResultType status) {
373373
SendReadyForQuery(NetworkTransactionStateType::IDLE);
374374
return;
375375
} else if (status == ResultType::TO_ABORT) {
376-
traffic_cop_->SetErrorMessage(
376+
std::string error_message =
377377
"current transaction is aborted, commands ignored until end of "
378-
"transaction block");
379-
SendErrorResponse({{NetworkMessageType::HUMAN_READABLE_ERROR,
380-
traffic_cop_->GetErrorMessage()}});
378+
"transaction block";
379+
SendErrorResponse(
380+
{{NetworkMessageType::HUMAN_READABLE_ERROR, error_message}});
381381
SendReadyForQuery(NetworkTransactionStateType::IDLE);
382382
return;
383383
}

src/planner/insert_plan.cpp

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,9 @@ InsertPlan::InsertPlan(storage::DataTable *table,
3737
for (uint32_t tuple_idx = 0; tuple_idx < insert_values->size();
3838
tuple_idx++) {
3939
auto &values = (*insert_values)[tuple_idx];
40-
PL_ASSERT(values.size() <= schema->GetColumnCount());
40+
if (values.size() > schema->GetColumnCount()) {
41+
throw Exception("Column size does not match");
42+
}
4143
uint32_t param_idx = 0;
4244
for (uint32_t column_id = 0; column_id < values.size(); column_id++) {
4345
auto &exp = values[column_id];
@@ -66,7 +68,9 @@ InsertPlan::InsertPlan(storage::DataTable *table,
6668
}
6769
// INSERT INTO table_name (col1, col2, ...) VALUES (val1, val2, ...);
6870
else {
69-
PL_ASSERT(columns->size() <= schema->GetColumnCount());
71+
if (columns->size() > schema->GetColumnCount()) {
72+
throw Exception("Column size does not match");
73+
}
7074
for (uint32_t tuple_idx = 0; tuple_idx < insert_values->size();
7175
tuple_idx++) {
7276
auto &values = (*insert_values)[tuple_idx];

src/storage/data_table.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -318,14 +318,14 @@ bool DataTable::InstallVersion(const AbstractTuple *tuple,
318318
ItemPointer *index_entry_ptr) {
319319
if (CheckConstraints(tuple) == false) {
320320
LOG_TRACE("InsertVersion(): Constraint violated");
321-
return false;
321+
throw Exception("Constraint violated");
322322
}
323323

324324
// Index checks and updates
325325
if (InsertInSecondaryIndexes(tuple, targets_ptr, transaction,
326326
index_entry_ptr) == false) {
327327
LOG_TRACE("Index constraint violated");
328-
return false;
328+
throw Exception("Constraint violated");
329329
}
330330
return true;
331331
}

src/traffic_cop/traffic_cop.cpp

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ ResultType TrafficCop::CommitQueryHelper() {
101101
auto txn = curr_state.first;
102102
auto &txn_manager = concurrency::TransactionManagerFactory::GetInstance();
103103
// I catch the exception (ex. table not found) explicitly,
104-
// If this exception if caused by a query in a transaction,
104+
// If this exception is caused by a query in a transaction,
105105
// I will block following queries in that transaction until 'COMMIT' or
106106
// 'ROLLBACK' After receive 'COMMIT', see if it is rollback or really commit.
107107
if (curr_state.second != ResultType::ABORTED) {
@@ -183,6 +183,9 @@ executor::ExecutionResult TrafficCop::ExecuteHelper(
183183
auto on_complete = [&result, this](executor::ExecutionResult p_status,
184184
std::vector<ResultValue> &&values) {
185185
this->p_status_ = p_status;
186+
// TODO (Tianyi) I would make a decision on keeping one of p_status or
187+
// error_message in my next PR
188+
this->error_message_ = std::move(p_status.m_error_message);
186189
result = std::move(values);
187190
task_callback_(task_callback_arg_);
188191
};
@@ -201,19 +204,13 @@ executor::ExecutionResult TrafficCop::ExecuteHelper(
201204
}
202205

203206
void TrafficCop::ExecuteStatementPlanGetResult() {
204-
bool init_failure = false;
205-
if (p_status_.m_result == ResultType::FAILURE) {
206-
// only possible if init failed
207-
init_failure = true;
208-
}
207+
if (p_status_.m_result == ResultType::FAILURE)
208+
return;
209209

210210
auto txn_result = GetCurrentTxnState().first->GetResult();
211-
if (single_statement_txn_ || init_failure ||
212-
txn_result == ResultType::FAILURE) {
213-
LOG_TRACE(
214-
"About to commit: single stmt: %d, init_failure: %d, txn_result: %s",
215-
single_statement_txn_, init_failure,
216-
ResultTypeToString(txn_result).c_str());
211+
if (single_statement_txn_ || txn_result == ResultType::FAILURE) {
212+
LOG_TRACE("About to commit/abort: single stmt: %d,txn_result: %s",
213+
single_statement_txn_, ResultTypeToString(txn_result).c_str());
217214
switch (txn_result) {
218215
case ResultType::SUCCESS:
219216
// Commit single statement

0 commit comments

Comments
 (0)