Skip to content

Commit a354cd2

Browse files
apollo_network_benchmark: add ReveresedSqmrSender struct and implementation
1 parent 55e1b2d commit a354cd2

File tree

1 file changed

+49
-1
lines changed
  • crates/apollo_network_benchmark/src/bin/broadcast_network_stress_test_node

1 file changed

+49
-1
lines changed

crates/apollo_network_benchmark/src/bin/broadcast_network_stress_test_node/protocol.rs

Lines changed: 49 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ use apollo_network_benchmark::node_args::NetworkProtocol;
1414
use futures::StreamExt;
1515
use libp2p::gossipsub::{Sha256Topic, Topic};
1616
use libp2p::PeerId;
17-
use tracing::error;
17+
use tracing::{error, info, trace};
1818

1919
// ================================
2020
// Types and Constants
@@ -77,6 +77,48 @@ pub fn register_protocol_channels(
7777
pub enum MessageSender {
7878
Gossipsub(BroadcastTopicClient<TopicType>),
7979
Sqmr(SqmrClientSender<TopicType, TopicType>),
80+
ReveresedSqmr(ReveresedSqmrSender),
81+
}
82+
83+
/// Wrapper for ReveresedSqmr that maintains the last active query
84+
pub struct ReveresedSqmrSender {
85+
server: SqmrServerReceiver<TopicType, TopicType>,
86+
active_query: Option<apollo_network::network_manager::ServerQueryManager<TopicType, TopicType>>,
87+
}
88+
89+
impl ReveresedSqmrSender {
90+
pub fn new(server: SqmrServerReceiver<TopicType, TopicType>) -> Self {
91+
Self { server, active_query: None }
92+
}
93+
94+
async fn collect_new_queries(&mut self) {
95+
// Non-blocking check for new queries, keeping only the last one
96+
while let Ok(query) =
97+
tokio::time::timeout(tokio::time::Duration::from_millis(1), self.server.next()).await
98+
{
99+
if let Some(query) = query {
100+
info!("ReveresedSqmr: Received new query, replacing previous query");
101+
self.active_query = Some(query);
102+
} else {
103+
break;
104+
}
105+
}
106+
}
107+
108+
async fn broadcast_to_queries(&mut self, message: TopicType) {
109+
if let Some(query) = &mut self.active_query {
110+
match query.send_response(message).await {
111+
Ok(()) => {
112+
trace!("ReveresedSqmr: Sent response to active query");
113+
}
114+
Err(e) => {
115+
// Query failed, remove it
116+
error!("ReveresedSqmr: Active query failed, removing it, error: {:?}", e);
117+
self.active_query = None;
118+
}
119+
}
120+
}
121+
}
80122
}
81123

82124
impl MessageSender {
@@ -95,6 +137,12 @@ impl MessageSender {
95137
error!("Failed to send SQMR query: {:?}", e);
96138
}
97139
},
140+
MessageSender::ReveresedSqmr(sender) => {
141+
// Collect any new queries first
142+
sender.collect_new_queries().await;
143+
// Then broadcast the message to all active queries
144+
sender.broadcast_to_queries(message).await;
145+
}
98146
}
99147
}
100148
}

0 commit comments

Comments
 (0)