Skip to content

Commit 70b5d89

Browse files
authored
feat(query): atomic workload group metadata management (#17880)
feat(query): atomic workload metadata management
1 parent b098f18 commit 70b5d89

File tree

7 files changed

+625
-0
lines changed

7 files changed

+625
-0
lines changed

src/common/exception/src/exception_code.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -404,6 +404,11 @@ build_exceptions! {
404404
UnknownProcedure(3130),
405405
ProcedureAlreadyExists(3131),
406406
IllegalProcedureFormat(3132),
407+
// Workload
408+
InvalidWorkload(3140),
409+
AlreadyExistsWorkload(3141),
410+
UnknownWorkload(3142),
411+
WorkloadOperateConflict(3143),
407412
}
408413

409414
// Storage errors [3001, 4000].

src/query/management/src/lib.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ mod warehouse;
3030
mod client_session;
3131
pub mod errors;
3232
mod procedure;
33+
mod workload;
3334

3435
pub use client_session::ClientSessionMgr;
3536
pub use connection::ConnectionMgr;
@@ -55,3 +56,7 @@ pub use warehouse::SystemManagedWarehouse;
5556
pub use warehouse::WarehouseApi;
5657
pub use warehouse::WarehouseInfo;
5758
pub use warehouse::WarehouseMgr;
59+
pub use workload::QuotaValue;
60+
pub use workload::WorkloadApi;
61+
pub use workload::WorkloadGroup;
62+
pub use workload::WorkloadMgr;
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
// Copyright 2021 Datafuse Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
mod workload_api;
16+
mod workload_mgr;
17+
18+
pub use workload_api::QuotaValue;
19+
pub use workload_api::WorkloadApi;
20+
pub use workload_api::WorkloadGroup;
21+
pub use workload_mgr::WorkloadMgr;
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
// Copyright 2021 Datafuse Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::collections::HashMap;
16+
use std::time::Duration;
17+
18+
use databend_common_exception::Result;
19+
20+
#[derive(serde::Serialize, serde::Deserialize, Clone, Eq, PartialEq, Debug)]
21+
pub enum QuotaValue {
22+
Duration(Duration),
23+
String(String),
24+
}
25+
26+
#[derive(serde::Serialize, serde::Deserialize, Eq, PartialEq, Debug)]
27+
pub struct WorkloadGroup {
28+
pub id: String,
29+
pub name: String,
30+
pub quotas: HashMap<String, QuotaValue>,
31+
}
32+
33+
#[async_trait::async_trait]
34+
pub trait WorkloadApi: Sync + Send {
35+
async fn create(&self, group: WorkloadGroup) -> Result<WorkloadGroup>;
36+
37+
async fn drop(&self, name: String) -> Result<()>;
38+
39+
async fn rename(&self, old_name: String, new_name: String) -> Result<()>;
40+
41+
async fn alter_quotas(&self, name: String, quotas: HashMap<String, QuotaValue>) -> Result<()>;
42+
43+
async fn get_all(&self) -> Result<Vec<WorkloadGroup>>;
44+
45+
async fn get_by_id(&self, id: &str) -> Result<WorkloadGroup>;
46+
47+
async fn get_by_name(&self, name: &str) -> Result<WorkloadGroup>;
48+
}
Lines changed: 263 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,263 @@
1+
// Copyright 2021 Datafuse Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::collections::HashMap;
16+
17+
use databend_common_base::base::escape_for_key;
18+
use databend_common_base::base::GlobalUniqName;
19+
use databend_common_exception::ErrorCode;
20+
use databend_common_exception::Result;
21+
use databend_common_meta_kvapi::kvapi::KVApi;
22+
use databend_common_meta_store::MetaStore;
23+
use databend_common_meta_types::SeqV;
24+
use databend_common_meta_types::SeqValue;
25+
use databend_common_meta_types::TxnCondition;
26+
use databend_common_meta_types::TxnOp;
27+
use databend_common_meta_types::TxnRequest;
28+
29+
use crate::workload::workload_api::QuotaValue;
30+
use crate::workload::workload_api::WorkloadApi;
31+
use crate::workload::workload_api::WorkloadGroup;
32+
pub static WORKLOAD_META_KEY_PREFIX: &str = "__fd_workloads";
33+
34+
pub struct WorkloadMgr {
35+
metastore: MetaStore,
36+
workload_key_prefix: String,
37+
workload_index_prefix: String,
38+
}
39+
40+
impl WorkloadMgr {
41+
pub fn create(metastore: MetaStore, tenant: &str) -> Result<Self> {
42+
Ok(WorkloadMgr {
43+
metastore,
44+
workload_key_prefix: format!(
45+
"{}/{}/workload_groups",
46+
WORKLOAD_META_KEY_PREFIX,
47+
escape_for_key(tenant)?
48+
),
49+
workload_index_prefix: format!(
50+
"{}/{}/index_workload_groups",
51+
WORKLOAD_META_KEY_PREFIX,
52+
escape_for_key(tenant)?
53+
),
54+
})
55+
}
56+
57+
pub async fn get_id_by_name(&self, name: &str) -> Result<String> {
58+
let index_key = format!("{}/{}", self.workload_index_prefix, escape_for_key(name)?);
59+
60+
let Some(seq) = self.metastore.get_kv(&index_key).await? else {
61+
return Err(ErrorCode::UnknownWorkload(format!(
62+
"Unknown workload {}",
63+
name
64+
)));
65+
};
66+
67+
Ok(unsafe { String::from_utf8_unchecked(seq.data) })
68+
}
69+
70+
async fn get_seq_by_name(&self, name: &str) -> Result<SeqV<WorkloadGroup>> {
71+
let id = self.get_id_by_name(name).await?;
72+
73+
let Some(seq) = self.get_seq_by_id(&id).await? else {
74+
return Err(ErrorCode::UnknownWorkload(format!(
75+
"Unknown workload {}",
76+
name
77+
)));
78+
};
79+
80+
Ok(seq)
81+
}
82+
83+
async fn get_seq_by_id(&self, id: &str) -> Result<Option<SeqV<WorkloadGroup>>> {
84+
let workload_key = format!("{}/{}", self.workload_key_prefix, id);
85+
86+
let Some(seq) = self.metastore.get_kv(&workload_key).await? else {
87+
return Ok(None);
88+
};
89+
90+
Ok(Some(SeqV::new(
91+
seq.seq,
92+
serde_json::from_slice::<WorkloadGroup>(&seq.data)?,
93+
)))
94+
}
95+
}
96+
97+
#[async_trait::async_trait]
98+
impl WorkloadApi for WorkloadMgr {
99+
async fn create(&self, mut group: WorkloadGroup) -> Result<WorkloadGroup> {
100+
if group.name.is_empty() {
101+
return Err(ErrorCode::InvalidWorkload("Workload group name is empty."));
102+
}
103+
104+
if !group.id.is_empty() {
105+
return Err(ErrorCode::InvalidWorkload(
106+
"Workload group id is not empty.",
107+
));
108+
}
109+
110+
let group_id = GlobalUniqName::unique();
111+
let mut create_workload = TxnRequest::default();
112+
113+
group.id = group_id.clone();
114+
let escape_name = escape_for_key(&group.name)?;
115+
let workload_key = format!("{}/{}", self.workload_key_prefix, group_id);
116+
let workload_index_key = format!("{}/{}", self.workload_index_prefix, escape_name);
117+
118+
create_workload
119+
.condition
120+
.push(TxnCondition::eq_seq(workload_index_key.clone(), 0));
121+
create_workload
122+
.if_then
123+
.push(TxnOp::put(workload_index_key, group_id.into_bytes()));
124+
create_workload
125+
.if_then
126+
.push(TxnOp::put(workload_key, serde_json::to_vec(&group)?));
127+
128+
match self.metastore.transaction(create_workload).await? {
129+
res if res.success => Ok(group),
130+
_res => Err(ErrorCode::AlreadyExistsWorkload(format!(
131+
"The workload {} already exits.",
132+
group.name
133+
))),
134+
}
135+
}
136+
137+
async fn drop(&self, name: String) -> Result<()> {
138+
let workload_id = self.get_id_by_name(&name).await?;
139+
140+
let escape_name = escape_for_key(&name)?;
141+
let workload_key = format!("{}/{}", self.workload_key_prefix, workload_id);
142+
let workload_index_key = format!("{}/{}", self.workload_index_prefix, escape_name);
143+
144+
let mut drop_workload = TxnRequest::default();
145+
drop_workload.condition.push(TxnCondition::eq_value(
146+
workload_index_key.clone(),
147+
workload_id.into_bytes(),
148+
));
149+
drop_workload.if_then.push(TxnOp::delete(workload_key));
150+
drop_workload
151+
.if_then
152+
.push(TxnOp::delete(workload_index_key));
153+
154+
match self.metastore.transaction(drop_workload).await? {
155+
res if res.success => Ok(()),
156+
_res => Err(ErrorCode::UnknownWorkload(format!(
157+
"Unknown workload {}",
158+
name
159+
))),
160+
}
161+
}
162+
163+
async fn rename(&self, old_name: String, new_name: String) -> Result<()> {
164+
let mut workload = self.get_by_name(&old_name).await?;
165+
workload.name = new_name.clone();
166+
167+
let workload_key = format!("{}/{}", self.workload_key_prefix, workload.id);
168+
169+
let escape_name = escape_for_key(&old_name)?;
170+
let old_workload_index_key = format!("{}/{}", self.workload_index_prefix, escape_name);
171+
172+
let escape_name = escape_for_key(&new_name)?;
173+
let new_workload_index_key = format!("{}/{}", self.workload_index_prefix, escape_name);
174+
175+
let mut rename_workload = TxnRequest::default();
176+
rename_workload
177+
.condition
178+
.push(TxnCondition::eq_seq(new_workload_index_key.clone(), 0));
179+
rename_workload.condition.push(TxnCondition::eq_value(
180+
old_workload_index_key.clone(),
181+
workload.id.clone().into_bytes(),
182+
));
183+
184+
rename_workload
185+
.if_then
186+
.push(TxnOp::put(workload_key, serde_json::to_vec(&workload)?));
187+
rename_workload
188+
.if_then
189+
.push(TxnOp::delete(old_workload_index_key));
190+
rename_workload.if_then.push(TxnOp::put(
191+
new_workload_index_key,
192+
workload.id.clone().into_bytes(),
193+
));
194+
195+
match self.metastore.transaction(rename_workload).await? {
196+
res if res.success => Ok(()),
197+
_res => Err(ErrorCode::InvalidWorkload(format!(
198+
"Unknown workload {} or workload {} already exists",
199+
old_name, new_name
200+
))),
201+
}
202+
}
203+
204+
async fn alter_quotas(&self, name: String, quotas: HashMap<String, QuotaValue>) -> Result<()> {
205+
for _index in 0..5 {
206+
let workload = self.get_seq_by_name(&name).await?;
207+
let seq = workload.seq;
208+
let mut workload = workload.into_value().unwrap();
209+
210+
for (key, value) in &quotas {
211+
workload.quotas.insert(key.clone(), value.clone());
212+
}
213+
214+
let workload_key = format!("{}/{}", self.workload_key_prefix, workload.id);
215+
let mut alter_workload = TxnRequest::default();
216+
alter_workload
217+
.condition
218+
.push(TxnCondition::eq_seq(workload_key.clone(), seq));
219+
alter_workload
220+
.if_then
221+
.push(TxnOp::put(workload_key, serde_json::to_vec(&workload)?));
222+
223+
if self.metastore.transaction(alter_workload).await?.success {
224+
return Ok(());
225+
}
226+
}
227+
228+
Err(ErrorCode::WorkloadOperateConflict(
229+
"Workload operate conflict(tried 5 times).",
230+
))
231+
}
232+
233+
async fn get_all(&self) -> Result<Vec<WorkloadGroup>> {
234+
let list_reply = self
235+
.metastore
236+
.prefix_list_kv(&format!("{}/", self.workload_key_prefix))
237+
.await?;
238+
239+
let mut workload_groups = Vec::with_capacity(list_reply.len());
240+
241+
for (_key, seq) in list_reply {
242+
workload_groups.push(serde_json::from_slice::<WorkloadGroup>(&seq.data)?);
243+
}
244+
245+
Ok(workload_groups)
246+
}
247+
248+
async fn get_by_id(&self, id: &str) -> Result<WorkloadGroup> {
249+
let Some(seq) = self.get_seq_by_id(id).await? else {
250+
return Err(ErrorCode::UnknownWorkload(format!(
251+
"Unknown workload id {}",
252+
id
253+
)));
254+
};
255+
256+
Ok(seq.into_value().unwrap())
257+
}
258+
259+
async fn get_by_name(&self, name: &str) -> Result<WorkloadGroup> {
260+
let seq_value = self.get_seq_by_name(name).await?;
261+
Ok(seq_value.into_value().unwrap())
262+
}
263+
}

src/query/management/tests/it/main.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,3 +22,4 @@ mod stage;
2222
mod udf;
2323
mod user;
2424
mod warehouse;
25+
mod workload;

0 commit comments

Comments
 (0)