diff --git a/crates/apollo_network_benchmark/src/bin/broadcast_network_stress_test_node/protocol.rs b/crates/apollo_network_benchmark/src/bin/broadcast_network_stress_test_node/protocol.rs index 768bda0334e..d86192c3749 100644 --- a/crates/apollo_network_benchmark/src/bin/broadcast_network_stress_test_node/protocol.rs +++ b/crates/apollo_network_benchmark/src/bin/broadcast_network_stress_test_node/protocol.rs @@ -8,6 +8,7 @@ use apollo_network::network_manager::{ BroadcastTopicServer, NetworkManager, SqmrClientSender, + SqmrServerReceiver, }; use apollo_network_benchmark::node_args::NetworkProtocol; use futures::StreamExt; @@ -48,8 +49,18 @@ pub fn register_protocol_channels( ) } NetworkProtocol::Sqmr => { - // TODO(AndrewL): Implement SQMR protocol registration - todo!("SQMR protocol registration not yet implemented") + let sqmr_client = network_manager + .register_sqmr_protocol_client::( + SQMR_PROTOCOL_NAME.to_string(), + buffer_size, + ); + let sqmr_server = network_manager + .register_sqmr_protocol_server::( + SQMR_PROTOCOL_NAME.to_string(), + buffer_size, + ); + + (MessageSender::Sqmr(sqmr_client), MessageReceiver::Sqmr(sqmr_server)) } } } @@ -90,6 +101,7 @@ impl MessageSender { pub enum MessageReceiver { Gossipsub(BroadcastTopicServer), + Sqmr(SqmrServerReceiver), } impl MessageReceiver { @@ -109,6 +121,13 @@ impl MessageReceiver { }) .await } + MessageReceiver::Sqmr(receiver) => { + receiver + .for_each(|x| async move { + f(x.query().as_ref().unwrap().to_vec(), None); + }) + .await + } } } }