Skip to content

Commit 77422ec

Browse files
authored
rebuild groups when one node dies (#328)
* rebuild groups when one node dies
1 parent 7892822 commit 77422ec

File tree

1 file changed

+115
-13
lines changed

1 file changed

+115
-13
lines changed

crates/orchestrator/src/status_update/plugins/node_groups.rs

Lines changed: 115 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -58,14 +58,15 @@ impl NodeGroupsPlugin {
5858
fn get_group_key(group_id: &str) -> String {
5959
format!("{}{}", GROUP_KEY_PREFIX, group_id)
6060
}
61-
62-
fn try_form_new_group(&self, node_addr: &str) -> Result<Option<NodeGroup>, Error> {
61+
fn try_form_new_group(&self, node_addr: Option<&str>) -> Result<Option<NodeGroup>, Error> {
6362
let mut conn = self.store.client.get_connection()?;
6463

65-
// Check if node is already in a group
66-
let existing_group: Option<String> = conn.hget(NODE_GROUP_MAP_KEY, node_addr)?;
67-
if existing_group.is_some() {
68-
return Ok(None);
64+
// Check if node is already in a group (if a specific node was provided)
65+
if let Some(addr) = node_addr {
66+
let existing_group: Option<String> = conn.hget(NODE_GROUP_MAP_KEY, addr)?;
67+
if existing_group.is_some() {
68+
return Ok(None);
69+
}
6970
}
7071

7172
let nodes = self.store_context.node_store.get_nodes();
@@ -74,29 +75,40 @@ impl NodeGroupsPlugin {
7475
let assigned_nodes: std::collections::HashMap<String, String> =
7576
conn.hgetall(NODE_GROUP_MAP_KEY)?;
7677

77-
let healthy_nodes = nodes
78+
let mut healthy_nodes = nodes
7879
.iter()
7980
.filter(|node| node.status == NodeStatus::Healthy)
80-
.filter(|node| node.address.to_string() != node_addr)
8181
.filter(|node| node.p2p_id.is_some())
8282
.filter(|node| !assigned_nodes.contains_key(&node.address.to_string()))
8383
.collect::<Vec<&OrchestratorNode>>();
8484

85+
// If a specific node was provided, make sure it's included and not counted twice
86+
if let Some(addr) = node_addr {
87+
healthy_nodes.retain(|node| node.address.to_string() != addr);
88+
}
89+
8590
info!(
8691
"Found {} healthy nodes for potential group formation",
8792
healthy_nodes.len()
8893
);
89-
if (healthy_nodes.len() + 1) < self.min_group_size {
94+
95+
// Calculate total available nodes (healthy nodes + the provided node if any)
96+
let total_available = healthy_nodes.len() + if node_addr.is_some() { 1 } else { 0 };
97+
98+
if total_available < self.min_group_size {
9099
info!(
91100
"Not enough healthy nodes to form a group (need {}, have {})",
92-
self.min_group_size,
93-
healthy_nodes.len() + 1
101+
self.min_group_size, total_available
94102
);
95103
return Ok(None);
96104
}
97105

98106
let mut available_nodes = BTreeSet::new();
99-
available_nodes.insert(node_addr.to_string());
107+
108+
// Add the provided node first if any
109+
if let Some(addr) = node_addr {
110+
available_nodes.insert(addr.to_string());
111+
}
100112

101113
for node in healthy_nodes {
102114
available_nodes.insert(node.address.to_string());
@@ -212,7 +224,7 @@ impl StatusUpdatePlugin for NodeGroupsPlugin {
212224
"Node {} is healthy, attempting to form new group",
213225
node_addr
214226
);
215-
if let Some(group) = self.try_form_new_group(&node_addr)? {
227+
if let Some(group) = self.try_form_new_group(Some(&node_addr))? {
216228
info!(
217229
"Successfully formed new group {} with {} nodes",
218230
group.id,
@@ -231,6 +243,7 @@ impl StatusUpdatePlugin for NodeGroupsPlugin {
231243
group.nodes.len()
232244
);
233245
self.dissolve_group(&group.id)?;
246+
self.try_form_new_group(None)?;
234247
}
235248
}
236249
_ => {
@@ -394,6 +407,10 @@ mod tests {
394407
"0x1234567890123456789012345678901234567890",
395408
NodeStatus::Dead,
396409
);
410+
plugin
411+
.store_context
412+
.node_store
413+
.update_node_status(&node1_dead.address, NodeStatus::Dead);
397414
let _ = plugin
398415
.handle_status_change(&node1_dead, &NodeStatus::Healthy)
399416
.await;
@@ -763,4 +780,89 @@ mod tests {
763780
"There should be exactly two distinct group IDs"
764781
);
765782
}
783+
784+
#[tokio::test]
785+
async fn test_reformation_on_death() {
786+
let store = Arc::new(RedisStore::new_test());
787+
let context_store = store.clone();
788+
let store_context = Arc::new(StoreContext::new(context_store));
789+
// Set max_group_size to 2, so groups can only have 2 nodes
790+
let plugin = NodeGroupsPlugin::new(2, 2, store.clone(), store_context);
791+
792+
let all_nodes = plugin.store_context.node_store.get_nodes();
793+
assert_eq!(all_nodes.len(), 0, "No nodes should be in the store");
794+
795+
// Create three nodes
796+
let node1 = create_test_node(
797+
"0x9234567890123456789012345678901234567890",
798+
NodeStatus::Healthy,
799+
);
800+
let mut node2 = create_test_node(
801+
"0x8234567890123456789012345678901234567890",
802+
NodeStatus::Healthy,
803+
);
804+
805+
// Add nodes to the store
806+
plugin.store_context.node_store.add_node(node1.clone());
807+
plugin.store_context.node_store.add_node(node2.clone());
808+
809+
// Add nodes to groups through the normal flow
810+
let _ = plugin
811+
.handle_status_change(&node1, &NodeStatus::Healthy)
812+
.await;
813+
let _ = plugin
814+
.handle_status_change(&node2, &NodeStatus::Healthy)
815+
.await;
816+
817+
// Get connection to check Redis state
818+
let mut conn = plugin.store.client.get_connection().unwrap();
819+
820+
// Verify each node's group assignment
821+
let node1_group_id: Option<String> = conn
822+
.hget(NODE_GROUP_MAP_KEY, node1.address.to_string())
823+
.unwrap();
824+
let node2_group_id: Option<String> = conn
825+
.hget(NODE_GROUP_MAP_KEY, node2.address.to_string())
826+
.unwrap();
827+
828+
assert!(node1_group_id.is_some(), "Node1 should be in a group");
829+
assert!(node2_group_id.is_some(), "Node2 should be in a group");
830+
831+
let node_3 = create_test_node(
832+
"0x3234567890123456789012345678901234567890",
833+
NodeStatus::Healthy,
834+
);
835+
plugin.store_context.node_store.add_node(node_3.clone());
836+
837+
let node_3_group_id: Option<String> = conn
838+
.hget(NODE_GROUP_MAP_KEY, node_3.address.to_string())
839+
.unwrap();
840+
841+
assert!(node_3_group_id.is_none(), "Node3 should not be in a group");
842+
843+
node2.status = NodeStatus::Dead;
844+
plugin
845+
.store_context
846+
.node_store
847+
.update_node_status(&node2.address, NodeStatus::Dead);
848+
let _ = plugin
849+
.handle_status_change(&node2, &NodeStatus::Healthy)
850+
.await;
851+
let nodes = plugin.store_context.node_store.get_nodes();
852+
println!("nodes {:?}", nodes);
853+
854+
let node_2_group_id: Option<String> = conn
855+
.hget(NODE_GROUP_MAP_KEY, node2.address.to_string())
856+
.unwrap();
857+
let node_1_group_id: Option<String> = conn
858+
.hget(NODE_GROUP_MAP_KEY, node1.address.to_string())
859+
.unwrap();
860+
let node_3_group_id: Option<String> = conn
861+
.hget(NODE_GROUP_MAP_KEY, node_3.address.to_string())
862+
.unwrap();
863+
864+
assert!(node_2_group_id.is_none(), "Node2 should not be in a group");
865+
assert!(node_1_group_id.is_some(), "Node1 should be in a group");
866+
assert!(node_3_group_id.is_some(), "Node3 should be in a group");
867+
}
766868
}

0 commit comments

Comments
 (0)