Skip to content

Commit d0f9482

Browse files
committed
feat: support copy into table with transform.
1 parent 1ea2e2f commit d0f9482

File tree

3 files changed

+283
-0
lines changed

3 files changed

+283
-0
lines changed

src/query/service/src/interpreters/interpreter_copy.rs

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ use tracing::info;
4141
use crate::interpreters::common::append2table;
4242
use crate::interpreters::Interpreter;
4343
use crate::interpreters::SelectInterpreter;
44+
use crate::pipelines::processors::TransformCastSchema;
4445
use crate::pipelines::processors::TransformLimit;
4546
use crate::pipelines::PipelineBuildResult;
4647
use crate::sessions::QueryContext;
@@ -152,6 +153,76 @@ impl CopyInterpreter {
152153
}
153154
}
154155

156+
#[allow(clippy::too_many_arguments)]
157+
async fn build_copy_into_table_with_transform_pipeline(
158+
&self,
159+
catalog_name: &str,
160+
database_name: &str,
161+
table_name: &str,
162+
query: &Plan,
163+
stage_info: StageInfo,
164+
all_source_file_infos: Vec<StageFileInfo>,
165+
need_copy_file_infos: Vec<StageFileInfo>,
166+
) -> Result<PipelineBuildResult> {
167+
let start = Instant::now();
168+
let ctx = self.ctx.clone();
169+
let (mut build_res, source_schema) = self.build_query(query).await?;
170+
let to_table = ctx
171+
.get_table(catalog_name, database_name, table_name)
172+
.await?;
173+
174+
let dst_schema = Arc::new(to_table.schema().into());
175+
if source_schema != dst_schema {
176+
let func_ctx = ctx.get_function_context()?;
177+
build_res.main_pipeline.add_transform(
178+
|transform_input_port, transform_output_port| {
179+
TransformCastSchema::try_create(
180+
transform_input_port,
181+
transform_output_port,
182+
source_schema.clone(),
183+
dst_schema.clone(),
184+
func_ctx,
185+
)
186+
},
187+
)?;
188+
}
189+
190+
// Build append data pipeline.
191+
to_table.append_data(
192+
ctx.clone(),
193+
&mut build_res.main_pipeline,
194+
AppendMode::Copy,
195+
false,
196+
)?;
197+
198+
let database_name = database_name.to_string();
199+
let catalog_name = catalog_name.to_string();
200+
let table_name = table_name.to_string();
201+
build_res.main_pipeline.set_on_finished(move |may_error| {
202+
if may_error.is_none() {
203+
CopyInterpreter::commit_copy_into_table(
204+
ctx.clone(),
205+
to_table,
206+
stage_info,
207+
all_source_file_infos,
208+
need_copy_file_infos,
209+
catalog_name,
210+
database_name,
211+
table_name,
212+
)?;
213+
// Status.
214+
{
215+
info!("all copy finished, elapsed:{}", start.elapsed().as_secs());
216+
}
217+
Ok(())
218+
} else {
219+
Err(may_error.as_ref().unwrap().clone())
220+
}
221+
});
222+
223+
Ok(build_res)
224+
}
225+
155226
#[allow(clippy::too_many_arguments)]
156227
async fn build_copy_into_table_pipeline(
157228
&self,
@@ -422,6 +493,27 @@ impl Interpreter for CopyInterpreter {
422493
other
423494
))),
424495
},
496+
CopyPlan::IntoTableWithTransform {
497+
catalog_name,
498+
database_name,
499+
table_name,
500+
stage_info,
501+
from,
502+
all_source_file_infos,
503+
need_copy_file_infos,
504+
..
505+
} => {
506+
self.build_copy_into_table_with_transform_pipeline(
507+
catalog_name,
508+
database_name,
509+
table_name,
510+
from,
511+
*stage_info.clone(),
512+
all_source_file_infos.clone(),
513+
need_copy_file_infos.clone(),
514+
)
515+
.await
516+
}
425517
CopyPlan::IntoStage {
426518
stage, from, path, ..
427519
} => self.build_copy_into_stage_pipeline(stage, path, from).await,

src/query/sql/src/planner/binder/copy.rs

Lines changed: 167 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
use std::collections::BTreeMap;
1616
use std::str::FromStr;
1717
use std::sync::Arc;
18+
use std::time::Instant;
1819

1920
use common_ast::ast::CopyStmt;
2021
use common_ast::ast::CopyUnit;
@@ -40,9 +41,13 @@ use common_exception::ErrorCode;
4041
use common_exception::Result;
4142
use common_meta_app::principal::OnErrorMode;
4243
use common_meta_app::principal::StageInfo;
44+
use common_storage::init_stage_operator;
45+
use common_storage::StageFileInfo;
46+
use common_storage::StageFileStatus;
4347
use common_storage::StageFilesInfo;
4448
use common_users::UserApiProvider;
4549
use tracing::debug;
50+
use tracing::info;
4651

4752
use crate::binder::location::parse_uri_location;
4853
use crate::binder::Binder;
@@ -181,6 +186,26 @@ impl<'a> Binder {
181186
self.bind_copy_from_query_into_uri(bind_context, stmt, query, &mut ul)
182187
.await
183188
}
189+
(
190+
CopyUnit::Query(query),
191+
CopyUnit::Table {
192+
catalog,
193+
database,
194+
table,
195+
},
196+
) => {
197+
let (catalog_name, database_name, table_name) =
198+
self.normalize_object_identifier_triple(catalog, database, table);
199+
self.bind_copy_from_query_into_table(
200+
bind_context,
201+
stmt,
202+
query,
203+
&catalog_name,
204+
&database_name,
205+
&table_name,
206+
)
207+
.await
208+
}
184209
(src, dst) => Err(ErrorCode::SyntaxException(format!(
185210
"COPY INTO <{}> FROM <{}> is invalid",
186211
dst.target(),
@@ -470,6 +495,148 @@ impl<'a> Binder {
470495
})))
471496
}
472497

498+
/// Bind COPY INFO <table> FROM <query>
499+
async fn bind_copy_from_query_into_table(
500+
&mut self,
501+
bind_context: &BindContext,
502+
stmt: &CopyStmt,
503+
src_query: &Query,
504+
dst_catalog_name: &str,
505+
dst_database_name: &str,
506+
dst_table_name: &str,
507+
) -> Result<Plan> {
508+
// Validation mode.
509+
let validation_mode = ValidationMode::from_str(stmt.validation_mode.as_str())
510+
.map_err(ErrorCode::SyntaxException)?;
511+
512+
// dst
513+
let dst_table = self
514+
.ctx
515+
.get_table(dst_catalog_name, dst_database_name, dst_table_name)
516+
.await?;
517+
518+
// src
519+
let (select_list, location, alias) = check_transform_query(src_query)?;
520+
if matches!(location, FileLocation::Uri(_)) {
521+
// todo!(youngsofun): need to refactor parser
522+
return Err(ErrorCode::SyntaxException(
523+
"copy into table from uri with transform not supported yet",
524+
));
525+
}
526+
527+
let (mut stage_info, path) =
528+
parse_file_location(&self.ctx, location, BTreeMap::new()).await?;
529+
self.apply_stage_options(stmt, &mut stage_info).await?;
530+
let files_info = StageFilesInfo {
531+
path,
532+
pattern: stmt.pattern.clone(),
533+
files: stmt.files.clone(),
534+
};
535+
536+
let start = Instant::now();
537+
{
538+
let status = "begin to list files";
539+
self.ctx.set_status_info(status);
540+
info!(status);
541+
}
542+
543+
let operator = init_stage_operator(&stage_info)?;
544+
let files = if operator.info().can_blocking() {
545+
files_info.blocking_list(&operator, false)
546+
} else {
547+
files_info.list(&operator, false).await
548+
}?;
549+
550+
let mut all_source_file_infos = files
551+
.into_iter()
552+
.map(|file_with_meta| StageFileInfo::new(file_with_meta.path, &file_with_meta.metadata))
553+
.collect::<Vec<_>>();
554+
555+
info!("end to list files: {}", all_source_file_infos.len());
556+
557+
if !stmt.force {
558+
// Status.
559+
{
560+
let status = "begin to color copied files";
561+
self.ctx.set_status_info(status);
562+
info!(status);
563+
}
564+
565+
all_source_file_infos = self
566+
.ctx
567+
.color_copied_files(
568+
dst_catalog_name,
569+
dst_database_name,
570+
dst_table_name,
571+
all_source_file_infos,
572+
)
573+
.await?;
574+
575+
info!("end to color copied files: {}", all_source_file_infos.len());
576+
}
577+
578+
let mut need_copy_file_infos = vec![];
579+
for file in &all_source_file_infos {
580+
if file.status == StageFileStatus::NeedCopy {
581+
need_copy_file_infos.push(file.clone());
582+
}
583+
}
584+
585+
info!(
586+
"copy: read all files finished, all:{}, need copy:{}, elapsed:{}",
587+
all_source_file_infos.len(),
588+
need_copy_file_infos.len(),
589+
start.elapsed().as_secs()
590+
);
591+
592+
if need_copy_file_infos.is_empty() {
593+
return Err(ErrorCode::EmptyData("no file need to copy"));
594+
}
595+
596+
let (s_expr, mut from_context) = self
597+
.bind_stage_table(
598+
bind_context,
599+
stage_info.clone(),
600+
files_info,
601+
alias,
602+
Some(need_copy_file_infos.clone()),
603+
)
604+
.await?;
605+
606+
// Generate a analyzed select list with from context
607+
let select_list = self
608+
.normalize_select_list(&from_context, select_list)
609+
.await?;
610+
let (scalar_items, projections) = self.analyze_projection(&select_list)?;
611+
let s_expr =
612+
self.bind_projection(&mut from_context, &projections, &scalar_items, s_expr)?;
613+
let mut output_context = BindContext::new();
614+
output_context.parent = from_context.parent;
615+
output_context.columns = from_context.columns;
616+
617+
let query_plan = Plan::Query {
618+
s_expr: Box::new(s_expr),
619+
metadata: self.metadata.clone(),
620+
bind_context: Box::new(output_context),
621+
rewrite_kind: None,
622+
ignore_result: false,
623+
formatted_ast: None,
624+
};
625+
626+
Ok(Plan::Copy(Box::new(CopyPlan::IntoTableWithTransform {
627+
catalog_name: dst_catalog_name.to_string(),
628+
database_name: dst_database_name.to_string(),
629+
table_name: dst_table_name.to_string(),
630+
table_id: dst_table.get_id(),
631+
schema: dst_table.schema(),
632+
from: Box::new(query_plan),
633+
stage_info: Box::new(stage_info),
634+
all_source_file_infos,
635+
need_copy_file_infos,
636+
validation_mode,
637+
})))
638+
}
639+
473640
async fn apply_stage_options(&mut self, stmt: &CopyStmt, stage: &mut StageInfo) -> Result<()> {
474641
if !stmt.file_format.is_empty() {
475642
stage.file_format_options = self.try_resolve_file_format(&stmt.file_format).await?;

src/query/sql/src/planner/plans/copy.rs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ use common_catalog::plan::DataSourcePlan;
2020
use common_expression::TableSchemaRef;
2121
use common_meta_app::principal::StageInfo;
2222
use common_meta_types::MetaId;
23+
use common_storage::StageFileInfo;
2324

2425
use crate::plans::Plan;
2526

@@ -65,6 +66,18 @@ pub enum CopyPlan {
6566
from: Box<DataSourcePlan>,
6667
force: bool,
6768
},
69+
IntoTableWithTransform {
70+
catalog_name: String,
71+
database_name: String,
72+
table_name: String,
73+
table_id: MetaId,
74+
schema: TableSchemaRef,
75+
stage_info: Box<StageInfo>,
76+
validation_mode: ValidationMode,
77+
from: Box<Plan>,
78+
all_source_file_infos: Vec<StageFileInfo>,
79+
need_copy_file_infos: Vec<StageFileInfo>,
80+
},
6881
IntoStage {
6982
stage: Box<StageInfo>,
7083
path: String,
@@ -90,6 +103,17 @@ impl Debug for CopyPlan {
90103
write!(f, ", from: {from:?}")?;
91104
write!(f, " force: {force}")?;
92105
}
106+
CopyPlan::IntoTableWithTransform {
107+
database_name,
108+
table_name,
109+
from,
110+
validation_mode,
111+
..
112+
} => {
113+
write!(f, "Copy into {database_name:}.{table_name:}")?;
114+
write!(f, ", validation_mode: {validation_mode:?}")?;
115+
write!(f, ", from: {from:?}")?;
116+
}
93117
CopyPlan::IntoStage {
94118
stage,
95119
path,

0 commit comments

Comments
 (0)