Skip to content

Commit 7ad8c21

Browse files
authored
Merge pull request #10630 from sundy-li/copy-transform
2 parents f477280 + cab5ca6 commit 7ad8c21

File tree

5 files changed

+55
-15
lines changed

5 files changed

+55
-15
lines changed

src/meta/app/src/principal/user_stage.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,12 @@ pub enum StageFileCompression {
106106
None,
107107
}
108108

109+
impl StageFileFormatType {
110+
pub fn has_inner_schema(&self) -> bool {
111+
matches!(self, StageFileFormatType::Parquet)
112+
}
113+
}
114+
109115
impl Default for StageFileCompression {
110116
fn default() -> Self {
111117
Self::None

src/query/service/src/interpreters/interpreter_copy.rs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ use common_exception::ErrorCode;
2424
use common_exception::Result;
2525
use common_expression::infer_table_schema;
2626
use common_expression::DataField;
27+
use common_expression::DataSchema;
2728
use common_expression::DataSchemaRef;
2829
use common_expression::DataSchemaRefExt;
2930
use common_meta_app::principal::StageInfo;
@@ -41,6 +42,7 @@ use tracing::info;
4142
use crate::interpreters::common::append2table;
4243
use crate::interpreters::Interpreter;
4344
use crate::interpreters::SelectInterpreter;
45+
use crate::pipelines::processors::transforms::TransformRuntimeCastSchema;
4446
use crate::pipelines::processors::TransformCastSchema;
4547
use crate::pipelines::processors::TransformLimit;
4648
use crate::pipelines::PipelineBuildResult;
@@ -334,6 +336,26 @@ impl CopyInterpreter {
334336
)?;
335337
}
336338

339+
if stage_table_info
340+
.stage_info
341+
.file_format_options
342+
.format
343+
.has_inner_schema()
344+
{
345+
let dst_schema: Arc<DataSchema> = Arc::new(to_table.schema().into());
346+
let func_ctx = self.ctx.get_function_context()?;
347+
build_res.main_pipeline.add_transform(
348+
|transform_input_port, transform_output_port| {
349+
TransformRuntimeCastSchema::try_create(
350+
transform_input_port,
351+
transform_output_port,
352+
dst_schema.clone(),
353+
func_ctx,
354+
)
355+
},
356+
)?;
357+
}
358+
337359
// Build append data pipeline.
338360
to_table.append_data(
339361
ctx.clone(),

src/query/service/src/interpreters/interpreter_insert.rs

Lines changed: 18 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -378,20 +378,23 @@ impl Interpreter for InsertInterpreter {
378378
.format
379379
.exec_stream(input_context.clone(), &mut build_res.main_pipeline)?;
380380

381-
if Ok(StageFileFormatType::Parquet) == StageFileFormatType::from_str(format) {
382-
let dest_schema = plan.schema();
383-
let func_ctx = self.ctx.get_function_context()?;
384-
385-
build_res.main_pipeline.add_transform(
386-
|transform_input_port, transform_output_port| {
387-
TransformRuntimeCastSchema::try_create(
388-
transform_input_port,
389-
transform_output_port,
390-
dest_schema.clone(),
391-
func_ctx,
392-
)
393-
},
394-
)?;
381+
match StageFileFormatType::from_str(format) {
382+
Ok(f) if f.has_inner_schema() => {
383+
let dest_schema = plan.schema();
384+
let func_ctx = self.ctx.get_function_context()?;
385+
386+
build_res.main_pipeline.add_transform(
387+
|transform_input_port, transform_output_port| {
388+
TransformRuntimeCastSchema::try_create(
389+
transform_input_port,
390+
transform_output_port,
391+
dest_schema.clone(),
392+
func_ctx,
393+
)
394+
},
395+
)?;
396+
}
397+
_ => {}
395398
}
396399
}
397400
InsertInputSource::StreamingWithFileFormat(format_options, _, input_context) => {
@@ -400,7 +403,7 @@ impl Interpreter for InsertInterpreter {
400403
.format
401404
.exec_stream(input_context.clone(), &mut build_res.main_pipeline)?;
402405

403-
if StageFileFormatType::Parquet == format_options.format {
406+
if format_options.format.has_inner_schema() {
404407
let dest_schema = plan.schema();
405408
let func_ctx = self.ctx.get_function_context()?;
406409

tests/suites/1_stateful/00_copy/00_0002_copy_from_fs_location.result

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,3 +6,4 @@
66
199 2020.0 769
77
199 2020.0 769
88
199 2020.0 769
9+
199 2020.0000 769.00

tests/suites/1_stateful/00_copy/00_0002_copy_from_fs_location.sh

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,5 +39,13 @@ for i in "${copy_from_location_cases[@]}"; do
3939
echo "truncate table ontime200" | $MYSQL_CLIENT_CONNECT
4040
done
4141

42+
43+
## test copy from parquet with auto cast schema
44+
echo "create table test_decimal(Year Decimal(15,2), DayOfWeek Decimal(15,2), OriginCityMarketID Decimal(15,2) )" | $MYSQL_CLIENT_CONNECT
45+
echo "copy into test_decimal from 'fs://${DATADIR}/' PATTERN = 'ontime.*parquet' FILE_FORMAT = (type = PARQUET)" | $MYSQL_CLIENT_CONNECT
46+
echo "select count(1), avg(Year), sum(DayOfWeek) from test_decimal" | $MYSQL_CLIENT_CONNECT
47+
48+
4249
## Drop table
4350
echo "drop table if exists ontime200;" | $MYSQL_CLIENT_CONNECT
51+
echo "drop table if exists test_decimal;" | $MYSQL_CLIENT_CONNECT

0 commit comments

Comments
 (0)