Skip to content

Commit 5839812

Browse files
authored
imp(orchestrator): automatically try to increase group size from single nodes (#479)
* automatically try to increase group size from single nodes * optimize status handling, include status updates in discovery also
1 parent ef4d09d commit 5839812

File tree

5 files changed

+1504
-58
lines changed

5 files changed

+1504
-58
lines changed

crates/orchestrator/src/discovery/monitor.rs

Lines changed: 41 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use crate::models::node::NodeStatus;
22
use crate::models::node::OrchestratorNode;
3+
use crate::plugins::StatusUpdatePlugin;
34
use crate::store::core::StoreContext;
45
use crate::utils::loop_heartbeats::LoopHeartbeats;
56
use alloy::primitives::Address;
@@ -25,9 +26,11 @@ pub struct DiscoveryMonitor {
2526
heartbeats: Arc<LoopHeartbeats>,
2627
http_client: reqwest::Client,
2728
max_healthy_nodes_with_same_endpoint: u32,
29+
status_change_handlers: Vec<Box<dyn StatusUpdatePlugin>>,
2830
}
2931

3032
impl DiscoveryMonitor {
33+
#[allow(clippy::too_many_arguments)]
3134
pub fn new(
3235
coordinator_wallet: Wallet,
3336
compute_pool_id: u32,
@@ -36,6 +39,7 @@ impl DiscoveryMonitor {
3639
store_context: Arc<StoreContext>,
3740
heartbeats: Arc<LoopHeartbeats>,
3841
max_healthy_nodes_with_same_endpoint: u32,
42+
status_change_handlers: Vec<Box<dyn StatusUpdatePlugin>>,
3943
) -> Self {
4044
Self {
4145
coordinator_wallet,
@@ -46,9 +50,43 @@ impl DiscoveryMonitor {
4650
heartbeats,
4751
http_client: reqwest::Client::new(),
4852
max_healthy_nodes_with_same_endpoint,
53+
status_change_handlers,
4954
}
5055
}
5156

57+
async fn handle_status_change(&self, node: &OrchestratorNode, old_status: NodeStatus) {
58+
for handler in &self.status_change_handlers {
59+
if let Err(e) = handler.handle_status_change(node, &old_status).await {
60+
error!("Status change handler failed: {}", e);
61+
}
62+
}
63+
}
64+
65+
async fn update_node_status(
66+
&self,
67+
node_address: &Address,
68+
new_status: NodeStatus,
69+
) -> Result<(), Error> {
70+
// Get the current node to know the old status
71+
let old_status = match self.store_context.node_store.get_node(node_address).await? {
72+
Some(node) => node.status,
73+
None => return Err(anyhow::anyhow!("Node not found: {}", node_address)),
74+
};
75+
76+
// Update the status in the store
77+
self.store_context
78+
.node_store
79+
.update_node_status(node_address, new_status.clone())
80+
.await?;
81+
82+
// Get the updated node and trigger status change handlers
83+
if let Some(updated_node) = self.store_context.node_store.get_node(node_address).await? {
84+
self.handle_status_change(&updated_node, old_status).await;
85+
}
86+
87+
Ok(())
88+
}
89+
5290
pub async fn run(&self) -> Result<(), Error> {
5391
let mut interval = interval(Duration::from_secs(self.interval_s));
5492

@@ -174,8 +212,6 @@ impl DiscoveryMonitor {
174212
node_address, discovery_node.node.ip_address, discovery_node.node.port
175213
);
176214
if let Err(e) = self
177-
.store_context
178-
.node_store
179215
.update_node_status(&node_address, NodeStatus::Dead)
180216
.await
181217
{
@@ -190,8 +226,6 @@ impl DiscoveryMonitor {
190226
node_address
191227
);
192228
if let Err(e) = self
193-
.store_context
194-
.node_store
195229
.update_node_status(&node_address, NodeStatus::Ejected)
196230
.await
197231
{
@@ -210,8 +244,6 @@ impl DiscoveryMonitor {
210244
node_address
211245
);
212246
if let Err(e) = self
213-
.store_context
214-
.node_store
215247
.update_node_status(&node_address, NodeStatus::Dead)
216248
.await
217249
{
@@ -228,16 +260,12 @@ impl DiscoveryMonitor {
228260
);
229261
if !discovery_node.is_provider_whitelisted {
230262
if let Err(e) = self
231-
.store_context
232-
.node_store
233263
.update_node_status(&node_address, NodeStatus::Ejected)
234264
.await
235265
{
236266
error!("Error updating node status: {}", e);
237267
}
238268
} else if let Err(e) = self
239-
.store_context
240-
.node_store
241269
.update_node_status(&node_address, NodeStatus::Dead)
242270
.await
243271
{
@@ -263,8 +291,6 @@ impl DiscoveryMonitor {
263291
if last_change < last_updated {
264292
info!("Node {} is dead but has been updated on discovery, marking as discovered", node_address);
265293
if let Err(e) = self
266-
.store_context
267-
.node_store
268294
.update_node_status(&node_address, NodeStatus::Discovered)
269295
.await
270296
{
@@ -398,6 +424,7 @@ mod tests {
398424
discovery_store_context,
399425
Arc::new(LoopHeartbeats::new(&mode)),
400426
1,
427+
vec![],
401428
);
402429

403430
let store_context_clone = store_context.clone();
@@ -478,6 +505,7 @@ mod tests {
478505
store_context.clone(),
479506
Arc::new(LoopHeartbeats::new(&mode)),
480507
1,
508+
vec![],
481509
);
482510

483511
let time_before = Utc::now();
@@ -626,6 +654,7 @@ mod tests {
626654
store_context.clone(),
627655
Arc::new(LoopHeartbeats::new(&mode)),
628656
1,
657+
vec![],
629658
);
630659

631660
// Try to sync the second node

crates/orchestrator/src/main.rs

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -305,6 +305,19 @@ async fn main() -> Result<()> {
305305
});
306306
}
307307

308+
// Create status_update_plugins for discovery monitor
309+
let mut discovery_status_update_plugins: Vec<Box<dyn StatusUpdatePlugin>> = vec![];
310+
311+
// Add webhook plugins to discovery status update plugins
312+
for plugin in &webhook_plugins {
313+
discovery_status_update_plugins.push(Box::new(plugin.clone()));
314+
}
315+
316+
// Add node groups plugin if available
317+
if let Some(group_plugin) = node_groups_plugin.clone() {
318+
discovery_status_update_plugins.push(Box::new(group_plugin.as_ref().clone()));
319+
}
320+
308321
let discovery_store_context = store_context.clone();
309322
let discovery_heartbeats = heartbeats.clone();
310323
tasks.spawn({
@@ -318,6 +331,7 @@ async fn main() -> Result<()> {
318331
discovery_store_context.clone(),
319332
discovery_heartbeats.clone(),
320333
args.max_healthy_nodes_with_same_endpoint,
334+
discovery_status_update_plugins,
321335
);
322336
monitor.run().await
323337
}
@@ -344,6 +358,19 @@ async fn main() -> Result<()> {
344358
}
345359
});
346360

361+
// Create status_update_plugins for status updater
362+
let mut status_updater_plugins: Vec<Box<dyn StatusUpdatePlugin>> = vec![];
363+
364+
// Add webhook plugins to status updater plugins
365+
for plugin in &webhook_plugins {
366+
status_updater_plugins.push(Box::new(plugin.clone()));
367+
}
368+
369+
// Add node groups plugin if available
370+
if let Some(group_plugin) = node_groups_plugin.clone() {
371+
status_updater_plugins.push(Box::new(group_plugin.as_ref().clone()));
372+
}
373+
347374
let status_update_store_context = store_context.clone();
348375
let status_update_heartbeats = heartbeats.clone();
349376
tasks.spawn({
@@ -357,7 +384,7 @@ async fn main() -> Result<()> {
357384
compute_pool_id,
358385
args.disable_ejection,
359386
status_update_heartbeats.clone(),
360-
status_update_plugins,
387+
status_updater_plugins,
361388
);
362389
status_updater.run().await
363390
}

0 commit comments

Comments
 (0)