diff --git a/Cargo.lock b/Cargo.lock index 38ec0baf..43c5bbb4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2108,6 +2108,7 @@ dependencies = [ "thiserror", "tokio", "tokio-stream", + "tokio-util", "tonic", "tracing", ] diff --git a/crates/hamgrd/src/main.rs b/crates/hamgrd/src/main.rs index c9aea264..738087bb 100644 --- a/crates/hamgrd/src/main.rs +++ b/crates/hamgrd/src/main.rs @@ -68,9 +68,9 @@ async fn main() { let runtime_data = RuntimeData::new(args.slot_id, swbus_config.npu_ipv4, swbus_config.npu_ipv6); - // Setup swbus and actor runtime + // Setup swbus and actor runtime to the first endpoint of swbusd let mut swbus_edge = SwbusEdgeRuntime::new( - format!("http://{}", swbus_config.endpoint), + format!("http://{}", swbus_config.endpoints.first().unwrap()), swbus_sp.clone(), ConnectionType::InNode, ); diff --git a/crates/swbus-cli/src/main.rs b/crates/swbus-cli/src/main.rs index 2504390f..2f0ade3f 100644 --- a/crates/swbus-cli/src/main.rs +++ b/crates/swbus-cli/src/main.rs @@ -159,7 +159,7 @@ async fn main() { sp.service_type = "swbus-cli".to_string(); sp.service_id = Uuid::new_v4().to_string(); let mut runtime = SwbusEdgeRuntime::new( - format!("http://{}", swbus_config.endpoint), + format!("http://{}", swbus_config.endpoints.first().unwrap()), sp.clone(), ConnectionType::Client, ); @@ -269,7 +269,10 @@ mod tests { std::env::set_var("DEV", format!("dpu{slot}")); let config = get_swbus_config(None).unwrap(); - assert_eq!(config.endpoint.to_string(), format!("{}:{}", "10.0.1.0", 23606 + slot)); + assert_eq!( + config.endpoints.first().unwrap().to_string(), + format!("{}:{}", "10.0.1.0", 23606 + slot) + ); let expected_sp = ServicePath::with_node("region-a", "cluster-a", &format!("host1-dpu{slot}"), "", "", "", ""); assert!(config .routes diff --git a/crates/swbus-config/src/lib.rs b/crates/swbus-config/src/lib.rs index 50f6e782..847935a1 100644 --- a/crates/swbus-config/src/lib.rs +++ b/crates/swbus-config/src/lib.rs @@ -15,7 +15,7 @@ const CONFIG_DB: &str = "CONFIG_DB"; #[derive(Debug, Clone, Deserialize, PartialEq, Eq)] pub struct SwbusConfig { - pub endpoint: SocketAddr, + pub endpoints: Vec, pub routes: Vec, pub peers: Vec, pub npu_ipv4: Option, @@ -174,7 +174,6 @@ fn peer_config_from_dpu_entry( "swbusd_port is not found in dpu {key} is not found" )))?; - // dual stack is not supported. Either all ipv4 or all ipv6. if let Some(npu_ipv4) = dpu_entry.npu_ipv4 { let npu_ipv4 = npu_ipv4 .parse::() @@ -184,7 +183,9 @@ fn peer_config_from_dpu_entry( endpoint: SocketAddr::new(IpAddr::V4(npu_ipv4), swbusd_port), conn_type: ConnectionType::InCluster, }); - } else if let Some(npu_ipv6) = dpu_entry.npu_ipv6 { + } + + if let Some(npu_ipv6) = dpu_entry.npu_ipv6 { let npu_ipv6 = npu_ipv6 .parse::() .map_err(|_| SwbusConfigError::InvalidConfig(format!("Invalid IPv6 address: {npu_ipv6}")))?; @@ -272,7 +273,7 @@ fn get_loopback_address(lb_index: u32) -> Result<(Option, Option Result { let mut peers = Vec::new(); let mut myroutes: Option> = None; - let mut myendpoint: Option = None; + let mut myendpoints: Vec = Vec::new(); let (region, cluster, hostname) = get_device_info()?; @@ -303,9 +304,10 @@ pub fn swbus_config_from_db(dpu_id: u32) -> Result { myroutes = Some(route_config_from_dpu_entry(&dpu, ®ion, &cluster, &hostname)); if let Some(npu_ipv4) = dpu.npu_ipv4 { - myendpoint = Some(SocketAddr::new(std::net::IpAddr::V4(npu_ipv4), swbusd_port)); - } else if let Some(npu_ipv6) = dpu.npu_ipv6 { - myendpoint = Some(SocketAddr::new(std::net::IpAddr::V6(npu_ipv6), swbusd_port)); + myendpoints.push(SocketAddr::new(std::net::IpAddr::V4(npu_ipv4), swbusd_port)); + } + if let Some(npu_ipv6) = dpu.npu_ipv6 { + myendpoints.push(SocketAddr::new(std::net::IpAddr::V6(npu_ipv6), swbusd_port)); } continue; } @@ -344,7 +346,7 @@ pub fn swbus_config_from_db(dpu_id: u32) -> Result { info!("successfully load swbus config from configdb for dpu {}", dpu_id); Ok(SwbusConfig { - endpoint: myendpoint.unwrap(), + endpoints: myendpoints, routes: myroutes.unwrap(), peers, npu_ipv4: my_ipv4, @@ -359,14 +361,17 @@ pub fn swbus_config_from_yaml(yaml_file: &str) -> Result { // Parse the YAML data let mut swbus_config: SwbusConfig = serde_yaml::from_reader(reader) .map_err(|e| SwbusConfigError::InvalidConfig(format!("Failed to parse YAML file: {e}")))?; - let ip = swbus_config.endpoint.ip(); - match ip { - IpAddr::V4(ipv4) => { - swbus_config.npu_ipv4 = Some(ipv4); - } - IpAddr::V6(ipv6) => { - swbus_config.npu_ipv6 = Some(ipv6); + let ips = swbus_config.endpoints.iter().map(|addr| addr.ip()).collect::>(); + + for ip in ips { + match ip { + IpAddr::V4(ipv4) => { + swbus_config.npu_ipv4 = Some(ipv4); + } + IpAddr::V6(ipv6) => { + swbus_config.npu_ipv6 = Some(ipv6); + } } } @@ -474,25 +479,35 @@ mod tests { let mut config_fromdb = swbus_config_from_db(0).unwrap(); assert_eq!(config_fromdb.routes.len(), 1); - assert_eq!(config_fromdb.peers.len(), 5); + assert_eq!(config_fromdb.peers.len(), 10); // create equivalent config in yaml let yaml_content = r#" - endpoint: "10.0.1.0:23606" + endpoints: ["10.0.1.0:23606", "[2001:db8:1::]:23606"] routes: - key: "region-a.cluster-a.host1-dpu0" scope: "InCluster" peers: - endpoint: "10.0.1.0:23607" conn_type: "InCluster" + - endpoint: "[2001:db8:1::]:23607" + conn_type: "InCluster" - endpoint: "10.0.1.1:23606" conn_type: "InCluster" + - endpoint: "[2001:db8:1::1]:23606" + conn_type: "InCluster" - endpoint: "10.0.1.1:23607" conn_type: "InCluster" + - endpoint: "[2001:db8:1::1]:23607" + conn_type: "InCluster" - endpoint: "10.0.1.2:23606" conn_type: "InCluster" + - endpoint: "[2001:db8:1::2]:23606" + conn_type: "InCluster" - endpoint: "10.0.1.2:23607" conn_type: "InCluster" + - endpoint: "[2001:db8:1::2]:23607" + conn_type: "InCluster" "#; let dir = tempdir().unwrap(); @@ -505,9 +520,10 @@ mod tests { expected.npu_ipv6 = Some(Ipv6Addr::from_str("2001:db8:1::").unwrap()); // sort before compare config_fromdb.routes.sort_by(|a, b| a.key.cmp(&b.key)); - config_fromdb.peers.sort_by(|a, b| a.endpoint.cmp(&b.endpoint)); + config_fromdb.peers.sort_by_key(|p| p.endpoint.to_string()); expected.routes.sort_by(|a, b| a.key.cmp(&b.key)); - expected.peers.sort_by(|a, b| a.endpoint.cmp(&b.endpoint)); + expected.peers.sort_by_key(|p| p.endpoint.to_string()); + assert_eq!(config_fromdb, expected); cleanup_configdb_for_test(); @@ -516,14 +532,16 @@ mod tests { #[test] fn test_load_from_yaml() { let yaml_content = r#" - endpoint: 10.0.0.1:8000 + endpoints: ["10.0.0.1:8000"] routes: - key: "region-a.cluster-a.10.0.0.1-dpu0" scope: "InCluster" peers: - - endpoint: "10.0.0.2:8000" + - id: "region-a.cluster-a.10.0.0.2-dpu0" + endpoint: "10.0.0.2:8000" conn_type: "InCluster" - - endpoint: "10.0.0.3:8000" + - id: "region-a.cluster-a.10.0.0.3-dpu0" + endpoint: "10.0.0.3:8000" conn_type: "InCluster" "#; diff --git a/crates/swbus-core/src/mux/multiplexer.rs b/crates/swbus-core/src/mux/multiplexer.rs index b0fa535f..db6e699a 100644 --- a/crates/swbus-core/src/mux/multiplexer.rs +++ b/crates/swbus-core/src/mux/multiplexer.rs @@ -381,6 +381,8 @@ impl SwbusMultiplexer { // get old routes, or create an empty one, and hold lock on the entry let mut old_routes = self.routes_by_conn.entry(conn_info.clone()).or_default(); + debug!("Old routes from conn {:?}: {:?}", conn_info.id(), *old_routes); + debug!("New routes from conn {:?}: {:?}", conn_info.id(), new_routes); let routes_to_remove: BTreeSet = old_routes.difference(&new_routes).cloned().collect(); let routes_to_add: BTreeSet = new_routes.difference(&old_routes).cloned().collect(); diff --git a/crates/swbus-core/src/mux/service.rs b/crates/swbus-core/src/mux/service.rs index 07b4d9d8..340d5c2c 100644 --- a/crates/swbus-core/src/mux/service.rs +++ b/crates/swbus-core/src/mux/service.rs @@ -11,10 +11,7 @@ use swbus_config::SwbusConfig; use swbus_proto::result::*; use swbus_proto::swbus::swbus_service_server::{SwbusService, SwbusServiceServer}; use swbus_proto::swbus::*; -use tokio::sync::{ - mpsc, - oneshot::{self, Receiver, Sender}, -}; +use tokio::sync::mpsc; use tokio_stream::wrappers::ReceiverStream; use tokio_stream::Stream; use tokio_util::sync::CancellationToken; @@ -22,41 +19,49 @@ use tonic::{transport::Server, Request, Response, Status, Streaming}; use tracing::*; pub struct SwbusServiceHost { - swbus_server_addr: SocketAddr, + swbus_server_addrs: Vec, mux: Option>, conn_store: Option>, - shutdown_tx: Option>, - shutdown_rx: Option>, + shutdown_ct: CancellationToken, } type SwbusMessageResult = Result, Status>; type SwbusMessageStream = Pin> + Send>>; +// Separate implementation struct to allow cloning for multiple servers +struct SwbusServiceImpl { + mux: Arc, + conn_store: Arc, +} + impl SwbusServiceHost { - pub fn new(swbus_server_addr: &SocketAddr) -> Self { - let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>(); + pub fn new(swbus_server_addrs: Vec) -> Self { Self { - swbus_server_addr: *swbus_server_addr, + swbus_server_addrs, mux: None, conn_store: None, - shutdown_tx: Some(shutdown_tx), - shutdown_rx: Some(shutdown_rx), + shutdown_ct: CancellationToken::new(), } } - pub fn take_shutdown_sender(&mut self) -> Option> { - self.shutdown_tx.take() + pub fn get_shutdown_token(&self) -> CancellationToken { + self.shutdown_ct.clone() } - pub async fn shutdown(&mut self) { - if let Some(shutdown_tx) = self.shutdown_tx.take() { - let _ = shutdown_tx.send(()); - } + pub async fn shutdown(&self) { + info!("SwbusServiceServer shutting down"); + self.shutdown_ct.cancel(); } pub async fn start(mut self, config: SwbusConfig) -> Result<()> { - debug!("SwbusServiceServer starting at {}", self.swbus_server_addr); - let addr = self.swbus_server_addr; + if self.swbus_server_addrs.is_empty() { + return Err(SwbusError::input( + SwbusErrorCode::InvalidArgs, + "No server addresses provided.".to_string(), + )); + } + + debug!("SwbusServiceServer starting at {:?}", self.swbus_server_addrs); if config.routes.is_empty() { return Err(SwbusError::input( @@ -90,29 +95,54 @@ impl SwbusServiceHost { self.mux = Some(mux); let conn_store_clone = conn_store.clone(); self.conn_store = Some(conn_store); - let shutdown_rx = self.shutdown_rx.take().unwrap(); - - Server::builder() - .add_service(SwbusServiceServer::new(self)) - .serve_with_shutdown(addr, async { - shutdown_rx.await.ok(); - info!("SwbusServiceServer received shutdown signal"); - conn_store_clone.shutdown().await; - }) - .await - .map_err(|e| { - SwbusError::connection( - SwbusErrorCode::ConnectionError, - io::Error::other(format!("Failed to listen at {addr}: {e}")), - ) - })?; - debug!("SwbusServiceServer terminated"); + + // Start multiple servers, one for each address + let mut server_handles = Vec::new(); + let addrs = self.swbus_server_addrs.clone(); + + for addr in addrs.into_iter() { + let service = SwbusServiceServer::new(SwbusServiceImpl { + mux: self.mux.clone().unwrap(), + conn_store: self.conn_store.clone().unwrap(), + }); + + let shutdown_ct_for_server = self.shutdown_ct.clone(); + let conn_store_clone = conn_store_clone.clone(); + let server_handle = tokio::spawn(async move { + info!("Starting SwbusServiceServer on {}", addr); + Server::builder() + .add_service(service) + .serve_with_shutdown(addr, async move { + shutdown_ct_for_server.cancelled().await; + info!("SwbusServiceServer on {} shutting down", addr); + conn_store_clone.shutdown().await; + }) + .await + .map_err(|e| { + SwbusError::connection( + SwbusErrorCode::ConnectionError, + io::Error::other(format!("Failed to listen at {addr}: {e}")), + ) + }) + }); + + server_handles.push(server_handle); + } + + // Wait for all servers to complete + for handle in server_handles { + handle + .await + .map_err(|e| SwbusError::internal(SwbusErrorCode::Fail, format!("Server task panicked: {e}")))??; + } + debug!("All SwbusServiceServers terminated"); + Ok(()) } } #[tonic::async_trait] -impl SwbusService for SwbusServiceHost { +impl SwbusService for SwbusServiceImpl { type StreamMessagesStream = SwbusMessageStream; #[instrument(name="connection_received", level="info", skip_all, fields(addr=%request.remote_addr().unwrap()))] @@ -163,20 +193,15 @@ impl SwbusService for SwbusServiceHost { let (out_tx, out_rx) = mpsc::channel(16); let conn_info = Arc::new(SwbusConnInfo::new_server(conn_type, client_addr, service_path)); - let conn = SwbusConn::from_incoming_stream( - conn_info, - in_stream, - out_tx, - self.mux.as_ref().unwrap().clone(), - self.conn_store.as_ref().unwrap().clone(), - ) - .await; - self.conn_store.as_ref().unwrap().conn_established(conn); + let conn = + SwbusConn::from_incoming_stream(conn_info, in_stream, out_tx, self.mux.clone(), self.conn_store.clone()) + .await; + self.conn_store.conn_established(conn); let out_stream = ReceiverStream::new(out_rx); // Send server service path in response metadata let mut response = Response::new(Box::pin(out_stream) as Self::StreamMessagesStream); - let server_service_path = self.mux.as_ref().unwrap().get_my_service_path().to_string(); + let server_service_path = self.mux.get_my_service_path().to_string(); response .metadata_mut() .insert(SWBUS_SERVER_SERVICE_PATH, server_service_path.parse().unwrap()); diff --git a/crates/swbus-core/tests/common/test_executor.rs b/crates/swbus-core/tests/common/test_executor.rs index 67b19a8e..c5b11a13 100644 --- a/crates/swbus-core/tests/common/test_executor.rs +++ b/crates/swbus-core/tests/common/test_executor.rs @@ -97,7 +97,7 @@ impl TopoRuntime { .unwrap_or_else(|| panic!("Failed to find topo swbusd {}", client.swbusd)); self.start_client( name, - &server.endpoint, + &server.endpoints.first().unwrap(), ServicePath::from_string(&client.client_sp).unwrap(), ) .await; @@ -107,7 +107,7 @@ impl TopoRuntime { } async fn start_server(&mut self, name: &str, route_config: &SwbusConfig) { - let service_host = SwbusServiceHost::new(&route_config.endpoint); + let service_host = SwbusServiceHost::new(route_config.endpoints.clone()); let config_clone = route_config.clone(); let server_task = tokio::spawn(async move { service_host.start(config_clone).await.unwrap(); @@ -115,7 +115,7 @@ impl TopoRuntime { self.server_jobs.push(server_task); - println!("Server {} started at {}", name, &route_config.endpoint); + println!("Server {} started at {:?}", name, route_config.endpoints); } async fn start_client(&mut self, name: &str, node_addr: &SocketAddr, client_sp: ServicePath) { diff --git a/crates/swbus-core/tests/data/b2b/topo.json b/crates/swbus-core/tests/data/b2b/topo.json index 27ef3308..f43aa7c9 100644 --- a/crates/swbus-core/tests/data/b2b/topo.json +++ b/crates/swbus-core/tests/data/b2b/topo.json @@ -2,7 +2,7 @@ "description": "Simple topo with 2 swbusd and 1 client: client <-> swbusd1 <-> swbusd2", "servers": { "swbusd1": { - "endpoint": "127.0.0.1:60001", + "endpoints": ["127.0.0.1:60001"], "routes": [ { "key": "region-a.cluster-a.10.0.0.1-dpu0", @@ -17,7 +17,7 @@ ] }, "swbusd2": { - "endpoint": "127.0.0.1:60002", + "endpoints": ["127.0.0.1:60002"], "routes": [ { "key": "region-a.cluster-a.10.0.0.2-dpu0", diff --git a/crates/swbus-core/tests/data/dual-stack/test_ping.json b/crates/swbus-core/tests/data/dual-stack/test_ping.json new file mode 100644 index 00000000..1ec0d690 --- /dev/null +++ b/crates/swbus-core/tests/data/dual-stack/test_ping.json @@ -0,0 +1,49 @@ +[ + { + "name": "inter_cluster_ping", + "description": "verify ping and response", + "steps": [ + { + "requests": [ + { + "client": "swbusd_cluster-a.node1_client", + "message": { + "header": { + "version": 1, + "flag": 0, + "ttl": 64, + "source": "region-a.cluster-a.node1/testsvc/0", + "destination": "region-a.cluster-b.node1" + }, + "body": { + "PingRequest": {} + } + } + } + ], + "responses": [ + { + "client": "swbusd_cluster-a.node1_client", + "message": { + "header": { + "version": 1, + "flag": 0, + "ttl": 60, + "source": "region-a.cluster-b.node1", + "destination": "region-a.cluster-a.node1/testsvc/0" + }, + "body": { + "Response": { + "request_id": 0, + "error_code": 1, + "error_message": "", + "response_body": null + } + } + } + } + ] + } + ] + } +] \ No newline at end of file diff --git a/crates/swbus-core/tests/data/dual-stack/topo.json b/crates/swbus-core/tests/data/dual-stack/topo.json new file mode 100644 index 00000000..dafa48f7 --- /dev/null +++ b/crates/swbus-core/tests/data/dual-stack/topo.json @@ -0,0 +1,83 @@ +{ + "description": "Simple topo with 3 swbusd including 1 gateways and 1 client: client <-> swbusd_cluster-a.node1 <-> swbusd_cluster-a.gw <-> swbusd_cluster-b.node1. cluster-a runs ipv4 and cluster-b runs ipv6. cluster-a.gw supports both ipv4 and ipv6", + "servers": { + "swbusd_cluster-a.node1": { + "endpoints": ["127.0.0.1:60001"], + "routes": [ + { + "key": "region-a.cluster-a.node1", + "scope": "InCluster" + } + ], + "peers": [ + { + "endpoint": "127.0.0.1:60002", + "conn_type": "InCluster" + } + ] + }, + "swbusd_cluster-a.gw": { + "endpoints": ["127.0.0.1:60002", "[::1]:60002"], + "routes": [ + { + "key": "region-a.cluster-a.gw", + "scope": "InCluster" + }, + { + "key": "region-a.cluster-a", + "scope": "InRegion" + } + ], + "peers": [ + { + "endpoint": "127.0.0.1:60001", + "conn_type": "InCluster" + } + ] + }, + "swbusd_cluster-b.gw": { + "endpoints": ["[::1]:60003"], + "routes": [ + { + "key": "region-a.cluster-b.gw", + "scope": "InCluster" + }, + { + "key": "region-a.cluster-b", + "scope": "InRegion" + } + ], + "peers": [ + { + "endpoint": "[::1]:60002", + "conn_type": "InRegion" + }, + { + "endpoint": "[::1]:60004", + "conn_type": "InCluster" + } + ] + }, + "swbusd_cluster-b.node1": { + "endpoints": ["[::1]:60004"], + "routes": [ + { + "key": "region-a.cluster-b.node1", + "scope": "InCluster" + } + ], + "peers": [ + { + "endpoint": "[::1]:60003", + "conn_type": "InCluster" + } + ] + } + }, + "clients": { + "swbusd_cluster-a.node1_client": { + "swbusd": "swbusd_cluster-a.node1", + "client_sp": "region-a.cluster-a.node1/testsvc/0" + } + } +} \ No newline at end of file diff --git a/crates/swbus-core/tests/data/inter-cluster/test_show_route.json b/crates/swbus-core/tests/data/inter-cluster/test_show_route.json index 9da3f9f8..0cdbc6d7 100644 --- a/crates/swbus-core/tests/data/inter-cluster/test_show_route.json +++ b/crates/swbus-core/tests/data/inter-cluster/test_show_route.json @@ -55,12 +55,6 @@ "route_scope": 2, "hop_count": 1 }, - { - "service_path": "region-a.cluster-a.gw", - "nh_service_path": "region-a.cluster-a.gw", - "route_scope": 2, - "hop_count": 1 - }, { "service_path": "region-a.cluster-a.node1/testsvc/0", "nh_service_path": "region-a.cluster-a.node1/testsvc/0", diff --git a/crates/swbus-core/tests/data/inter-cluster/topo.json b/crates/swbus-core/tests/data/inter-cluster/topo.json index c167ddf8..6aadcdf6 100644 --- a/crates/swbus-core/tests/data/inter-cluster/topo.json +++ b/crates/swbus-core/tests/data/inter-cluster/topo.json @@ -2,7 +2,7 @@ "description": "Simple topo with 4 swbusd including 2 gateways and 1 client: client <-> swbusd_cluster-a.node1 <-> swbusd_cluster-a.gw <-> swbusd_cluster-b.gw <-> swbusd_cluster-b.node1", "servers": { "swbusd_cluster-a.node1": { - "endpoint": "127.0.0.1:60001", + "endpoints": ["127.0.0.1:60001"], "routes": [ { "key": "region-a.cluster-a.node1", @@ -17,7 +17,7 @@ ] }, "swbusd_cluster-a.gw": { - "endpoint": "127.0.0.1:60002", + "endpoints": ["127.0.0.1:60002"], "routes": [ { "key": "region-a.cluster-a.gw", @@ -29,10 +29,6 @@ } ], "peers": [ - { - "endpoint": "127.0.0.1:60001", - "conn_type": "InCluster" - }, { "endpoint": "127.0.0.1:60003", "conn_type": "InRegion" @@ -40,7 +36,7 @@ ] }, "swbusd_cluster-b.gw": { - "endpoint": "127.0.0.1:60003", + "endpoints": ["127.0.0.1:60003"], "routes": [ { "key": "region-a.cluster-b.gw", @@ -63,7 +59,7 @@ ] }, "swbusd_cluster-b.node1": { - "endpoint": "127.0.0.1:60004", + "endpoints": ["127.0.0.1:60004"], "routes": [ { "key": "region-a.cluster-b.node1", diff --git a/crates/swbus-core/tests/dual_stack.rs b/crates/swbus-core/tests/dual_stack.rs new file mode 100644 index 00000000..35ae918d --- /dev/null +++ b/crates/swbus-core/tests/dual_stack.rs @@ -0,0 +1,20 @@ +mod common; +use common::test_executor::{run_tests, TopoRuntime}; +use sonic_common::log::init_logger_for_test; +#[tokio::test] +async fn test_dual_stack() { + init_logger_for_test(); + + let mut topo = TopoRuntime::new("tests/data/dual-stack/topo.json"); + topo.bring_up().await; + + // Wait for topology to stabilize + tokio::time::sleep(tokio::time::Duration::from_secs(3)).await; + + // can't split run_tests into multiple test cases. Each tokio::test creates a new runtime, from which bring_up_topo runs. + // when a test case is done, the runtime is dropped and the topology is torn down. It can't be reused to run another test case. + // If move bring_up_topo outside of test cases to a setup function and create a single shared runtime, test case cannot + // use the shared runtime. It will panic with "fatal runtime error: thread::set_current should only be called once per thread". + run_tests(&mut topo, "tests/data/dual-stack/test_ping.json", None).await; + //run_tests(&mut topo, "tests/data/inter-cluster/test_show_route.json", None).await; +} diff --git a/crates/swbus-core/tests/inter_cluster_tests.rs b/crates/swbus-core/tests/inter_cluster_tests.rs index 5a367cad..a4e4fe64 100644 --- a/crates/swbus-core/tests/inter_cluster_tests.rs +++ b/crates/swbus-core/tests/inter_cluster_tests.rs @@ -7,6 +7,10 @@ async fn test_inter_cluster() { let mut topo = TopoRuntime::new("tests/data/inter-cluster/topo.json"); topo.bring_up().await; + + // Wait for topology to stabilize + tokio::time::sleep(tokio::time::Duration::from_secs(3)).await; + // can't split run_tests into multiple test cases. Each tokio::test creates a new runtime, from which bring_up_topo runs. // when a test case is done, the runtime is dropped and the topology is torn down. It can't be reused to run another test case. // If move bring_up_topo outside of test cases to a setup function and create a single shared runtime, test case cannot diff --git a/crates/swbus-edge/Cargo.toml b/crates/swbus-edge/Cargo.toml index 6744b34f..851b107b 100644 --- a/crates/swbus-edge/Cargo.toml +++ b/crates/swbus-edge/Cargo.toml @@ -15,6 +15,7 @@ workspace = true # Async framework tokio.workspace = true tokio-stream.workspace = true +tokio-util.workspace = true # gRPC tonic.workspace = true diff --git a/crates/swbus-edge/src/edge_runtime.rs b/crates/swbus-edge/src/edge_runtime.rs index e6f3f27a..ceeb5ce2 100644 --- a/crates/swbus-edge/src/edge_runtime.rs +++ b/crates/swbus-edge/src/edge_runtime.rs @@ -138,8 +138,8 @@ mod tests { use swbus_core::mux::service::SwbusServiceHost; use swbus_proto::swbus::*; use tokio::sync::mpsc::{self, Receiver, Sender}; - use tokio::sync::oneshot; use tokio::time::{self, timeout, Duration, Instant}; + use tokio_util::sync::CancellationToken; fn make_swbusd_config() -> SwbusConfig { // generate a random port @@ -148,7 +148,7 @@ mod tests { let config = format!( r#" - endpoint: "127.0.0.1:{port}" + endpoints: ["127.0.0.1:{port}"] routes: - key: "region-a.cluster-a.10.0.1.0-dpu0" scope: "InCluster" @@ -158,14 +158,14 @@ mod tests { serde_yaml::from_str(&config).unwrap() } - fn start_standalone_swbusd(swbus_cfg: SwbusConfig) -> oneshot::Sender<()> { + fn start_standalone_swbusd(swbus_cfg: SwbusConfig) -> CancellationToken { // start a standalone swbusd - let mut service_host = SwbusServiceHost::new(&swbus_cfg.endpoint); - let shutdown_handle = service_host.take_shutdown_sender().unwrap(); + let service_host = SwbusServiceHost::new(swbus_cfg.endpoints.clone()); + let shutdown_ct = service_host.get_shutdown_token(); tokio::spawn(async move { let _ = service_host.start(swbus_cfg).await; }); - shutdown_handle + shutdown_ct } async fn wait_runtime_until<'a, F, Fut>( @@ -185,7 +185,7 @@ mod tests { return Ok(()); } if start.elapsed() > Duration::from_secs(timeout as u64) { - return Err("swbusd is not connected".to_string()); + return Err("timed out waiting for the condition to match".to_string()); } time::sleep(Duration::from_millis(100)).await; } @@ -211,7 +211,7 @@ mod tests { sp.service_type = "swbus-edge".to_string(); sp.service_id = "test".to_string(); let mut runtime = SwbusEdgeRuntime::new( - format!("http://{}", swbus_config.endpoint), + format!("http://{}", swbus_config.endpoints.first().unwrap()), sp.clone(), ConnectionType::InNode, ); @@ -225,17 +225,17 @@ mod tests { .expect("swbusd is not connected"); // shut swbusd and wait until connection is lost - shut_hdl.send(()).expect("Failed to send shutdown signal"); + shut_hdl.cancel(); wait_runtime_until(runtime.clone(), |x| async move { !x.swbusd_connected().await }, 10) .await - .expect("swbusd is still connected"); + .expect("swbusd connection did not drop after shutdown"); // restart swbusd and wait until reconnected let shut_hdl = start_standalone_swbusd(swbus_config.clone()); wait_runtime_until(runtime.clone(), |x| async move { x.swbusd_connected().await }, 10) .await .expect("swbusd is not reconnected"); - shut_hdl.send(()).expect("Failed to send shutdown signal"); + shut_hdl.cancel(); } #[tokio::test] @@ -249,8 +249,11 @@ mod tests { sp.service_type = "swbus-edge".to_string(); sp.service_id = "test".to_string(); - let mut runtime = - SwbusEdgeRuntime::new(format!("http://{}", swbus_config.endpoint), sp, ConnectionType::InNode); + let mut runtime = SwbusEdgeRuntime::new( + format!("http://{}", swbus_config.endpoints.first().unwrap()), + sp, + ConnectionType::InNode, + ); runtime.start().await.unwrap(); let base_sp = swbus_config.routes[0].key.to_swbusd_service_path().to_longest_path(); diff --git a/crates/swbusd/src/main.rs b/crates/swbusd/src/main.rs index 2eeb06b4..055a1dee 100644 --- a/crates/swbusd/src/main.rs +++ b/crates/swbusd/src/main.rs @@ -33,6 +33,6 @@ async fn main() { } }; - let server = SwbusServiceHost::new(&swbusd_config.endpoint); + let server = SwbusServiceHost::new(swbusd_config.endpoints.clone()); server.start(swbusd_config).await.unwrap(); }