Skip to content

Commit d143d1e

Browse files
authored
feat: support query metadata of ORC file. (#18428)
* chore: polish error message. * feat: support query metadata of ORC file.
1 parent 07498be commit d143d1e

File tree

17 files changed

+235
-94
lines changed

17 files changed

+235
-94
lines changed

src/query/service/src/servers/http/v1/streaming_load.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ async fn streaming_load_handler_inner(
136136
.headers()
137137
.get(HEADER_SQL)
138138
.ok_or(poem::Error::from_string(
139-
"[HTTP-STREAMING-LOAD] Missing required 'sql' header in request",
139+
format!("[HTTP-STREAMING-LOAD] Missing required header {HEADER_SQL} in request"),
140140
StatusCode::BAD_REQUEST,
141141
))?;
142142
let sql = sql.to_str().map_err(|e| {

src/query/sql/src/planner/binder/copy_into_table.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -691,7 +691,7 @@ pub async fn resolve_stage_location(
691691
// my_named_stage/abc/
692692
let names: Vec<&str> = location.splitn(2, '/').filter(|v| !v.is_empty()).collect();
693693
if names[0] == STAGE_PLACEHOLDER {
694-
return Err(ErrorCode::BadArguments("placeholder @_databend_upload should be used in streaming_load handler or replaced in client."));
694+
return Err(ErrorCode::BadArguments("placeholder @_databend_upload as location: should be used in streaming_load handler or replaced in client."));
695695
}
696696

697697
let stage = if names[0] == "~" {

src/query/sql/src/planner/binder/insert.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,7 @@ impl Binder {
194194
match location.as_str() {
195195
STAGE_PLACEHOLDER => {
196196
if self.ctx.get_session_type() != SessionType::HTTPStreamingLoad {
197-
return Err(ErrorCode::BadArguments("placeholder @_databend_upload should be used in streaming_load handler or replaced in client."));
197+
return Err(ErrorCode::BadArguments("placeholder @_databend_upload in query handler: should be used in streaming_load handler or replaced in client."));
198198
}
199199
let (required_source_schema, values_consts) = if let Some(value) = value {
200200
self.prepared_values(value, &schema, settings).await?
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
// Copyright 2021 Datafuse Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use databend_common_catalog::plan::InternalColumnType;
16+
use databend_common_expression::types::DataType;
17+
use databend_common_expression::types::NumberColumnBuilder;
18+
use databend_common_expression::Column;
19+
use databend_common_expression::DataBlock;
20+
use databend_common_expression::Scalar;
21+
22+
pub fn add_internal_columns(
23+
internal_columns: &[InternalColumnType],
24+
path: String,
25+
b: &mut DataBlock,
26+
start_row: &mut u64,
27+
) {
28+
for c in internal_columns {
29+
match c {
30+
InternalColumnType::FileName => {
31+
b.add_const_column(Scalar::String(path.clone()), DataType::String);
32+
}
33+
InternalColumnType::FileRowNumber => {
34+
let end_row = (*start_row) + b.num_rows() as u64;
35+
b.add_column(Column::Number(
36+
NumberColumnBuilder::UInt64(((*start_row)..end_row).collect()).build(),
37+
));
38+
*start_row = end_row;
39+
}
40+
_ => {
41+
unreachable!(
42+
"except InternalColumnType::FileName or InternalColumnType::FileRowNumber"
43+
);
44+
}
45+
}
46+
}
47+
}

src/query/storages/common/stage/src/read/columnar/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,10 @@
1313
// limitations under the License.
1414

1515
pub mod arrow_to_variant;
16+
mod internal_columns;
1617
mod projection;
1718

1819
pub use arrow_to_variant::read_record_batch_to_variant_column;
1920
pub use arrow_to_variant::record_batch_to_variant_block;
21+
pub use internal_columns::add_internal_columns;
2022
pub use projection::project_columnar;

src/query/storages/iceberg/src/table.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -333,7 +333,12 @@ impl IcebergTable {
333333
)?;
334334
pipeline.try_resize(max_threads)?;
335335
pipeline.add_accumulating_transformer(|| {
336-
StripeDecoder::new(ctx.clone(), data_schema.clone(), arrow_schema.clone())
336+
StripeDecoder::new(
337+
ctx.clone(),
338+
data_schema.clone(),
339+
arrow_schema.clone(),
340+
vec![],
341+
)
337342
});
338343
} else {
339344
let arrow_schema: Schema = table_schema.as_ref().into();

src/query/storages/orc/src/copy_into_table/processors/source.rs

Lines changed: 28 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -30,24 +30,19 @@ use databend_common_pipeline_sources::AsyncSource;
3030
use databend_common_pipeline_sources::AsyncSourcer;
3131
use databend_storages_common_stage::SingleFilePartition;
3232
use opendal::Operator;
33-
use orc_rust::async_arrow_reader::StripeFactory;
3433
use orc_rust::ArrowReaderBuilder;
3534

3635
use crate::chunk_reader_impl::OrcChunkReader;
3736
use crate::hashable_schema::HashableSchema;
37+
use crate::processors::source::ReadingFile;
3838
use crate::strip::StripeInMemory;
3939
use crate::utils::map_orc_error;
4040

4141
pub struct ORCSourceForCopy {
4242
table_ctx: Arc<dyn TableContext>,
4343
scan_progress: Arc<Progress>,
4444
op: Operator,
45-
reader: Option<(
46-
String,
47-
Box<StripeFactory<OrcChunkReader>>,
48-
HashableSchema,
49-
usize,
50-
)>,
45+
reader: Option<ReadingFile>,
5146
}
5247

5348
impl ORCSourceForCopy {
@@ -85,10 +80,16 @@ impl ORCSourceForCopy {
8580
.map_err(|e| map_orc_error(e, &path))?;
8681
let reader = builder.build_async();
8782
let (factory, schema) = reader.into_parts();
88-
let factory = factory.expect("factory must has been created");
83+
let stripe_factory = factory.expect("factory must has been created");
8984
let schema = HashableSchema::try_create(schema)?;
9085

91-
self.reader = Some((path, factory, schema, size));
86+
self.reader = Some(ReadingFile {
87+
path: path.to_string(),
88+
stripe_factory,
89+
size,
90+
schema: Some(schema),
91+
rows: 0,
92+
});
9293
Ok(true)
9394
}
9495
}
@@ -105,8 +106,9 @@ impl AsyncSource for ORCSourceForCopy {
105106
return Ok(None);
106107
}
107108
let start = Instant::now();
108-
if let Some((path, factory, schema, size)) = mem::take(&mut self.reader) {
109-
let (factory, stripe) = factory
109+
if let Some(file) = mem::take(&mut self.reader) {
110+
let (factory, stripe) = file
111+
.stripe_factory
110112
.read_next_stripe()
111113
.await
112114
.map_err(|e| ErrorCode::StorageOther(e.to_string()))?;
@@ -118,24 +120,31 @@ impl AsyncSource for ORCSourceForCopy {
118120
}
119121
Some(stripe) => {
120122
let used = start.elapsed().as_secs_f32();
123+
let rows = stripe.number_of_rows();
124+
121125
let bytes = stripe.stream_map().inner.values().map(|b| b.len()).sum();
122-
let progress_values = ProgressValues {
123-
rows: stripe.number_of_rows(),
124-
bytes,
125-
};
126+
let progress_values = ProgressValues { rows, bytes };
126127
Profile::record_usize_profile(ProfileStatisticsName::ScanBytes, bytes);
127128
log::info!(
128-
"read new stripe of {} rows and {bytes} bytes from {path}, use {} secs",
129+
"read new stripe of {} rows and {bytes} bytes from {}, use {} secs",
129130
stripe.number_of_rows(),
131+
file.path,
130132
used
131133
);
132134
self.scan_progress.incr(&progress_values);
133135

134-
self.reader = Some((path.clone(), Box::new(factory), schema.clone(), size));
136+
self.reader = Some(ReadingFile {
137+
path: file.path.clone(),
138+
stripe_factory: Box::new(factory),
139+
size: file.size,
140+
schema: file.schema.clone(),
141+
rows: (rows as u64) + file.rows,
142+
});
135143
let meta = Box::new(StripeInMemory {
136-
path,
144+
path: file.path.clone(),
137145
stripe,
138-
schema: Some(schema),
146+
schema: file.schema,
147+
start_row: file.rows,
139148
});
140149
return Ok(Some(DataBlock::empty_with_meta(meta)));
141150
}

src/query/storages/orc/src/processors/decoder.rs

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
use std::sync::Arc;
1616

1717
use arrow_array::RecordBatch;
18+
use databend_common_catalog::plan::InternalColumnType;
1819
use databend_common_catalog::query_kind::QueryKind;
1920
use databend_common_catalog::table_context::TableContext;
2021
use databend_common_exception::Result;
@@ -24,6 +25,7 @@ use databend_common_expression::DataSchema;
2425
use databend_common_pipeline_transforms::processors::AccumulatingTransform;
2526
use databend_common_storage::CopyStatus;
2627
use databend_common_storage::FileStatus;
28+
use databend_storages_common_stage::add_internal_columns;
2729
use orc_rust::array_decoder::NaiveStripeDecoder;
2830

2931
use crate::strip::StripeInMemory;
@@ -33,13 +35,15 @@ pub struct StripeDecoder {
3335
data_schema: Arc<DataSchema>,
3436
arrow_schema: arrow_schema::SchemaRef,
3537
copy_status: Option<Arc<CopyStatus>>,
38+
internal_columns: Vec<InternalColumnType>,
3639
}
3740

3841
impl StripeDecoder {
3942
pub fn new(
4043
table_ctx: Arc<dyn TableContext>,
4144
data_schema: Arc<DataSchema>,
4245
arrow_schema: arrow_schema::SchemaRef,
46+
internal_columns: Vec<InternalColumnType>,
4347
) -> Self {
4448
let copy_status = if matches!(table_ctx.get_query_kind(), QueryKind::CopyIntoTable) {
4549
Some(table_ctx.get_copy_status())
@@ -50,6 +54,7 @@ impl StripeDecoder {
5054
copy_status,
5155
arrow_schema,
5256
data_schema,
57+
internal_columns,
5358
}
5459
}
5560
}
@@ -68,14 +73,22 @@ impl AccumulatingTransform for StripeDecoder {
6873
let batches: std::result::Result<Vec<RecordBatch>, _> = decoder.into_iter().collect();
6974
let batches = batches.map_err(|e| map_orc_error(e, &stripe.path))?;
7075
let mut blocks = vec![];
76+
let mut start_row = stripe.start_row;
77+
7178
for batch in batches {
72-
let (block, _) = DataBlock::from_record_batch(self.data_schema.as_ref(), &batch)?;
79+
let (mut block, _) = DataBlock::from_record_batch(self.data_schema.as_ref(), &batch)?;
7380
if let Some(copy_status) = &self.copy_status {
7481
copy_status.add_chunk(&stripe.path, FileStatus {
7582
num_rows_loaded: block.num_rows(),
7683
error: None,
7784
})
7885
}
86+
add_internal_columns(
87+
&self.internal_columns,
88+
stripe.path.clone(),
89+
&mut block,
90+
&mut start_row,
91+
);
7992
blocks.push(block);
8093
}
8194
Ok(blocks)

src/query/storages/orc/src/processors/source.rs

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -65,10 +65,11 @@ impl InferredSchema {
6565
}
6666

6767
pub struct ReadingFile {
68-
path: String,
69-
stripe_factory: Box<StripeFactory<OrcChunkReader>>,
70-
size: usize,
71-
schema: Option<HashableSchema>,
68+
pub path: String,
69+
pub stripe_factory: Box<StripeFactory<OrcChunkReader>>,
70+
pub size: usize,
71+
pub schema: Option<HashableSchema>,
72+
pub rows: u64,
7273
}
7374

7475
pub struct ORCSource {
@@ -162,6 +163,7 @@ impl ORCSource {
162163
stripe_factory,
163164
size,
164165
schema,
166+
rows: 0,
165167
});
166168
Ok(true)
167169
}
@@ -197,22 +199,22 @@ impl AsyncSource for ORCSource {
197199
continue;
198200
}
199201
Some(stripe) => {
200-
let progress_values = ProgressValues {
201-
rows: stripe.number_of_rows(),
202-
bytes: 0,
203-
};
202+
let rows = stripe.number_of_rows();
203+
let progress_values = ProgressValues { rows, bytes: 0 };
204204
self.scan_progress.incr(&progress_values);
205205

206206
let meta = Box::new(StripeInMemory {
207207
path: file.path.clone(),
208208
stripe,
209209
schema: file.schema.clone(),
210+
start_row: file.rows,
210211
});
211212
self.reader = Some(ReadingFile {
212213
path: file.path.clone(),
213214
stripe_factory: Box::new(factory),
214215
size: file.size,
215216
schema: file.schema.clone(),
217+
rows: (rows as u64) + file.rows,
216218
});
217219
return Ok(Some(DataBlock::empty_with_meta(meta)));
218220
}

src/query/storages/orc/src/processors/variant_decoder.rs

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
use std::sync::Arc;
1616

1717
use arrow_array::RecordBatch;
18+
use databend_common_catalog::plan::InternalColumnType;
1819
use databend_common_catalog::query_kind::QueryKind;
1920
use databend_common_catalog::table_context::TableContext;
2021
use databend_common_exception::ErrorCode;
@@ -26,6 +27,7 @@ use databend_common_expression::TableSchema;
2627
use databend_common_pipeline_transforms::processors::AccumulatingTransform;
2728
use databend_common_storage::CopyStatus;
2829
use databend_common_storage::FileStatus;
30+
use databend_storages_common_stage::add_internal_columns;
2931
use databend_storages_common_stage::record_batch_to_variant_block;
3032
use jiff::tz::TimeZone;
3133
use orc_rust::array_decoder::NaiveStripeDecoder;
@@ -36,16 +38,25 @@ use crate::utils::map_orc_error;
3638
pub struct StripeDecoderForVariantTable {
3739
copy_status: Option<Arc<CopyStatus>>,
3840
tz: TimeZone,
41+
internal_columns: Vec<InternalColumnType>,
3942
}
4043

4144
impl StripeDecoderForVariantTable {
42-
pub fn new(table_ctx: Arc<dyn TableContext>, tz: TimeZone) -> Self {
45+
pub fn new(
46+
table_ctx: Arc<dyn TableContext>,
47+
tz: TimeZone,
48+
internal_columns: Vec<InternalColumnType>,
49+
) -> Self {
4350
let copy_status = if matches!(table_ctx.get_query_kind(), QueryKind::CopyIntoTable) {
4451
Some(table_ctx.get_copy_status())
4552
} else {
4653
None
4754
};
48-
StripeDecoderForVariantTable { copy_status, tz }
55+
StripeDecoderForVariantTable {
56+
copy_status,
57+
tz,
58+
internal_columns,
59+
}
4960
}
5061
}
5162

@@ -75,14 +86,22 @@ impl AccumulatingTransform for StripeDecoderForVariantTable {
7586
let batches = batches.map_err(|e| map_orc_error(e, &stripe.path))?;
7687

7788
let mut blocks = vec![];
89+
let mut start_row = stripe.start_row;
7890
for batch in batches {
79-
let block = record_batch_to_variant_block(batch, &self.tz, &typ, &schemas.data_schema)?;
91+
let mut block =
92+
record_batch_to_variant_block(batch, &self.tz, &typ, &schemas.data_schema)?;
8093
if let Some(copy_status) = &self.copy_status {
8194
copy_status.add_chunk(&stripe.path, FileStatus {
8295
num_rows_loaded: block.num_rows(),
8396
error: None,
8497
})
8598
}
99+
add_internal_columns(
100+
&self.internal_columns,
101+
stripe.path.clone(),
102+
&mut block,
103+
&mut start_row,
104+
);
86105
blocks.push(block);
87106
}
88107
Ok(blocks)

0 commit comments

Comments
 (0)