Skip to content

Commit c64a941

Browse files
committed
add information about active/standby to proxy
1 parent e736889 commit c64a941

File tree

6 files changed

+163
-21
lines changed

6 files changed

+163
-21
lines changed

edgeless_orc/src/bin/proxy_cli.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ fn main() -> anyhow::Result<()> {
142142
println!(
143143
"{} -> {}",
144144
function,
145-
nodes.iter().map(|x| x.to_string()).collect::<Vec<String>>().join(",")
145+
nodes.iter().map(|x| format!("{}({})", x.0, if x.1 { "active" } else { "hot-standby" })).collect::<Vec<String>>().join(",")
146146
);
147147
}
148148
}
@@ -156,7 +156,7 @@ fn main() -> anyhow::Result<()> {
156156
println!(
157157
"{} -> {}",
158158
logical,
159-
physical.iter().map(|x| x.to_string()).collect::<Vec<String>>().join(",")
159+
physical.iter().map(|x| format!("{}({})", x.0, if x.1 { "active" } else { "standby" })).collect::<Vec<String>>().join(",")
160160
);
161161
}
162162
}

edgeless_orc/src/orchestrator/proxy_test.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,12 +57,12 @@ impl crate::proxy::Proxy for ProxyTest {
5757
}
5858
fn fetch_function_instances_to_nodes(
5959
&mut self,
60-
) -> std::collections::HashMap<edgeless_api::function_instance::ComponentId, Vec<edgeless_api::function_instance::NodeId>> {
60+
) -> std::collections::HashMap<edgeless_api::function_instance::ComponentId, Vec<(edgeless_api::function_instance::NodeId, bool)>> {
6161
std::collections::HashMap::new()
6262
}
6363
fn fetch_instances_to_physical_ids(
6464
&mut self,
65-
) -> std::collections::HashMap<edgeless_api::function_instance::ComponentId, Vec<edgeless_api::function_instance::ComponentId>> {
65+
) -> std::collections::HashMap<edgeless_api::function_instance::ComponentId, Vec<(edgeless_api::function_instance::ComponentId, bool)>> {
6666
std::collections::HashMap::new()
6767
}
6868
fn fetch_resource_instances_to_nodes(

edgeless_orc/src/proxy.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -84,15 +84,19 @@ pub trait Proxy: Sync + Send {
8484
) -> std::collections::HashMap<edgeless_api::function_instance::ComponentId, edgeless_api::resource_configuration::ResourceInstanceSpecification>;
8585

8686
/// Fetch the mapping between active function instances and nodes.
87+
/// Returns a map from logical function ID to a vector of (node_id, is_active) tuples.
88+
/// is_active=true means the instance is actively used, false means it's a hot-standby.
8789
fn fetch_function_instances_to_nodes(
8890
&mut self,
89-
) -> std::collections::HashMap<edgeless_api::function_instance::ComponentId, Vec<edgeless_api::function_instance::NodeId>>;
91+
) -> std::collections::HashMap<edgeless_api::function_instance::ComponentId, Vec<(edgeless_api::function_instance::NodeId, bool)>>;
9092

9193
/// Fetch the mapping between active function/resource instances and their
9294
/// physical identifiers.
95+
/// For functions, returns a map from logical ID to a vector of (physical_id, is_active) tuples.
96+
/// For resources, returns a single physical ID with is_active=true.
9397
fn fetch_instances_to_physical_ids(
9498
&mut self,
95-
) -> std::collections::HashMap<edgeless_api::function_instance::ComponentId, Vec<edgeless_api::function_instance::ComponentId>>;
99+
) -> std::collections::HashMap<edgeless_api::function_instance::ComponentId, Vec<(edgeless_api::function_instance::ComponentId, bool)>>;
96100

97101
/// Fetch the mapping between active resources instances and nodes.
98102
fn fetch_resource_instances_to_nodes(

edgeless_orc/src/proxy_none.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,12 +51,12 @@ impl super::proxy::Proxy for ProxyNone {
5151
}
5252
fn fetch_function_instances_to_nodes(
5353
&mut self,
54-
) -> std::collections::HashMap<edgeless_api::function_instance::ComponentId, Vec<edgeless_api::function_instance::NodeId>> {
54+
) -> std::collections::HashMap<edgeless_api::function_instance::ComponentId, Vec<(edgeless_api::function_instance::NodeId, bool)>> {
5555
std::collections::HashMap::new()
5656
}
5757
fn fetch_instances_to_physical_ids(
5858
&mut self,
59-
) -> std::collections::HashMap<edgeless_api::function_instance::ComponentId, Vec<edgeless_api::function_instance::ComponentId>> {
59+
) -> std::collections::HashMap<edgeless_api::function_instance::ComponentId, Vec<(edgeless_api::function_instance::ComponentId, bool)>> {
6060
std::collections::HashMap::new()
6161
}
6262
fn fetch_resource_instances_to_nodes(

edgeless_orc/src/proxy_redis.rs

Lines changed: 145 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -698,27 +698,27 @@ impl super::proxy::Proxy for ProxyRedis {
698698

699699
fn fetch_function_instances_to_nodes(
700700
&mut self,
701-
) -> std::collections::HashMap<edgeless_api::function_instance::ComponentId, Vec<edgeless_api::function_instance::NodeId>> {
701+
) -> std::collections::HashMap<edgeless_api::function_instance::ComponentId, Vec<(edgeless_api::function_instance::NodeId, bool)>> {
702702
let mut instances = std::collections::HashMap::new();
703703
for (logical_id, instance) in self.fetch_instances() {
704704
if let crate::active_instance::ActiveInstance::Function(_, instance_ids) = instance {
705-
instances.insert(logical_id, instance_ids.iter().map(|x| x.0.node_id).collect());
705+
instances.insert(logical_id, instance_ids.iter().map(|x| (x.0.node_id, x.1)).collect());
706706
}
707707
}
708708
instances
709709
}
710710

711711
fn fetch_instances_to_physical_ids(
712712
&mut self,
713-
) -> std::collections::HashMap<edgeless_api::function_instance::ComponentId, Vec<edgeless_api::function_instance::ComponentId>> {
713+
) -> std::collections::HashMap<edgeless_api::function_instance::ComponentId, Vec<(edgeless_api::function_instance::ComponentId, bool)>> {
714714
let mut instances = std::collections::HashMap::new();
715715
for (logical_id, instance) in self.fetch_instances() {
716716
match instance {
717717
crate::active_instance::ActiveInstance::Function(_, instance_ids) => {
718-
instances.insert(logical_id, instance_ids.iter().map(|x| x.0.function_id).collect());
718+
instances.insert(logical_id, instance_ids.iter().map(|x| (x.0.function_id, x.1)).collect());
719719
}
720720
crate::active_instance::ActiveInstance::Resource(_, instance_id) => {
721-
instances.insert(logical_id, vec![instance_id.function_id]);
721+
instances.insert(logical_id, vec![(instance_id.function_id, true)]);
722722
}
723723
}
724724
}
@@ -939,7 +939,8 @@ mod test {
939939
for (_instance, nodes) in function_instances {
940940
assert_eq!(nodes.len(), 1);
941941
assert!(!nodes.is_empty());
942-
assert!(nodes.first().unwrap() == &node1_id);
942+
assert_eq!(nodes.first().unwrap().0, node1_id);
943+
assert!(nodes.first().unwrap().1); // all instances are active in this test
943944
}
944945

945946
let resources_instances = redis_proxy.fetch_resource_instances_to_nodes();
@@ -962,11 +963,12 @@ mod test {
962963
for mapping in logical_to_physical {
963964
let logical = mapping.0;
964965
assert_eq!(1, mapping.1.len());
965-
let physical = mapping.1.first().unwrap();
966+
let (physical, is_active) = mapping.1.first().unwrap();
966967

967968
let elem = logical_physical_ids.iter().find(|x| x.0 == logical).unwrap();
968969
assert_eq!(logical, elem.0);
969970
assert_eq!(*physical, elem.1);
971+
assert!(*is_active); // all instances are active in this test
970972
}
971973
}
972974

@@ -1269,4 +1271,140 @@ mod test {
12691271
}
12701272
}
12711273
}
1274+
1275+
#[serial_test::serial]
1276+
#[test]
1277+
fn test_redis_proxy_hot_standby_instances() {
1278+
let mut redis_proxy = match get_proxy() {
1279+
Some(redis_proxy) => redis_proxy,
1280+
None => return,
1281+
};
1282+
1283+
// Create active instances with hot-standby replicas
1284+
let mut active_instances = std::collections::HashMap::new();
1285+
let node1_id = uuid::Uuid::new_v4();
1286+
let node2_id = uuid::Uuid::new_v4();
1287+
let node3_id = uuid::Uuid::new_v4();
1288+
1289+
// Function with one active instance and one hot-standby
1290+
let logical_id_1 = uuid::Uuid::new_v4();
1291+
let physical_id_1a = uuid::Uuid::new_v4();
1292+
let physical_id_1b = uuid::Uuid::new_v4();
1293+
active_instances.insert(
1294+
logical_id_1,
1295+
crate::active_instance::ActiveInstance::Function(
1296+
SpawnFunctionRequest {
1297+
spec: edgeless_api::function_instance::FunctionClassSpecification {
1298+
id: "fun1".to_string(),
1299+
function_type: "class".to_string(),
1300+
version: "1.0".to_string(),
1301+
binary: None,
1302+
code: None,
1303+
outputs: vec!["out".to_string()],
1304+
},
1305+
annotations: std::collections::HashMap::new(),
1306+
state_specification: edgeless_api::function_instance::StateSpecification {
1307+
state_id: uuid::Uuid::new_v4(),
1308+
state_policy: edgeless_api::function_instance::StatePolicy::NodeLocal,
1309+
},
1310+
workflow_id: "workflow_1".to_string(),
1311+
replication_factor: Some(2),
1312+
},
1313+
vec![
1314+
(edgeless_api::function_instance::InstanceId {
1315+
node_id: node1_id,
1316+
function_id: physical_id_1a,
1317+
}, true), // active instance
1318+
(edgeless_api::function_instance::InstanceId {
1319+
node_id: node2_id,
1320+
function_id: physical_id_1b,
1321+
}, false), // hot-standby instance
1322+
],
1323+
),
1324+
);
1325+
1326+
// Function with three replicas: one active, two hot-standby
1327+
let logical_id_2 = uuid::Uuid::new_v4();
1328+
let physical_id_2a = uuid::Uuid::new_v4();
1329+
let physical_id_2b = uuid::Uuid::new_v4();
1330+
let physical_id_2c = uuid::Uuid::new_v4();
1331+
active_instances.insert(
1332+
logical_id_2,
1333+
crate::active_instance::ActiveInstance::Function(
1334+
SpawnFunctionRequest {
1335+
spec: edgeless_api::function_instance::FunctionClassSpecification {
1336+
id: "fun2".to_string(),
1337+
function_type: "class".to_string(),
1338+
version: "1.0".to_string(),
1339+
binary: None,
1340+
code: None,
1341+
outputs: vec!["out".to_string()],
1342+
},
1343+
annotations: std::collections::HashMap::new(),
1344+
state_specification: edgeless_api::function_instance::StateSpecification {
1345+
state_id: uuid::Uuid::new_v4(),
1346+
state_policy: edgeless_api::function_instance::StatePolicy::NodeLocal,
1347+
},
1348+
workflow_id: "workflow_1".to_string(),
1349+
replication_factor: Some(3),
1350+
},
1351+
vec![
1352+
(edgeless_api::function_instance::InstanceId {
1353+
node_id: node1_id,
1354+
function_id: physical_id_2a,
1355+
}, true), // active instance
1356+
(edgeless_api::function_instance::InstanceId {
1357+
node_id: node2_id,
1358+
function_id: physical_id_2b,
1359+
}, false), // hot-standby instance
1360+
(edgeless_api::function_instance::InstanceId {
1361+
node_id: node3_id,
1362+
function_id: physical_id_2c,
1363+
}, false), // hot-standby instance
1364+
],
1365+
),
1366+
);
1367+
1368+
redis_proxy.update_active_instances(&active_instances);
1369+
1370+
// Fetch function instances to nodes and verify active/standby status
1371+
let function_instances = redis_proxy.fetch_function_instances_to_nodes();
1372+
assert_eq!(function_instances.len(), 2);
1373+
1374+
let nodes_1 = function_instances.get(&logical_id_1).unwrap();
1375+
assert_eq!(nodes_1.len(), 2);
1376+
assert_eq!(nodes_1[0].0, node1_id);
1377+
assert!(nodes_1[0].1); // active
1378+
assert_eq!(nodes_1[1].0, node2_id);
1379+
assert!(!nodes_1[1].1); // standby
1380+
1381+
let nodes_2 = function_instances.get(&logical_id_2).unwrap();
1382+
assert_eq!(nodes_2.len(), 3);
1383+
assert_eq!(nodes_2[0].0, node1_id);
1384+
assert!(nodes_2[0].1); // active
1385+
assert_eq!(nodes_2[1].0, node2_id);
1386+
assert!(!nodes_2[1].1); // standby
1387+
assert_eq!(nodes_2[2].0, node3_id);
1388+
assert!(!nodes_2[2].1); // standby
1389+
1390+
// Fetch instances to physical IDs and verify active/standby status
1391+
let logical_to_physical = redis_proxy.fetch_instances_to_physical_ids();
1392+
assert_eq!(logical_to_physical.len(), 2);
1393+
1394+
let physical_1 = logical_to_physical.get(&logical_id_1).unwrap();
1395+
assert_eq!(physical_1.len(), 2);
1396+
assert_eq!(physical_1[0].0, physical_id_1a);
1397+
assert!(physical_1[0].1); // active
1398+
assert_eq!(physical_1[1].0, physical_id_1b);
1399+
assert!(!physical_1[1].1); // standby
1400+
1401+
let physical_2 = logical_to_physical.get(&logical_id_2).unwrap();
1402+
assert_eq!(physical_2.len(), 3);
1403+
assert_eq!(physical_2[0].0, physical_id_2a);
1404+
assert!(physical_2[0].1); // active
1405+
assert_eq!(physical_2[1].0, physical_id_2b);
1406+
assert!(!physical_2[1].1); // standby
1407+
assert_eq!(physical_2[2].0, physical_id_2c);
1408+
assert!(!physical_2[2].1); // standby
1409+
}
12721410
}

edgeless_systemtests/src/lib.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -793,8 +793,8 @@ mod system_tests {
793793
let mut nodes_with_functions = std::collections::HashSet::new();
794794
for (logical_fid, nodes) in &instances {
795795
assert!(nodes.len() == 1);
796-
println!("before function {} -> node {}", logical_fid, nodes.first().unwrap());
797-
nodes_with_functions.insert(*nodes.first().unwrap());
796+
println!("before function {} -> node {} ({})", logical_fid, nodes.first().unwrap().0, if nodes.first().unwrap().1 { "active" } else { "hot-standby" });
797+
nodes_with_functions.insert(nodes.first().unwrap().0);
798798
}
799799

800800
// Add intents to migrate the function instances.
@@ -811,7 +811,7 @@ mod system_tests {
811811
assert!(nodes.len() == 1);
812812
intents.push(edgeless_orc::deploy_intent::DeployIntent::Migrate(
813813
*logical_fid,
814-
vec![other(nodes.first().unwrap())],
814+
vec![other(&nodes.first().unwrap().0)],
815815
));
816816
}
817817
redis_proxy.add_deploy_intents(intents);
@@ -825,8 +825,8 @@ mod system_tests {
825825
for (logical_fid, nodes) in &instances {
826826
if let Some(new_nodes) = new_instances.get(logical_fid) {
827827
assert!(new_nodes.len() == 1);
828-
let new_node = new_nodes.first().unwrap();
829-
let node = nodes.first().unwrap();
828+
let new_node = &new_nodes.first().unwrap().0;
829+
let node = &nodes.first().unwrap().0;
830830
if new_node != &other(node) {
831831
not_done = true;
832832
break;
@@ -845,7 +845,7 @@ mod system_tests {
845845
instances = redis_proxy.fetch_function_instances_to_nodes();
846846
for (logical_fid, nodes) in instances {
847847
assert!(nodes.len() == 1);
848-
println!("after function {} -> node {}", logical_fid, nodes.first().unwrap());
848+
println!("after function {} -> node {} ({})", logical_fid, nodes.first().unwrap().0, if nodes.first().unwrap().1 { "active" } else { "hot-standby" });
849849
}
850850

851851
// Check that the intents have been cleared.

0 commit comments

Comments
 (0)