Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use apollo_network_benchmark::node_args::NetworkProtocol;
use futures::StreamExt;
use libp2p::gossipsub::{Sha256Topic, Topic};
use libp2p::PeerId;
use tracing::error;
use tracing::{error, info, trace};

// ================================
// Types and Constants
Expand Down Expand Up @@ -77,6 +77,48 @@ pub fn register_protocol_channels(
pub enum MessageSender {
Gossipsub(BroadcastTopicClient<TopicType>),
Sqmr(SqmrClientSender<TopicType, TopicType>),
ReveresedSqmr(ReveresedSqmrSender),
}

/// Wrapper for ReveresedSqmr that maintains the last active query
pub struct ReveresedSqmrSender {
server: SqmrServerReceiver<TopicType, TopicType>,
active_query: Option<apollo_network::network_manager::ServerQueryManager<TopicType, TopicType>>,
}

impl ReveresedSqmrSender {
pub fn new(server: SqmrServerReceiver<TopicType, TopicType>) -> Self {
Self { server, active_query: None }
}

async fn collect_new_queries(&mut self) {
// Non-blocking check for new queries, keeping only the last one
while let Ok(query) =
tokio::time::timeout(tokio::time::Duration::from_millis(1), self.server.next()).await
{
if let Some(query) = query {
info!("ReveresedSqmr: Received new query, replacing previous query");
self.active_query = Some(query);
} else {
break;
}
}
}

async fn broadcast_to_queries(&mut self, message: TopicType) {
if let Some(query) = &mut self.active_query {
match query.send_response(message).await {
Ok(()) => {
trace!("ReveresedSqmr: Sent response to active query");
}
Err(e) => {
// Query failed, remove it
error!("ReveresedSqmr: Active query failed, removing it, error: {:?}", e);
self.active_query = None;
}
}
}
}
}

impl MessageSender {
Expand All @@ -95,6 +137,12 @@ impl MessageSender {
error!("Failed to send SQMR query: {:?}", e);
}
},
MessageSender::ReveresedSqmr(sender) => {
// Collect any new queries first
sender.collect_new_queries().await;
// Then broadcast the message to all active queries
sender.broadcast_to_queries(message).await;
}
}
}
}
Expand Down
Loading