Skip to content

Commit 0a6673b

Browse files
authored
feat: streaming load support parquet. (#18017)
* chore: ParquetPart do not need to impl Eq. * refactor: simplify ParquetCopySource. * refactor: rename fn. * refactor: read_blocks_from_binary return iter instead of vector. * feat: streaming load support parquet.
1 parent fd73cb8 commit 0a6673b

36 files changed

+590
-88
lines changed

src/meta/app/src/principal/file_format.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,16 @@ impl FileFormatParams {
7878
}
7979
}
8080

81+
pub fn support_streaming_load(&self) -> bool {
82+
matches!(
83+
self,
84+
FileFormatParams::Csv(_)
85+
| FileFormatParams::Tsv(_)
86+
| FileFormatParams::NdJson(_)
87+
| FileFormatParams::Parquet(_)
88+
)
89+
}
90+
8191
pub fn default_by_type(format_type: StageFileFormatType) -> Result<Self> {
8292
match format_type {
8393
StageFileFormatType::Parquet => {

src/query/service/src/pipelines/builders/builder_copy_into_table.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,12 +35,12 @@ use databend_common_sql::executor::physical_plans::CopyIntoTable;
3535
use databend_common_sql::executor::physical_plans::CopyIntoTableSource;
3636
use databend_common_sql::plans::CopyIntoTableMode;
3737
use databend_common_storage::StageFileInfo;
38+
use databend_common_storages_stage::TransformNullIf;
3839
use log::debug;
3940
use log::info;
4041

4142
use crate::pipelines::processors::transforms::TransformAddConstColumns;
4243
use crate::pipelines::processors::transforms::TransformCastSchema;
43-
use crate::pipelines::processors::transforms::TransformNullIf;
4444
use crate::pipelines::PipelineBuilder;
4545
use crate::sessions::QueryContext;
4646

@@ -135,7 +135,7 @@ impl PipelineBuilder {
135135
let source_schema = if let Some(null_if) =
136136
Self::need_null_if_processor(plan, &source_schema, plan_required_source_schema)
137137
{
138-
let func_ctx = ctx.get_function_context()?;
138+
let func_ctx = Arc::new(ctx.get_function_context()?);
139139
main_pipeline.try_add_transformer(|| {
140140
TransformNullIf::try_new(
141141
source_schema.clone(),

src/query/service/src/pipelines/processors/transforms/mod.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@ mod transform_filter;
3434
mod transform_limit;
3535
mod transform_merge_block;
3636
mod transform_merge_sort;
37-
mod transform_null_if;
3837
mod transform_recursive_cte_scan;
3938
mod transform_recursive_cte_source;
4039
mod transform_resort_addon;
@@ -67,7 +66,6 @@ pub use transform_filter::TransformFilter;
6766
pub use transform_limit::TransformLimit;
6867
pub use transform_merge_block::TransformMergeBlock;
6968
pub use transform_merge_sort::*;
70-
pub use transform_null_if::TransformNullIf;
7169
pub use transform_recursive_cte_scan::TransformRecursiveCteScan;
7270
pub use transform_recursive_cte_source::TransformRecursiveCteSource;
7371
pub use transform_resort_addon::TransformResortAddOn;

src/query/service/src/servers/http/v1/streaming_load.rs

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,7 @@ pub async fn streaming_load_handler_inner(
170170
receiver,
171171
..
172172
} => {
173-
if !matches!(&**format, FileFormatParams::Csv(_) | FileFormatParams::Tsv(_) | FileFormatParams::NdJson(_)) {
173+
if !format.support_streaming_load() {
174174
return Err(poem::Error::from_string( format!( "[HTTP-STREAMING-LOAD] Unsupported file format: {}", format.get_type() ), StatusCode::BAD_REQUEST));
175175
}
176176
let (tx, rx) = tokio::sync::mpsc::channel(1);
@@ -185,17 +185,22 @@ pub async fn streaming_load_handler_inner(
185185
id: http_context.query_id.clone(),
186186
stats: query_context.get_write_progress().get_values(),
187187
})),
188-
Ok(Err(cause)) => Err(poem::Error::from_string(
188+
Ok(Err(cause)) => {
189+
info!("[HTTP-STREAMING-LOAD] Query execution failed: {:?}", cause);
190+
Err(poem::Error::from_string(
189191
format!(
190192
"[HTTP-STREAMING-LOAD] Query execution failed: {}",
191193
cause.display_with_sql(sql).message()
192194
),
193195
StatusCode::BAD_REQUEST,
194-
)),
195-
Err(_) => Err(poem::Error::from_string(
196-
"[HTTP-STREAMING-LOAD] Internal server error: execution thread panicked",
197-
StatusCode::INTERNAL_SERVER_ERROR,
198-
)),
196+
))},
197+
Err(err) => {
198+
info!("[HTTP-STREAMING-LOAD] Internal server error: {:?}", err);
199+
Err(poem::Error::from_string(
200+
"[HTTP-STREAMING-LOAD] Internal server error: execution thread panicked",
201+
StatusCode::INTERNAL_SERVER_ERROR,
202+
))
203+
},
199204
}
200205
}
201206
_non_supported_source => Err(poem::Error::from_string(

src/query/storages/delta/src/partition.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ use databend_common_expression::Scalar;
2222
use databend_common_storages_parquet::ParquetPart;
2323

2424
/// only support parquet for now: https://github.com/delta-io/delta/issues/87
25-
#[derive(serde::Serialize, serde::Deserialize, PartialEq, Eq, Debug, Clone)]
25+
#[derive(serde::Serialize, serde::Deserialize, PartialEq, Debug, Clone)]
2626
pub struct DeltaPartInfo {
2727
pub data: ParquetPart,
2828
pub partition_values: Vec<Scalar>,

src/query/storages/parquet/src/copy_into_table/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,10 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
mod projection;
1516
mod reader;
1617
mod source;
1718
mod table;
1819

20+
pub(crate) use projection::CopyProjectionEvaluator;
1921
pub use table::ParquetTableForCopy;
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
// Copyright 2021 Datafuse Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::sync::Arc;
16+
17+
use databend_common_exception::Result;
18+
use databend_common_expression::BlockEntry;
19+
use databend_common_expression::DataBlock;
20+
use databend_common_expression::DataSchemaRef;
21+
use databend_common_expression::Evaluator;
22+
use databend_common_expression::Expr;
23+
use databend_common_expression::FunctionContext;
24+
use databend_common_functions::BUILTIN_FUNCTIONS;
25+
26+
pub(crate) struct CopyProjectionEvaluator {
27+
schema: DataSchemaRef,
28+
func_ctx: Arc<FunctionContext>,
29+
}
30+
31+
impl CopyProjectionEvaluator {
32+
pub(crate) fn new(schema: DataSchemaRef, func_ctx: Arc<FunctionContext>) -> Self {
33+
Self { schema, func_ctx }
34+
}
35+
pub(crate) fn project(&self, block: &DataBlock, projection: &[Expr]) -> Result<DataBlock> {
36+
let evaluator = Evaluator::new(block, &self.func_ctx, &BUILTIN_FUNCTIONS);
37+
let mut columns = Vec::with_capacity(projection.len());
38+
for (field, expr) in self.schema.fields().iter().zip(projection.iter()) {
39+
let value = evaluator.run(expr)?;
40+
let column = BlockEntry::new(field.data_type().clone(), value);
41+
columns.push(column);
42+
}
43+
Ok(DataBlock::new(columns, block.num_rows()))
44+
}
45+
}

src/query/storages/parquet/src/copy_into_table/reader.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ impl RowGroupReaderForCopy {
5454
InMemoryRowGroup::new(&part.location, op.clone(), &part.meta, None, *read_settings);
5555
let mut _sorter = None;
5656
self.row_group_reader_builder
57-
.build(row_group, None, &mut _sorter, None, batch_size)
57+
.fetch_and_build(row_group, None, &mut _sorter, None, batch_size)
5858
.await
5959
}
6060

src/query/storages/parquet/src/copy_into_table/source.rs

Lines changed: 15 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -22,28 +22,24 @@ use databend_common_base::runtime::profile::Profile;
2222
use databend_common_base::runtime::profile::ProfileStatisticsName;
2323
use databend_common_catalog::table_context::TableContext;
2424
use databend_common_exception::Result;
25-
use databend_common_expression::BlockEntry;
2625
use databend_common_expression::DataBlock;
2726
use databend_common_expression::DataSchemaRef;
28-
use databend_common_expression::Evaluator;
29-
use databend_common_expression::FunctionContext;
30-
use databend_common_functions::BUILTIN_FUNCTIONS;
27+
use databend_common_expression::Expr;
3128
use databend_common_pipeline_core::processors::Event;
3229
use databend_common_pipeline_core::processors::OutputPort;
3330
use databend_common_pipeline_core::processors::Processor;
3431
use databend_common_pipeline_core::processors::ProcessorPtr;
3532
use opendal::Operator;
3633

34+
use crate::copy_into_table::projection::CopyProjectionEvaluator;
3735
use crate::copy_into_table::reader::RowGroupReaderForCopy;
3836
use crate::parquet_reader::policy::ReadPolicyImpl;
3937
use crate::read_settings::ReadSettings;
4038
use crate::ParquetPart;
4139

42-
type SchemaIndex = usize;
43-
4440
enum State {
4541
Init,
46-
ReadRowGroup((SchemaIndex, ReadPolicyImpl)),
42+
ReadRowGroup((Vec<Expr>, ReadPolicyImpl)),
4743
}
4844

4945
pub struct ParquetCopySource {
@@ -59,9 +55,7 @@ pub struct ParquetCopySource {
5955
// Used to read parquet.
6056
row_group_readers: Arc<HashMap<usize, RowGroupReaderForCopy>>,
6157
operator: Operator,
62-
schema: DataSchemaRef,
63-
func_ctx: FunctionContext,
64-
58+
copy_projection_evaluator: CopyProjectionEvaluator,
6559
state: State,
6660
batch_size: usize,
6761
}
@@ -76,20 +70,20 @@ impl ParquetCopySource {
7670
) -> Result<ProcessorPtr> {
7771
let scan_progress = ctx.get_scan_progress();
7872
let batch_size = ctx.get_settings().get_parquet_max_block_size()? as usize;
79-
let func_ctx = ctx.get_function_context()?;
73+
let func_ctx = Arc::new(ctx.get_function_context()?);
74+
let copy_projection_evaluator = CopyProjectionEvaluator::new(schema, func_ctx);
8075

8176
Ok(ProcessorPtr::create(Box::new(Self {
8277
output,
8378
scan_progress,
8479
ctx,
8580
operator,
8681
row_group_readers,
87-
func_ctx,
8882
batch_size,
8983
generated_data: None,
9084
is_finished: false,
9185
state: State::Init,
92-
schema,
86+
copy_projection_evaluator,
9387
})))
9488
}
9589
}
@@ -141,23 +135,13 @@ impl Processor for ParquetCopySource {
141135

142136
fn process(&mut self) -> Result<()> {
143137
match std::mem::replace(&mut self.state, State::Init) {
144-
State::ReadRowGroup((schema_index, mut reader)) => {
138+
State::ReadRowGroup((projection, mut reader)) => {
145139
if let Some(block) = reader.as_mut().read_block()? {
146-
let projection = self
147-
.row_group_readers
148-
.get(&schema_index)
149-
.unwrap()
150-
.output_projection();
151-
let evaluator = Evaluator::new(&block, &self.func_ctx, &BUILTIN_FUNCTIONS);
152-
let mut columns = Vec::with_capacity(projection.len());
153-
for (field, expr) in self.schema.fields().iter().zip(projection.iter()) {
154-
let value = evaluator.run(expr)?;
155-
let column = BlockEntry::new(field.data_type().clone(), value);
156-
columns.push(column);
157-
}
158-
let block = DataBlock::new(columns, block.num_rows());
159-
self.generated_data = Some(block);
160-
self.state = State::ReadRowGroup((schema_index, reader));
140+
self.generated_data = Some(
141+
self.copy_projection_evaluator
142+
.project(&block, &projection)?,
143+
);
144+
self.state = State::ReadRowGroup((projection, reader));
161145
}
162146
// Else: The reader is finished. We should try to build another reader.
163147
}
@@ -178,6 +162,7 @@ impl Processor for ParquetCopySource {
178162
.row_group_readers
179163
.get(&schema_index)
180164
.expect("schema index must exist");
165+
let projection = builder.output_projection().to_vec();
181166
let reader = builder
182167
.build_reader(
183168
part,
@@ -188,7 +173,7 @@ impl Processor for ParquetCopySource {
188173
.await?
189174
.expect("reader must exist");
190175
{
191-
self.state = State::ReadRowGroup((schema_index, reader));
176+
self.state = State::ReadRowGroup((projection, reader));
192177
}
193178
// Else: keep in init state.
194179
}
@@ -200,7 +185,6 @@ impl Processor for ParquetCopySource {
200185
}
201186
_ => unreachable!(),
202187
}
203-
204188
Ok(())
205189
}
206190
}

src/query/storages/parquet/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
#![feature(core_intrinsics)]
2424
#![feature(int_roundings)]
2525
#![feature(box_patterns)]
26+
#![feature(result_flattening)]
2627
// FIXME: Remove this once the deprecated code is removed
2728
#![allow(deprecated)]
2829

@@ -45,6 +46,7 @@ mod schema;
4546
pub use copy_into_table::ParquetTableForCopy;
4647
pub use parquet_part::ParquetFilePart;
4748
pub use parquet_part::ParquetPart;
49+
pub use parquet_reader::InmMemoryFile;
4850
pub use parquet_reader::ParquetFileReader;
4951
pub use parquet_reader::ParquetReaderBuilder;
5052
pub use parquet_reader::ParquetWholeFileReader;

0 commit comments

Comments
 (0)