Skip to content

Commit 5332f02

Browse files
authored
feat(query): support unset quota for workload group (#17924)
* feat(query): support unset quota for workload group * feat(query): support unset quota for workload group
1 parent a872034 commit 5332f02

File tree

18 files changed

+235
-45
lines changed

18 files changed

+235
-45
lines changed

src/common/exception/src/exception_code.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -409,6 +409,7 @@ build_exceptions! {
409409
AlreadyExistsWorkload(3141),
410410
UnknownWorkload(3142),
411411
WorkloadOperateConflict(3143),
412+
UnknownWorkloadQuotas(3144),
412413
}
413414

414415
// Storage errors [3001, 4000].

src/query/ast/src/ast/statements/statement.rs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,10 @@ use crate::ast::statements::pipe::CreatePipeStmt;
3030
use crate::ast::statements::settings::Settings;
3131
use crate::ast::statements::task::CreateTaskStmt;
3232
use crate::ast::statements::warehouse::ShowWarehousesStmt;
33-
use crate::ast::statements::workload::AlterWorkloadGroupStmt;
3433
use crate::ast::statements::workload::CreateWorkloadGroupStmt;
3534
use crate::ast::statements::workload::DropWorkloadGroupStmt;
3635
use crate::ast::statements::workload::RenameWorkloadGroupStmt;
36+
use crate::ast::statements::workload::SetWorkloadGroupQuotasStmt;
3737
use crate::ast::statements::workload::ShowWorkloadGroupsStmt;
3838
use crate::ast::write_comma_separated_list;
3939
use crate::ast::CreateOption;
@@ -156,7 +156,8 @@ pub enum Statement {
156156
CreateWorkloadGroup(CreateWorkloadGroupStmt),
157157
DropWorkloadGroup(DropWorkloadGroupStmt),
158158
RenameWorkloadGroup(RenameWorkloadGroupStmt),
159-
AlterWorkloadGroup(AlterWorkloadGroupStmt),
159+
SetWorkloadQuotasGroup(SetWorkloadGroupQuotasStmt),
160+
UnsetWorkloadQuotasGroup(UnsetWorkloadGroupQuotasStmt),
160161

161162
// Databases
162163
ShowDatabases(ShowDatabasesStmt),
@@ -589,7 +590,8 @@ impl Statement {
589590
| Statement::CreateWorkloadGroup(..)
590591
| Statement::DropWorkloadGroup(..)
591592
| Statement::RenameWorkloadGroup(..)
592-
| Statement::AlterWorkloadGroup(..) => false,
593+
| Statement::SetWorkloadQuotasGroup(..)
594+
| Statement::UnsetWorkloadQuotasGroup(..) => false,
593595
Statement::StatementWithSettings { stmt, settings: _ } => {
594596
stmt.allowed_in_multi_statement()
595597
}
@@ -1034,7 +1036,8 @@ impl Display for Statement {
10341036
Statement::CreateWorkloadGroup(stmt) => write!(f, "{stmt}")?,
10351037
Statement::DropWorkloadGroup(stmt) => write!(f, "{stmt}")?,
10361038
Statement::RenameWorkloadGroup(stmt) => write!(f, "{stmt}")?,
1037-
Statement::AlterWorkloadGroup(stmt) => write!(f, "{stmt}")?,
1039+
Statement::SetWorkloadQuotasGroup(stmt) => write!(f, "{stmt}")?,
1040+
Statement::UnsetWorkloadQuotasGroup(stmt) => write!(f, "{stmt}")?,
10381041
}
10391042
Ok(())
10401043
}

src/query/ast/src/ast/statements/workload.rs

Lines changed: 30 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -218,18 +218,18 @@ impl Display for RenameWorkloadGroupStmt {
218218
}
219219

220220
#[derive(Debug, Clone, PartialEq, Drive, DriveMut)]
221-
pub struct AlterWorkloadGroupStmt {
221+
pub struct SetWorkloadGroupQuotasStmt {
222222
pub name: Identifier,
223223
#[drive(skip)]
224224
pub quotas: BTreeMap<String, QuotaValueStmt>,
225225
}
226226

227-
impl Display for AlterWorkloadGroupStmt {
227+
impl Display for SetWorkloadGroupQuotasStmt {
228228
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
229229
write!(f, "ALTER WORKLOAD GROUP {}", self.name)?;
230230

231231
if !self.quotas.is_empty() {
232-
write!(f, " SET ")?;
232+
write!(f, " SET")?;
233233

234234
for (idx, (key, value)) in self.quotas.iter().enumerate() {
235235
if idx != 0 {
@@ -244,6 +244,33 @@ impl Display for AlterWorkloadGroupStmt {
244244
}
245245
}
246246

247+
#[derive(Debug, Clone, PartialEq, Drive, DriveMut)]
248+
pub struct UnsetWorkloadGroupQuotasStmt {
249+
pub name: Identifier,
250+
#[drive(skip)]
251+
pub quotas: Vec<Identifier>,
252+
}
253+
254+
impl Display for UnsetWorkloadGroupQuotasStmt {
255+
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
256+
write!(f, "ALTER WORKLOAD GROUP {}", self.name)?;
257+
258+
if !self.quotas.is_empty() {
259+
write!(f, " UNSET")?;
260+
261+
for (idx, name) in self.quotas.iter().enumerate() {
262+
if idx != 0 {
263+
write!(f, ",")?;
264+
}
265+
266+
write!(f, " {}", name)?;
267+
}
268+
}
269+
270+
Ok(())
271+
}
272+
}
273+
247274
#[cfg(test)]
248275
mod tests {
249276
use std::time::Duration;

src/query/ast/src/parser/statement.rs

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -712,12 +712,21 @@ pub fn statement_body(i: Input) -> IResult<Statement> {
712712
},
713713
);
714714

715-
let alter_workload_group = map(
715+
let set_workload_group_quotas = map(
716716
rule! {
717717
ALTER ~ WORKLOAD ~ GROUP ~ #ident ~ SET ~ #workload_quotas
718718
},
719719
|(_, _, _, name, _, quotas)| {
720-
Statement::AlterWorkloadGroup(AlterWorkloadGroupStmt { name, quotas })
720+
Statement::SetWorkloadQuotasGroup(SetWorkloadGroupQuotasStmt { name, quotas })
721+
},
722+
);
723+
724+
let unset_workload_group_quotas = map(
725+
rule! {
726+
ALTER ~ WORKLOAD ~ GROUP ~ #ident ~ UNSET ~ #unset_source
727+
},
728+
|(_, _, _, name, _, quotas)| {
729+
Statement::UnsetWorkloadQuotasGroup(UnsetWorkloadGroupQuotasStmt { name, quotas })
721730
},
722731
);
723732

@@ -2556,7 +2565,8 @@ pub fn statement_body(i: Input) -> IResult<Statement> {
25562565
| #create_workload_group: "`CREATE WORKLOAD GROUP [IF NOT EXISTS] <name> WITH [<workload_group_quotas>]`"
25572566
| #drop_workload_group: "`DROP WORKLOAD GROUP [IF EXISTS] <name>`"
25582567
| #rename_workload_group: "`RENAME WORKLOAD GROUP <old_name> TO <new_name>`"
2559-
| #alter_workload_group: "`ALTER WORKLOAD GROUP <name> set [<workload_group_quotas>]`"
2568+
| #set_workload_group_quotas: "`ALTER WORKLOAD GROUP <name> SET [<workload_group_quotas>]`"
2569+
| #unset_workload_group_quotas: "`ALTER WORKLOAD GROUP <name> UNSET {<name> | (<name>, ...)}`"
25602570
),
25612571
// database
25622572
rule!(

src/query/management/src/workload/workload_api.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,9 @@ pub trait WorkloadApi: Sync + Send {
6565

6666
async fn rename(&self, old_name: String, new_name: String) -> Result<()>;
6767

68-
async fn alter_quotas(&self, name: String, quotas: HashMap<String, QuotaValue>) -> Result<()>;
68+
async fn set_quotas(&self, name: String, quotas: HashMap<String, QuotaValue>) -> Result<()>;
69+
70+
async fn unset_quotas(&self, name: String, quotas: Vec<String>) -> Result<()>;
6971

7072
async fn get_all(&self) -> Result<Vec<WorkloadGroup>>;
7173

src/query/management/src/workload/workload_mgr.rs

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -201,7 +201,7 @@ impl WorkloadApi for WorkloadMgr {
201201
}
202202
}
203203

204-
async fn alter_quotas(&self, name: String, quotas: HashMap<String, QuotaValue>) -> Result<()> {
204+
async fn set_quotas(&self, name: String, quotas: HashMap<String, QuotaValue>) -> Result<()> {
205205
for _index in 0..5 {
206206
let workload = self.get_seq_by_name(&name).await?;
207207
let seq = workload.seq;
@@ -230,6 +230,40 @@ impl WorkloadApi for WorkloadMgr {
230230
))
231231
}
232232

233+
async fn unset_quotas(&self, name: String, quotas: Vec<String>) -> Result<()> {
234+
for _index in 0..5 {
235+
let workload = self.get_seq_by_name(&name).await?;
236+
let seq = workload.seq;
237+
let mut workload = workload.into_value().unwrap();
238+
239+
for quota_name in &quotas {
240+
if workload.quotas.remove(quota_name).is_none() {
241+
return Err(ErrorCode::UnknownWorkloadQuotas(format!(
242+
"Unknown workload group quota name {} in {}",
243+
quota_name, name
244+
)));
245+
}
246+
}
247+
248+
let workload_key = format!("{}/{}", self.workload_key_prefix, workload.id);
249+
let mut alter_workload = TxnRequest::default();
250+
alter_workload
251+
.condition
252+
.push(TxnCondition::eq_seq(workload_key.clone(), seq));
253+
alter_workload
254+
.if_then
255+
.push(TxnOp::put(workload_key, serde_json::to_vec(&workload)?));
256+
257+
if self.metastore.transaction(alter_workload).await?.success {
258+
return Ok(());
259+
}
260+
}
261+
262+
Err(ErrorCode::WorkloadOperateConflict(
263+
"Workload operate conflict(tried 5 times).",
264+
))
265+
}
266+
233267
async fn get_all(&self) -> Result<Vec<WorkloadGroup>> {
234268
let list_reply = self
235269
.metastore

src/query/management/tests/it/workload.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -189,8 +189,7 @@ async fn test_alter_quotas() -> Result<()> {
189189
QuotaValue::Duration(Duration::from_secs(10)),
190190
);
191191

192-
mgr.alter_quotas("test_group".to_string(), new_quotas)
193-
.await?;
192+
mgr.set_quotas("test_group".to_string(), new_quotas).await?;
194193

195194
let updated = mgr.get_by_name("test_group").await?;
196195
assert_eq!(updated.quotas.len(), 1);
@@ -206,7 +205,7 @@ async fn test_alter_quotas() -> Result<()> {
206205
QuotaValue::Duration(Duration::from_secs(20)),
207206
);
208207

209-
mgr.alter_quotas("test_group".to_string(), update_quotas)
208+
mgr.set_quotas("test_group".to_string(), update_quotas)
210209
.await?;
211210

212211
let updated = mgr.get_by_name("test_group").await?;

src/query/service/src/interpreters/access/privilege_access.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1373,7 +1373,8 @@ impl AccessChecker for PrivilegeAccess {
13731373
Plan::CreateWorkloadGroup(_) => {}
13741374
Plan::DropWorkloadGroup(_) => {}
13751375
Plan::RenameWorkloadGroup(_) => {}
1376-
Plan::AlterWorkloadGroup(_) => {}
1376+
Plan::SetWorkloadGroupQuotas(_) => {}
1377+
Plan::UnsetWorkloadGroupQuotas(_) => {}
13771378
}
13781379

13791380
Ok(())

src/query/service/src/interpreters/interpreter_factory.rs

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@ use super::interpreter_user_stage_drop::DropUserStageInterpreter;
3939
use super::*;
4040
use crate::interpreters::access::Accessor;
4141
use crate::interpreters::interpreter_add_warehouse_cluster::AddWarehouseClusterInterpreter;
42-
use crate::interpreters::interpreter_alter_workload_group::AlterWorkloadGroupInterpreter;
4342
use crate::interpreters::interpreter_assign_warehouse_nodes::AssignWarehouseNodesInterpreter;
4443
use crate::interpreters::interpreter_catalog_drop::DropCatalogInterpreter;
4544
use crate::interpreters::interpreter_connection_create::CreateConnectionInterpreter;
@@ -72,6 +71,7 @@ use crate::interpreters::interpreter_rename_warehouse_cluster::RenameWarehouseCl
7271
use crate::interpreters::interpreter_rename_workload_group::RenameWorkloadGroupInterpreter;
7372
use crate::interpreters::interpreter_resume_warehouse::ResumeWarehouseInterpreter;
7473
use crate::interpreters::interpreter_set_priority::SetPriorityInterpreter;
74+
use crate::interpreters::interpreter_set_workload_group_quotas::SetWorkloadGroupQuotasInterpreter;
7575
use crate::interpreters::interpreter_show_online_nodes::ShowOnlineNodesInterpreter;
7676
use crate::interpreters::interpreter_show_warehouses::ShowWarehousesInterpreter;
7777
use crate::interpreters::interpreter_show_workload_groups::ShowWorkloadGroupsInterpreter;
@@ -90,6 +90,7 @@ use crate::interpreters::interpreter_txn_abort::AbortInterpreter;
9090
use crate::interpreters::interpreter_txn_begin::BeginInterpreter;
9191
use crate::interpreters::interpreter_txn_commit::CommitInterpreter;
9292
use crate::interpreters::interpreter_unassign_warehouse_nodes::UnassignWarehouseNodesInterpreter;
93+
use crate::interpreters::interpreter_unset_workload_group_quotas::UnsetWorkloadGroupQuotasInterpreter;
9394
use crate::interpreters::interpreter_use_warehouse::UseWarehouseInterpreter;
9495
use crate::interpreters::interpreter_view_describe::DescribeViewInterpreter;
9596
use crate::interpreters::AlterUserInterpreter;
@@ -734,10 +735,12 @@ impl InterpreterFactory {
734735
Plan::RenameWorkloadGroup(p) => Ok(Arc::new(
735736
RenameWorkloadGroupInterpreter::try_create(ctx, *p.clone())?,
736737
)),
737-
Plan::AlterWorkloadGroup(p) => Ok(Arc::new(AlterWorkloadGroupInterpreter::try_create(
738-
ctx,
739-
*p.clone(),
740-
)?)),
738+
Plan::SetWorkloadGroupQuotas(p) => Ok(Arc::new(
739+
SetWorkloadGroupQuotasInterpreter::try_create(ctx, *p.clone())?,
740+
)),
741+
Plan::UnsetWorkloadGroupQuotas(p) => Ok(Arc::new(
742+
UnsetWorkloadGroupQuotasInterpreter::try_create(ctx, *p.clone())?,
743+
)),
741744
// Plan::ShowCreateProcedure(_) => {}
742745
//
743746
// Plan::RenameProcedure(p) => Ok(Arc::new(RenameProcedureInterpreter::try_create(

src/query/service/src/interpreters/interpreter_alter_workload_group.rs renamed to src/query/service/src/interpreters/interpreter_set_workload_group_quotas.rs

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -22,29 +22,29 @@ use databend_common_license::license::Feature;
2222
use databend_common_license::license_manager::LicenseManagerSwitch;
2323
use databend_common_management::WorkloadApi;
2424
use databend_common_management::WorkloadMgr;
25-
use databend_common_sql::plans::AlterWorkloadGroupPlan;
25+
use databend_common_sql::plans::SetWorkloadGroupQuotasPlan;
2626

2727
use crate::interpreters::interpreter_create_workload_group::to_quota_value;
2828
use crate::interpreters::util::AuditElement;
2929
use crate::interpreters::Interpreter;
3030
use crate::pipelines::PipelineBuildResult;
3131
use crate::sessions::QueryContext;
3232

33-
pub struct AlterWorkloadGroupInterpreter {
33+
pub struct SetWorkloadGroupQuotasInterpreter {
3434
ctx: Arc<QueryContext>,
35-
plan: AlterWorkloadGroupPlan,
35+
plan: SetWorkloadGroupQuotasPlan,
3636
}
3737

38-
impl AlterWorkloadGroupInterpreter {
39-
pub fn try_create(ctx: Arc<QueryContext>, plan: AlterWorkloadGroupPlan) -> Result<Self> {
40-
Ok(AlterWorkloadGroupInterpreter { ctx, plan })
38+
impl SetWorkloadGroupQuotasInterpreter {
39+
pub fn try_create(ctx: Arc<QueryContext>, plan: SetWorkloadGroupQuotasPlan) -> Result<Self> {
40+
Ok(SetWorkloadGroupQuotasInterpreter { ctx, plan })
4141
}
4242
}
4343

4444
#[async_trait::async_trait]
45-
impl Interpreter for AlterWorkloadGroupInterpreter {
45+
impl Interpreter for SetWorkloadGroupQuotasInterpreter {
4646
fn name(&self) -> &str {
47-
"AlterWorkloadGroupInterpreter"
47+
"SetWorkloadGroupQuotasInterpreter"
4848
}
4949

5050
fn is_ddl(&self) -> bool {
@@ -64,14 +64,14 @@ impl Interpreter for AlterWorkloadGroupInterpreter {
6464

6565
let workload_manager = GlobalInstance::get::<Arc<WorkloadMgr>>();
6666
workload_manager
67-
.alter_quotas(self.plan.name.clone(), workload_quotas)
67+
.set_quotas(self.plan.name.clone(), workload_quotas)
6868
.await?;
6969

7070
let user_info = self.ctx.get_current_user()?;
7171
log::info!(
7272
target: "databend::log::audit",
7373
"{}",
74-
serde_json::to_string(&AuditElement::create(&user_info, "alter_workload", &self.plan))?
74+
serde_json::to_string(&AuditElement::create(&user_info, "set_workload_quotas", &self.plan))?
7575
);
7676
Ok(PipelineBuildResult::create())
7777
}

0 commit comments

Comments
 (0)