Skip to content

Commit 06824b3

Browse files
authored
refactor: Separate the code logic for COPY INTO LOCATION/TABLE. (#18060)
refactor: Separate the code for COPY INTO LOCATION and TABLE.
1 parent f835a85 commit 06824b3

File tree

27 files changed

+241
-220
lines changed

27 files changed

+241
-220
lines changed

Cargo.lock

Lines changed: 3 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/query/catalog/src/plan/datasource/datasource_info/stage.rs

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ use std::fmt::Display;
1717
use std::fmt::Formatter;
1818
use std::sync::Arc;
1919

20-
use databend_common_ast::ast::CopyIntoLocationOptions;
2120
use databend_common_ast::ast::CopyIntoTableOptions;
2221
use databend_common_exception::Result;
2322
use databend_common_expression::RemoteDefaultExpr;
@@ -48,10 +47,6 @@ pub struct StageTableInfo {
4847
pub is_select: bool,
4948
pub copy_into_table_options: CopyIntoTableOptions,
5049
pub is_variant: bool,
51-
52-
// copy into location only
53-
pub copy_into_location_ordered: bool,
54-
pub copy_into_location_options: CopyIntoLocationOptions,
5550
}
5651

5752
impl StageTableInfo {

src/query/service/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,7 @@ databend-storages-common-cache = { workspace = true }
116116
databend-storages-common-index = { workspace = true }
117117
databend-storages-common-io = { workspace = true }
118118
databend-storages-common-session = { workspace = true }
119+
databend-storages-common-stage = { workspace = true }
119120
databend-storages-common-table-meta = { workspace = true }
120121
derive-visitor = { workspace = true }
121122
ethnum = { workspace = true }

src/query/service/src/interpreters/access/privilege_access.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1244,7 +1244,7 @@ impl AccessChecker for PrivilegeAccess {
12441244
}
12451245
}
12461246
Plan::CopyIntoLocation(plan) => {
1247-
self.validate_stage_access(&plan.stage, UserPrivilegeType::Write).await?;
1247+
self.validate_stage_access(&plan.info.stage, UserPrivilegeType::Write).await?;
12481248
let from = plan.from.clone();
12491249
return self.check(ctx, &from).await;
12501250
}

src/query/service/src/interpreters/interpreter_copy_into_location.rs

Lines changed: 6 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -14,17 +14,14 @@
1414

1515
use std::sync::Arc;
1616

17-
use databend_common_ast::ast::CopyIntoLocationOptions;
1817
use databend_common_base::runtime::GlobalIORuntime;
19-
use databend_common_catalog::plan::StageTableInfo;
2018
use databend_common_exception::Result;
2119
use databend_common_expression::infer_table_schema;
22-
use databend_common_meta_app::principal::StageInfo;
2320
use databend_common_meta_app::schema::UpdateStreamMetaReq;
2421
use databend_common_pipeline_core::ExecutionInfo;
2522
use databend_common_sql::executor::physical_plans::CopyIntoLocation;
2623
use databend_common_sql::executor::PhysicalPlan;
27-
use databend_common_storage::StageFilesInfo;
24+
use databend_storages_common_stage::CopyIntoLocationInfo;
2825
use log::debug;
2926
use log::info;
3027

@@ -84,10 +81,8 @@ impl CopyIntoLocationInterpreter {
8481
#[async_backtrace::framed]
8582
async fn build_local_copy_into_stage_pipeline(
8683
&self,
87-
stage: &StageInfo,
88-
path: &str,
8984
query: &Plan,
90-
options: &CopyIntoLocationOptions,
85+
info: &CopyIntoLocationInfo,
9186
) -> Result<(PipelineBuildResult, Vec<UpdateStreamMetaReq>)> {
9287
let (query_interpreter, update_stream_meta_req) = self.build_query(query).await?;
9388
let query_physical_plan = query_interpreter.build_physical_plan().await?;
@@ -98,25 +93,9 @@ impl CopyIntoLocationInterpreter {
9893
plan_id: 0,
9994
input: Box::new(query_physical_plan),
10095
project_columns: query_interpreter.get_result_columns(),
101-
input_schema: query_result_schema,
102-
to_stage_info: StageTableInfo {
103-
schema: table_schema,
104-
stage_info: stage.clone(),
105-
files_info: StageFilesInfo {
106-
path: path.to_string(),
107-
files: None,
108-
pattern: None,
109-
},
110-
files_to_copy: None,
111-
duplicated_files_detected: vec![],
112-
is_select: false,
113-
default_exprs: None,
114-
copy_into_location_options: options.clone(),
115-
copy_into_table_options: Default::default(),
116-
stage_root: "".to_string(),
117-
copy_into_location_ordered: self.plan.is_ordered,
118-
is_variant: false,
119-
},
96+
input_data_schema: query_result_schema,
97+
input_table_schema: table_schema,
98+
info: info.clone(),
12099
}));
121100

122101
let mut next_plan_id = 0;
@@ -148,12 +127,7 @@ impl Interpreter for CopyIntoLocationInterpreter {
148127
}
149128

150129
let (mut pipeline_build_result, update_stream_reqs) = self
151-
.build_local_copy_into_stage_pipeline(
152-
&self.plan.stage,
153-
&self.plan.path,
154-
&self.plan.from,
155-
&self.plan.options,
156-
)
130+
.build_local_copy_into_stage_pipeline(&self.plan.from, &self.plan.info)
157131
.await?;
158132

159133
// We are going to consuming streams, which are all of the default catalog

src/query/service/src/pipelines/builders/builder_copy_into_location.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
use databend_common_catalog::table_context::TableContext;
1616
use databend_common_exception::Result;
1717
use databend_common_sql::executor::physical_plans::CopyIntoLocation;
18-
use databend_common_storages_stage::StageTable;
18+
use databend_common_storages_stage::StageSinkTable;
1919

2020
use crate::pipelines::PipelineBuilder;
2121

@@ -32,12 +32,12 @@ impl PipelineBuilder {
3232
false,
3333
)?;
3434

35-
let to_table = StageTable::try_create(copy.to_stage_info.clone())?;
35+
let to_table = StageSinkTable::create(copy.info.clone(), copy.input_table_schema.clone())?;
3636
PipelineBuilder::build_append2table_with_commit_pipeline(
3737
self.ctx.clone(),
3838
&mut self.main_pipeline,
3939
to_table,
40-
copy.input_schema.clone(),
40+
copy.input_data_schema.clone(),
4141
None,
4242
vec![],
4343
false,

src/query/service/src/sessions/query_ctx.rs

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1701,10 +1701,8 @@ impl TableContext for QueryContext {
17011701
duplicated_files_detected: vec![],
17021702
is_select: true,
17031703
default_exprs: None,
1704-
copy_into_location_options: Default::default(),
17051704
copy_into_table_options: Default::default(),
17061705
stage_root,
1707-
copy_into_location_ordered: false,
17081706
is_variant: true,
17091707
};
17101708
StageTable::try_create(info)
@@ -1720,10 +1718,8 @@ impl TableContext for QueryContext {
17201718
duplicated_files_detected: vec![],
17211719
is_select: true,
17221720
default_exprs: None,
1723-
copy_into_location_options: Default::default(),
17241721
copy_into_table_options: Default::default(),
17251722
stage_root,
1726-
copy_into_location_ordered: false,
17271723
is_variant: false,
17281724
};
17291725
OrcTable::try_create(info).await
@@ -1741,10 +1737,8 @@ impl TableContext for QueryContext {
17411737
duplicated_files_detected: vec![],
17421738
is_select: true,
17431739
default_exprs: None,
1744-
copy_into_location_options: Default::default(),
17451740
copy_into_table_options: Default::default(),
17461741
stage_root,
1747-
copy_into_location_ordered: false,
17481742
is_variant: true,
17491743
};
17501744
StageTable::try_create(info)
@@ -1780,10 +1774,8 @@ impl TableContext for QueryContext {
17801774
duplicated_files_detected: vec![],
17811775
is_select: true,
17821776
default_exprs: None,
1783-
copy_into_location_options: Default::default(),
17841777
copy_into_table_options: Default::default(),
17851778
stage_root,
1786-
copy_into_location_ordered: false,
17871779
is_variant: false,
17881780
};
17891781
StageTable::try_create(info)

src/query/sql/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ databend-common-users = { workspace = true }
3434
databend-common-version = { workspace = true }
3535
databend-enterprise-data-mask-feature = { workspace = true }
3636
databend-storages-common-cache = { workspace = true }
37+
databend-storages-common-stage = { workspace = true }
3738
databend-storages-common-table-meta = { workspace = true }
3839

3940
ahash = { workspace = true, features = ["no-rng"] }

src/query/sql/src/executor/physical_plan_visitor.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -425,8 +425,9 @@ pub trait PhysicalPlanReplacer {
425425
plan_id: plan.plan_id,
426426
input: Box::new(input),
427427
project_columns: plan.project_columns.clone(),
428-
input_schema: plan.input_schema.clone(),
429-
to_stage_info: plan.to_stage_info.clone(),
428+
input_data_schema: plan.input_data_schema.clone(),
429+
input_table_schema: plan.input_table_schema.clone(),
430+
info: plan.info.clone(),
430431
})))
431432
}
432433

src/query/sql/src/executor/physical_plans/physical_copy_into_location.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,14 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
use databend_common_catalog::plan::StageTableInfo;
1615
use databend_common_exception::Result;
1716
use databend_common_expression::types::DataType;
1817
use databend_common_expression::types::NumberDataType;
1918
use databend_common_expression::DataField;
2019
use databend_common_expression::DataSchemaRef;
2120
use databend_common_expression::DataSchemaRefExt;
21+
use databend_common_expression::TableSchemaRef;
22+
use databend_storages_common_stage::CopyIntoLocationInfo;
2223

2324
use crate::executor::PhysicalPlan;
2425
use crate::ColumnBinding;
@@ -28,8 +29,9 @@ pub struct CopyIntoLocation {
2829
pub plan_id: u32,
2930
pub input: Box<PhysicalPlan>,
3031
pub project_columns: Vec<ColumnBinding>,
31-
pub input_schema: DataSchemaRef,
32-
pub to_stage_info: StageTableInfo,
32+
pub input_data_schema: DataSchemaRef,
33+
pub input_table_schema: TableSchemaRef,
34+
pub info: CopyIntoLocationInfo,
3335
}
3436

3537
impl CopyIntoLocation {

0 commit comments

Comments
 (0)