Skip to content

Commit d059d80

Browse files
committed
improve orchestration for hot-redundancy by deploying replicas on different nodes; ensure timely data update in the proxy
1 parent 0702853 commit d059d80

File tree

2 files changed

+76
-14
lines changed

2 files changed

+76
-14
lines changed

edgeless_orc/src/orchestration_logic.rs

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,43 @@ impl OrchestrationLogic {
178178
}
179179
}
180180
}
181+
182+
pub fn next_excluding(
183+
&mut self,
184+
spawn_req: &edgeless_api::function_instance::SpawnFunctionRequest,
185+
exclude: &Vec<uuid::Uuid>,
186+
) -> Option<uuid::Uuid> {
187+
let feasible_nodes = self.feasible_nodes(spawn_req, &self.nodes);
188+
let candidates: Vec<uuid::Uuid> = feasible_nodes
189+
.into_iter()
190+
.filter(|node_id| !exclude.contains(node_id))
191+
.collect();
192+
if candidates.is_empty() {
193+
return None;
194+
}
195+
match self.orchestration_strategy {
196+
crate::OrchestrationStrategy::Random => {
197+
let rv = rand::distributions::Uniform::new(0, candidates.len());
198+
let rnd = rv.sample(&mut self.rng);
199+
Some(candidates[rnd])
200+
}
201+
crate::OrchestrationStrategy::RoundRobin => {
202+
// Prevent infinite loop: evaluate each node at most once.
203+
for _ in 0..candidates.len() {
204+
// Wrap-around if the current index is out of bounds.
205+
if self.round_robin_current_index >= candidates.len() {
206+
self.round_robin_current_index = 0;
207+
}
208+
209+
let cand_ndx = self.round_robin_current_index;
210+
self.round_robin_current_index += 1;
211+
212+
return Some(candidates[cand_ndx]);
213+
}
214+
None
215+
}
216+
}
217+
}
181218
}
182219

183220
/// Tests

edgeless_orc/src/orchestrator_task.rs

Lines changed: 39 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ impl OrchestratorTask {
8383
if let Err(err) = reply_channel.send(res) {
8484
log::error!("Orchestrator channel error in SPAWN: {:?}", err);
8585
}
86+
self.refresh().await;
8687
}
8788
crate::orchestrator::OrchestratorRequest::StopFunction(lid) => {
8889
log::debug!("Orchestrator StopFunction {:?}", lid);
@@ -484,6 +485,17 @@ impl OrchestratorTask {
484485
}
485486
}
486487

488+
fn select_node_excluding(
489+
&mut self,
490+
spawn_req: &edgeless_api::function_instance::SpawnFunctionRequest,
491+
exclude_node_ids: &Vec<edgeless_api::function_instance::NodeId>,
492+
) -> anyhow::Result<edgeless_api::function_instance::NodeId> {
493+
match self.orchestration_logic.next_excluding(spawn_req, exclude_node_ids) {
494+
Some(node_id) => Ok(node_id),
495+
None => Err(anyhow::anyhow!("no valid node found (redundancy requires that there are enough nodes)")),
496+
}
497+
}
498+
487499
/// Start a new logical function in this orchestration domain, as assigned
488500
/// by the controller
489501
async fn start_function(
@@ -495,29 +507,33 @@ impl OrchestratorTask {
495507
let mut results: Vec<anyhow::Result<edgeless_api::common::StartComponentResponse<uuid::Uuid>>> = vec![];
496508

497509
// Select the target node.
498-
let res = match self.select_node(spawn_req) {
510+
let (res, active_instance_node_id) = match self.select_node(spawn_req) {
499511
Ok(node_id) => {
500512
// Start the function instance.
501-
self.start_function_in_node(spawn_req, &lid, &node_id).await
513+
(self.start_function_in_node(spawn_req, &lid, &node_id).await, node_id)
502514
}
503-
Err(err) => Ok(edgeless_api::common::StartComponentResponse::ResponseError(
515+
Err(err) => (Ok(edgeless_api::common::StartComponentResponse::ResponseError(
504516
edgeless_api::common::ResponseError {
505517
summary: format!("Could not start function {}", spawn_req.spec.to_short_string()),
506518
detail: Some(err.to_string()),
507519
},
508-
)),
520+
)), uuid::Uuid::nil())
509521
};
510522
results.push(res);
511523

512524
// start the replicas for hot-standby redundancy, if replication factor is > 1
513525
if let Some(replication_factor) = spawn_req.replication_factor {
514526
if replication_factor > 1 {
515-
// TODO:2 start replicas on different nodes to provide good coverage and good fault tolerance
527+
// start replicas on different nodes to provide good coverage and good fault tolerance
528+
let mut used_node_ids = vec![active_instance_node_id];
516529
for _ in 1..replication_factor {
517-
let res = match self.select_node(spawn_req) {
530+
let res = match self.select_node_excluding(spawn_req, &used_node_ids) {
518531
Ok(node_id) => {
519532
// Start the function instance.
520-
self.start_function_in_node(spawn_req, &lid, &node_id).await
533+
let res = self.start_function_in_node(spawn_req, &lid, &node_id).await;
534+
// update the list of used node ids
535+
used_node_ids.push(node_id);
536+
res
521537
}
522538
Err(err) => Ok(edgeless_api::common::StartComponentResponse::ResponseError(
523539
edgeless_api::common::ResponseError {
@@ -687,6 +703,14 @@ impl OrchestratorTask {
687703
node_id: *node_id,
688704
function_id: id.function_id,
689705
}, is_active)]); // hot-standby instance (false = standby, true = active)
706+
log::info!(
707+
"Spawned {} instance number {} at node_id {}, LID {}, pid {}",
708+
if is_active { "active" } else { "hot-standby" },
709+
self.active_instances.get(lid).unwrap().instance_ids().len(),
710+
node_id,
711+
&lid,
712+
id.function_id
713+
);
690714
} else {
691715
self.active_instances.insert(
692716
*lid,
@@ -698,15 +722,15 @@ impl OrchestratorTask {
698722
}, true)], // first instance is active (true = active, false = standby)
699723
),
700724
);
725+
log::info!(
726+
"Spawned active instance number {} at node_id {}, LID {}, pid {}",
727+
self.active_instances.get(lid).unwrap().instance_ids().len(),
728+
node_id,
729+
&lid,
730+
id.function_id
731+
);
701732
}
702733
self.active_instances_changed = true;
703-
log::info!(
704-
"Spawned instance number {} at node_id {}, LID {}, pid {}",
705-
self.active_instances.len(),
706-
node_id,
707-
&lid,
708-
id.function_id
709-
);
710734

711735
Ok(edgeless_api::common::StartComponentResponse::InstanceId(*lid))
712736
}
@@ -1088,6 +1112,7 @@ impl OrchestratorTask {
10881112
log::info!("no disconnected functions, num_nodes {}", self.nodes.len());
10891113
continue;
10901114
}
1115+
self.active_instances_changed = true;
10911116

10921117
// some physical instances of this function got disconnected - need to handle this by repatching
10931118
to_be_repatched.push(*origin_lid);

0 commit comments

Comments
 (0)