1212
1313#include " catalog/abstract_catalog.h"
1414
15- #include " binder/bind_node_visitor.h"
16-
1715#include " common/statement.h"
1816
17+ #include " catalog/catalog.h"
1918#include " catalog/database_catalog.h"
2019#include " catalog/table_catalog.h"
2120
3231#include " executor/delete_executor.h"
3332#include " executor/index_scan_executor.h"
3433#include " executor/insert_executor.h"
34+ #include " executor/plan_executor.h"
3535#include " executor/seq_scan_executor.h"
3636#include " executor/update_executor.h"
3737
@@ -46,84 +46,95 @@ AbstractCatalog::AbstractCatalog(oid_t catalog_table_oid,
4646 std::string catalog_table_name,
4747 catalog::Schema *catalog_table_schema,
4848 storage::Database *pg_catalog) {
49+ // set database_oid
50+ database_oid = pg_catalog->GetOid ();
4951 // Create catalog_table_
5052 catalog_table_ = storage::TableFactory::GetDataTable (
51- CATALOG_DATABASE_OID, catalog_table_oid, catalog_table_schema,
52- catalog_table_name, DEFAULT_TUPLES_PER_TILEGROUP, true , false , true );
53-
53+ database_oid, catalog_table_oid, catalog_table_schema, catalog_table_name,
54+ DEFAULT_TUPLES_PER_TILEGROUP, true , false , true );
5455 // Add catalog_table_ into pg_catalog database
5556 pg_catalog->AddTable (catalog_table_, true );
5657}
5758
5859AbstractCatalog::AbstractCatalog (const std::string &catalog_table_ddl,
5960 concurrency::TransactionContext *txn) {
60- // Get catalog table schema
61+ // get catalog table schema
6162 auto &peloton_parser = parser::PostgresParser::GetInstance ();
62-
63- // Build the parse tree
64- const auto parse_tree_list = peloton_parser.BuildParseTree (catalog_table_ddl);
65- if (parse_tree_list->GetStatements ().empty ()) {
66- throw CatalogException (
67- " Parse tree list has no parse trees. Cannot build plan" );
68- }
69- // TODO: support multi-statement queries
70- auto parse_tree = parse_tree_list->GetStatement (0 );
71-
72- // Run binder
73- auto bind_node_visitor = binder::BindNodeVisitor (txn, DATABASE_CATALOG_NAME);
74- bind_node_visitor.BindNameToNode (parse_tree);
75-
76- // Create the plan tree
7763 auto create_plan = std::dynamic_pointer_cast<planner::CreatePlan>(
78- optimizer::Optimizer ().BuildPelotonPlanTree (parse_tree_list, txn));
64+ optimizer::Optimizer ().BuildPelotonPlanTree (
65+ peloton_parser.BuildParseTree (catalog_table_ddl), txn));
7966 auto catalog_table_schema = create_plan->GetSchema ();
8067 auto catalog_table_name = create_plan->GetTableName ();
81-
82- // Create catalog table
68+ auto catalog_schema_name = create_plan->GetSchemaName ();
69+ auto catalog_database_name = create_plan->GetDatabaseName ();
70+ PELOTON_ASSERT (catalog_schema_name == std::string (CATALOG_SCHEMA_NAME));
71+ // create catalog table
8372 Catalog::GetInstance ()->CreateTable (
84- CATALOG_DATABASE_NAME , catalog_table_name,
73+ catalog_database_name, catalog_schema_name , catalog_table_name,
8574 std::unique_ptr<catalog::Schema>(catalog_table_schema), txn, true );
8675
87- // Get catalog table oid
76+ // get catalog table oid
8877 auto catalog_table_object = Catalog::GetInstance ()->GetTableObject (
89- CATALOG_DATABASE_NAME , catalog_table_name, txn);
78+ catalog_database_name, catalog_schema_name , catalog_table_name, txn);
9079
91- // Set catalog_table_
80+ // set catalog_table_
9281 try {
9382 catalog_table_ = storage::StorageManager::GetInstance ()->GetTableWithOid (
94- CATALOG_DATABASE_OID, catalog_table_object->GetTableOid ());
83+ catalog_table_object->GetDatabaseOid (),
84+ catalog_table_object->GetTableOid ());
85+ // set database_oid
86+ database_oid = catalog_table_object->GetDatabaseOid ();
9587 } catch (CatalogException &e) {
9688 LOG_TRACE (" Can't find table %d! Return false" ,
9789 catalog_table_object->GetTableOid ());
9890 }
9991}
10092
10193/* @brief insert tuple(reord) helper function
102- * @param tuple tuple to be inserted
103- * @param txn TransactionContext
104- * @return Whether insertion is Successful
105- */
94+ * @param tuple tuple to be inserted
95+ * @param txn TransactionContext
96+ * @return Whether insertion is Successful
97+ */
10698bool AbstractCatalog::InsertTuple (std::unique_ptr<storage::Tuple> tuple,
10799 concurrency::TransactionContext *txn) {
108100 if (txn == nullptr )
109101 throw CatalogException (" Insert tuple requires transaction" );
110102
111- std::unique_ptr<executor::ExecutorContext> context (
112- new executor::ExecutorContext (txn));
113- planner::InsertPlan node (catalog_table_, std::move (tuple));
114- executor::InsertExecutor executor (&node, context.get ());
115- executor.Init ();
116- bool status = executor.Execute ();
103+ std::vector<type::Value> params;
104+ std::vector<std::string> columns;
105+ std::vector<std::vector<std::unique_ptr<expression::AbstractExpression>>>
106+ values;
107+ values.push_back (
108+ std::vector<std::unique_ptr<expression::AbstractExpression>>());
109+ std::vector<int > result_format (tuple->GetSchema ()->GetColumnCount (), 0 );
110+ for (size_t i = 0 ; i < tuple->GetSchema ()->GetColumnCount (); i++) {
111+ params.push_back (tuple->GetValue (i));
112+ columns.push_back (tuple->GetSchema ()->GetColumn (i).GetName ());
113+ values[0 ].emplace_back (
114+ new expression::ConstantValueExpression (tuple->GetValue (i)));
115+ }
116+ auto node =
117+ std::make_shared<planner::InsertPlan>(catalog_table_, &columns, &values);
117118
118- return status;
119+ executor::ExecutionResult this_p_status;
120+ auto on_complete = [&this_p_status](
121+ executor::ExecutionResult p_status,
122+ std::vector<ResultValue> &&values UNUSED_ATTRIBUTE) {
123+ this_p_status = p_status;
124+ };
125+
126+ executor::PlanExecutor::ExecutePlan (node, txn, params, result_format,
127+ on_complete);
128+
129+ return this_p_status.m_result == peloton::ResultType::SUCCESS;
119130}
120131
121132/* @brief Delete a tuple using index scan
122- * @param index_offset Offset of index for scan
123- * @param values Values for search
124- * @param txn TransactionContext
125- * @return Whether deletion is Successful
126- */
133+ * @param index_offset Offset of index for scan
134+ * @param values Values for search
135+ * @param txn TransactionContext
136+ * @return Whether deletion is Successful
137+ */
127138bool AbstractCatalog::DeleteWithIndexScan (
128139 oid_t index_offset, std::vector<type::Value> values,
129140 concurrency::TransactionContext *txn) {
@@ -235,12 +246,12 @@ bool AbstractCatalog::UpdateWithIndexScan(
235246}
236247
237248/* @brief Index scan helper function
238- * @param column_offsets Column ids for search (projection)
239- * @param index_offset Offset of index for scan
240- * @param values Values for search
241- * @param txn TransactionContext
242- * @return Unique pointer of vector of logical tiles
243- */
249+ * @param column_offsets Column ids for search (projection)
250+ * @param index_offset Offset of index for scan
251+ * @param values Values for search
252+ * @param txn TransactionContext
253+ * @return Unique pointer of vector of logical tiles
254+ */
244255std::unique_ptr<std::vector<std::unique_ptr<executor::LogicalTile>>>
245256AbstractCatalog::GetResultWithIndexScan (
246257 std::vector<oid_t > column_offsets, oid_t index_offset,
@@ -283,14 +294,14 @@ AbstractCatalog::GetResultWithIndexScan(
283294}
284295
285296/* @brief Sequential scan helper function
286- * NOTE: try to use efficient index scan instead of sequential scan, but you
287- * shouldn't build too many indexes on one catalog table
288- * @param column_offsets Column ids for search (projection)
289- * @param predicate predicate for this sequential scan query
290- * @param txn TransactionContext
291- *
292- * @return Unique pointer of vector of logical tiles
293- */
297+ * NOTE: try to use efficient index scan instead of sequential scan, but you
298+ * shouldn't build too many indexes on one catalog table
299+ * @param column_offsets Column ids for search (projection)
300+ * @param predicate predicate for this sequential scan query
301+ * @param txn TransactionContext
302+ *
303+ * @return Unique pointer of vector of logical tiles
304+ */
294305std::unique_ptr<std::vector<std::unique_ptr<executor::LogicalTile>>>
295306AbstractCatalog::GetResultWithSeqScan (std::vector<oid_t > column_offsets,
296307 expression::AbstractExpression *predicate,
@@ -318,14 +329,14 @@ AbstractCatalog::GetResultWithSeqScan(std::vector<oid_t> column_offsets,
318329}
319330
320331/* @brief Add index on catalog table
321- * @param key_attrs indexed column offset(position)
322- * @param index_oid index id(global unique)
323- * @param index_name index name(global unique)
324- * @param index_constraint index constraints
325- * @return Unique pointer of vector of logical tiles
326- * Note: Use catalog::Catalog::CreateIndex() if you can, only ColumnCatalog and
327- * IndexCatalog should need this
328- */
332+ * @param key_attrs indexed column offset(position)
333+ * @param index_oid index id(global unique)
334+ * @param index_name index name(global unique)
335+ * @param index_constraint index constraints
336+ * @return Unique pointer of vector of logical tiles
337+ * Note: Use catalog::Catalog::CreateIndex() if you can, only ColumnCatalog and
338+ * IndexCatalog should need this
339+ */
329340void AbstractCatalog::AddIndex (const std::vector<oid_t > &key_attrs,
330341 oid_t index_oid, const std::string &index_name,
331342 IndexConstraintType index_constraint) {
@@ -354,5 +365,74 @@ void AbstractCatalog::AddIndex(const std::vector<oid_t> &key_attrs,
354365 index_name.c_str (), (int )catalog_table_->GetOid ());
355366}
356367
368+ /* @brief Update specific columns using index scan
369+ * @param update_columns Columns to be updated
370+ * @param update_values Values to be updated
371+ * @param scan_values Value to be scaned (used in index scan)
372+ * @param index_offset Offset of index for scan
373+ * @return true if successfully executes
374+ */
375+ bool AbstractCatalog::UpdateWithIndexScan (
376+ std::vector<oid_t > update_columns, std::vector<type::Value> update_values,
377+ std::vector<type::Value> scan_values, oid_t index_offset,
378+ concurrency::TransactionContext *txn) {
379+ if (txn == nullptr ) throw CatalogException (" Scan table requires transaction" );
380+
381+ std::unique_ptr<executor::ExecutorContext> context (
382+ new executor::ExecutorContext (txn));
383+ // Construct index scan executor
384+ auto index = catalog_table_->GetIndex (index_offset);
385+ std::vector<oid_t > key_column_offsets =
386+ index->GetMetadata ()->GetKeySchema ()->GetIndexedColumns ();
387+
388+ // NOTE: For indexed scan on catalog tables, we expect it not to be "partial
389+ // indexed scan"(efficiency purpose).That being said, indexed column number
390+ // must be equal to passed in "scan_values" size
391+ PELOTON_ASSERT (scan_values.size () == key_column_offsets.size ());
392+ std::vector<ExpressionType> expr_types (scan_values.size (),
393+ ExpressionType::COMPARE_EQUAL);
394+ std::vector<expression::AbstractExpression *> runtime_keys;
395+
396+ planner::IndexScanPlan::IndexScanDesc index_scan_desc (
397+ index->GetOid (), key_column_offsets, expr_types, scan_values,
398+ runtime_keys);
399+
400+ planner::IndexScanPlan index_scan_node (catalog_table_, nullptr ,
401+ update_columns, index_scan_desc);
402+
403+ executor::IndexScanExecutor index_scan_executor (&index_scan_node,
404+ context.get ());
405+ // Construct update executor
406+ TargetList target_list;
407+ DirectMapList direct_map_list;
408+
409+ size_t column_count = catalog_table_->GetSchema ()->GetColumnCount ();
410+ for (size_t col_itr = 0 ; col_itr < column_count; col_itr++) {
411+ // Skip any column for update
412+ if (std::find (std::begin (update_columns), std::end (update_columns),
413+ col_itr) == std::end (update_columns)) {
414+ direct_map_list.emplace_back (col_itr, std::make_pair (0 , col_itr));
415+ }
416+ }
417+
418+ PELOTON_ASSERT (update_columns.size () == update_values.size ());
419+ for (size_t i = 0 ; i < update_values.size (); i++) {
420+ planner::DerivedAttribute update_attribute{
421+ new expression::ConstantValueExpression (update_values[i])};
422+ target_list.emplace_back (update_columns[i], update_attribute);
423+ }
424+
425+ std::unique_ptr<const planner::ProjectInfo> project_info (
426+ new planner::ProjectInfo (std::move (target_list),
427+ std::move (direct_map_list)));
428+ planner::UpdatePlan update_node (catalog_table_, std::move (project_info));
429+
430+ executor::UpdateExecutor update_executor (&update_node, context.get ());
431+ update_executor.AddChild (&index_scan_executor);
432+ // Execute
433+ update_executor.Init ();
434+ return update_executor.Execute ();
435+ }
436+
357437} // namespace catalog
358438} // namespace peloton
0 commit comments