Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion protocols/gossipsub/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,13 @@

- Fix incorrect default values in ConfigBuilder
See [PR 6113](https://github.com/libp2p/rust-libp2p/pull/6113)

- Remove duplicated config `set_topic_max_transmit_size` method, prefer `max_transmit_size_for_topic`.
See [PR 6173](https://github.com/libp2p/rust-libp2p/pull/6173).

- Switch the internal `async-channel` used to dispatch messages from `NetworkBehaviour` to the `ConnectionHandler`
with an internal priority queue. See [PR XXXX](https://github.com/libp2p/rust-libp2p/pull/6175)

## 0.49.2

- Relax `Behaviour::with_metrics` requirements, do not require DataTransform and TopicSubscriptionFilter to also impl Default
Expand All @@ -37,6 +40,7 @@

- Feature gate metrics related code. This changes some `Behaviour` constructor methods.
See [PR 6020](https://github.com/libp2p/rust-libp2p/pull/6020)

- Send IDONTWANT before Publishing a new message.
See [PR 6017](https://github.com/libp2p/rust-libp2p/pull/6017)

Expand Down
129 changes: 53 additions & 76 deletions protocols/gossipsub/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ use crate::{
mcache::MessageCache,
peer_score::{PeerScore, PeerScoreParams, PeerScoreState, PeerScoreThresholds, RejectReason},
protocol::SIGNING_PREFIX,
rpc::Sender,
queue::Queue,
rpc_proto::proto,
subscription_filter::{AllowAllSubscriptionFilter, TopicSubscriptionFilter},
time_cache::DuplicateCache,
Expand Down Expand Up @@ -751,6 +751,7 @@ where
if self.send_message(
*peer_id,
RpcOut::Publish {
message_id: msg_id.clone(),
message: raw_message.clone(),
timeout: Delay::new(self.config.publish_queue_duration()),
},
Expand Down Expand Up @@ -1341,6 +1342,7 @@ where
self.send_message(
*peer_id,
RpcOut::Forward {
message_id: id.clone(),
message: msg,
timeout: Delay::new(self.config.forward_queue_duration()),
},
Expand Down Expand Up @@ -2081,9 +2083,9 @@ where
// steady-state size of the queues.
#[cfg(feature = "metrics")]
if let Some(m) = &mut self.metrics {
for sender_queue in self.connected_peers.values().map(|v| &v.sender) {
m.observe_priority_queue_size(sender_queue.priority_queue_len());
m.observe_non_priority_queue_size(sender_queue.non_priority_queue_len());
for sender_queue in self.connected_peers.values().map(|v| &v.messages) {
m.observe_priority_queue_size(sender_queue.priority_len());
m.observe_non_priority_queue_size(sender_queue.non_priority_len());
}
}

Expand Down Expand Up @@ -2499,6 +2501,11 @@ where
// Report expired messages
for (peer_id, failed_messages) in self.failed_messages.drain() {
tracing::debug!("Peer couldn't consume messages: {:?}", failed_messages);
#[cfg(feature = "metrics")]
if let Some(metrics) = self.metrics.as_mut() {
metrics.observe_failed_priority_messages(failed_messages.priority);
metrics.observe_failed_non_priority_messages(failed_messages.non_priority);
}
self.events
.push_back(ToSwarm::GenerateEvent(Event::SlowPeer {
peer_id,
Expand Down Expand Up @@ -2746,6 +2753,7 @@ where
self.send_message(
*peer_id,
RpcOut::Forward {
message_id: msg_id.clone(),
message: message.clone(),
timeout: Delay::new(self.config.forward_queue_duration()),
},
Expand Down Expand Up @@ -2874,33 +2882,20 @@ where
return false;
}

// Try sending the message to the connection handler.
match peer.sender.send_message(rpc) {
// Try sending the message to the connection handler,
// High priority messages should not fail.
match peer.messages.try_push(rpc) {
Ok(()) => true,
Err(rpc) => {
// Sending failed because the channel is full.
tracing::warn!(peer=%peer_id, "Send Queue full. Could not send {:?}.", rpc);

// Update failed message counter.
let failed_messages = self.failed_messages.entry(peer_id).or_default();
match rpc {
RpcOut::Publish { .. } => {
failed_messages.priority += 1;
failed_messages.publish += 1;
}
RpcOut::Forward { .. } => {
failed_messages.non_priority += 1;
failed_messages.forward += 1;
}
RpcOut::IWant(_) | RpcOut::IHave(_) | RpcOut::IDontWant(_) => {
failed_messages.non_priority += 1;
}
RpcOut::Graft(_)
| RpcOut::Prune(_)
| RpcOut::Subscribe(_)
| RpcOut::Unsubscribe(_) => {
unreachable!("Channel for highpriority control messages is unbounded and should always be open.")
}
if rpc.priority() {
failed_messages.priority += 1;
} else {
failed_messages.non_priority += 1;
}

// Update peer score.
Expand Down Expand Up @@ -3125,23 +3120,22 @@ where
// The protocol negotiation occurs once a message is sent/received. Once this happens we
// update the type of peer that this is in order to determine which kind of routing should
// occur.
let connected_peer = self
.connected_peers
.entry(peer_id)
.or_insert_with(|| PeerDetails {
kind: PeerKind::Floodsub,
connections: vec![],
outbound: false,
sender: Sender::new(self.config.connection_handler_queue_len()),
topics: Default::default(),
dont_send: LinkedHashMap::new(),
});
let connected_peer = self.connected_peers.entry(peer_id).or_insert(PeerDetails {
kind: PeerKind::Floodsub,
connections: vec![],
outbound: false,
messages: Queue::new(self.config.connection_handler_queue_len()),
topics: Default::default(),
dont_send: LinkedHashMap::new(),
});
// Add the new connection
connected_peer.connections.push(connection_id);

// This clones a reference to the Queue so any new handlers reference the same underlying
// queue. No data is actually cloned here.
Ok(Handler::new(
self.config.protocol_config(),
connected_peer.sender.new_receiver(),
connected_peer.messages.clone(),
))
}

Expand All @@ -3153,25 +3147,24 @@ where
_: Endpoint,
_: PortUse,
) -> Result<THandler<Self>, ConnectionDenied> {
let connected_peer = self
.connected_peers
.entry(peer_id)
.or_insert_with(|| PeerDetails {
kind: PeerKind::Floodsub,
connections: vec![],
// Diverging from the go implementation we only want to consider a peer as outbound
// peer if its first connection is outbound.
outbound: !self.px_peers.contains(&peer_id),
sender: Sender::new(self.config.connection_handler_queue_len()),
topics: Default::default(),
dont_send: LinkedHashMap::new(),
});
let connected_peer = self.connected_peers.entry(peer_id).or_insert(PeerDetails {
kind: PeerKind::Floodsub,
connections: vec![],
// Diverging from the go implementation we only want to consider a peer as outbound peer
// if its first connection is outbound.
outbound: !self.px_peers.contains(&peer_id),
messages: Queue::new(self.config.connection_handler_queue_len()),
topics: Default::default(),
dont_send: LinkedHashMap::new(),
});
// Add the new connection
connected_peer.connections.push(connection_id);

// This clones a reference to the Queue so any new handlers reference the same underlying
// queue. No data is actually cloned here.
Ok(Handler::new(
self.config.protocol_config(),
connected_peer.sender.new_receiver(),
connected_peer.messages.clone(),
))
}

Expand Down Expand Up @@ -3213,6 +3206,8 @@ where
}
}
}
// rpc is only used for metrics code.
#[allow(unused_variables)]
HandlerEvent::MessageDropped(rpc) => {
// Account for this in the scoring logic
if let PeerScoreState::Active(peer_score) = &mut self.peer_score {
Expand All @@ -3221,32 +3216,7 @@ where

// Keep track of expired messages for the application layer.
let failed_messages = self.failed_messages.entry(propagation_source).or_default();
failed_messages.timeout += 1;
match rpc {
RpcOut::Publish { .. } => {
failed_messages.publish += 1;
}
RpcOut::Forward { .. } => {
failed_messages.forward += 1;
}
_ => {}
}

// Record metrics on the failure.
#[cfg(feature = "metrics")]
if let Some(metrics) = self.metrics.as_mut() {
match rpc {
RpcOut::Publish { message, .. } => {
metrics.publish_msg_dropped(&message.topic);
metrics.timeout_msg_dropped(&message.topic);
}
RpcOut::Forward { message, .. } => {
metrics.forward_msg_dropped(&message.topic);
metrics.timeout_msg_dropped(&message.topic);
}
_ => {}
}
}
failed_messages.non_priority += 1;
}
HandlerEvent::Message {
rpc,
Expand Down Expand Up @@ -3345,10 +3315,17 @@ where
"Could not handle IDONTWANT, peer doesn't exist in connected peer list");
continue;
};

// Remove messages from the queue.
#[allow(unused)]
let removed = peer.messages.remove_data_messages(&message_ids);

#[cfg(feature = "metrics")]
if let Some(metrics) = self.metrics.as_mut() {
metrics.register_idontwant(message_ids.len());
metrics.register_removed_messages(removed);
}

for message_id in message_ids {
peer.dont_send.insert(message_id, Instant::now());
// Don't exceed capacity.
Expand Down
Loading
Loading