Skip to content

Commit 35f38d5

Browse files
authored
Merge pull request #10305 from lichuang/internal_column
feat: Internal column support
2 parents 20ca21a + c166242 commit 35f38d5

File tree

76 files changed

+1494
-135
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

76 files changed

+1494
-135
lines changed

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/common/exception/src/exception_code.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,8 @@ build_exceptions! {
135135
ReadTableDataError(1107),
136136
AddColumnExistError(1108),
137137
DropColumnEmptyError(1109),
138+
// create table or alter table add column with internal column name
139+
TableWithInternalColumnName(1110),
138140

139141
// Data Related Errors
140142

@@ -207,6 +209,7 @@ build_exceptions! {
207209
/// - and without `IF EXISTS`
208210
CatalogNotFound(2320),
209211

212+
210213
// Cluster error codes.
211214
ClusterUnknownNode(2401),
212215
ClusterNodeAlreadyExists(2402),

src/query/catalog/src/plan/datasource/datasource_plan.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ pub struct DataSourcePlan {
3535

3636
pub tbl_args: Option<TableArgs>,
3737
pub push_downs: Option<PushDownInfo>,
38+
pub query_internal_columns: bool,
3839
}
3940

4041
impl DataSourcePlan {
Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
// Copyright 2023 Datafuse Labs.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use common_expression::types::string::StringColumnBuilder;
16+
use common_expression::types::DataType;
17+
use common_expression::types::NumberDataType;
18+
use common_expression::types::UInt64Type;
19+
use common_expression::BlockEntry;
20+
use common_expression::ColumnId;
21+
use common_expression::FromData;
22+
use common_expression::Scalar;
23+
use common_expression::TableDataType;
24+
use common_expression::Value;
25+
26+
// Segment and Block id Bits when generate internal column `_row_id`
27+
// Since `DEFAULT_BLOCK_PER_SEGMENT` is 1000, so `block_id` 10 bits is enough.
28+
const NUM_BLOCK_ID_BITS: usize = 10;
29+
const NUM_SEGMENT_ID_BITS: usize = 22;
30+
31+
pub const ROW_ID: &str = "_row_id";
32+
pub const SNAPSHOT_NAME: &str = "_snapshot_name";
33+
pub const SEGMENT_NAME: &str = "_segment_name";
34+
pub const BLOCK_NAME: &str = "_block_name";
35+
36+
// meta data for generate internal columns
37+
#[derive(Debug)]
38+
pub struct InternalColumnMeta {
39+
pub segment_id: usize,
40+
pub block_id: usize,
41+
pub block_location: String,
42+
pub segment_location: String,
43+
pub snapshot_location: String,
44+
}
45+
46+
#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, PartialEq)]
47+
pub enum InternalColumnType {
48+
RowId,
49+
BlockName,
50+
SegmentName,
51+
SnapshotName,
52+
}
53+
54+
#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, PartialEq)]
55+
pub struct InternalColumn {
56+
pub column_name: String,
57+
pub column_type: InternalColumnType,
58+
}
59+
60+
impl InternalColumn {
61+
pub fn new(name: &str, column_type: InternalColumnType) -> Self {
62+
InternalColumn {
63+
column_name: name.to_string(),
64+
column_type,
65+
}
66+
}
67+
68+
pub fn column_type(&self) -> &InternalColumnType {
69+
&self.column_type
70+
}
71+
72+
pub fn table_data_type(&self) -> TableDataType {
73+
match &self.column_type {
74+
InternalColumnType::RowId => TableDataType::Number(NumberDataType::UInt64),
75+
InternalColumnType::BlockName => TableDataType::String,
76+
InternalColumnType::SegmentName => TableDataType::String,
77+
InternalColumnType::SnapshotName => TableDataType::String,
78+
}
79+
}
80+
81+
pub fn data_type(&self) -> DataType {
82+
let t = &self.table_data_type();
83+
t.into()
84+
}
85+
86+
pub fn column_name(&self) -> &String {
87+
&self.column_name
88+
}
89+
90+
pub fn column_id(&self) -> ColumnId {
91+
match &self.column_type {
92+
InternalColumnType::RowId => u32::MAX,
93+
InternalColumnType::BlockName => u32::MAX - 1,
94+
InternalColumnType::SegmentName => u32::MAX - 2,
95+
InternalColumnType::SnapshotName => u32::MAX - 3,
96+
}
97+
}
98+
99+
pub fn generate_column_values(&self, meta: &InternalColumnMeta, num_rows: usize) -> BlockEntry {
100+
match &self.column_type {
101+
InternalColumnType::RowId => {
102+
let block_id = meta.block_id as u64;
103+
let seg_id = meta.segment_id as u64;
104+
let high_32bit = (seg_id << NUM_SEGMENT_ID_BITS) + (block_id << NUM_BLOCK_ID_BITS);
105+
let mut row_ids = Vec::with_capacity(num_rows);
106+
for i in 0..num_rows {
107+
let row_id = high_32bit + i as u64;
108+
row_ids.push(row_id);
109+
}
110+
BlockEntry {
111+
data_type: DataType::Number(NumberDataType::UInt64),
112+
value: Value::Column(UInt64Type::from_data(row_ids)),
113+
}
114+
}
115+
InternalColumnType::BlockName => {
116+
let mut builder = StringColumnBuilder::with_capacity(1, meta.block_location.len());
117+
builder.put_str(&meta.block_location);
118+
builder.commit_row();
119+
BlockEntry {
120+
data_type: DataType::String,
121+
value: Value::Scalar(Scalar::String(builder.build_scalar())),
122+
}
123+
}
124+
InternalColumnType::SegmentName => {
125+
let mut builder =
126+
StringColumnBuilder::with_capacity(1, meta.segment_location.len());
127+
builder.put_str(&meta.segment_location);
128+
builder.commit_row();
129+
BlockEntry {
130+
data_type: DataType::String,
131+
value: Value::Scalar(Scalar::String(builder.build_scalar())),
132+
}
133+
}
134+
InternalColumnType::SnapshotName => {
135+
let mut builder =
136+
StringColumnBuilder::with_capacity(1, meta.snapshot_location.len());
137+
builder.put_str(&meta.snapshot_location);
138+
builder.commit_row();
139+
BlockEntry {
140+
data_type: DataType::String,
141+
value: Value::Scalar(Scalar::String(builder.build_scalar())),
142+
}
143+
}
144+
}
145+
}
146+
}

src/query/catalog/src/plan/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,15 @@
1313
// limitations under the License.
1414

1515
mod datasource;
16+
mod internal_column;
1617
mod partition;
1718
mod partition_statistics;
1819
mod projection;
1920
mod pruning_statistics;
2021
mod pushdown;
2122

2223
pub use datasource::*;
24+
pub use internal_column::*;
2325
pub use partition::*;
2426
pub use partition_statistics::PartStatistics;
2527
pub use projection::Projection;

src/query/expression/src/block.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -440,8 +440,6 @@ impl DataBlock {
440440
default_vals: &[Scalar],
441441
) -> Result<DataBlock> {
442442
let num_rows = data_block.num_rows();
443-
let mut new_data_block = DataBlock::empty();
444-
new_data_block.num_rows = num_rows;
445443
let mut data_block_columns_idx: usize = 0;
446444
let data_block_columns = data_block.columns();
447445

src/query/expression/src/schema.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -370,6 +370,17 @@ impl TableSchema {
370370
Ok(())
371371
}
372372

373+
// Every internal column has constant column id, no need to generate column id of internal columns.
374+
pub fn add_internal_column(
375+
&mut self,
376+
name: &str,
377+
data_type: TableDataType,
378+
column_id: ColumnId,
379+
) {
380+
let field = TableField::new_from_column_id(name, data_type, column_id);
381+
self.fields.push(field);
382+
}
383+
373384
pub fn drop_column(&mut self, column: &str) -> Result<()> {
374385
if self.fields.len() == 1 {
375386
return Err(ErrorCode::DropColumnEmptyError(

src/query/service/src/interpreters/interpreter_copy.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -298,7 +298,7 @@ impl CopyInterpreter {
298298
let stage_table = StageTable::try_create(stage_table_info.clone())?;
299299
let read_source_plan = {
300300
stage_table
301-
.read_plan_with_catalog(ctx.clone(), catalog_name.to_string(), None)
301+
.read_plan_with_catalog(ctx.clone(), catalog_name.to_string(), None, None)
302302
.await?
303303
};
304304

src/query/service/src/interpreters/interpreter_insert.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -248,7 +248,7 @@ impl InsertInterpreter {
248248
let stage_table = StageTable::try_create(stage_table_info.clone())?;
249249
let read_source_plan = {
250250
stage_table
251-
.read_plan_with_catalog(ctx.clone(), catalog_name, None)
251+
.read_plan_with_catalog(ctx.clone(), catalog_name, None, None)
252252
.await?
253253
};
254254

src/query/service/src/interpreters/interpreter_table_add_column.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ use common_exception::Result;
1919
use common_meta_app::schema::DatabaseType;
2020
use common_meta_app::schema::UpdateTableMetaReq;
2121
use common_meta_types::MatchSeq;
22+
use common_sql::binder::INTERNAL_COLUMN_FACTORY;
2223
use common_sql::plans::AddTableColumnPlan;
2324
use common_storages_view::view_table::VIEW_ENGINE;
2425

@@ -81,6 +82,14 @@ impl Interpreter for AddTableColumnInterpreter {
8182
} else {
8283
field
8384
};
85+
86+
if INTERNAL_COLUMN_FACTORY.exist(field.name()) {
87+
return Err(ErrorCode::TableWithInternalColumnName(format!(
88+
"Cannot alter table to add a column with the same name as internal column: {}",
89+
field.name()
90+
)));
91+
}
92+
8493
fields.push(field)
8594
}
8695
new_table_meta.add_columns(&fields, &self.plan.field_comments)?;

0 commit comments

Comments
 (0)