Skip to content

Commit ec0a9f1

Browse files
tomakawheresaddiemxinden
authored andcommitted
sc-network: Log outgoing notifications too (paritytech#7624)
* Log outgoing notifications too * Update client/network/src/protocol/generic_proto/handler.rs Co-authored-by: Max Inden <[email protected]> Co-authored-by: Addie Wagenknecht <[email protected]> Co-authored-by: Max Inden <[email protected]>
1 parent cce716e commit ec0a9f1

File tree

3 files changed

+52
-6
lines changed

3 files changed

+52
-6
lines changed

client/network/src/protocol/generic_proto/behaviour.rs

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -650,13 +650,17 @@ impl GenericProto {
650650
Some(sink) => sink
651651
};
652652

653+
let message = message.into();
654+
653655
trace!(
654656
target: "sub-libp2p",
655-
"External API => Notification({:?}, {:?})",
657+
"External API => Notification({:?}, {:?}, {} bytes)",
656658
target,
657659
protocol_name,
660+
message.len(),
658661
);
659-
trace!(target: "sub-libp2p", "Handler({:?}) <= Packet", target);
662+
trace!(target: "sub-libp2p", "Handler({:?}) <= Sync notification", target);
663+
660664
notifs_sink.send_sync_notification(
661665
protocol_name,
662666
message
@@ -1930,9 +1934,10 @@ impl NetworkBehaviour for GenericProto {
19301934
if self.is_open(&source) {
19311935
trace!(
19321936
target: "sub-libp2p",
1933-
"Handler({:?}) => Notification({:?})",
1937+
"Handler({:?}) => Notification({:?}, {} bytes)",
19341938
source,
19351939
protocol_name,
1940+
message.len()
19361941
);
19371942
trace!(target: "sub-libp2p", "External API <= Message({:?}, {:?})", protocol_name, source);
19381943
let event = GenericProtoOut::Notification {
@@ -1945,9 +1950,10 @@ impl NetworkBehaviour for GenericProto {
19451950
} else {
19461951
trace!(
19471952
target: "sub-libp2p",
1948-
"Handler({:?}) => Post-close notification({:?})",
1953+
"Handler({:?}) => Post-close notification({:?}, {} bytes)",
19491954
source,
19501955
protocol_name,
1956+
message.len()
19511957
);
19521958
}
19531959
}

client/network/src/protocol/generic_proto/handler.rs

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,9 @@ pub struct NotifsHandler {
138138
/// Whether we are the connection dialer or listener.
139139
endpoint: ConnectedPoint,
140140

141+
/// Remote we are connected to.
142+
peer_id: PeerId,
143+
141144
/// State of this handler.
142145
state: State,
143146

@@ -260,12 +263,13 @@ impl IntoProtocolsHandler for NotifsHandlerProto {
260263
SelectUpgrade::new(in_protocols, self.legacy_protocol.clone())
261264
}
262265

263-
fn into_handler(self, _: &PeerId, connected_point: &ConnectedPoint) -> Self::Handler {
266+
fn into_handler(self, peer_id: &PeerId, connected_point: &ConnectedPoint) -> Self::Handler {
264267
let num_out_proto = self.out_protocols.len();
265268

266269
NotifsHandler {
267270
in_protocols: self.in_protocols,
268271
out_protocols: self.out_protocols,
272+
peer_id: peer_id.clone(),
269273
endpoint: connected_point.clone(),
270274
when_connection_open: Instant::now(),
271275
state: State::Closed {
@@ -365,6 +369,8 @@ pub struct NotificationsSink {
365369

366370
#[derive(Debug)]
367371
struct NotificationsSinkInner {
372+
/// Target of the sink.
373+
peer_id: PeerId,
368374
/// Sender to use in asynchronous contexts. Uses an asynchronous mutex.
369375
async_channel: FuturesMutex<mpsc::Sender<NotificationsSinkMessage>>,
370376
/// Sender to use in synchronous contexts. Uses a synchronous mutex.
@@ -390,6 +396,11 @@ enum NotificationsSinkMessage {
390396
}
391397

392398
impl NotificationsSink {
399+
/// Returns the [`PeerId`] the sink is connected to.
400+
pub fn peer_id(&self) -> &PeerId {
401+
&self.inner.peer_id
402+
}
403+
393404
/// Sends a notification to the peer.
394405
///
395406
/// If too many messages are already buffered, the notification is silently discarded and the
@@ -447,6 +458,12 @@ pub struct Ready<'a> {
447458
}
448459

449460
impl<'a> Ready<'a> {
461+
/// Returns the name of the protocol. Matches the one passed to
462+
/// [`NotificationsSink::reserve_notification`].
463+
pub fn protocol_name(&self) -> &Cow<'static, str> {
464+
&self.protocol_name
465+
}
466+
450467
/// Consumes this slots reservation and actually queues the notification.
451468
///
452469
/// Returns an error if the substream has been closed.
@@ -622,6 +639,7 @@ impl ProtocolsHandler for NotifsHandler {
622639
let (sync_tx, sync_rx) = mpsc::channel(SYNC_NOTIFICATIONS_BUFFER_SIZE);
623640
let notifications_sink = NotificationsSink {
624641
inner: Arc::new(NotificationsSinkInner {
642+
peer_id: self.peer_id.clone(),
625643
async_channel: FuturesMutex::new(async_tx),
626644
sync_channel: Mutex::new(sync_tx),
627645
}),
@@ -782,6 +800,7 @@ impl ProtocolsHandler for NotifsHandler {
782800
let (sync_tx, sync_rx) = mpsc::channel(SYNC_NOTIFICATIONS_BUFFER_SIZE);
783801
let notifications_sink = NotificationsSink {
784802
inner: Arc::new(NotificationsSinkInner {
803+
peer_id: self.peer_id.clone(),
785804
async_channel: FuturesMutex::new(async_tx),
786805
sync_channel: Mutex::new(sync_tx),
787806
}),

client/network/src/service.rs

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -664,7 +664,7 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkService<B, H> {
664664
// `peers_notifications_sinks` mutex as soon as possible.
665665
let sink = {
666666
let peers_notifications_sinks = self.peers_notifications_sinks.lock();
667-
if let Some(sink) = peers_notifications_sinks.get(&(target, protocol.clone())) {
667+
if let Some(sink) = peers_notifications_sinks.get(&(target.clone(), protocol.clone())) {
668668
sink.clone()
669669
} else {
670670
// Notification silently discarded, as documented.
@@ -684,6 +684,14 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkService<B, H> {
684684
}
685685

686686
// Sending is communicated to the `NotificationsSink`.
687+
trace!(
688+
target: "sub-libp2p",
689+
"External API => Notification({:?}, {:?}, {} bytes)",
690+
target,
691+
protocol,
692+
message.len()
693+
);
694+
trace!(target: "sub-libp2p", "Handler({:?}) <= Sync notification", target);
687695
sink.send_sync_notification(protocol, message);
688696
}
689697

@@ -1139,6 +1147,7 @@ impl NotificationSender {
11391147
Ok(r) => r,
11401148
Err(()) => return Err(NotificationSenderError::Closed),
11411149
},
1150+
peer_id: self.sink.peer_id(),
11421151
notification_size_metric: self.notification_size_metric.clone(),
11431152
})
11441153
}
@@ -1149,6 +1158,9 @@ impl NotificationSender {
11491158
pub struct NotificationSenderReady<'a> {
11501159
ready: Ready<'a>,
11511160

1161+
/// Target of the notification.
1162+
peer_id: &'a PeerId,
1163+
11521164
/// Field extracted from the [`Metrics`] struct and necessary to report the
11531165
/// notifications-related metrics.
11541166
notification_size_metric: Option<Histogram>,
@@ -1163,6 +1175,15 @@ impl<'a> NotificationSenderReady<'a> {
11631175
notification_size_metric.observe(notification.len() as f64);
11641176
}
11651177

1178+
trace!(
1179+
target: "sub-libp2p",
1180+
"External API => Notification({:?}, {:?}, {} bytes)",
1181+
self.peer_id,
1182+
self.ready.protocol_name(),
1183+
notification.len()
1184+
);
1185+
trace!(target: "sub-libp2p", "Handler({:?}) <= Async notification", self.peer_id);
1186+
11661187
self.ready
11671188
.send(notification)
11681189
.map_err(|()| NotificationSenderError::Closed)

0 commit comments

Comments
 (0)