Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 89 additions & 11 deletions crates/apollo_network/src/network_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,13 @@ use futures::future::{ready, BoxFuture, Ready};
use futures::sink::With;
use futures::stream::{FuturesUnordered, Map, Stream};
use futures::{pin_mut, FutureExt, Sink, SinkExt, StreamExt};
use libp2p::gossipsub::{SubscriptionError, TopicHash};
use libp2p::gossipsub::{MessageId, PublishError, SubscriptionError, TopicHash};
use libp2p::identity::Keypair;
use libp2p::swarm::SwarmEvent;
use libp2p::{noise, yamux, Multiaddr, PeerId, StreamProtocol, Swarm, SwarmBuilder};
use metrics::NetworkMetrics;
use tokio::time::{sleep_until, Instant};
use tokio_retry::strategy::ExponentialBackoff;
use tracing::{debug, error, trace, warn};

use self::swarm_trait::SwarmTrait;
Expand All @@ -43,6 +44,20 @@ pub enum NetworkError {
#[error("Channels for broadcast topic with hash {topic_hash:?} were dropped.")]
BroadcastChannelsDropped { topic_hash: TopicHash },
}

struct BroadcastDetails {
/// Instant of next broadcast
time: Instant,
/// The number of broadcast tries preformed
count: u64,
/// The message to broadcast
message: Bytes,
/// The topic to broadcast on
topic: TopicHash,
/// exponential backoff strategy for broadcasting the next message.
broadcast_retry_strategy: ExponentialBackoff,
}

pub struct GenericNetworkManager<SwarmT: SwarmTrait> {
swarm: SwarmT,
inbound_protocol_to_buffer_size: HashMap<StreamProtocol, usize>,
Expand All @@ -64,29 +79,33 @@ pub struct GenericNetworkManager<SwarmT: SwarmTrait> {
continue_propagation_receiver: Receiver<BroadcastedMessageMetadata>,
metrics: Option<NetworkMetrics>,
next_metrics_update: Instant,
/// Next message to broadcast
next_broadcast: Option<BroadcastDetails>,
}

impl<SwarmT: SwarmTrait> GenericNetworkManager<SwarmT> {
pub async fn run(mut self) -> Result<(), NetworkError> {
if let Some(metrics) = self.metrics.as_ref() {
metrics.register();
}

loop {
let should_broadcast = self.next_broadcast.is_some();
let broadcast_time =
self.next_broadcast.as_ref().map(|x| x.time).unwrap_or(Instant::now());
tokio::select! {
Some(event) = self.swarm.next() => self.handle_swarm_event(event)?,
Some(res) = self.sqmr_inbound_response_receivers.next() => self.handle_response_for_inbound_query(res),
Some((protocol, client_payload)) = self.sqmr_outbound_payload_receivers.next() => {
let protocol = StreamProtocol::try_from_owned(protocol).expect("Invalid protocol should not appear");
self.handle_local_sqmr_payload(protocol, client_payload.expect("An SQMR client channel should not be terminated."))
}
Some((topic_hash, message)) = self.messages_to_broadcast_receivers.next() => {
self.broadcast_message(
message.ok_or(NetworkError::BroadcastChannelsDropped {
topic_hash: topic_hash.clone()
})?,
topic_hash,
);
}
Some((topic_hash, message)) = self.messages_to_broadcast_receivers.next(), if !should_broadcast => {
self.setup_broadcast(topic_hash, message)?;
},
_ = sleep_until(broadcast_time), if should_broadcast => {
self.do_broadcast();
},
Some(Some(peer_id)) = self.reported_peer_receivers.next() => self.swarm.report_peer_as_malicious(peer_id, MisconductScore::MALICIOUS),
Some(peer_id) = self.reported_peers_receiver.next() => self.swarm.report_peer_as_malicious(peer_id, MisconductScore::MALICIOUS),
Some(broadcasted_message_metadata) = self.continue_propagation_receiver.next() => {
Expand Down Expand Up @@ -138,6 +157,7 @@ impl<SwarmT: SwarmTrait> GenericNetworkManager<SwarmT> {
continue_propagation_receiver,
metrics,
next_metrics_update: Instant::now() + Duration::from_secs(1),
next_broadcast: None,
}
}

Expand Down Expand Up @@ -274,6 +294,60 @@ impl<SwarmT: SwarmTrait> GenericNetworkManager<SwarmT> {
})
}

fn setup_broadcast(
&mut self,
topic_hash: TopicHash,
message: Option<Bytes>,
) -> Result<(), NetworkError> {
let message = message
.ok_or(NetworkError::BroadcastChannelsDropped { topic_hash: topic_hash.clone() })?;
self.next_broadcast = Some(BroadcastDetails {
time: Instant::now(),
count: 0,
message,
topic: topic_hash,
broadcast_retry_strategy: ExponentialBackoff::from_millis(2)
.max_delay(Duration::from_secs(1)),
});
Ok(())
}

fn do_broadcast(&mut self) {
let mut details =
self.next_broadcast.take().expect("Broadcasting when next broadcast is None");
details.count += 1;
match self.broadcast_message(details.message.clone(), details.topic.clone()) {
Ok(_) => {}
Err(e) => match &e {
PublishError::Duplicate
| PublishError::SigningError(_)
| PublishError::MessageTooLarge => {
error!(
"Failed to broadcast message: `{e:?}` after {} tries Dropping message.",
details.count
);
}
PublishError::AllQueuesFull(_)
| PublishError::NoPeersSubscribedToTopic
| PublishError::TransformFailed(_) => {
let wait_duration = details.broadcast_retry_strategy.next().expect(
"Broadcast retry strategy ended even though it's an infinite iterator.",
);
warn!(
"Failed to broadcast message: `{e:?}` after {} tries. Trying again in {} \
milliseconds. Not reading more messages until then (Applying \
backpressure).",
details.count,
wait_duration.as_millis()
);

details.time = Instant::now() + wait_duration;
self.next_broadcast = Some(details)
}
},
}
}

fn handle_swarm_event(
&mut self,
event: SwarmEvent<mixed_behaviour::Event>,
Expand Down Expand Up @@ -661,7 +735,11 @@ impl<SwarmT: SwarmTrait> GenericNetworkManager<SwarmT> {
.insert(outbound_session_id, report_receiver);
}

fn broadcast_message(&mut self, message: Bytes, topic_hash: TopicHash) {
fn broadcast_message(
&mut self,
message: Bytes,
topic_hash: TopicHash,
) -> Result<MessageId, PublishError> {
if let Some(broadcast_metrics_by_topic) =
self.metrics.as_ref().and_then(|metrics| metrics.broadcast_metrics_by_topic.as_ref())
{
Expand All @@ -673,7 +751,7 @@ impl<SwarmT: SwarmTrait> GenericNetworkManager<SwarmT> {
}
}
trace!("Sending broadcast message with topic hash: {topic_hash:?}");
self.swarm.broadcast_message(message, topic_hash);
self.swarm.broadcast_message(message, topic_hash)
}

fn report_session_removed_to_metrics(&mut self, session_id: SessionId) {
Expand Down
26 changes: 13 additions & 13 deletions crates/apollo_network/src/network_manager/swarm_trait.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use futures::stream::Stream;
use libp2p::gossipsub::{SubscriptionError, TopicHash};
use libp2p::gossipsub::{MessageId, PublishError, SubscriptionError, TopicHash};
use libp2p::swarm::dial_opts::DialOpts;
use libp2p::swarm::{DialError, NetworkBehaviour, SwarmEvent};
use libp2p::{Multiaddr, PeerId, StreamProtocol, Swarm};
use tracing::{info, warn};
use tracing::info;

use super::BroadcastedMessageMetadata;
use crate::gossipsub_impl::Topic;
Expand Down Expand Up @@ -42,7 +42,11 @@ pub trait SwarmTrait: Stream<Item = Event> + Unpin {

fn subscribe_to_topic(&mut self, topic: &Topic) -> Result<(), SubscriptionError>;

fn broadcast_message(&mut self, message: Bytes, topic_hash: TopicHash);
fn broadcast_message(
&mut self,
message: Bytes,
topic_hash: TopicHash,
) -> Result<MessageId, PublishError>;

fn report_peer_as_malicious(&mut self, peer_id: PeerId, misconduct_score: MisconductScore);

Expand Down Expand Up @@ -100,16 +104,12 @@ impl SwarmTrait for Swarm<mixed_behaviour::MixedBehaviour> {
self.behaviour_mut().gossipsub.subscribe(topic).map(|_| ())
}

fn broadcast_message(&mut self, message: Bytes, topic_hash: TopicHash) {
let result = self.behaviour_mut().gossipsub.publish(topic_hash.clone(), message);
if let Err(err) = result {
// TODO(shahak): Consider reporting to the subscriber broadcast failures or retrying
// upon failure.
warn!(
"Error occured while broadcasting a message to the topic with hash \
{topic_hash:?}: {err:?}"
);
}
fn broadcast_message(
&mut self,
message: Bytes,
topic_hash: TopicHash,
) -> Result<MessageId, PublishError> {
self.behaviour_mut().gossipsub.publish(topic_hash.clone(), message)
}

fn report_peer_as_malicious(&mut self, peer_id: PeerId, misconduct_score: MisconductScore) {
Expand Down
9 changes: 7 additions & 2 deletions crates/apollo_network/src/network_manager/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use futures::{pin_mut, Future, SinkExt, StreamExt};
use lazy_static::lazy_static;
use libp2p::core::transport::PortUse;
use libp2p::core::ConnectedPoint;
use libp2p::gossipsub::{SubscriptionError, TopicHash};
use libp2p::gossipsub::{MessageId, PublishError, SubscriptionError, TopicHash};
use libp2p::swarm::ConnectionId;
use libp2p::{Multiaddr, PeerId, StreamProtocol};
use tokio::select;
Expand Down Expand Up @@ -175,10 +175,15 @@ impl SwarmTrait for MockSwarm {
Ok(())
}

fn broadcast_message(&mut self, message: Bytes, topic_hash: TopicHash) {
fn broadcast_message(
&mut self,
message: Bytes,
topic_hash: TopicHash,
) -> Result<MessageId, PublishError> {
for sender in &self.broadcasted_messages_senders {
sender.unbounded_send((message.clone(), topic_hash.clone())).unwrap();
}
Ok(MessageId::new(&message))
}

fn report_peer_as_malicious(&mut self, peer_id: PeerId, _: MisconductScore) {
Expand Down
Loading