Skip to content

Commit 291703b

Browse files
authored
feat(cluster): support fuzzy match for warehouse unassign node (#17877)
* feat(cluster): support fuzzy match for warehouse unassign node * feat(cluster): support fuzzy match node for warehouse unassign node
1 parent 00f03ae commit 291703b

File tree

2 files changed

+180
-29
lines changed

2 files changed

+180
-29
lines changed

src/query/management/src/warehouse/warehouse_mgr.rs

Lines changed: 122 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
use std::collections::hash_map::Entry;
1616
use std::collections::HashMap;
17+
use std::collections::HashSet;
1718
use std::time::Duration;
1819

1920
use databend_common_base::base::escape_for_key;
@@ -139,7 +140,7 @@ fn map_condition(k: &str, seq: MatchSeq) -> TxnCondition {
139140
}
140141
}
141142

142-
struct NodeInfoSnapshot {
143+
pub struct NodeInfoSnapshot {
143144
node_seq: u64,
144145
cluster_seq: u64,
145146
node_info: NodeInfo,
@@ -1998,7 +1999,7 @@ impl WarehouseApi for WarehouseMgr {
19981999
}
19992000

20002001
for _idx in 0..10 {
2001-
let mut nodes = nodes.clone();
2002+
let mut removed_nodes = HashSet::new();
20022003
let mut drop_cluster_node_txn = TxnRequest::default();
20032004

20042005
let mut warehouse_snapshot = self.warehouse_snapshot(warehouse).await?;
@@ -2009,11 +2010,11 @@ impl WarehouseApi for WarehouseMgr {
20092010
warehouse
20102011
))),
20112012
WarehouseInfo::SystemManaged(mut info) => {
2012-
for (cluster, nodes) in &nodes {
2013-
let Some(cluster) = info.clusters.get_mut(cluster) else {
2013+
for (cluster_id, nodes) in &nodes {
2014+
let Some(cluster) = info.clusters.get_mut(cluster_id) else {
20142015
return Err(ErrorCode::WarehouseClusterNotExists(format!(
20152016
"Warehouse cluster {:?}.{:?} not exists",
2016-
warehouse, cluster
2017+
warehouse, cluster_id
20172018
)));
20182019
};
20192020

@@ -2024,14 +2025,7 @@ impl WarehouseApi for WarehouseMgr {
20242025
)));
20252026
}
20262027

2027-
for remove_node in nodes {
2028-
if cluster.nodes.remove_first(remove_node).is_none() {
2029-
return Err(ErrorCode::ClusterUnknownNode(format!(
2030-
"Warehouse cluster {:?}.{:?} unknown node {:?}",
2031-
warehouse, cluster, remove_node
2032-
)));
2033-
}
2034-
}
2028+
removed_nodes = info.remove_nodes(cluster_id, nodes, &warehouse_snapshot.snapshot_nodes)?;
20352029
}
20362030

20372031

@@ -2070,22 +2064,17 @@ impl WarehouseApi for WarehouseMgr {
20702064
MatchSeq::Exact(node_snapshot.cluster_seq),
20712065
));
20722066

2073-
if let Some(v) = nodes.get_mut(&node_snapshot.node_info.cluster_id) {
2074-
let runtime_node_group = node_snapshot.node_info.runtime_node_group.clone();
2075-
if v.remove_first(&SelectedNode::Random(runtime_node_group))
2076-
.is_some()
2077-
{
2078-
let node = node_snapshot.node_info.leave_warehouse();
2079-
2080-
drop_cluster_node_txn
2081-
.if_then
2082-
.push(TxnOp::delete(cluster_node_key));
2083-
drop_cluster_node_txn.if_then.push(TxnOp::put_with_ttl(
2084-
node_key,
2085-
serde_json::to_vec(&node)?,
2086-
Some(self.lift_time * 4),
2087-
));
2088-
}
2067+
if removed_nodes.contains(&node_snapshot.node_info.id) {
2068+
let node = node_snapshot.node_info.leave_warehouse();
2069+
2070+
drop_cluster_node_txn
2071+
.if_then
2072+
.push(TxnOp::delete(cluster_node_key));
2073+
drop_cluster_node_txn.if_then.push(TxnOp::put_with_ttl(
2074+
node_key,
2075+
serde_json::to_vec(&node)?,
2076+
Some(self.lift_time * 4),
2077+
));
20892078
}
20902079
}
20912080

@@ -2211,6 +2200,110 @@ impl WarehouseApi for WarehouseMgr {
22112200
}
22122201
}
22132202

2203+
impl SystemManagedWarehouse {
2204+
pub fn remove_nodes(
2205+
&mut self,
2206+
cluster_id: &String,
2207+
unassign: &[SelectedNode],
2208+
nodes: &[NodeInfoSnapshot],
2209+
) -> Result<HashSet<String>> {
2210+
let mut final_removed_nodes = HashSet::new();
2211+
let mut match_any = Vec::with_capacity(unassign.len());
2212+
let mut match_node_group = Vec::with_capacity(unassign.len());
2213+
2214+
let Some(cluster) = self.clusters.get_mut(cluster_id) else {
2215+
unreachable!()
2216+
};
2217+
2218+
// 1. assign node group == unassign node group
2219+
for node in unassign {
2220+
if cluster.nodes.remove_first(node).is_none() {
2221+
match node {
2222+
SelectedNode::Random(None) => {
2223+
match_any.push(node);
2224+
}
2225+
SelectedNode::Random(Some(node)) => {
2226+
match_node_group.push(Some(node.clone()));
2227+
}
2228+
}
2229+
2230+
continue;
2231+
}
2232+
2233+
for node_snapshot in nodes {
2234+
if &node_snapshot.node_info.cluster_id != cluster_id {
2235+
continue;
2236+
}
2237+
2238+
match (&node, &node_snapshot.node_info.runtime_node_group) {
2239+
(SelectedNode::Random(None), None) => {
2240+
if !final_removed_nodes.contains(&node_snapshot.node_info.id) {
2241+
final_removed_nodes.insert(node_snapshot.node_info.id.clone());
2242+
break;
2243+
}
2244+
}
2245+
(SelectedNode::Random(Some(left)), Some(right)) if left == right => {
2246+
if !final_removed_nodes.contains(&node_snapshot.node_info.id) {
2247+
final_removed_nodes.insert(node_snapshot.node_info.id.clone());
2248+
break;
2249+
}
2250+
}
2251+
_ => {
2252+
continue;
2253+
}
2254+
}
2255+
}
2256+
}
2257+
2258+
// 2. unassign Some(node group), assign None
2259+
'match_node_group: for node_group in match_node_group {
2260+
for node_snapshot in nodes {
2261+
if &node_snapshot.node_info.cluster_id != cluster_id {
2262+
continue;
2263+
}
2264+
2265+
if node_snapshot.node_info.node_group == node_group
2266+
&& !final_removed_nodes.contains(&node_snapshot.node_info.id)
2267+
{
2268+
final_removed_nodes.insert(node_snapshot.node_info.id.clone());
2269+
cluster.nodes.remove_first(&SelectedNode::Random(
2270+
node_snapshot.node_info.runtime_node_group.clone(),
2271+
));
2272+
continue 'match_node_group;
2273+
}
2274+
}
2275+
2276+
return Err(ErrorCode::ClusterUnknownNode(format!(
2277+
"Cannot found {:?} node group node in {:?}",
2278+
node_group, cluster
2279+
)));
2280+
}
2281+
2282+
// 3. assign Some(node group) and unassign None
2283+
'match_any: for _index in 0..match_any.len() {
2284+
for node_snapshot in nodes {
2285+
if &node_snapshot.node_info.cluster_id != cluster_id {
2286+
continue;
2287+
}
2288+
2289+
if !final_removed_nodes.contains(&node_snapshot.node_info.id) {
2290+
final_removed_nodes.insert(node_snapshot.node_info.id.clone());
2291+
cluster.nodes.remove_first(&SelectedNode::Random(
2292+
node_snapshot.node_info.runtime_node_group.clone(),
2293+
));
2294+
continue 'match_any;
2295+
}
2296+
}
2297+
2298+
return Err(ErrorCode::ClusterUnknownNode(
2299+
"Cannot unassign empty warehouse cluster",
2300+
));
2301+
}
2302+
2303+
Ok(final_removed_nodes)
2304+
}
2305+
}
2306+
22142307
/// Build an error indicating that databend-meta responded with an unexpected response,
22152308
/// while expecting a TxnGetResponse.
22162309
fn invalid_get_resp(resp: Option<&TxnOpResponse>) -> MetaError {

src/query/management/tests/it/warehouse.rs

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -860,6 +860,64 @@ async fn test_unassign_nodes_for_warehouse() -> Result<()> {
860860

861861
assert_eq!(nodes.len(), 1);
862862
assert!(nodes[0].id == node_1.id || nodes[0].id == node_2.id);
863+
864+
let add_warehouse_cluster = warehouse_manager.add_warehouse_cluster(
865+
String::from("test_warehouse"),
866+
String::from("cluster_name_1"),
867+
vec![SelectedNode::Random(None), SelectedNode::Random(None)],
868+
);
869+
870+
add_warehouse_cluster.await?;
871+
872+
let unassign_warehouse_nodes = warehouse_manager.unassign_warehouse_nodes(
873+
"test_warehouse",
874+
HashMap::from([(String::from("cluster_name_1"), vec![SelectedNode::Random(
875+
Some(String::from("test_node_group")),
876+
)])]),
877+
);
878+
879+
unassign_warehouse_nodes.await?;
880+
881+
let nodes = warehouse_manager
882+
.list_warehouse_cluster_nodes("test_warehouse", "cluster_name_1")
883+
.await?;
884+
885+
assert_eq!(nodes.len(), 1);
886+
887+
let mut node_3 = system_managed_node(&GlobalUniqName::unique());
888+
node_3.node_group = Some(String::from("test_node_group_1"));
889+
warehouse_manager.start_node(node_3.clone()).await?;
890+
891+
let mut node_4 = system_managed_node(&GlobalUniqName::unique());
892+
node_4.node_group = Some(String::from("test_node_group_1"));
893+
warehouse_manager.start_node(node_4.clone()).await?;
894+
895+
// eprintln!("{:?}", warehouse_manager.list_online_nodes().await?);
896+
let add_warehouse_cluster = warehouse_manager.add_warehouse_cluster(
897+
String::from("test_warehouse"),
898+
String::from("cluster_name_2"),
899+
vec![
900+
SelectedNode::Random(Some(String::from("test_node_group_1"))),
901+
SelectedNode::Random(Some(String::from("test_node_group_1"))),
902+
],
903+
);
904+
905+
add_warehouse_cluster.await?;
906+
907+
let unassign_warehouse_nodes = warehouse_manager.unassign_warehouse_nodes(
908+
"test_warehouse",
909+
HashMap::from([(String::from("cluster_name_2"), vec![SelectedNode::Random(
910+
None,
911+
)])]),
912+
);
913+
914+
unassign_warehouse_nodes.await?;
915+
let nodes = warehouse_manager
916+
.list_warehouse_cluster_nodes("test_warehouse", "cluster_name_2")
917+
.await?;
918+
919+
assert_eq!(nodes.len(), 1);
920+
863921
Ok(())
864922
}
865923

0 commit comments

Comments
 (0)