|
27 | 27 | #include "catalog/trigger_catalog.h"
|
28 | 28 | #include "concurrency/transaction_manager_factory.h"
|
29 | 29 | #include "executor/executor_context.h"
|
| 30 | +#include "executor/insert_executor.h" |
| 31 | +#include "executor/seq_scan_executor.h" |
30 | 32 | #include "function/date_functions.h"
|
31 | 33 | #include "function/decimal_functions.h"
|
32 | 34 | #include "function/old_engine_string_functions.h"
|
33 | 35 | #include "function/timestamp_functions.h"
|
34 | 36 | #include "index/index_factory.h"
|
| 37 | +#include "planner/insert_plan.h" |
| 38 | +#include "planner/seq_scan_plan.h" |
35 | 39 | #include "settings/settings_manager.h"
|
36 | 40 | #include "storage/storage_manager.h"
|
37 | 41 | #include "storage/table_factory.h"
|
@@ -800,13 +804,163 @@ std::shared_ptr<TableCatalogObject> Catalog::GetTableObject(
|
800 | 804 | * @param txn the transaction Context
|
801 | 805 | * @return TransactionContext ResultType(SUCCESS or FAILURE)
|
802 | 806 | */
|
803 |
| -ResultType Catalog::AlterTable( |
804 |
| - UNUSED_ATTRIBUTE oid_t database_oid, UNUSED_ATTRIBUTE oid_t table_oid, |
805 |
| - UNUSED_ATTRIBUTE std::unique_ptr<catalog::Schema> new_schema, |
806 |
| - UNUSED_ATTRIBUTE concurrency::TransactionContext *txn) { |
| 807 | +ResultType Catalog::AlterTable(oid_t database_oid, oid_t table_oid, |
| 808 | + std::unique_ptr<catalog::Schema> new_schema, |
| 809 | + concurrency::TransactionContext *txn) { |
807 | 810 | LOG_TRACE("AlterTable in Catalog");
|
808 | 811 |
|
809 |
| - // TODO: perform AlterTable Operation |
| 812 | + if (txn == nullptr) |
| 813 | + throw CatalogException("Alter table requires transaction"); |
| 814 | + try { |
| 815 | + auto storage_manager = storage::StorageManager::GetInstance(); |
| 816 | + auto database = storage_manager->GetDatabaseWithOid(database_oid); |
| 817 | + try { |
| 818 | + auto old_table = database->GetTableWithOid(table_oid); |
| 819 | + auto old_schema = old_table->GetSchema(); |
| 820 | + |
| 821 | + // Step 1: build empty table with new schema |
| 822 | + bool own_schema = true; |
| 823 | + bool adapt_table = false; |
| 824 | + auto new_table = storage::TableFactory::GetDataTable( |
| 825 | + database_oid, table_oid, |
| 826 | + catalog::Schema::CopySchema(new_schema.get()), old_table->GetName(), |
| 827 | + DEFAULT_TUPLES_PER_TILEGROUP, own_schema, adapt_table); |
| 828 | + |
| 829 | + // Step 2: Copy indexes |
| 830 | + auto old_index_oids = |
| 831 | + IndexCatalog::GetInstance()->GetIndexObjects(table_oid, txn); |
| 832 | + for (auto index_oid_pair : old_index_oids) { |
| 833 | + oid_t index_oid = index_oid_pair.first; |
| 834 | + // delete record in pg_index |
| 835 | + IndexCatalog::GetInstance()->DeleteIndex(index_oid, txn); |
| 836 | + // Check if all indexed columns still exists |
| 837 | + auto old_index = old_table->GetIndexWithOid(index_oid); |
| 838 | + bool index_exist = true; |
| 839 | + std::vector<oid_t> new_key_attrs; |
| 840 | + |
| 841 | + for (oid_t column_id : old_index->GetMetadata()->GetKeyAttrs()) { |
| 842 | + bool is_found = false; |
| 843 | + std::string column_name = old_schema->GetColumn(column_id).GetName(); |
| 844 | + oid_t i = 0; |
| 845 | + for (auto new_column : new_schema->GetColumns()) { |
| 846 | + if (column_name == new_column.GetName()) { |
| 847 | + is_found = true; |
| 848 | + new_key_attrs.push_back(i); |
| 849 | + break; |
| 850 | + } |
| 851 | + i++; |
| 852 | + } |
| 853 | + if (!is_found) { |
| 854 | + index_exist = false; |
| 855 | + break; |
| 856 | + } |
| 857 | + } |
| 858 | + if (!index_exist) continue; |
| 859 | + |
| 860 | + // construct index on new table |
| 861 | + auto index_metadata = new index::IndexMetadata( |
| 862 | + old_index->GetName(), index_oid, table_oid, database_oid, |
| 863 | + old_index->GetMetadata()->GetIndexType(), |
| 864 | + old_index->GetMetadata()->GetIndexConstraintType(), |
| 865 | + new_schema.get(), |
| 866 | + // catalog::Schema::CopySchema(old_index->GetKeySchema()), |
| 867 | + catalog::Schema::CopySchema(new_schema.get(), new_key_attrs), |
| 868 | + new_key_attrs, old_index->GetMetadata()->HasUniqueKeys()); |
| 869 | + |
| 870 | + std::shared_ptr<index::Index> new_index( |
| 871 | + index::IndexFactory::GetIndex(index_metadata)); |
| 872 | + new_table->AddIndex(new_index); |
| 873 | + |
| 874 | + // reinsert record into pg_index |
| 875 | + IndexCatalog::GetInstance()->InsertIndex( |
| 876 | + index_oid, old_index->GetName(), table_oid, |
| 877 | + old_index->GetMetadata()->GetIndexType(), |
| 878 | + old_index->GetMetadata()->GetIndexConstraintType(), |
| 879 | + old_index->GetMetadata()->HasUniqueKeys(), new_key_attrs, |
| 880 | + pool_.get(), txn); |
| 881 | + } |
| 882 | + |
| 883 | + std::unique_ptr<executor::ExecutorContext> context( |
| 884 | + new executor::ExecutorContext(txn, {})); |
| 885 | + // Step 3: build column mapping between old table and new table |
| 886 | + // we're using column name as unique identifier |
| 887 | + std::vector<oid_t> old_column_ids; |
| 888 | + std::unordered_map<oid_t, oid_t> column_map; |
| 889 | + for (oid_t old_column_id = 0; |
| 890 | + old_column_id < old_schema->GetColumnCount(); old_column_id++) { |
| 891 | + old_column_ids.push_back(old_column_id); |
| 892 | + for (oid_t new_column_id = 0; |
| 893 | + new_column_id < new_schema->GetColumnCount(); new_column_id++) { |
| 894 | + if (old_schema->GetColumn(old_column_id).GetName() == |
| 895 | + new_schema->GetColumn(new_column_id).GetName()) { |
| 896 | + column_map[new_column_id] = old_column_id; |
| 897 | + } |
| 898 | + } |
| 899 | + } |
| 900 | + // Step 4: Get tuples from old table with sequential scan |
| 901 | + // TODO: Try to reuse Sequential scan function and insert function in |
| 902 | + // abstract catalog |
| 903 | + planner::SeqScanPlan seq_scan_node(old_table, nullptr, old_column_ids); |
| 904 | + executor::SeqScanExecutor seq_scan_executor(&seq_scan_node, |
| 905 | + context.get()); |
| 906 | + seq_scan_executor.Init(); |
| 907 | + while (seq_scan_executor.Execute()) { |
| 908 | + std::unique_ptr<executor::LogicalTile> result_tile( |
| 909 | + seq_scan_executor.GetOutput()); |
| 910 | + for (size_t i = 0; i < result_tile->GetTupleCount(); i++) { |
| 911 | + // Transform tuple into new schema |
| 912 | + std::unique_ptr<storage::Tuple> tuple( |
| 913 | + new storage::Tuple(new_schema.get(), true)); |
| 914 | + |
| 915 | + for (oid_t new_column_id = 0; |
| 916 | + new_column_id < new_schema->GetColumnCount(); new_column_id++) { |
| 917 | + auto it = column_map.find(new_column_id); |
| 918 | + type::Value val; |
| 919 | + if (it == column_map.end()) { |
| 920 | + // new column, set value to null |
| 921 | + val = type::ValueFactory::GetNullValueByType( |
| 922 | + new_schema->GetColumn(new_column_id).GetType()); |
| 923 | + } else { |
| 924 | + // otherwise, copy value in old table |
| 925 | + // TODO: Change type if necessary |
| 926 | + val = result_tile->GetValue(i, it->second); |
| 927 | + } |
| 928 | + tuple->SetValue(new_column_id, val, pool_.get()); |
| 929 | + } |
| 930 | + // insert new tuple into new table |
| 931 | + planner::InsertPlan node(new_table, std::move(tuple)); |
| 932 | + executor::InsertExecutor executor(&node, context.get()); |
| 933 | + executor.Init(); |
| 934 | + executor.Execute(); |
| 935 | + } |
| 936 | + } |
| 937 | + // Step 5: delete all the column(attribute) records in pg_attribute |
| 938 | + // and reinsert them using new schema(column offset needs to change |
| 939 | + // accordingly) |
| 940 | + catalog::ColumnCatalog::GetInstance()->DeleteColumns(table_oid, txn); |
| 941 | + oid_t column_offset = 0; |
| 942 | + for (auto new_column : new_schema->GetColumns()) { |
| 943 | + catalog::ColumnCatalog::GetInstance()->InsertColumn( |
| 944 | + table_oid, new_column.GetName(), column_offset, |
| 945 | + new_column.GetOffset(), new_column.GetType(), |
| 946 | + new_column.IsInlined(), new_column.GetConstraints(), pool_.get(), |
| 947 | + txn); |
| 948 | + column_offset++; |
| 949 | + } |
| 950 | + |
| 951 | + // Final step of physical change should be moved to commit time |
| 952 | + database->ReplaceTableWithOid(table_oid, new_table); |
| 953 | + |
| 954 | + // Record table drop |
| 955 | + txn->RecordDrop(database_oid, table_oid, INVALID_OID); |
| 956 | + |
| 957 | + } catch (CatalogException &e) { |
| 958 | + LOG_TRACE("Alter table failed."); |
| 959 | + return ResultType::FAILURE; |
| 960 | + } |
| 961 | + } catch (CatalogException &e) { |
| 962 | + return ResultType::FAILURE; |
| 963 | + } |
810 | 964 | return ResultType::SUCCESS;
|
811 | 965 | }
|
812 | 966 |
|
|
0 commit comments