Skip to content

Commit abb0165

Browse files
authored
fix(cluster): fix unassign warehouse nodes failure (#17195)
* fix(cluster): fix unassign warehouse nodes failure * fix(cluster): fix unassign warehouse nodes failure * chore(cluster): use millis for forward requet seed
1 parent 726d6cb commit abb0165

File tree

3 files changed

+213
-17
lines changed

3 files changed

+213
-17
lines changed

src/query/management/src/warehouse/warehouse_mgr.rs

Lines changed: 44 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1829,6 +1829,16 @@ impl WarehouseApi for WarehouseMgr {
18291829
));
18301830
}
18311831

1832+
if nodes.iter().any(|(name, _)| name.is_empty()) {
1833+
return Err(ErrorCode::BadArguments("Assign cluster name is empty."));
1834+
}
1835+
1836+
if nodes.iter().any(|(_, list)| list.is_empty()) {
1837+
return Err(ErrorCode::BadArguments(
1838+
"Assign cluster nodes list is empty.",
1839+
));
1840+
}
1841+
18321842
for _idx in 0..10 {
18331843
let selected_nodes = self.pick_assign_warehouse_node(&warehouse, &nodes).await?;
18341844

@@ -1937,6 +1947,20 @@ impl WarehouseApi for WarehouseMgr {
19371947
return Err(ErrorCode::InvalidWarehouse("Warehouse name is empty."));
19381948
}
19391949

1950+
if nodes.is_empty() {
1951+
return Err(ErrorCode::BadArguments("Unassign list is empty."));
1952+
}
1953+
1954+
if nodes.iter().any(|(name, _)| name.is_empty()) {
1955+
return Err(ErrorCode::BadArguments("Unassign cluster name is empty."));
1956+
}
1957+
1958+
if nodes.iter().any(|(_, list)| list.is_empty()) {
1959+
return Err(ErrorCode::BadArguments(
1960+
"Unassign cluster nodes list is empty.",
1961+
));
1962+
}
1963+
19401964
for _idx in 0..10 {
19411965
let mut nodes = nodes.clone();
19421966
let mut drop_cluster_node_txn = TxnRequest::default();
@@ -2011,23 +2035,29 @@ impl WarehouseApi for WarehouseMgr {
20112035
));
20122036

20132037
if let Some(v) = nodes.get_mut(&node_snapshot.node_info.cluster_id) {
2014-
if let Some(remove_node) = v.pop() {
2015-
let SelectedNode::Random(node_group) = remove_node;
2016-
if node_snapshot.node_info.runtime_node_group == node_group {
2017-
let node = node_snapshot.node_info.leave_warehouse();
2018-
2019-
drop_cluster_node_txn
2020-
.if_then
2021-
.push(TxnOp::delete(cluster_node_key));
2022-
drop_cluster_node_txn.if_then.push(TxnOp::put_with_ttl(
2023-
node_key,
2024-
serde_json::to_vec(&node)?,
2025-
Some(self.lift_time * 4),
2026-
))
2027-
}
2038+
let runtime_node_group = node_snapshot.node_info.runtime_node_group.clone();
2039+
if v.remove_first(&SelectedNode::Random(runtime_node_group))
2040+
.is_some()
2041+
{
2042+
let node = node_snapshot.node_info.leave_warehouse();
2043+
2044+
drop_cluster_node_txn
2045+
.if_then
2046+
.push(TxnOp::delete(cluster_node_key));
2047+
drop_cluster_node_txn.if_then.push(TxnOp::put_with_ttl(
2048+
node_key,
2049+
serde_json::to_vec(&node)?,
2050+
Some(self.lift_time * 4),
2051+
));
20282052
}
20292053
}
20302054
}
2055+
2056+
let txn_reply = self.metastore.transaction(drop_cluster_node_txn).await?;
2057+
2058+
if txn_reply.success {
2059+
return Ok(());
2060+
}
20312061
}
20322062

20332063
Err(ErrorCode::WarehouseOperateConflict(

src/query/management/tests/it/warehouse.rs

Lines changed: 166 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -613,6 +613,172 @@ async fn test_recovery_create_warehouse() -> Result<()> {
613613
Ok(())
614614
}
615615

616+
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
617+
async fn test_assign_nodes_for_invalid_warehouse() -> Result<()> {
618+
let (_, warehouse_manager, _nodes) = nodes(Duration::from_mins(30), 2).await?;
619+
620+
let assign_warehouse_nodes =
621+
warehouse_manager.assign_warehouse_nodes(String::from(""), HashMap::new());
622+
623+
assert_eq!(assign_warehouse_nodes.await.unwrap_err().code(), 2403);
624+
625+
let assign_warehouse_nodes =
626+
warehouse_manager.assign_warehouse_nodes(String::from("test_warehouse"), HashMap::new());
627+
628+
assert_eq!(assign_warehouse_nodes.await.unwrap_err().code(), 2408);
629+
630+
let assign_warehouse_nodes = warehouse_manager.assign_warehouse_nodes(
631+
String::from("test_warehouse"),
632+
HashMap::from([(String::new(), vec![])]),
633+
);
634+
635+
assert_eq!(assign_warehouse_nodes.await.unwrap_err().code(), 1006);
636+
637+
let assign_warehouse_nodes = warehouse_manager.assign_warehouse_nodes(
638+
String::from("test_warehouse"),
639+
HashMap::from([(String::from("test"), vec![])]),
640+
);
641+
642+
assert_eq!(assign_warehouse_nodes.await.unwrap_err().code(), 1006);
643+
644+
let assign_warehouse_nodes = warehouse_manager.assign_warehouse_nodes(
645+
String::from("test_warehouse"),
646+
HashMap::from([(String::from("test"), vec![SelectedNode::Random(None)])]),
647+
);
648+
649+
assert_eq!(assign_warehouse_nodes.await.unwrap_err().code(), 2406);
650+
651+
Ok(())
652+
}
653+
654+
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
655+
async fn test_unassign_nodes_for_invalid_warehouse() -> Result<()> {
656+
let (_, warehouse_manager, _nodes) = nodes(Duration::from_mins(30), 2).await?;
657+
658+
let unassign_warehouse_nodes = warehouse_manager.unassign_warehouse_nodes("", HashMap::new());
659+
660+
assert_eq!(unassign_warehouse_nodes.await.unwrap_err().code(), 2403);
661+
662+
let unassign_warehouse_nodes =
663+
warehouse_manager.unassign_warehouse_nodes("test_warehouse", HashMap::new());
664+
665+
assert_eq!(unassign_warehouse_nodes.await.unwrap_err().code(), 1006);
666+
667+
let unassign_warehouse_nodes = warehouse_manager
668+
.unassign_warehouse_nodes("test_warehouse", HashMap::from([(String::new(), vec![])]));
669+
670+
assert_eq!(unassign_warehouse_nodes.await.unwrap_err().code(), 1006);
671+
672+
let unassign_warehouse_nodes = warehouse_manager.unassign_warehouse_nodes(
673+
"test_warehouse",
674+
HashMap::from([(String::from("test"), vec![])]),
675+
);
676+
677+
assert_eq!(unassign_warehouse_nodes.await.unwrap_err().code(), 1006);
678+
679+
let unassign_warehouse_nodes = warehouse_manager.unassign_warehouse_nodes(
680+
"test_warehouse",
681+
HashMap::from([(String::from("test"), vec![SelectedNode::Random(None)])]),
682+
);
683+
684+
assert_eq!(unassign_warehouse_nodes.await.unwrap_err().code(), 2406);
685+
686+
warehouse_manager
687+
.create_warehouse(String::from("test_warehouse"), vec![
688+
SelectedNode::Random(None),
689+
SelectedNode::Random(None),
690+
])
691+
.await?;
692+
693+
let unassign_warehouse_nodes = warehouse_manager.unassign_warehouse_nodes(
694+
"test_warehouse",
695+
HashMap::from([(String::from("test"), vec![SelectedNode::Random(None)])]),
696+
);
697+
698+
assert_eq!(unassign_warehouse_nodes.await.unwrap_err().code(), 2410);
699+
700+
let unassign_warehouse_nodes = warehouse_manager.unassign_warehouse_nodes(
701+
"test_warehouse",
702+
HashMap::from([(String::from("default"), vec![SelectedNode::Random(Some(
703+
String::from("unknown"),
704+
))])]),
705+
);
706+
707+
assert_eq!(unassign_warehouse_nodes.await.unwrap_err().code(), 2401);
708+
709+
Ok(())
710+
}
711+
712+
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
713+
async fn test_unassign_all_nodes_for_warehouse() -> Result<()> {
714+
let (_, warehouse_manager, _nodes) = nodes(Duration::from_mins(30), 2).await?;
715+
warehouse_manager
716+
.create_warehouse(String::from("test_warehouse"), vec![SelectedNode::Random(
717+
None,
718+
)])
719+
.await?;
720+
721+
let unassign_warehouse_nodes = warehouse_manager.unassign_warehouse_nodes(
722+
"test_warehouse",
723+
HashMap::from([(String::from("default"), vec![
724+
SelectedNode::Random(None),
725+
SelectedNode::Random(Some(String::from("test_node_group"))),
726+
])]),
727+
);
728+
729+
assert_eq!(unassign_warehouse_nodes.await.unwrap_err().code(), 2401);
730+
Ok(())
731+
}
732+
733+
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
734+
async fn test_unassign_nodes_for_warehouse() -> Result<()> {
735+
let (_, warehouse_manager, _nodes) = nodes(Duration::from_mins(30), 2).await?;
736+
let create_warehouse = warehouse_manager
737+
.create_warehouse(String::from("test_warehouse"), vec![SelectedNode::Random(
738+
None,
739+
)]);
740+
741+
create_warehouse.await?;
742+
743+
let mut node_1 = system_managed_node(&GlobalUniqName::unique());
744+
node_1.node_group = Some(String::from("test_node_group"));
745+
warehouse_manager.start_node(node_1.clone()).await?;
746+
747+
let mut node_2 = system_managed_node(&GlobalUniqName::unique());
748+
node_2.node_group = Some(String::from("test_node_group"));
749+
warehouse_manager.start_node(node_2.clone()).await?;
750+
751+
let add_warehouse_cluster = warehouse_manager.add_warehouse_cluster(
752+
String::from("test_warehouse"),
753+
String::from("cluster_name"),
754+
vec![
755+
SelectedNode::Random(Some(String::from("test_node_group"))),
756+
SelectedNode::Random(None),
757+
SelectedNode::Random(Some(String::from("test_node_group"))),
758+
],
759+
);
760+
761+
add_warehouse_cluster.await?;
762+
763+
let unassign_warehouse_nodes = warehouse_manager.unassign_warehouse_nodes(
764+
"test_warehouse",
765+
HashMap::from([(String::from("cluster_name"), vec![
766+
SelectedNode::Random(None),
767+
SelectedNode::Random(Some(String::from("test_node_group"))),
768+
])]),
769+
);
770+
771+
unassign_warehouse_nodes.await?;
772+
773+
let nodes = warehouse_manager
774+
.list_warehouse_cluster_nodes("test_warehouse", "cluster_name")
775+
.await?;
776+
777+
assert_eq!(nodes.len(), 1);
778+
assert!(nodes[0].id == node_1.id || nodes[0].id == node_2.id);
779+
Ok(())
780+
}
781+
616782
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
617783
async fn test_concurrent_recovery_create_warehouse() -> Result<()> {
618784
let (_, warehouse_manager, nodes) = nodes(Duration::from_mins(30), 2).await?;

src/query/service/src/clusters/cluster.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -364,10 +364,10 @@ impl ClusterDiscovery {
364364
.duration_since(std::time::UNIX_EPOCH)
365365
.expect("expect time");
366366

367-
let nanos = system_timestamp.as_nanos();
368-
let cluster_idx = (nanos % warehouse_clusters_nodes_index.len() as u128) as usize;
367+
let millis = system_timestamp.as_millis();
368+
let cluster_idx = (millis % warehouse_clusters_nodes_index.len() as u128) as usize;
369369
let pick_cluster_nodes = &warehouse_clusters_nodes[cluster_idx];
370-
let nodes_idx = (nanos % pick_cluster_nodes.len() as u128) as usize;
370+
let nodes_idx = (millis % pick_cluster_nodes.len() as u128) as usize;
371371
Ok(Some(pick_cluster_nodes[nodes_idx].clone()))
372372
}
373373

0 commit comments

Comments
 (0)