Skip to content

Commit 466460e

Browse files
apollo_network: applying backpressure to swarm broadcasting
1 parent 2c32041 commit 466460e

File tree

3 files changed

+43
-25
lines changed

3 files changed

+43
-25
lines changed

crates/apollo_network/src/network_manager/mod.rs

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ use futures::future::{ready, BoxFuture, Ready};
1818
use futures::sink::With;
1919
use futures::stream::{FuturesUnordered, Map, Stream};
2020
use futures::{pin_mut, FutureExt, Sink, SinkExt, StreamExt};
21-
use libp2p::gossipsub::{SubscriptionError, TopicHash};
21+
use libp2p::gossipsub::{MessageId, PublishError, SubscriptionError, TopicHash};
2222
use libp2p::identity::Keypair;
2323
use libp2p::swarm::SwarmEvent;
2424
use libp2p::{noise, yamux, Multiaddr, PeerId, StreamProtocol, Swarm, SwarmBuilder};
@@ -68,21 +68,30 @@ impl<SwarmT: SwarmTrait> GenericNetworkManager<SwarmT> {
6868
if let Some(metrics) = self.metrics.as_ref() {
6969
metrics.register();
7070
}
71+
72+
let mut message_to_send: Option<(Bytes, TopicHash)> = None;
7173
loop {
74+
if let Some((message, topic_hash)) = message_to_send.as_ref() {
75+
match self.broadcast_message(message.clone(), topic_hash.clone()) {
76+
Ok(_) => {
77+
message_to_send = None;
78+
}
79+
Err(e) => {
80+
warn!("Failed to broadcast message: `{e:?}` Applying Backpressure.");
81+
}
82+
};
83+
}
7284
tokio::select! {
7385
Some(event) = self.swarm.next() => self.handle_swarm_event(event)?,
7486
Some(res) = self.sqmr_inbound_response_receivers.next() => self.handle_response_for_inbound_query(res),
7587
Some((protocol, client_payload)) = self.sqmr_outbound_payload_receivers.next() => {
7688
let protocol = StreamProtocol::try_from_owned(protocol).expect("Invalid protocol should not appear");
7789
self.handle_local_sqmr_payload(protocol, client_payload.expect("An SQMR client channel should not be terminated."))
7890
}
79-
Some((topic_hash, message)) = self.messages_to_broadcast_receivers.next() => {
80-
self.broadcast_message(
81-
message.ok_or(NetworkError::BroadcastChannelsDropped {
82-
topic_hash: topic_hash.clone()
83-
})?,
84-
topic_hash,
85-
);
91+
Some((topic_hash, message)) = self.messages_to_broadcast_receivers.next(), if message_to_send.is_none() => {
92+
message_to_send = Some((message.ok_or(NetworkError::BroadcastChannelsDropped {
93+
topic_hash: topic_hash.clone(),
94+
})?, topic_hash));
8695
}
8796
Some(Some(peer_id)) = self.reported_peer_receivers.next() => self.swarm.report_peer_as_malicious(peer_id, MisconductScore::MALICIOUS),
8897
Some(peer_id) = self.reported_peers_receiver.next() => self.swarm.report_peer_as_malicious(peer_id, MisconductScore::MALICIOUS),
@@ -615,7 +624,11 @@ impl<SwarmT: SwarmTrait> GenericNetworkManager<SwarmT> {
615624
.insert(outbound_session_id, report_receiver);
616625
}
617626

618-
fn broadcast_message(&mut self, message: Bytes, topic_hash: TopicHash) {
627+
fn broadcast_message(
628+
&mut self,
629+
message: Bytes,
630+
topic_hash: TopicHash,
631+
) -> Result<MessageId, PublishError> {
619632
if let Some(broadcast_metrics_by_topic) =
620633
self.metrics.as_ref().and_then(|metrics| metrics.broadcast_metrics_by_topic.as_ref())
621634
{
@@ -627,7 +640,7 @@ impl<SwarmT: SwarmTrait> GenericNetworkManager<SwarmT> {
627640
}
628641
}
629642
trace!("Sending broadcast message with topic hash: {topic_hash:?}");
630-
self.swarm.broadcast_message(message, topic_hash);
643+
self.swarm.broadcast_message(message, topic_hash)
631644
}
632645

633646
fn report_session_removed_to_metrics(&mut self, session_id: SessionId) {

crates/apollo_network/src/network_manager/swarm_trait.rs

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
use futures::stream::Stream;
2-
use libp2p::gossipsub::{SubscriptionError, TopicHash};
2+
use libp2p::gossipsub::{MessageId, PublishError, SubscriptionError, TopicHash};
33
use libp2p::swarm::dial_opts::DialOpts;
44
use libp2p::swarm::{DialError, NetworkBehaviour, SwarmEvent};
55
use libp2p::{Multiaddr, PeerId, StreamProtocol, Swarm};
6-
use tracing::{info, warn};
6+
use tracing::info;
77

88
use super::BroadcastedMessageMetadata;
99
use crate::gossipsub_impl::Topic;
@@ -41,7 +41,11 @@ pub trait SwarmTrait: Stream<Item = Event> + Unpin {
4141

4242
fn subscribe_to_topic(&mut self, topic: &Topic) -> Result<(), SubscriptionError>;
4343

44-
fn broadcast_message(&mut self, message: Bytes, topic_hash: TopicHash);
44+
fn broadcast_message(
45+
&mut self,
46+
message: Bytes,
47+
topic_hash: TopicHash,
48+
) -> Result<MessageId, PublishError>;
4549

4650
fn report_peer_as_malicious(&mut self, peer_id: PeerId, misconduct_score: MisconductScore);
4751

@@ -97,16 +101,12 @@ impl SwarmTrait for Swarm<mixed_behaviour::MixedBehaviour> {
97101
self.behaviour_mut().gossipsub.subscribe(topic).map(|_| ())
98102
}
99103

100-
fn broadcast_message(&mut self, message: Bytes, topic_hash: TopicHash) {
101-
let result = self.behaviour_mut().gossipsub.publish(topic_hash.clone(), message);
102-
if let Err(err) = result {
103-
// TODO(shahak): Consider reporting to the subscriber broadcast failures or retrying
104-
// upon failure.
105-
warn!(
106-
"Error occured while broadcasting a message to the topic with hash \
107-
{topic_hash:?}: {err:?}"
108-
);
109-
}
104+
fn broadcast_message(
105+
&mut self,
106+
message: Bytes,
107+
topic_hash: TopicHash,
108+
) -> Result<MessageId, PublishError> {
109+
self.behaviour_mut().gossipsub.publish(topic_hash.clone(), message)
110110
}
111111

112112
fn report_peer_as_malicious(&mut self, peer_id: PeerId, misconduct_score: MisconductScore) {

crates/apollo_network/src/network_manager/test.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ use futures::{pin_mut, Future, SinkExt, StreamExt};
1515
use lazy_static::lazy_static;
1616
use libp2p::core::transport::PortUse;
1717
use libp2p::core::ConnectedPoint;
18-
use libp2p::gossipsub::{SubscriptionError, TopicHash};
18+
use libp2p::gossipsub::{MessageId, PublishError, SubscriptionError, TopicHash};
1919
use libp2p::swarm::ConnectionId;
2020
use libp2p::{Multiaddr, PeerId, StreamProtocol};
2121
use tokio::select;
@@ -175,10 +175,15 @@ impl SwarmTrait for MockSwarm {
175175
Ok(())
176176
}
177177

178-
fn broadcast_message(&mut self, message: Bytes, topic_hash: TopicHash) {
178+
fn broadcast_message(
179+
&mut self,
180+
message: Bytes,
181+
topic_hash: TopicHash,
182+
) -> Result<MessageId, PublishError> {
179183
for sender in &self.broadcasted_messages_senders {
180184
sender.unbounded_send((message.clone(), topic_hash.clone())).unwrap();
181185
}
186+
Ok(MessageId::new(&message))
182187
}
183188

184189
fn report_peer_as_malicious(&mut self, peer_id: PeerId, _: MisconductScore) {

0 commit comments

Comments
 (0)