Skip to content

Commit cd45b2d

Browse files
XuanwoBohuTANG
andauthored
feat: Implement Modify Table Connection (#18034)
* feat: Implement alter table connection Signed-off-by: Xuanwo <[email protected]> * feat: Implement Modify Table Connection Signed-off-by: Xuanwo <[email protected]> * Fix typo Signed-off-by: Xuanwo <[email protected]> * Add new test cases Signed-off-by: Xuanwo <[email protected]> * Make clippy happy Signed-off-by: Xuanwo <[email protected]> * Fix tests Signed-off-by: Xuanwo <[email protected]> --------- Signed-off-by: Xuanwo <[email protected]> Co-authored-by: Bohu <[email protected]>
1 parent 9b05067 commit cd45b2d

File tree

18 files changed

+393
-4
lines changed

18 files changed

+393
-4
lines changed

src/common/storage/src/operator.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -615,7 +615,7 @@ pub async fn check_operator(
615615

616616
GlobalIORuntime::instance()
617617
.spawn(async move {
618-
let res = op.stat("/").await;
618+
let res = op.stat("databend_storage_checker").await;
619619
match res {
620620
Ok(_) => Ok(()),
621621
Err(e) if e.kind() == opendal::ErrorKind::NotFound => Ok(()),
@@ -626,7 +626,7 @@ pub async fn check_operator(
626626
.expect("join must succeed")
627627
.map_err(|cause| {
628628
ErrorCode::StorageUnavailable(format!(
629-
"current configured storage is not available: config: {:?}, cause: {cause}",
629+
"current configured storage is not valid: config: {:?}, cause: {cause}",
630630
params
631631
))
632632
})

src/meta/app/src/storage/storage_params.rs

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,28 @@ impl Default for StorageParams {
5858
}
5959

6060
impl StorageParams {
61+
/// Get the storage type as a string.
62+
pub fn storage_type(&self) -> String {
63+
match self {
64+
StorageParams::Azblob(_) => "azblob".to_string(),
65+
StorageParams::Fs(_) => "fs".to_string(),
66+
StorageParams::Ftp(_) => "ftp".to_string(),
67+
StorageParams::Gcs(_) => "gcs".to_string(),
68+
StorageParams::Hdfs(_) => "hdfs".to_string(),
69+
StorageParams::Http(_) => "http".to_string(),
70+
StorageParams::Ipfs(_) => "ipfs".to_string(),
71+
StorageParams::Memory => "memory".to_string(),
72+
StorageParams::Moka(_) => "moka".to_string(),
73+
StorageParams::Obs(_) => "obs".to_string(),
74+
StorageParams::Oss(_) => "oss".to_string(),
75+
StorageParams::S3(_) => "s3".to_string(),
76+
StorageParams::Webhdfs(_) => "webhdfs".to_string(),
77+
StorageParams::Cos(_) => "cos".to_string(),
78+
StorageParams::Huggingface(_) => "huggingface".to_string(),
79+
StorageParams::None => "none".to_string(),
80+
}
81+
}
82+
6183
/// Whether this storage params is secure.
6284
///
6385
/// Query will forbid this storage config unless `allow_insecure` has been enabled.
@@ -157,6 +179,37 @@ impl StorageParams {
157179

158180
Ok(sp)
159181
}
182+
183+
/// Apply the update from another StorageParams.
184+
///
185+
/// Only specific storage params like `credential` can be updated.
186+
pub fn apply_update(self, other: Self) -> Result<Self> {
187+
match (self, other) {
188+
(StorageParams::Azblob(mut s1), StorageParams::Azblob(s2)) => {
189+
s1.account_name = s2.account_name;
190+
s1.account_key = s2.account_key;
191+
s1.network_config = s2.network_config;
192+
Ok(Self::Azblob(s1))
193+
}
194+
(StorageParams::Gcs(mut s1), StorageParams::Gcs(s2)) => {
195+
s1.credential = s2.credential;
196+
s1.network_config = s2.network_config;
197+
Ok(Self::Gcs(s1))
198+
}
199+
(StorageParams::S3(mut s1), StorageParams::S3(s2)) => {
200+
s1.access_key_id = s2.access_key_id;
201+
s1.secret_access_key = s2.secret_access_key;
202+
s1.security_token = s2.security_token;
203+
s1.master_key = s2.master_key;
204+
s1.network_config = s2.network_config;
205+
Ok(Self::S3(s1))
206+
}
207+
(s1, s2) => Err(ErrorCode::StorageOther(format!(
208+
"Cannot apply update from {:?} to {:?}",
209+
&s1, &s2
210+
))),
211+
}
212+
}
160213
}
161214

162215
/// StorageParams will be displayed by `{protocol}://{key1=value1},{key2=value2}`

src/query/ast/src/ast/statements/table.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -466,6 +466,9 @@ pub enum AlterTableAction {
466466
targets: Vec<Identifier>,
467467
},
468468
RefreshTableCache,
469+
ModifyConnection {
470+
new_connection: BTreeMap<String, String>,
471+
},
469472
}
470473

471474
impl Display for AlterTableAction {
@@ -538,6 +541,11 @@ impl Display for AlterTableAction {
538541
AlterTableAction::RefreshTableCache => {
539542
write!(f, "REFRESH CACHE")?;
540543
}
544+
AlterTableAction::ModifyConnection { new_connection } => {
545+
write!(f, "CONNECTION=(")?;
546+
write_space_separated_string_map(f, new_connection)?;
547+
write!(f, ")")?;
548+
}
541549
};
542550
Ok(())
543551
}

src/query/ast/src/parser/statement.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3991,6 +3991,15 @@ pub fn alter_table_action(i: Input) -> IResult<AlterTableAction> {
39913991
|(_, _)| AlterTableAction::RefreshTableCache,
39923992
);
39933993

3994+
let modify_table_connection = map(
3995+
rule! {
3996+
CONNECTION ~ ^"=" ~ #connection_options
3997+
},
3998+
|(_, _, connection_options)| AlterTableAction::ModifyConnection {
3999+
new_connection: connection_options,
4000+
},
4001+
);
4002+
39944003
rule!(
39954004
#alter_table_cluster_key
39964005
| #drop_table_cluster_key
@@ -4005,6 +4014,7 @@ pub fn alter_table_action(i: Input) -> IResult<AlterTableAction> {
40054014
| #set_table_options
40064015
| #unset_table_options
40074016
| #refresh_cache
4017+
| #modify_table_connection
40084018
)(i)
40094019
}
40104020

src/query/ast/tests/it/parser.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -925,6 +925,8 @@ SELECT * from s;"#,
925925
r#"CREATE SEQUENCE seq comment='test'"#,
926926
r#"DESCRIBE SEQUENCE seq"#,
927927
r#"SHOW SEQUENCES LIKE '%seq%'"#,
928+
r#"ALTER TABLE p1 CONNECTION=(CONNECTION_NAME='test')"#,
929+
r#"ALTER table t connection=(access_key_id ='x' secret_access_key ='y' endpoint_url='http://127.0.0.1:9900')"#,
928930
];
929931

930932
for case in cases {

src/query/ast/tests/it/testdata/stmt.txt

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26341,3 +26341,81 @@ ShowSequences {
2634126341
}
2634226342

2634326343

26344+
---------- Input ----------
26345+
ALTER TABLE p1 CONNECTION=(CONNECTION_NAME='test')
26346+
---------- Output ---------
26347+
ALTER TABLE p1 CONNECTION=(connection_name = 'test')
26348+
---------- AST ------------
26349+
AlterTable(
26350+
AlterTableStmt {
26351+
if_exists: false,
26352+
table_reference: Table {
26353+
span: Some(
26354+
12..14,
26355+
),
26356+
catalog: None,
26357+
database: None,
26358+
table: Identifier {
26359+
span: Some(
26360+
12..14,
26361+
),
26362+
name: "p1",
26363+
quote: None,
26364+
ident_type: None,
26365+
},
26366+
alias: None,
26367+
temporal: None,
26368+
with_options: None,
26369+
pivot: None,
26370+
unpivot: None,
26371+
sample: None,
26372+
},
26373+
action: ModifyConnection {
26374+
new_connection: {
26375+
"connection_name": "test",
26376+
},
26377+
},
26378+
},
26379+
)
26380+
26381+
26382+
---------- Input ----------
26383+
ALTER table t connection=(access_key_id ='x' secret_access_key ='y' endpoint_url='http://127.0.0.1:9900')
26384+
---------- Output ---------
26385+
ALTER TABLE t CONNECTION=(access_key_id = 'x' endpoint_url = 'http://127.0.0.1:9900' secret_access_key = 'y')
26386+
---------- AST ------------
26387+
AlterTable(
26388+
AlterTableStmt {
26389+
if_exists: false,
26390+
table_reference: Table {
26391+
span: Some(
26392+
12..13,
26393+
),
26394+
catalog: None,
26395+
database: None,
26396+
table: Identifier {
26397+
span: Some(
26398+
12..13,
26399+
),
26400+
name: "t",
26401+
quote: None,
26402+
ident_type: None,
26403+
},
26404+
alias: None,
26405+
temporal: None,
26406+
with_options: None,
26407+
pivot: None,
26408+
unpivot: None,
26409+
sample: None,
26410+
},
26411+
action: ModifyConnection {
26412+
new_connection: {
26413+
"access_key_id": "x",
26414+
"endpoint_url": "http://127.0.0.1:9900",
26415+
"secret_access_key": "y",
26416+
},
26417+
},
26418+
},
26419+
)
26420+
26421+

src/query/service/src/interpreters/access/privilege_access.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1008,6 +1008,9 @@ impl AccessChecker for PrivilegeAccess {
10081008
Plan::ModifyTableComment(plan) => {
10091009
self.validate_table_access(&plan.catalog, &plan.database, &plan.table, UserPrivilegeType::Alter, false, false).await?
10101010
}
1011+
Plan::ModifyTableConnection(plan) => {
1012+
self.validate_table_access(&plan.catalog, &plan.database, &plan.table, UserPrivilegeType::Alter, false, false).await?
1013+
}
10111014
Plan::DropTableColumn(plan) => {
10121015
self.validate_table_access(&plan.catalog, &plan.database, &plan.table, UserPrivilegeType::Alter, false, false).await?
10131016
}

src/query/service/src/interpreters/interpreter_factory.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ use super::interpreter_mutation::MutationInterpreter;
3434
use super::interpreter_table_index_create::CreateTableIndexInterpreter;
3535
use super::interpreter_table_index_drop::DropTableIndexInterpreter;
3636
use super::interpreter_table_index_refresh::RefreshTableIndexInterpreter;
37+
use super::interpreter_table_modify_connection::ModifyTableConnectionInterpreter;
3738
use super::interpreter_table_set_options::SetOptionsInterpreter;
3839
use super::interpreter_user_stage_drop::DropUserStageInterpreter;
3940
use super::*;
@@ -334,6 +335,9 @@ impl InterpreterFactory {
334335
Plan::ModifyTableComment(new_comment) => Ok(Arc::new(
335336
ModifyTableCommentInterpreter::try_create(ctx, *new_comment.clone())?,
336337
)),
338+
Plan::ModifyTableConnection(new_connection) => Ok(Arc::new(
339+
ModifyTableConnectionInterpreter::try_create(ctx, *new_connection.clone())?,
340+
)),
337341
Plan::RenameTableColumn(rename_table_column) => Ok(Arc::new(
338342
RenameTableColumnInterpreter::try_create(ctx, *rename_table_column.clone())?,
339343
)),
Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
// Copyright 2021 Datafuse Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::sync::Arc;
16+
17+
use databend_common_ast::ast::UriLocation;
18+
use databend_common_catalog::table::TableExt;
19+
use databend_common_exception::ErrorCode;
20+
use databend_common_exception::Result;
21+
use databend_common_meta_app::schema::DatabaseType;
22+
use databend_common_sql::binder::parse_storage_params_from_uri;
23+
use databend_common_sql::plans::ModifyTableConnectionPlan;
24+
use databend_common_storage::check_operator;
25+
use databend_common_storage::init_operator;
26+
use databend_common_storages_stream::stream_table::STREAM_ENGINE;
27+
use databend_common_storages_view::view_table::VIEW_ENGINE;
28+
use log::debug;
29+
30+
use crate::interpreters::interpreter_table_add_column::commit_table_meta;
31+
use crate::interpreters::Interpreter;
32+
use crate::pipelines::PipelineBuildResult;
33+
use crate::sessions::QueryContext;
34+
use crate::sessions::TableContext;
35+
36+
pub struct ModifyTableConnectionInterpreter {
37+
ctx: Arc<QueryContext>,
38+
plan: ModifyTableConnectionPlan,
39+
}
40+
41+
impl ModifyTableConnectionInterpreter {
42+
pub fn try_create(ctx: Arc<QueryContext>, plan: ModifyTableConnectionPlan) -> Result<Self> {
43+
Ok(ModifyTableConnectionInterpreter { ctx, plan })
44+
}
45+
}
46+
47+
#[async_trait::async_trait]
48+
impl Interpreter for ModifyTableConnectionInterpreter {
49+
fn name(&self) -> &str {
50+
"ModifyTableConnectionInterpreter"
51+
}
52+
53+
fn is_ddl(&self) -> bool {
54+
true
55+
}
56+
57+
#[async_backtrace::framed]
58+
async fn execute2(&self) -> Result<PipelineBuildResult> {
59+
let catalog_name = self.plan.catalog.as_str();
60+
let db_name = self.plan.database.as_str();
61+
let tbl_name = self.plan.table.as_str();
62+
63+
let table = self
64+
.ctx
65+
.get_catalog(catalog_name)
66+
.await?
67+
.get_table(&self.ctx.get_tenant(), db_name, tbl_name)
68+
.await?;
69+
70+
// check mutability
71+
table.check_mutable()?;
72+
73+
let table_info = table.get_table_info();
74+
let engine = table.engine();
75+
if matches!(engine, VIEW_ENGINE | STREAM_ENGINE) {
76+
return Err(ErrorCode::TableEngineNotSupported(format!(
77+
"{}.{} engine is {} that doesn't support alter",
78+
&self.plan.database, &self.plan.table, engine
79+
)));
80+
}
81+
if table_info.db_type != DatabaseType::NormalDB {
82+
return Err(ErrorCode::TableEngineNotSupported(format!(
83+
"{}.{} doesn't support alter",
84+
&self.plan.database, &self.plan.table
85+
)));
86+
}
87+
let Some(old_sp) = table_info.meta.storage_params.clone() else {
88+
return Err(ErrorCode::TableEngineNotSupported(format!(
89+
"{}.{} is not an external table, cannot alter connection",
90+
&self.plan.database, &self.plan.table
91+
)));
92+
};
93+
94+
debug!("old storage params before update: {old_sp:?}");
95+
96+
// This location is used to parse the storage parameters from the URI.
97+
//
98+
// We don't really this this location to replace the old one, we just parse it out and change the storage parameters on needs.
99+
let mut location = UriLocation::new(
100+
// The storage type is not changeable, we just use the old one.
101+
old_sp.storage_type(),
102+
// name is not changeable, we just use a dummy value here.
103+
"test".to_string(),
104+
// root is not changeable, we just use a dummy value here.
105+
"/".to_string(),
106+
self.plan.new_connection.clone(),
107+
);
108+
// NOTE: never use this storage params directly.
109+
let updated_sp = parse_storage_params_from_uri(
110+
&mut location,
111+
Some(self.ctx.as_ref() as _),
112+
"when ALTER TABLE CONNECTION",
113+
)
114+
.await?;
115+
116+
debug!("storage params used for update: {updated_sp:?}");
117+
let new_sp = old_sp.apply_update(updated_sp)?;
118+
debug!("new storage params been updated: {new_sp:?}");
119+
120+
// Check the storage params via init operator.
121+
let op = init_operator(&new_sp).map_err(|err| {
122+
ErrorCode::InvalidConfig(format!(
123+
"Input storage config for stage is invalid: {err:?}"
124+
))
125+
})?;
126+
check_operator(&op, &new_sp).await?;
127+
128+
let catalog = self.ctx.get_catalog(self.plan.catalog.as_str()).await?;
129+
let mut new_table_meta = table_info.meta.clone();
130+
new_table_meta.storage_params = Some(new_sp);
131+
132+
commit_table_meta(
133+
&self.ctx,
134+
table.as_ref(),
135+
table_info,
136+
new_table_meta,
137+
catalog,
138+
)
139+
.await?;
140+
141+
Ok(PipelineBuildResult::create())
142+
}
143+
}

0 commit comments

Comments
 (0)