Skip to content

Commit e570d28

Browse files
authored
feat(query): add system.zero table && support empty result for empty stage (#18453)
* feat(query): add zero table * feat(query): add zero table * feat(query): add zero table * feat(query): add zero table * feat(query): add zero table * feat(query): add zero table
1 parent 6b15099 commit e570d28

File tree

15 files changed

+157
-38
lines changed

15 files changed

+157
-38
lines changed

โ€Žsrc/common/storage/src/stage.rs

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -201,19 +201,10 @@ impl StageFilesInfo {
201201
}
202202

203203
#[async_backtrace::framed]
204-
pub async fn first_file(&self, operator: &Operator) -> Result<StageFileInfo> {
204+
pub async fn first_file(&self, operator: &Operator) -> Result<Option<StageFileInfo>> {
205205
// We only fetch first file.
206206
let mut files = self.list(operator, 1, Some(1)).await?;
207-
files
208-
.pop()
209-
.ok_or_else(|| ErrorCode::BadArguments("no file found"))
210-
}
211-
212-
pub fn blocking_first_file(&self, operator: &Operator) -> Result<StageFileInfo> {
213-
let mut files = self.blocking_list(operator, Some(1))?;
214-
files
215-
.pop()
216-
.ok_or_else(|| ErrorCode::BadArguments("no file found"))
207+
Ok(files.pop())
217208
}
218209

219210
pub fn blocking_list(

โ€Žsrc/query/catalog/src/table_context.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -275,6 +275,13 @@ pub trait TableContext: Send + Sync {
275275
async fn get_table(&self, catalog: &str, database: &str, table: &str)
276276
-> Result<Arc<dyn Table>>;
277277

278+
async fn get_zero_table(&self) -> Result<Arc<dyn Table>> {
279+
let catalog = self.get_catalog("default").await?;
280+
catalog
281+
.get_table(&self.get_tenant(), "system", "zero")
282+
.await
283+
}
284+
278285
fn evict_table_from_cache(&self, catalog: &str, database: &str, table: &str) -> Result<()>;
279286

280287
async fn get_table_with_batch(

โ€Žsrc/query/service/src/databases/system/system_database.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ use databend_common_storages_system::UsersTable;
7070
use databend_common_storages_system::ViewsTableWithHistory;
7171
use databend_common_storages_system::ViewsTableWithoutHistory;
7272
use databend_common_storages_system::VirtualColumnsTable;
73+
use databend_common_storages_system::ZeroTable;
7374
use databend_common_version::DATABEND_CARGO_CFG_TARGET_FEATURE;
7475
use databend_common_version::DATABEND_COMMIT_AUTHORS;
7576
use databend_common_version::DATABEND_CREDITS_LICENSES;
@@ -105,6 +106,7 @@ impl SystemDatabase {
105106
) -> Self {
106107
let mut table_list = vec![
107108
OneTable::create(sys_db_meta.next_table_id()),
109+
ZeroTable::create(sys_db_meta.next_table_id()),
108110
FunctionsTable::create(sys_db_meta.next_table_id()),
109111
ContributorsTable::create(sys_db_meta.next_table_id(), DATABEND_COMMIT_AUTHORS),
110112
CreditsTable::create(

โ€Žsrc/query/service/src/sessions/query_ctx.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1771,6 +1771,7 @@ impl TableContext for QueryContext {
17711771
read_options = read_options.with_do_prewhere(false);
17721772
}
17731773
ParquetTable::create(
1774+
self,
17741775
stage_info.clone(),
17751776
files_info,
17761777
read_options,
@@ -1823,7 +1824,7 @@ impl TableContext for QueryContext {
18231824
stage_root,
18241825
is_variant,
18251826
};
1826-
OrcTable::try_create(info).await
1827+
OrcTable::try_create(self, info).await
18271828
}
18281829
FileFormatParams::NdJson(..) | FileFormatParams::Avro(..) => {
18291830
let schema = Arc::new(TableSchema::new(vec![TableField::new(

โ€Žsrc/query/service/src/table_functions/infer_schema/parquet.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -127,8 +127,9 @@ impl AsyncSource for ParquetInferSchemaSource {
127127
Some(f) => self.ctx.get_file_format(f).await?,
128128
None => stage_info.file_format_params.clone(),
129129
};
130-
let schema = match file_format_params.get_type() {
131-
StageFileFormatType::Parquet => {
130+
let schema = match (first_file.as_ref(), file_format_params.get_type()) {
131+
(None, _) => return Ok(None),
132+
(Some(first_file), StageFileFormatType::Parquet) => {
132133
let arrow_schema = read_parquet_schema_async_rs(
133134
&operator,
134135
&first_file.path,

โ€Žsrc/query/service/src/table_functions/inspect_parquet/inspect_parquet_table.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -237,9 +237,12 @@ impl AsyncSource for InspectParquetSource {
237237
};
238238

239239
let first_file = file_info.first_file(&operator).await?;
240-
240+
let Some(first_file) = first_file else {
241+
return Ok(None);
242+
};
241243
let parquet_schema =
242244
read_metadata_async(&first_file.path, &operator, Some(first_file.size)).await?;
245+
243246
let created = match parquet_schema.file_metadata().created_by() {
244247
Some(user) => user.to_owned(),
245248
None => String::from("NULL"),

โ€Žsrc/query/service/tests/it/storages/testdata/columns_table.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,7 @@ DB.Table: 'system'.'columns', Table: columns-table_id:1, ver:0, Engine: SystemCo
156156
| 'dropped_on' | 'system' | 'views' | 'Nullable(Timestamp)' | 'TIMESTAMP' | '' | '' | 'YES' | '' | NULL | NULL | NULL |
157157
| 'dropped_on' | 'system' | 'views_with_history' | 'Nullable(Timestamp)' | 'TIMESTAMP' | '' | '' | 'YES' | '' | NULL | NULL | NULL |
158158
| 'dummy' | 'system' | 'one' | 'UInt8' | 'TINYINT UNSIGNED' | '' | '' | 'NO' | '' | NULL | NULL | NULL |
159+
| 'dummy' | 'system' | 'zero' | 'UInt8' | 'TINYINT UNSIGNED' | '' | '' | 'NO' | '' | NULL | NULL | NULL |
159160
| 'enabled' | 'system' | 'notifications' | 'Boolean' | 'BOOLEAN' | '' | '' | 'NO' | '' | NULL | NULL | NULL |
160161
| 'end_time' | 'system' | 'clustering_history' | 'Timestamp' | 'TIMESTAMP' | '' | '' | 'NO' | '' | NULL | NULL | NULL |
161162
| 'engine' | 'information_schema' | 'tables' | 'String' | 'VARCHAR' | '' | '' | 'NO' | '' | NULL | NULL | NULL |

โ€Žsrc/query/storages/orc/src/table.rs

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,10 @@ impl OrcTable {
7171
}
7272

7373
#[async_backtrace::framed]
74-
pub async fn try_create(mut stage_table_info: StageTableInfo) -> Result<Arc<dyn Table>> {
74+
pub async fn try_create(
75+
ctx: &dyn TableContext,
76+
mut stage_table_info: StageTableInfo,
77+
) -> Result<Arc<dyn Table>> {
7578
let stage_info = &stage_table_info.stage_info;
7679
if stage_table_info.is_variant {
7780
let schema = Arc::new(TableSchema::new(vec![TableField::new(
@@ -88,18 +91,16 @@ impl OrcTable {
8891
let files_to_read = &stage_table_info.files_to_copy;
8992
let operator = init_stage_operator(stage_info)?;
9093
let first_file = match &files_to_read {
91-
Some(files) => files[0].clone(),
92-
None => stage_table_info
93-
.files_info
94-
.first_file(&operator)
95-
.await?
96-
.clone(),
94+
Some(files) => Some(files[0].clone()),
95+
None => stage_table_info.files_info.first_file(&operator).await?,
9796
};
9897

99-
let schema_from = first_file.path.clone();
98+
let Some(first_file) = first_file else {
99+
return ctx.get_zero_table().await;
100+
};
100101

102+
let schema_from = first_file.path.clone();
101103
let arrow_schema = Self::prepare_metas(first_file, operator.clone()).await?;
102-
103104
let table_schema = Arc::new(
104105
TableSchema::try_from(arrow_schema.as_ref()).map_err(ErrorCode::from_std_error)?,
105106
);

โ€Žsrc/query/storages/parquet/src/parquet_table/table.rs

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,7 @@ impl ParquetTable {
108108

109109
#[async_backtrace::framed]
110110
pub async fn create(
111+
ctx: &dyn TableContext,
111112
stage_info: StageInfo,
112113
files_info: StageFilesInfo,
113114
read_options: ParquetReadOptions,
@@ -119,13 +120,18 @@ impl ParquetTable {
119120
) -> Result<Arc<dyn Table>> {
120121
let operator = init_stage_operator(&stage_info)?;
121122
let first_file = match &files_to_read {
122-
Some(files) => files[0].path.clone(),
123-
None => files_info.first_file(&operator).await?.path.clone(),
123+
Some(files) => Some(files[0].clone()),
124+
None => files_info.first_file(&operator).await?,
124125
};
125126

127+
let Some(first_file) = first_file else {
128+
return ctx.get_zero_table().await;
129+
};
130+
131+
let first_file = first_file.path;
132+
126133
let (arrow_schema, schema_descr, compression_ratio) =
127134
Self::prepare_metas(&first_file, operator.clone()).await?;
128-
129135
let schema =
130136
arrow_to_table_schema(&arrow_schema, case_sensitive, fmt.use_logic_type)?.into();
131137
let table_info = create_parquet_table_info(schema, &stage_info)?;

โ€Žsrc/query/storages/system/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ mod user_functions_table;
6767
mod users_table;
6868
mod util;
6969
mod virtual_columns_table;
70+
mod zero_table;
7071

7172
pub use backtrace_table::BacktraceTable;
7273
pub use build_options_table::BuildOptionsTable;
@@ -132,3 +133,4 @@ pub use user_functions_table::UserFunctionsTable;
132133
pub use users_table::UsersTable;
133134
pub use util::generate_catalog_meta;
134135
pub use virtual_columns_table::VirtualColumnsTable;
136+
pub use zero_table::ZeroTable;

0 commit comments

Comments
ย (0)