Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -90,19 +90,22 @@ impl MessageSender {
client.broadcast_message(message).await.unwrap();
}
MessageSender::Sqmr(client) => {
// Send query and properly handle the response manager to avoid session warnings
match client.send_new_query(message).await {
Ok(mut response_manager) => {
// Consume the response manager to properly close the session
// This prevents the "finished with no messages" warning
tokio::spawn(async move {
while let Some(_response) = response_manager.next().await {
// Process any responses if they come, but don't block the sender
}
});
}
Err(e) => {
error!("Failed to send SQMR query: {:?}", e);
// Send query to all specified peers
for peer_id in _peers {
match client.send_new_query_to_peer(message.clone(), *peer_id).await {
Ok(mut response_manager) => {
// Consume the response manager to properly close the session
// This prevents the "finished with no messages" warning
tokio::spawn(async move {
while let Some(_response) = response_manager.next().await {
// Process any responses if they come, but don't block the
// sender
}
});
}
Err(e) => {
error!("Failed to send SQMR query to peer {:?}: {:?}", peer_id, e);
}
}
}
}
Expand Down
27 changes: 24 additions & 3 deletions crates/apollo_network/src/network_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -706,8 +706,10 @@ impl<SwarmT: SwarmTrait> GenericNetworkManager<SwarmT> {
protocol: StreamProtocol,
client_payload: SqmrClientPayload,
) {
let SqmrClientPayload { query, report_receiver, responses_sender } = client_payload;
let outbound_session_id = self.swarm.send_query(query, protocol.clone());
let SqmrClientPayload { query, report_receiver, responses_sender, peer_id } =
client_payload;
// Use the peer_id from the payload (None for round-robin, Some for specific peer)
let outbound_session_id = self.swarm.send_query(query, protocol.clone(), peer_id);
if let Some(sqmr_metrics) =
self.metrics.as_ref().and_then(|metrics| metrics.sqmr_metrics.as_ref())
{
Expand Down Expand Up @@ -945,7 +947,25 @@ where
let query = Bytes::from(query);
let responses_sender =
Box::new(responses_sender.with(|response| ready(Ok(Response::try_from(response)))));
let payload = SqmrClientPayload { query, report_receiver, responses_sender };
let payload = SqmrClientPayload { query, report_receiver, responses_sender, peer_id: None };
self.sender.send(payload).await?;
Ok(ClientResponsesManager { report_sender, responses_receiver })
}

pub async fn send_new_query_to_peer(
&mut self,
query: Query,
peer_id: PeerId,
) -> Result<ClientResponsesManager<Response>, SendError> {
let (report_sender, report_receiver) = oneshot::channel::<()>();
let (responses_sender, responses_receiver) =
futures::channel::mpsc::channel(self.buffer_size);
let responses_receiver = Box::new(responses_receiver);
let query = Bytes::from(query);
let responses_sender =
Box::new(responses_sender.with(|response| ready(Ok(Response::try_from(response)))));
let payload =
SqmrClientPayload { query, report_receiver, responses_sender, peer_id: Some(peer_id) };
self.sender.send(payload).await?;
Ok(ClientResponsesManager { report_sender, responses_receiver })
}
Expand Down Expand Up @@ -980,6 +1000,7 @@ pub struct SqmrClientPayload {
query: Bytes,
report_receiver: ReportReceiver,
responses_sender: ResponsesSender,
peer_id: Option<PeerId>,
}

pub struct SqmrServerReceiver<Query, Response>
Expand Down
16 changes: 13 additions & 3 deletions crates/apollo_network/src/network_manager/swarm_trait.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,12 @@ pub trait SwarmTrait: Stream<Item = Event> + Unpin {
inbound_session_id: InboundSessionId,
) -> Result<(), SessionIdNotFoundError>;

fn send_query(&mut self, query: Vec<u8>, protocol: StreamProtocol) -> OutboundSessionId;
fn send_query(
&mut self,
query: Vec<u8>,
protocol: StreamProtocol,
peer_id: Option<PeerId>,
) -> OutboundSessionId;

fn dial(&mut self, peer_multiaddr: Multiaddr) -> Result<(), DialError>;

Expand Down Expand Up @@ -74,8 +79,13 @@ impl SwarmTrait for Swarm<mixed_behaviour::MixedBehaviour> {
self.behaviour_mut().sqmr.send_response(response, inbound_session_id)
}

fn send_query(&mut self, query: Vec<u8>, protocol: StreamProtocol) -> OutboundSessionId {
self.behaviour_mut().sqmr.start_query(query, protocol)
fn send_query(
&mut self,
query: Vec<u8>,
protocol: StreamProtocol,
peer_id: Option<PeerId>,
) -> OutboundSessionId {
self.behaviour_mut().sqmr.start_query(query, protocol, peer_id)
}

fn dial(&mut self, peer_multiaddr: Multiaddr) -> Result<(), DialError> {
Expand Down
7 changes: 6 additions & 1 deletion crates/apollo_network/src/network_manager/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,12 @@ impl SwarmTrait for MockSwarm {
Ok(())
}

fn send_query(&mut self, query: Vec<u8>, _protocol: StreamProtocol) -> OutboundSessionId {
fn send_query(
&mut self,
query: Vec<u8>,
_protocol: StreamProtocol,
_peer_id: Option<PeerId>,
) -> OutboundSessionId {
let outbound_session_id = OutboundSessionId { value: self.next_outbound_session_id };
self.create_response_events_for_query_each_num_becomes_response(
query,
Expand Down
2 changes: 1 addition & 1 deletion crates/apollo_network/src/network_manager/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ where
Bytes: From<Response>,
{
fn from(payload: SqmrClientPayload) -> Self {
let SqmrClientPayload { query, report_receiver, responses_sender } = payload;
let SqmrClientPayload { query, report_receiver, responses_sender, peer_id: _ } = payload;
let query = Query::try_from(query);
let responses_sender =
Box::new(responses_sender.with(|response: Response| ready(Ok(Bytes::from(response)))));
Expand Down
51 changes: 41 additions & 10 deletions crates/apollo_network/src/sqmr/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,24 +129,55 @@ impl Behaviour {
}
}

/// Assign some peer and start a query. Return the id of the new session.
/// Start a query and return the id of the new session.
///
/// If `peer_id` is `Some`, the query will be sent to that specific peer using any available
/// connection. If `peer_id` is `None`, a peer will be assigned via the peer manager (typically
/// using round-robin selection).
pub fn start_query(
&mut self,
query: Bytes,
protocol_name: StreamProtocol,
peer_id: Option<PeerId>,
) -> OutboundSessionId {
let outbound_session_id = self.next_outbound_session_id;
self.next_outbound_session_id.value += 1;

self.outbound_sessions_pending_peer_assignment
.insert(outbound_session_id, (query, protocol_name));
debug!(
"Network received new outbound query. Requesting peer assignment for {:?}.",
outbound_session_id
);
self.add_event_to_queue(ToSwarm::GenerateEvent(Event::ToOtherBehaviourEvent(
ToOtherBehaviourEvent::RequestPeerAssignment { outbound_session_id },
)));
if let Some(peer_id) = peer_id {
// Peer specified directly - send the query immediately without peer assignment
debug!(
"Network received new outbound query for peer {:?}. Sending directly. {:?}",
peer_id, outbound_session_id
);

// We don't have a specific connection ID, so we use NotifyHandler::Any to send to
// any connection with this peer
self.add_event_to_queue(ToSwarm::NotifyHandler {
peer_id,
handler: NotifyHandler::Any,
event: RequestFromBehaviourEvent::CreateOutboundSession {
query,
outbound_session_id,
protocol_name,
},
});

// Store the mapping without a specific connection_id (using a dummy value)
// The actual connection will be determined by NotifyHandler::Any
self.session_id_to_peer_id_and_connection_id
.insert(outbound_session_id.into(), (peer_id, ConnectionId::new_unchecked(0)));
} else {
// No peer specified - use peer assignment (round-robin)
self.outbound_sessions_pending_peer_assignment
.insert(outbound_session_id, (query, protocol_name));
debug!(
"Network received new outbound query. Requesting peer assignment for {:?}.",
outbound_session_id
);
self.add_event_to_queue(ToSwarm::GenerateEvent(Event::ToOtherBehaviourEvent(
ToOtherBehaviourEvent::RequestPeerAssignment { outbound_session_id },
)));
}

outbound_session_id
}
Expand Down
52 changes: 48 additions & 4 deletions crates/apollo_network/src/sqmr/behaviour_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,14 @@ use futures::{FutureExt, Stream, StreamExt};
use lazy_static::lazy_static;
use libp2p::core::transport::PortUse;
use libp2p::core::{ConnectedPoint, Endpoint};
use libp2p::swarm::{ConnectionClosed, ConnectionId, FromSwarm, NetworkBehaviour, ToSwarm};
use libp2p::swarm::{
ConnectionClosed,
ConnectionId,
FromSwarm,
NetworkBehaviour,
NotifyHandler,
ToSwarm,
};
use libp2p::{Multiaddr, PeerId, StreamProtocol};

use super::super::handler::{RequestFromBehaviourEvent, RequestToBehaviourEvent};
Expand Down Expand Up @@ -319,7 +326,7 @@ async fn create_and_process_outbound_session() {

let peer_id = *DUMMY_PEER_ID;

let outbound_session_id = behaviour.start_query(QUERY.clone(), PROTOCOL_NAME.clone());
let outbound_session_id = behaviour.start_query(QUERY.clone(), PROTOCOL_NAME.clone(), None);

validate_request_peer_assignment_event(&mut behaviour, outbound_session_id).await;
validate_no_events(&mut behaviour);
Expand Down Expand Up @@ -354,7 +361,7 @@ async fn connection_closed() {
let peer_id = *DUMMY_PEER_ID;

// Add an outbound session on the connection.
let outbound_session_id = behaviour.start_query(QUERY.clone(), PROTOCOL_NAME.clone());
let outbound_session_id = behaviour.start_query(QUERY.clone(), PROTOCOL_NAME.clone(), None);
// Consume the event to request peer assignment.
behaviour.next().await.unwrap();
simulate_peer_assigned(&mut behaviour, peer_id, outbound_session_id);
Expand Down Expand Up @@ -400,7 +407,7 @@ async fn drop_outbound_session() {

let peer_id = *DUMMY_PEER_ID;

let outbound_session_id = behaviour.start_query(QUERY.clone(), PROTOCOL_NAME.clone());
let outbound_session_id = behaviour.start_query(QUERY.clone(), PROTOCOL_NAME.clone(), None);
// Consume the event to request peer assignment.
behaviour.next().await.unwrap();
simulate_peer_assigned(&mut behaviour, peer_id, outbound_session_id);
Expand Down Expand Up @@ -464,3 +471,40 @@ fn send_response_non_existing_session_fails() {
behaviour.send_response(response, InboundSessionId::default()).unwrap_err();
}
}

#[tokio::test]
async fn create_outbound_session_with_specified_peer() {
let mut behaviour = Behaviour::new(Config::get_test_config());

let peer_id = *DUMMY_PEER_ID;

// Start a query with a specific peer ID
let outbound_session_id =
behaviour.start_query(QUERY.clone(), PROTOCOL_NAME.clone(), Some(peer_id));

// Should not generate a RequestPeerAssignment event since peer is specified
// Instead, should directly generate a NotifyHandler event
let event = behaviour.next().await.unwrap();
assert_matches!(
event,
ToSwarm::NotifyHandler {
peer_id: event_peer_id,
handler: NotifyHandler::Any,
event: RequestFromBehaviourEvent::CreateOutboundSession {
query: event_query,
outbound_session_id: event_outbound_session_id,
protocol_name,
},
} if event_peer_id == peer_id
&& event_outbound_session_id == outbound_session_id
&& event_query == QUERY.clone()
&& protocol_name == PROTOCOL_NAME.clone()
);
validate_no_events(&mut behaviour);

// Verify the session is tracked
let (tracked_peer_id, _connection_id) = behaviour
.get_peer_id_and_connection_id_from_session_id(outbound_session_id.into())
.unwrap();
assert_eq!(tracked_peer_id, peer_id);
}
1 change: 1 addition & 0 deletions crates/apollo_network/src/sqmr/flow_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ fn start_query_and_update_map(
let outbound_session_id = outbound_swarm.behaviour_mut().start_query(
get_bytes_from_query_indices(outbound_peer_id, inbound_peer_id),
PROTOCOL_NAME,
None,
);
outbound_session_id_to_peer_id.insert((outbound_peer_id, outbound_session_id), inbound_peer_id);
}
Expand Down
Loading