Skip to content

Commit c42d5b6

Browse files
apollo_network: applying backpressure to swarm broadcasting
1 parent 36d2167 commit c42d5b6

File tree

3 files changed

+110
-27
lines changed

3 files changed

+110
-27
lines changed

crates/apollo_network/src/network_manager/mod.rs

Lines changed: 90 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,13 @@ use futures::future::{ready, BoxFuture, Ready};
1919
use futures::sink::With;
2020
use futures::stream::{FuturesUnordered, Map, Stream};
2121
use futures::{pin_mut, FutureExt, Sink, SinkExt, StreamExt};
22-
use tokio::time::{sleep_until, Instant};
23-
use libp2p::gossipsub::{SubscriptionError, TopicHash};
22+
use libp2p::gossipsub::{MessageId, PublishError, SubscriptionError, TopicHash};
2423
use libp2p::identity::Keypair;
2524
use libp2p::swarm::SwarmEvent;
2625
use libp2p::{noise, yamux, Multiaddr, PeerId, StreamProtocol, Swarm, SwarmBuilder};
2726
use metrics::NetworkMetrics;
27+
use tokio::time::{sleep_until, Instant};
28+
use tokio_retry::strategy::ExponentialBackoff;
2829
use tracing::{debug, error, trace, warn};
2930

3031
use self::swarm_trait::SwarmTrait;
@@ -43,6 +44,20 @@ pub enum NetworkError {
4344
#[error("Channels for broadcast topic with hash {topic_hash:?} were dropped.")]
4445
BroadcastChannelsDropped { topic_hash: TopicHash },
4546
}
47+
48+
struct BroadcastDetails {
49+
/// Instant of next broadcast
50+
time: Instant,
51+
/// The number of broadcast tries preformed
52+
count: u64,
53+
/// The message to broadcast
54+
message: Bytes,
55+
/// The topic to broadcast on
56+
topic: TopicHash,
57+
/// exponential backoff strategy for broadcasting the next message.
58+
broadcast_retry_strategy: ExponentialBackoff,
59+
}
60+
4661
pub struct GenericNetworkManager<SwarmT: SwarmTrait> {
4762
swarm: SwarmT,
4863
inbound_protocol_to_buffer_size: HashMap<StreamProtocol, usize>,
@@ -64,29 +79,33 @@ pub struct GenericNetworkManager<SwarmT: SwarmTrait> {
6479
continue_propagation_receiver: Receiver<BroadcastedMessageMetadata>,
6580
metrics: Option<NetworkMetrics>,
6681
next_metrics_update: Instant,
82+
/// Next message to broadcast
83+
next_broadcast: Option<BroadcastDetails>,
6784
}
6885

6986
impl<SwarmT: SwarmTrait> GenericNetworkManager<SwarmT> {
7087
pub async fn run(mut self) -> Result<(), NetworkError> {
7188
if let Some(metrics) = self.metrics.as_ref() {
7289
metrics.register();
7390
}
91+
7492
loop {
93+
let should_broadcast = self.next_broadcast.is_some();
94+
let broadcast_time =
95+
self.next_broadcast.as_ref().map(|x| x.time).unwrap_or(Instant::now());
7596
tokio::select! {
7697
Some(event) = self.swarm.next() => self.handle_swarm_event(event)?,
7798
Some(res) = self.sqmr_inbound_response_receivers.next() => self.handle_response_for_inbound_query(res),
7899
Some((protocol, client_payload)) = self.sqmr_outbound_payload_receivers.next() => {
79100
let protocol = StreamProtocol::try_from_owned(protocol).expect("Invalid protocol should not appear");
80101
self.handle_local_sqmr_payload(protocol, client_payload.expect("An SQMR client channel should not be terminated."))
81102
}
82-
Some((topic_hash, message)) = self.messages_to_broadcast_receivers.next() => {
83-
self.broadcast_message(
84-
message.ok_or(NetworkError::BroadcastChannelsDropped {
85-
topic_hash: topic_hash.clone()
86-
})?,
87-
topic_hash,
88-
);
89-
}
103+
Some((topic_hash, message)) = self.messages_to_broadcast_receivers.next(), if !should_broadcast => {
104+
self.setup_broadcast(topic_hash, message)?;
105+
},
106+
_ = sleep_until(broadcast_time), if should_broadcast => {
107+
self.do_broadcast();
108+
},
90109
Some(Some(peer_id)) = self.reported_peer_receivers.next() => self.swarm.report_peer_as_malicious(peer_id, MisconductScore::MALICIOUS),
91110
Some(peer_id) = self.reported_peers_receiver.next() => self.swarm.report_peer_as_malicious(peer_id, MisconductScore::MALICIOUS),
92111
Some(broadcasted_message_metadata) = self.continue_propagation_receiver.next() => {
@@ -138,6 +157,7 @@ impl<SwarmT: SwarmTrait> GenericNetworkManager<SwarmT> {
138157
continue_propagation_receiver,
139158
metrics,
140159
next_metrics_update: Instant::now() + Duration::from_secs(1),
160+
next_broadcast: None,
141161
}
142162
}
143163

@@ -273,6 +293,60 @@ impl<SwarmT: SwarmTrait> GenericNetworkManager<SwarmT> {
273293
})
274294
}
275295

296+
fn setup_broadcast(
297+
&mut self,
298+
topic_hash: TopicHash,
299+
message: Option<Bytes>,
300+
) -> Result<(), NetworkError> {
301+
let message = message
302+
.ok_or(NetworkError::BroadcastChannelsDropped { topic_hash: topic_hash.clone() })?;
303+
self.next_broadcast = Some(BroadcastDetails {
304+
time: Instant::now(),
305+
count: 0,
306+
message,
307+
topic: topic_hash,
308+
broadcast_retry_strategy: ExponentialBackoff::from_millis(2)
309+
.max_delay(Duration::from_secs(1)),
310+
});
311+
Ok(())
312+
}
313+
314+
fn do_broadcast(&mut self) {
315+
let mut details =
316+
self.next_broadcast.take().expect("Broadcasting when next broadcast is None");
317+
details.count += 1;
318+
match self.broadcast_message(details.message.clone(), details.topic.clone()) {
319+
Ok(_) => {}
320+
Err(e) => match &e {
321+
PublishError::Duplicate
322+
| PublishError::SigningError(_)
323+
| PublishError::MessageTooLarge => {
324+
error!(
325+
"Failed to broadcast message: `{e:?}` after {} tries Dropping message.",
326+
details.count
327+
);
328+
}
329+
PublishError::AllQueuesFull(_)
330+
| PublishError::NoPeersSubscribedToTopic
331+
| PublishError::TransformFailed(_) => {
332+
let wait_duration = details.broadcast_retry_strategy.next().expect(
333+
"Broadcast retry strategy ended even though it's an infinite iterator.",
334+
);
335+
warn!(
336+
"Failed to broadcast message: `{e:?}` after {} tries. Trying again in {} \
337+
milliseconds. Not reading more messages until then (Applying \
338+
backpressure).",
339+
details.count,
340+
wait_duration.as_millis()
341+
);
342+
343+
details.time = Instant::now() + wait_duration;
344+
self.next_broadcast = Some(details)
345+
}
346+
},
347+
}
348+
}
349+
276350
fn handle_swarm_event(
277351
&mut self,
278352
event: SwarmEvent<mixed_behaviour::Event>,
@@ -660,7 +734,11 @@ impl<SwarmT: SwarmTrait> GenericNetworkManager<SwarmT> {
660734
.insert(outbound_session_id, report_receiver);
661735
}
662736

663-
fn broadcast_message(&mut self, message: Bytes, topic_hash: TopicHash) {
737+
fn broadcast_message(
738+
&mut self,
739+
message: Bytes,
740+
topic_hash: TopicHash,
741+
) -> Result<MessageId, PublishError> {
664742
if let Some(broadcast_metrics_by_topic) =
665743
self.metrics.as_ref().and_then(|metrics| metrics.broadcast_metrics_by_topic.as_ref())
666744
{
@@ -672,7 +750,7 @@ impl<SwarmT: SwarmTrait> GenericNetworkManager<SwarmT> {
672750
}
673751
}
674752
trace!("Sending broadcast message with topic hash: {topic_hash:?}");
675-
self.swarm.broadcast_message(message, topic_hash);
753+
self.swarm.broadcast_message(message, topic_hash)
676754
}
677755

678756
fn report_session_removed_to_metrics(&mut self, session_id: SessionId) {

crates/apollo_network/src/network_manager/swarm_trait.rs

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
use futures::stream::Stream;
2-
use libp2p::gossipsub::{SubscriptionError, TopicHash};
2+
use libp2p::gossipsub::{MessageId, PublishError, SubscriptionError, TopicHash};
33
use libp2p::swarm::dial_opts::DialOpts;
44
use libp2p::swarm::{DialError, NetworkBehaviour, SwarmEvent};
55
use libp2p::{Multiaddr, PeerId, StreamProtocol, Swarm};
6-
use tracing::{info, warn};
6+
use tracing::info;
77

88
use super::BroadcastedMessageMetadata;
99
use crate::gossipsub_impl::Topic;
@@ -42,7 +42,11 @@ pub trait SwarmTrait: Stream<Item = Event> + Unpin {
4242

4343
fn subscribe_to_topic(&mut self, topic: &Topic) -> Result<(), SubscriptionError>;
4444

45-
fn broadcast_message(&mut self, message: Bytes, topic_hash: TopicHash);
45+
fn broadcast_message(
46+
&mut self,
47+
message: Bytes,
48+
topic_hash: TopicHash,
49+
) -> Result<MessageId, PublishError>;
4650

4751
fn report_peer_as_malicious(&mut self, peer_id: PeerId, misconduct_score: MisconductScore);
4852

@@ -100,16 +104,12 @@ impl SwarmTrait for Swarm<mixed_behaviour::MixedBehaviour> {
100104
self.behaviour_mut().gossipsub.subscribe(topic).map(|_| ())
101105
}
102106

103-
fn broadcast_message(&mut self, message: Bytes, topic_hash: TopicHash) {
104-
let result = self.behaviour_mut().gossipsub.publish(topic_hash.clone(), message);
105-
if let Err(err) = result {
106-
// TODO(shahak): Consider reporting to the subscriber broadcast failures or retrying
107-
// upon failure.
108-
warn!(
109-
"Error occured while broadcasting a message to the topic with hash \
110-
{topic_hash:?}: {err:?}"
111-
);
112-
}
107+
fn broadcast_message(
108+
&mut self,
109+
message: Bytes,
110+
topic_hash: TopicHash,
111+
) -> Result<MessageId, PublishError> {
112+
self.behaviour_mut().gossipsub.publish(topic_hash.clone(), message)
113113
}
114114

115115
fn report_peer_as_malicious(&mut self, peer_id: PeerId, misconduct_score: MisconductScore) {

crates/apollo_network/src/network_manager/test.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ use futures::{pin_mut, Future, SinkExt, StreamExt};
1515
use lazy_static::lazy_static;
1616
use libp2p::core::transport::PortUse;
1717
use libp2p::core::ConnectedPoint;
18-
use libp2p::gossipsub::{SubscriptionError, TopicHash};
18+
use libp2p::gossipsub::{MessageId, PublishError, SubscriptionError, TopicHash};
1919
use libp2p::swarm::ConnectionId;
2020
use libp2p::{Multiaddr, PeerId, StreamProtocol};
2121
use tokio::select;
@@ -175,10 +175,15 @@ impl SwarmTrait for MockSwarm {
175175
Ok(())
176176
}
177177

178-
fn broadcast_message(&mut self, message: Bytes, topic_hash: TopicHash) {
178+
fn broadcast_message(
179+
&mut self,
180+
message: Bytes,
181+
topic_hash: TopicHash,
182+
) -> Result<MessageId, PublishError> {
179183
for sender in &self.broadcasted_messages_senders {
180184
sender.unbounded_send((message.clone(), topic_hash.clone())).unwrap();
181185
}
186+
Ok(MessageId::new(&message))
182187
}
183188

184189
fn report_peer_as_malicious(&mut self, peer_id: PeerId, _: MisconductScore) {

0 commit comments

Comments
 (0)