-
Notifications
You must be signed in to change notification settings - Fork 619
Namespace(Schema) support + Catalog refactoring #1302
Changes from all commits
ab15a85
69ad271
eb2bab5
8a0b1ac
b3dd1a4
d4815e4
bbf369b
08b845e
4436a68
666b8d2
3dd4bfe
2f6d937
5a8d30e
c4a7317
792f660
88c2cef
f15b3c3
e9d118d
98d6321
f63fcd9
0aed96f
a21bb13
5d877bf
ec6d27a
ec7e9d2
00ebf5f
69b1be3
ea6f79f
f9bac0c
2eb87de
01fecac
d5fd964
ef7ed74
237d3c9
fa328bb
a97d296
835a7b6
3ab195c
c1853a2
0fa160a
094e448
fccc88b
3928724
48c91e5
9ced761
7f7fc44
bac11dd
781b6d7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -10,19 +10,19 @@ | |
// | ||
//===----------------------------------------------------------------------===// | ||
|
||
#include "expression/expression_util.h" | ||
#include "binder/bind_node_visitor.h" | ||
#include "expression/star_expression.h" | ||
#include "catalog/catalog.h" | ||
#include "expression/expression_util.h" | ||
#include "expression/star_expression.h" | ||
#include "type/type_id.h" | ||
|
||
#include "expression/aggregate_expression.h" | ||
#include "expression/case_expression.h" | ||
#include "expression/function_expression.h" | ||
#include "expression/operator_expression.h" | ||
#include "expression/star_expression.h" | ||
#include "expression/tuple_value_expression.h" | ||
#include "expression/subquery_expression.h" | ||
#include "expression/tuple_value_expression.h" | ||
|
||
namespace peloton { | ||
namespace binder { | ||
|
@@ -155,8 +155,8 @@ void BindNodeVisitor::Visit(parser::UpdateStatement *node) { | |
void BindNodeVisitor::Visit(parser::DeleteStatement *node) { | ||
context_ = std::make_shared<BinderContext>(nullptr); | ||
node->TryBindDatabaseName(default_database_name_); | ||
context_->AddRegularTable(node->GetDatabaseName(), node->GetTableName(), | ||
node->GetTableName(), txn_); | ||
context_->AddRegularTable(node->GetDatabaseName(), node->GetSchemaName(), | ||
node->GetTableName(), node->GetTableName(), txn_); | ||
|
||
if (node->expr != nullptr) { | ||
node->expr->Accept(this); | ||
|
@@ -174,8 +174,8 @@ void BindNodeVisitor::Visit(parser::CreateStatement *node) { | |
void BindNodeVisitor::Visit(parser::InsertStatement *node) { | ||
node->TryBindDatabaseName(default_database_name_); | ||
context_ = std::make_shared<BinderContext>(nullptr); | ||
context_->AddRegularTable(node->GetDatabaseName(), node->GetTableName(), | ||
node->GetTableName(), txn_); | ||
context_->AddRegularTable(node->GetDatabaseName(), node->GetSchemaName(), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same as above. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. See above |
||
node->GetTableName(), node->GetTableName(), txn_); | ||
if (node->select != nullptr) { | ||
node->select->Accept(this); | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -12,10 +12,9 @@ | |
|
||
#include "catalog/abstract_catalog.h" | ||
|
||
#include "binder/bind_node_visitor.h" | ||
|
||
#include "common/statement.h" | ||
|
||
#include "catalog/catalog.h" | ||
#include "catalog/database_catalog.h" | ||
#include "catalog/table_catalog.h" | ||
|
||
|
@@ -32,7 +31,9 @@ | |
#include "executor/delete_executor.h" | ||
#include "executor/index_scan_executor.h" | ||
#include "executor/insert_executor.h" | ||
#include "executor/plan_executor.h" | ||
#include "executor/seq_scan_executor.h" | ||
#include "executor/update_executor.h" | ||
|
||
#include "storage/database.h" | ||
#include "storage/storage_manager.h" | ||
|
@@ -45,84 +46,95 @@ AbstractCatalog::AbstractCatalog(oid_t catalog_table_oid, | |
std::string catalog_table_name, | ||
catalog::Schema *catalog_table_schema, | ||
storage::Database *pg_catalog) { | ||
// set database_oid | ||
database_oid = pg_catalog->GetOid(); | ||
// Create catalog_table_ | ||
catalog_table_ = storage::TableFactory::GetDataTable( | ||
CATALOG_DATABASE_OID, catalog_table_oid, catalog_table_schema, | ||
catalog_table_name, DEFAULT_TUPLES_PER_TILEGROUP, true, false, true); | ||
|
||
database_oid, catalog_table_oid, catalog_table_schema, catalog_table_name, | ||
DEFAULT_TUPLES_PER_TILEGROUP, true, false, true); | ||
// Add catalog_table_ into pg_catalog database | ||
pg_catalog->AddTable(catalog_table_, true); | ||
} | ||
|
||
AbstractCatalog::AbstractCatalog(const std::string &catalog_table_ddl, | ||
concurrency::TransactionContext *txn) { | ||
// Get catalog table schema | ||
// get catalog table schema | ||
auto &peloton_parser = parser::PostgresParser::GetInstance(); | ||
|
||
// Build the parse tree | ||
const auto parse_tree_list = peloton_parser.BuildParseTree(catalog_table_ddl); | ||
if (parse_tree_list->GetStatements().empty()) { | ||
throw CatalogException( | ||
"Parse tree list has no parse trees. Cannot build plan"); | ||
} | ||
// TODO: support multi-statement queries | ||
auto parse_tree = parse_tree_list->GetStatement(0); | ||
|
||
// Run binder | ||
auto bind_node_visitor = binder::BindNodeVisitor(txn, DATABASE_CATALOG_NAME); | ||
bind_node_visitor.BindNameToNode(parse_tree); | ||
|
||
// Create the plan tree | ||
auto create_plan = std::dynamic_pointer_cast<planner::CreatePlan>( | ||
optimizer::Optimizer().BuildPelotonPlanTree(parse_tree_list, txn)); | ||
optimizer::Optimizer().BuildPelotonPlanTree( | ||
peloton_parser.BuildParseTree(catalog_table_ddl), txn)); | ||
auto catalog_table_schema = create_plan->GetSchema(); | ||
auto catalog_table_name = create_plan->GetTableName(); | ||
|
||
// Create catalog table | ||
auto catalog_schema_name = create_plan->GetSchemaName(); | ||
auto catalog_database_name = create_plan->GetDatabaseName(); | ||
PELOTON_ASSERT(catalog_schema_name == std::string(CATALOG_SCHEMA_NAME)); | ||
// create catalog table | ||
Catalog::GetInstance()->CreateTable( | ||
CATALOG_DATABASE_NAME, catalog_table_name, | ||
catalog_database_name, catalog_schema_name, catalog_table_name, | ||
std::unique_ptr<catalog::Schema>(catalog_table_schema), txn, true); | ||
|
||
// Get catalog table oid | ||
// get catalog table oid | ||
auto catalog_table_object = Catalog::GetInstance()->GetTableObject( | ||
CATALOG_DATABASE_NAME, catalog_table_name, txn); | ||
catalog_database_name, catalog_schema_name, catalog_table_name, txn); | ||
|
||
// Set catalog_table_ | ||
// set catalog_table_ | ||
try { | ||
catalog_table_ = storage::StorageManager::GetInstance()->GetTableWithOid( | ||
CATALOG_DATABASE_OID, catalog_table_object->GetTableOid()); | ||
catalog_table_object->GetDatabaseOid(), | ||
catalog_table_object->GetTableOid()); | ||
// set database_oid | ||
database_oid = catalog_table_object->GetDatabaseOid(); | ||
} catch (CatalogException &e) { | ||
LOG_TRACE("Can't find table %d! Return false", | ||
catalog_table_object->GetTableOid()); | ||
} | ||
} | ||
|
||
/*@brief insert tuple(reord) helper function | ||
* @param tuple tuple to be inserted | ||
* @param txn TransactionContext | ||
* @return Whether insertion is Successful | ||
*/ | ||
* @param tuple tuple to be inserted | ||
* @param txn TransactionContext | ||
* @return Whether insertion is Successful | ||
*/ | ||
bool AbstractCatalog::InsertTuple(std::unique_ptr<storage::Tuple> tuple, | ||
concurrency::TransactionContext *txn) { | ||
if (txn == nullptr) | ||
throw CatalogException("Insert tuple requires transaction"); | ||
|
||
std::unique_ptr<executor::ExecutorContext> context( | ||
new executor::ExecutorContext(txn)); | ||
planner::InsertPlan node(catalog_table_, std::move(tuple)); | ||
executor::InsertExecutor executor(&node, context.get()); | ||
executor.Init(); | ||
bool status = executor.Execute(); | ||
std::vector<type::Value> params; | ||
std::vector<std::string> columns; | ||
std::vector<std::vector<std::unique_ptr<expression::AbstractExpression>>> | ||
values; | ||
values.push_back( | ||
std::vector<std::unique_ptr<expression::AbstractExpression>>()); | ||
std::vector<int> result_format(tuple->GetSchema()->GetColumnCount(), 0); | ||
for (size_t i = 0; i < tuple->GetSchema()->GetColumnCount(); i++) { | ||
params.push_back(tuple->GetValue(i)); | ||
columns.push_back(tuple->GetSchema()->GetColumn(i).GetName()); | ||
values[0].emplace_back( | ||
new expression::ConstantValueExpression(tuple->GetValue(i))); | ||
} | ||
auto node = | ||
std::make_shared<planner::InsertPlan>(catalog_table_, &columns, &values); | ||
|
||
return status; | ||
executor::ExecutionResult this_p_status; | ||
auto on_complete = [&this_p_status]( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there a reason for changing this from the old There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We are trying to convert the old execution engine to llvm execution engine. |
||
executor::ExecutionResult p_status, | ||
std::vector<ResultValue> &&values UNUSED_ATTRIBUTE) { | ||
this_p_status = p_status; | ||
}; | ||
|
||
executor::PlanExecutor::ExecutePlan(node, txn, params, result_format, | ||
on_complete); | ||
|
||
return this_p_status.m_result == peloton::ResultType::SUCCESS; | ||
} | ||
|
||
/*@brief Delete a tuple using index scan | ||
* @param index_offset Offset of index for scan | ||
* @param values Values for search | ||
* @param txn TransactionContext | ||
* @return Whether deletion is Successful | ||
*/ | ||
* @param index_offset Offset of index for scan | ||
* @param values Values for search | ||
* @param txn TransactionContext | ||
* @return Whether deletion is Successful | ||
*/ | ||
bool AbstractCatalog::DeleteWithIndexScan( | ||
oid_t index_offset, std::vector<type::Value> values, | ||
concurrency::TransactionContext *txn) { | ||
|
@@ -167,12 +179,12 @@ bool AbstractCatalog::DeleteWithIndexScan( | |
} | ||
|
||
/*@brief Index scan helper function | ||
* @param column_offsets Column ids for search (projection) | ||
* @param index_offset Offset of index for scan | ||
* @param values Values for search | ||
* @param txn TransactionContext | ||
* @return Unique pointer of vector of logical tiles | ||
*/ | ||
* @param column_offsets Column ids for search (projection) | ||
* @param index_offset Offset of index for scan | ||
* @param values Values for search | ||
* @param txn TransactionContext | ||
* @return Unique pointer of vector of logical tiles | ||
*/ | ||
std::unique_ptr<std::vector<std::unique_ptr<executor::LogicalTile>>> | ||
AbstractCatalog::GetResultWithIndexScan( | ||
std::vector<oid_t> column_offsets, oid_t index_offset, | ||
|
@@ -215,14 +227,14 @@ AbstractCatalog::GetResultWithIndexScan( | |
} | ||
|
||
/*@brief Sequential scan helper function | ||
* NOTE: try to use efficient index scan instead of sequential scan, but you | ||
* shouldn't build too many indexes on one catalog table | ||
* @param column_offsets Column ids for search (projection) | ||
* @param predicate predicate for this sequential scan query | ||
* @param txn TransactionContext | ||
* | ||
* @return Unique pointer of vector of logical tiles | ||
*/ | ||
* NOTE: try to use efficient index scan instead of sequential scan, but you | ||
* shouldn't build too many indexes on one catalog table | ||
* @param column_offsets Column ids for search (projection) | ||
* @param predicate predicate for this sequential scan query | ||
* @param txn TransactionContext | ||
* | ||
* @return Unique pointer of vector of logical tiles | ||
*/ | ||
std::unique_ptr<std::vector<std::unique_ptr<executor::LogicalTile>>> | ||
AbstractCatalog::GetResultWithSeqScan(std::vector<oid_t> column_offsets, | ||
expression::AbstractExpression *predicate, | ||
|
@@ -250,14 +262,14 @@ AbstractCatalog::GetResultWithSeqScan(std::vector<oid_t> column_offsets, | |
} | ||
|
||
/*@brief Add index on catalog table | ||
* @param key_attrs indexed column offset(position) | ||
* @param index_oid index id(global unique) | ||
* @param index_name index name(global unique) | ||
* @param index_constraint index constraints | ||
* @return Unique pointer of vector of logical tiles | ||
* Note: Use catalog::Catalog::CreateIndex() if you can, only ColumnCatalog and | ||
* IndexCatalog should need this | ||
*/ | ||
* @param key_attrs indexed column offset(position) | ||
* @param index_oid index id(global unique) | ||
* @param index_name index name(global unique) | ||
* @param index_constraint index constraints | ||
* @return Unique pointer of vector of logical tiles | ||
* Note: Use catalog::Catalog::CreateIndex() if you can, only ColumnCatalog and | ||
* IndexCatalog should need this | ||
*/ | ||
void AbstractCatalog::AddIndex(const std::vector<oid_t> &key_attrs, | ||
oid_t index_oid, const std::string &index_name, | ||
IndexConstraintType index_constraint) { | ||
|
@@ -286,5 +298,74 @@ void AbstractCatalog::AddIndex(const std::vector<oid_t> &key_attrs, | |
index_name.c_str(), (int)catalog_table_->GetOid()); | ||
} | ||
|
||
/*@brief Update specific columns using index scan | ||
* @param update_columns Columns to be updated | ||
* @param update_values Values to be updated | ||
* @param scan_values Value to be scaned (used in index scan) | ||
* @param index_offset Offset of index for scan | ||
* @return true if successfully executes | ||
*/ | ||
bool AbstractCatalog::UpdateWithIndexScan( | ||
std::vector<oid_t> update_columns, std::vector<type::Value> update_values, | ||
std::vector<type::Value> scan_values, oid_t index_offset, | ||
concurrency::TransactionContext *txn) { | ||
if (txn == nullptr) throw CatalogException("Scan table requires transaction"); | ||
|
||
std::unique_ptr<executor::ExecutorContext> context( | ||
new executor::ExecutorContext(txn)); | ||
// Construct index scan executor | ||
auto index = catalog_table_->GetIndex(index_offset); | ||
std::vector<oid_t> key_column_offsets = | ||
index->GetMetadata()->GetKeySchema()->GetIndexedColumns(); | ||
|
||
// NOTE: For indexed scan on catalog tables, we expect it not to be "partial | ||
// indexed scan"(efficiency purpose).That being said, indexed column number | ||
// must be equal to passed in "scan_values" size | ||
PELOTON_ASSERT(scan_values.size() == key_column_offsets.size()); | ||
std::vector<ExpressionType> expr_types(scan_values.size(), | ||
ExpressionType::COMPARE_EQUAL); | ||
std::vector<expression::AbstractExpression *> runtime_keys; | ||
|
||
planner::IndexScanPlan::IndexScanDesc index_scan_desc( | ||
index->GetOid(), key_column_offsets, expr_types, scan_values, | ||
runtime_keys); | ||
|
||
planner::IndexScanPlan index_scan_node(catalog_table_, nullptr, | ||
update_columns, index_scan_desc); | ||
|
||
executor::IndexScanExecutor index_scan_executor(&index_scan_node, | ||
context.get()); | ||
// Construct update executor | ||
TargetList target_list; | ||
DirectMapList direct_map_list; | ||
|
||
size_t column_count = catalog_table_->GetSchema()->GetColumnCount(); | ||
for (size_t col_itr = 0; col_itr < column_count; col_itr++) { | ||
// Skip any column for update | ||
if (std::find(std::begin(update_columns), std::end(update_columns), | ||
col_itr) == std::end(update_columns)) { | ||
direct_map_list.emplace_back(col_itr, std::make_pair(0, col_itr)); | ||
} | ||
} | ||
|
||
PELOTON_ASSERT(update_columns.size() == update_values.size()); | ||
for (size_t i = 0; i < update_values.size(); i++) { | ||
planner::DerivedAttribute update_attribute{ | ||
new expression::ConstantValueExpression(update_values[i])}; | ||
target_list.emplace_back(update_columns[i], update_attribute); | ||
} | ||
|
||
std::unique_ptr<const planner::ProjectInfo> project_info( | ||
new planner::ProjectInfo(std::move(target_list), | ||
std::move(direct_map_list))); | ||
planner::UpdatePlan update_node(catalog_table_, std::move(project_info)); | ||
|
||
executor::UpdateExecutor update_executor(&update_node, context.get()); | ||
update_executor.AddChild(&index_scan_executor); | ||
// Execute | ||
update_executor.Init(); | ||
return update_executor.Execute(); | ||
} | ||
|
||
} // namespace catalog | ||
} // namespace peloton |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
GetTableName()
is invoked twice. Why?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The second table name is const std::string table_alias, I didn't change the logic here except adding namespace parameter(I'll confirm with bowei)