Skip to content

Commit 51f8017

Browse files
authored
feat: csv/tsv/ndjson support querying file meta data. (#17512)
* feat: csv/tsv/ndjson support querying file meta data.
1 parent 48febfb commit 51f8017

File tree

23 files changed

+257
-71
lines changed

23 files changed

+257
-71
lines changed

src/query/catalog/src/plan/datasource/datasource_info/stage.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ pub struct StageTableInfo {
4444
pub is_select: bool,
4545
pub copy_into_location_options: CopyIntoLocationOptions,
4646
pub copy_into_table_options: CopyIntoTableOptions,
47+
pub stage_root: String,
4748
}
4849

4950
impl StageTableInfo {

src/query/catalog/src/plan/datasource/datasource_plan.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ use databend_common_expression::Scalar;
2020
use databend_common_expression::TableSchemaRef;
2121

2222
use crate::plan::datasource::datasource_info::DataSourceInfo;
23+
use crate::plan::InternalColumn;
2324
use crate::plan::PartStatistics;
2425
use crate::plan::Partitions;
2526
use crate::plan::PushDownInfo;
@@ -38,7 +39,7 @@ pub struct DataSourcePlan {
3839

3940
pub tbl_args: Option<TableArgs>,
4041
pub push_downs: Option<PushDownInfo>,
41-
pub query_internal_columns: bool,
42+
pub internal_columns: Option<BTreeMap<FieldIndex, InternalColumn>>,
4243
pub base_block_ids: Option<Scalar>,
4344
// used for recluster to update stream columns
4445
pub update_stream_columns: bool,

src/query/catalog/src/plan/internal_column.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@ use databend_common_expression::Value;
3737
use databend_common_expression::BASE_BLOCK_IDS_COLUMN_ID;
3838
use databend_common_expression::BASE_ROW_ID_COLUMN_ID;
3939
use databend_common_expression::BLOCK_NAME_COLUMN_ID;
40+
use databend_common_expression::FILENAME_COLUMN_ID;
41+
use databend_common_expression::FILE_ROW_NUMBER_COLUMN_ID;
4042
use databend_common_expression::ROW_ID_COLUMN_ID;
4143
use databend_common_expression::SEARCH_MATCHED_COLUMN_ID;
4244
use databend_common_expression::SEARCH_SCORE_COLUMN_ID;
@@ -141,6 +143,9 @@ pub enum InternalColumnType {
141143
// search columns
142144
SearchMatched,
143145
SearchScore,
146+
147+
FileName,
148+
FileRowNumber,
144149
}
145150

146151
#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, PartialEq, Eq)]
@@ -176,6 +181,8 @@ impl InternalColumn {
176181
)),
177182
InternalColumnType::SearchMatched => TableDataType::Boolean,
178183
InternalColumnType::SearchScore => TableDataType::Number(NumberDataType::Float32),
184+
InternalColumnType::FileName => TableDataType::String,
185+
InternalColumnType::FileRowNumber => TableDataType::Number(NumberDataType::UInt64),
179186
}
180187
}
181188

@@ -198,6 +205,8 @@ impl InternalColumn {
198205
InternalColumnType::BaseBlockIds => BASE_BLOCK_IDS_COLUMN_ID,
199206
InternalColumnType::SearchMatched => SEARCH_MATCHED_COLUMN_ID,
200207
InternalColumnType::SearchScore => SEARCH_SCORE_COLUMN_ID,
208+
InternalColumnType::FileName => FILENAME_COLUMN_ID,
209+
InternalColumnType::FileRowNumber => FILE_ROW_NUMBER_COLUMN_ID,
201210
}
202211
}
203212

@@ -315,6 +324,9 @@ impl InternalColumn {
315324
Value::Column(Float32Type::from_data(scores)),
316325
)
317326
}
327+
InternalColumnType::FileName | InternalColumnType::FileRowNumber => {
328+
todo!("generate_column_values not support for file related")
329+
}
318330
}
319331
}
320332
}

src/query/expression/src/schema.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,11 +67,16 @@ pub const CHANGE_ROW_ID_COL_NAME: &str = "change$row_id";
6767

6868
pub const PREDICATE_COLUMN_NAME: &str = "_predicate";
6969

70+
pub const FILENAME_COLUMN_NAME: &str = "metadata$filename";
71+
pub const FILE_ROW_NUMBER_COLUMN_NAME: &str = "metadata$file_row_number";
72+
7073
// stream column id.
7174
pub const ORIGIN_BLOCK_ROW_NUM_COLUMN_ID: u32 = u32::MAX - 10;
7275
pub const ORIGIN_BLOCK_ID_COLUMN_ID: u32 = u32::MAX - 11;
7376
pub const ORIGIN_VERSION_COLUMN_ID: u32 = u32::MAX - 12;
7477
pub const ROW_VERSION_COLUMN_ID: u32 = u32::MAX - 13;
78+
pub const FILENAME_COLUMN_ID: u32 = u32::MAX - 14;
79+
pub const FILE_ROW_NUMBER_COLUMN_ID: u32 = u32::MAX - 15;
7580
// stream column name.
7681
pub const ORIGIN_VERSION_COL_NAME: &str = "_origin_version";
7782
pub const ORIGIN_BLOCK_ID_COL_NAME: &str = "_origin_block_id";
@@ -98,12 +103,15 @@ pub static INTERNAL_COLUMNS: LazyLock<HashSet<&'static str>> = LazyLock::new(||
98103
ORIGIN_BLOCK_ID_COL_NAME,
99104
ORIGIN_BLOCK_ROW_NUM_COL_NAME,
100105
ROW_VERSION_COL_NAME,
106+
FILENAME_COLUMN_NAME,
107+
FILE_ROW_NUMBER_COLUMN_NAME,
101108
])
102109
});
103110

104111
#[inline]
105112
pub fn is_internal_column_id(column_id: ColumnId) -> bool {
106113
column_id >= SEARCH_SCORE_COLUMN_ID
114+
|| (FILE_ROW_NUMBER_COLUMN_ID..=FILENAME_COLUMN_ID).contains(&column_id)
107115
}
108116

109117
#[inline]

src/query/service/src/interpreters/interpreter_copy_into_location.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,7 @@ impl CopyIntoLocationInterpreter {
113113
default_values: None,
114114
copy_into_location_options: options.clone(),
115115
copy_into_table_options: Default::default(),
116+
stage_root: "".to_string(),
116117
},
117118
}));
118119

src/query/service/src/pipelines/builders/builder_recluster.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ impl PipelineBuilder {
8383
description,
8484
tbl_args: table.table_args(),
8585
push_downs: None,
86-
query_internal_columns: false,
86+
internal_columns: None,
8787
base_block_ids: None,
8888
update_stream_columns: table.change_tracking_enabled(),
8989
data_mask_policy: None,

src/query/service/src/sessions/query_ctx.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ use databend_common_pipeline_core::InputError;
9191
use databend_common_pipeline_core::LockGuard;
9292
use databend_common_settings::Settings;
9393
use databend_common_sql::IndexType;
94+
use databend_common_storage::init_stage_operator;
9495
use databend_common_storage::CopyStatus;
9596
use databend_common_storage::DataOperator;
9697
use databend_common_storage::FileStatus;
@@ -1514,6 +1515,14 @@ impl TableContext for QueryContext {
15141515
max_column_position: usize,
15151516
case_sensitive: bool,
15161517
) -> Result<Arc<dyn Table>> {
1518+
let operator = init_stage_operator(&stage_info)?;
1519+
let info = operator.info();
1520+
let stage_root = format!("{}{}", info.name(), info.root());
1521+
let stage_root = if stage_root.ends_with('/') {
1522+
stage_root
1523+
} else {
1524+
format!("{}/", stage_root)
1525+
};
15171526
match stage_info.file_format_params {
15181527
FileFormatParams::Parquet(..) => {
15191528
let mut read_options = ParquetReadOptions::default();
@@ -1553,6 +1562,7 @@ impl TableContext for QueryContext {
15531562
default_values: None,
15541563
copy_into_location_options: Default::default(),
15551564
copy_into_table_options: Default::default(),
1565+
stage_root,
15561566
};
15571567
OrcTable::try_create(info).await
15581568
}
@@ -1571,6 +1581,7 @@ impl TableContext for QueryContext {
15711581
default_values: None,
15721582
copy_into_location_options: Default::default(),
15731583
copy_into_table_options: Default::default(),
1584+
stage_root,
15741585
};
15751586
StageTable::try_create(info)
15761587
}
@@ -1607,6 +1618,7 @@ impl TableContext for QueryContext {
16071618
default_values: None,
16081619
copy_into_location_options: Default::default(),
16091620
copy_into_table_options: Default::default(),
1621+
stage_root,
16101622
};
16111623
StageTable::try_create(info)
16121624
}

src/query/sql/src/executor/table_read_plan.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -272,7 +272,7 @@ impl ToReadDataSourcePlan for dyn Table {
272272
description,
273273
tbl_args: self.table_args(),
274274
push_downs,
275-
query_internal_columns: internal_columns.is_some(),
275+
internal_columns,
276276
base_block_ids,
277277
update_stream_columns,
278278
data_mask_policy,

src/query/sql/src/planner/binder/copy_into_table.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -225,6 +225,7 @@ impl Binder {
225225
default_values,
226226
copy_into_location_options: Default::default(),
227227
copy_into_table_options: stmt.options.clone(),
228+
stage_root: "".to_string(),
228229
},
229230
values_consts: vec![],
230231
required_source_schema: required_values_schema.clone(),
@@ -405,6 +406,7 @@ impl Binder {
405406
default_values: Some(default_values),
406407
copy_into_location_options: Default::default(),
407408
copy_into_table_options: options,
409+
stage_root: "".to_string(),
408410
},
409411
write_mode,
410412
query: None,
@@ -482,6 +484,7 @@ impl Binder {
482484

483485
// rewrite async function and udf
484486
s_expr = self.rewrite_udf(&mut from_context, s_expr)?;
487+
s_expr = self.add_internal_column_into_expr(&mut from_context, s_expr)?;
485488

486489
let mut output_context = BindContext::new();
487490
output_context.parent = from_context.parent;

src/query/sql/src/planner/binder/internal_column_factory.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ use databend_common_catalog::plan::InternalColumnType;
2020
use databend_common_expression::BASE_BLOCK_IDS_COL_NAME;
2121
use databend_common_expression::BASE_ROW_ID_COL_NAME;
2222
use databend_common_expression::BLOCK_NAME_COL_NAME;
23+
use databend_common_expression::FILENAME_COLUMN_NAME;
24+
use databend_common_expression::FILE_ROW_NUMBER_COLUMN_NAME;
2325
use databend_common_expression::ROW_ID_COL_NAME;
2426
use databend_common_expression::SEARCH_MATCHED_COL_NAME;
2527
use databend_common_expression::SEARCH_SCORE_COL_NAME;
@@ -77,6 +79,19 @@ impl InternalColumnFactory {
7779
InternalColumn::new(SEARCH_SCORE_COL_NAME, InternalColumnType::SearchScore),
7880
);
7981

82+
internal_columns.insert(
83+
FILENAME_COLUMN_NAME.to_string(),
84+
InternalColumn::new(FILENAME_COLUMN_NAME, InternalColumnType::FileName),
85+
);
86+
87+
internal_columns.insert(
88+
FILE_ROW_NUMBER_COLUMN_NAME.to_string(),
89+
InternalColumn::new(
90+
FILE_ROW_NUMBER_COLUMN_NAME,
91+
InternalColumnType::FileRowNumber,
92+
),
93+
);
94+
8095
InternalColumnFactory { internal_columns }
8196
}
8297

0 commit comments

Comments
 (0)