Skip to content

Commit 55a8c13

Browse files
apollo_network: applying backpressure to swarm broadcasting
1 parent 1be21a6 commit 55a8c13

File tree

3 files changed

+111
-26
lines changed

3 files changed

+111
-26
lines changed

crates/apollo_network/src/network_manager/mod.rs

Lines changed: 91 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use std::collections::{BTreeMap, HashMap};
99
use std::net::Ipv4Addr;
1010
use std::pin::Pin;
1111
use std::task::{Context, Poll};
12+
use std::time::Duration;
1213

1314
use apollo_network_types::network_types::{BroadcastedMessageMetadata, OpaquePeerId};
1415
use async_trait::async_trait;
@@ -18,11 +19,13 @@ use futures::future::{ready, BoxFuture, Ready};
1819
use futures::sink::With;
1920
use futures::stream::{FuturesUnordered, Map, Stream};
2021
use futures::{pin_mut, FutureExt, Sink, SinkExt, StreamExt};
21-
use libp2p::gossipsub::{SubscriptionError, TopicHash};
22+
use libp2p::gossipsub::{MessageId, PublishError, SubscriptionError, TopicHash};
2223
use libp2p::identity::Keypair;
2324
use libp2p::swarm::SwarmEvent;
2425
use libp2p::{noise, yamux, Multiaddr, PeerId, StreamProtocol, Swarm, SwarmBuilder};
2526
use metrics::NetworkMetrics;
27+
use tokio::time::{sleep_until, Instant};
28+
use tokio_retry::strategy::ExponentialBackoff;
2629
use tracing::{debug, error, trace, warn};
2730

2831
use self::swarm_trait::SwarmTrait;
@@ -41,6 +44,20 @@ pub enum NetworkError {
4144
#[error("Channels for broadcast topic with hash {topic_hash:?} were dropped.")]
4245
BroadcastChannelsDropped { topic_hash: TopicHash },
4346
}
47+
48+
struct BroadcastDetails {
49+
/// Instant of next broadcast
50+
time: Instant,
51+
/// The number of broadcast tries preformed
52+
count: u64,
53+
/// The message to broadcast
54+
message: Bytes,
55+
/// The topic to broadcast on
56+
topic: TopicHash,
57+
/// exponential backoff strategy for broadcasting the next message.
58+
broadcast_retry_strategy: ExponentialBackoff,
59+
}
60+
4461
pub struct GenericNetworkManager<SwarmT: SwarmTrait> {
4562
swarm: SwarmT,
4663
inbound_protocol_to_buffer_size: HashMap<StreamProtocol, usize>,
@@ -61,29 +78,33 @@ pub struct GenericNetworkManager<SwarmT: SwarmTrait> {
6178
continue_propagation_sender: Sender<BroadcastedMessageMetadata>,
6279
continue_propagation_receiver: Receiver<BroadcastedMessageMetadata>,
6380
metrics: Option<NetworkMetrics>,
81+
/// Next message to broadcast
82+
next_broadcast: Option<BroadcastDetails>,
6483
}
6584

6685
impl<SwarmT: SwarmTrait> GenericNetworkManager<SwarmT> {
6786
pub async fn run(mut self) -> Result<(), NetworkError> {
6887
if let Some(metrics) = self.metrics.as_ref() {
6988
metrics.register();
7089
}
90+
7191
loop {
92+
let should_broadcast = self.next_broadcast.is_some();
93+
let broadcast_time =
94+
self.next_broadcast.as_ref().map(|x| x.time).unwrap_or(Instant::now());
7295
tokio::select! {
7396
Some(event) = self.swarm.next() => self.handle_swarm_event(event)?,
7497
Some(res) = self.sqmr_inbound_response_receivers.next() => self.handle_response_for_inbound_query(res),
7598
Some((protocol, client_payload)) = self.sqmr_outbound_payload_receivers.next() => {
7699
let protocol = StreamProtocol::try_from_owned(protocol).expect("Invalid protocol should not appear");
77100
self.handle_local_sqmr_payload(protocol, client_payload.expect("An SQMR client channel should not be terminated."))
78101
}
79-
Some((topic_hash, message)) = self.messages_to_broadcast_receivers.next() => {
80-
self.broadcast_message(
81-
message.ok_or(NetworkError::BroadcastChannelsDropped {
82-
topic_hash: topic_hash.clone()
83-
})?,
84-
topic_hash,
85-
);
86-
}
102+
Some((topic_hash, message)) = self.messages_to_broadcast_receivers.next(), if !should_broadcast => {
103+
self.setup_broadcast(topic_hash, message)?;
104+
},
105+
_ = sleep_until(broadcast_time), if should_broadcast => {
106+
self.do_broadcast();
107+
},
87108
Some(Some(peer_id)) = self.reported_peer_receivers.next() => self.swarm.report_peer_as_malicious(peer_id, MisconductScore::MALICIOUS),
88109
Some(peer_id) = self.reported_peers_receiver.next() => self.swarm.report_peer_as_malicious(peer_id, MisconductScore::MALICIOUS),
89110
Some(broadcasted_message_metadata) = self.continue_propagation_receiver.next() => {
@@ -128,6 +149,7 @@ impl<SwarmT: SwarmTrait> GenericNetworkManager<SwarmT> {
128149
continue_propagation_sender,
129150
continue_propagation_receiver,
130151
metrics,
152+
next_broadcast: None,
131153
}
132154
}
133155

@@ -263,6 +285,60 @@ impl<SwarmT: SwarmTrait> GenericNetworkManager<SwarmT> {
263285
})
264286
}
265287

288+
fn setup_broadcast(
289+
&mut self,
290+
topic_hash: TopicHash,
291+
message: Option<Bytes>,
292+
) -> Result<(), NetworkError> {
293+
let message = message
294+
.ok_or(NetworkError::BroadcastChannelsDropped { topic_hash: topic_hash.clone() })?;
295+
self.next_broadcast = Some(BroadcastDetails {
296+
time: Instant::now(),
297+
count: 0,
298+
message,
299+
topic: topic_hash,
300+
broadcast_retry_strategy: ExponentialBackoff::from_millis(2)
301+
.max_delay(Duration::from_secs(1)),
302+
});
303+
Ok(())
304+
}
305+
306+
fn do_broadcast(&mut self) {
307+
let mut details =
308+
self.next_broadcast.take().expect("Broadcasting when next broadcast is None");
309+
details.count += 1;
310+
match self.broadcast_message(details.message.clone(), details.topic.clone()) {
311+
Ok(_) => {}
312+
Err(e) => match &e {
313+
PublishError::Duplicate
314+
| PublishError::SigningError(_)
315+
| PublishError::MessageTooLarge => {
316+
error!(
317+
"Failed to broadcast message: `{e:?}` after {} tries Dropping message.",
318+
details.count
319+
);
320+
}
321+
PublishError::AllQueuesFull(_)
322+
| PublishError::NoPeersSubscribedToTopic
323+
| PublishError::TransformFailed(_) => {
324+
let wait_duration = details.broadcast_retry_strategy.next().expect(
325+
"Broadcast retry strategy ended even though it's an infinite iterator.",
326+
);
327+
warn!(
328+
"Failed to broadcast message: `{e:?}` after {} tries. Trying again in {} \
329+
milliseconds. Not reading more messages until then (Applying \
330+
backpressure).",
331+
details.count,
332+
wait_duration.as_millis()
333+
);
334+
335+
details.time = Instant::now() + wait_duration;
336+
self.next_broadcast = Some(details)
337+
}
338+
},
339+
}
340+
}
341+
266342
fn handle_swarm_event(
267343
&mut self,
268344
event: SwarmEvent<mixed_behaviour::Event>,
@@ -650,7 +726,11 @@ impl<SwarmT: SwarmTrait> GenericNetworkManager<SwarmT> {
650726
.insert(outbound_session_id, report_receiver);
651727
}
652728

653-
fn broadcast_message(&mut self, message: Bytes, topic_hash: TopicHash) {
729+
fn broadcast_message(
730+
&mut self,
731+
message: Bytes,
732+
topic_hash: TopicHash,
733+
) -> Result<MessageId, PublishError> {
654734
if let Some(broadcast_metrics_by_topic) =
655735
self.metrics.as_ref().and_then(|metrics| metrics.broadcast_metrics_by_topic.as_ref())
656736
{
@@ -662,7 +742,7 @@ impl<SwarmT: SwarmTrait> GenericNetworkManager<SwarmT> {
662742
}
663743
}
664744
trace!("Sending broadcast message with topic hash: {topic_hash:?}");
665-
self.swarm.broadcast_message(message, topic_hash);
745+
self.swarm.broadcast_message(message, topic_hash)
666746
}
667747

668748
fn report_session_removed_to_metrics(&mut self, session_id: SessionId) {

crates/apollo_network/src/network_manager/swarm_trait.rs

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
use futures::stream::Stream;
2-
use libp2p::gossipsub::{SubscriptionError, TopicHash};
2+
use libp2p::gossipsub::{MessageId, PublishError, SubscriptionError, TopicHash};
33
use libp2p::swarm::dial_opts::DialOpts;
44
use libp2p::swarm::{DialError, NetworkBehaviour, SwarmEvent};
55
use libp2p::{Multiaddr, PeerId, StreamProtocol, Swarm};
6-
use tracing::{info, warn};
6+
use tracing::info;
77

88
use super::BroadcastedMessageMetadata;
99
use crate::gossipsub_impl::Topic;
@@ -41,7 +41,11 @@ pub trait SwarmTrait: Stream<Item = Event> + Unpin {
4141

4242
fn subscribe_to_topic(&mut self, topic: &Topic) -> Result<(), SubscriptionError>;
4343

44-
fn broadcast_message(&mut self, message: Bytes, topic_hash: TopicHash);
44+
fn broadcast_message(
45+
&mut self,
46+
message: Bytes,
47+
topic_hash: TopicHash,
48+
) -> Result<MessageId, PublishError>;
4549

4650
fn report_peer_as_malicious(&mut self, peer_id: PeerId, misconduct_score: MisconductScore);
4751

@@ -97,16 +101,12 @@ impl SwarmTrait for Swarm<mixed_behaviour::MixedBehaviour> {
97101
self.behaviour_mut().gossipsub.subscribe(topic).map(|_| ())
98102
}
99103

100-
fn broadcast_message(&mut self, message: Bytes, topic_hash: TopicHash) {
101-
let result = self.behaviour_mut().gossipsub.publish(topic_hash.clone(), message);
102-
if let Err(err) = result {
103-
// TODO(shahak): Consider reporting to the subscriber broadcast failures or retrying
104-
// upon failure.
105-
warn!(
106-
"Error occured while broadcasting a message to the topic with hash \
107-
{topic_hash:?}: {err:?}"
108-
);
109-
}
104+
fn broadcast_message(
105+
&mut self,
106+
message: Bytes,
107+
topic_hash: TopicHash,
108+
) -> Result<MessageId, PublishError> {
109+
self.behaviour_mut().gossipsub.publish(topic_hash.clone(), message)
110110
}
111111

112112
fn report_peer_as_malicious(&mut self, peer_id: PeerId, misconduct_score: MisconductScore) {

crates/apollo_network/src/network_manager/test.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ use futures::{pin_mut, Future, SinkExt, StreamExt};
1515
use lazy_static::lazy_static;
1616
use libp2p::core::transport::PortUse;
1717
use libp2p::core::ConnectedPoint;
18-
use libp2p::gossipsub::{SubscriptionError, TopicHash};
18+
use libp2p::gossipsub::{MessageId, PublishError, SubscriptionError, TopicHash};
1919
use libp2p::swarm::ConnectionId;
2020
use libp2p::{Multiaddr, PeerId, StreamProtocol};
2121
use tokio::select;
@@ -175,10 +175,15 @@ impl SwarmTrait for MockSwarm {
175175
Ok(())
176176
}
177177

178-
fn broadcast_message(&mut self, message: Bytes, topic_hash: TopicHash) {
178+
fn broadcast_message(
179+
&mut self,
180+
message: Bytes,
181+
topic_hash: TopicHash,
182+
) -> Result<MessageId, PublishError> {
179183
for sender in &self.broadcasted_messages_senders {
180184
sender.unbounded_send((message.clone(), topic_hash.clone())).unwrap();
181185
}
186+
Ok(MessageId::new(&message))
182187
}
183188

184189
fn report_peer_as_malicious(&mut self, peer_id: PeerId, _: MisconductScore) {

0 commit comments

Comments
 (0)