Skip to content

Commit 29cdba5

Browse files
committed
Added error handling to pubsub
1 parent 4f7efe4 commit 29cdba5

File tree

5 files changed

+45
-17
lines changed

5 files changed

+45
-17
lines changed

node/src/action_kind.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -310,8 +310,10 @@ pub enum ActionKind {
310310
P2pNetworkPubsubNewStream,
311311
P2pNetworkPubsubOutgoingData,
312312
P2pNetworkPubsubOutgoingMessage,
313+
P2pNetworkPubsubOutgoingMessageError,
313314
P2pNetworkPubsubPrune,
314315
P2pNetworkPubsubSign,
316+
P2pNetworkPubsubSignError,
315317
P2pNetworkRpcHeartbeatSend,
316318
P2pNetworkRpcIncomingData,
317319
P2pNetworkRpcIncomingMessage,
@@ -538,7 +540,7 @@ pub enum ActionKind {
538540
}
539541

540542
impl ActionKind {
541-
pub const COUNT: u16 = 446;
543+
pub const COUNT: u16 = 448;
542544
}
543545

544546
impl std::fmt::Display for ActionKind {
@@ -1466,8 +1468,10 @@ impl ActionKindGet for P2pNetworkPubsubAction {
14661468
Self::Prune { .. } => ActionKind::P2pNetworkPubsubPrune,
14671469
Self::Broadcast { .. } => ActionKind::P2pNetworkPubsubBroadcast,
14681470
Self::Sign { .. } => ActionKind::P2pNetworkPubsubSign,
1471+
Self::SignError { .. } => ActionKind::P2pNetworkPubsubSignError,
14691472
Self::BroadcastSigned { .. } => ActionKind::P2pNetworkPubsubBroadcastSigned,
14701473
Self::OutgoingMessage { .. } => ActionKind::P2pNetworkPubsubOutgoingMessage,
1474+
Self::OutgoingMessageError { .. } => ActionKind::P2pNetworkPubsubOutgoingMessageError,
14711475
Self::OutgoingData { .. } => ActionKind::P2pNetworkPubsubOutgoingData,
14721476
}
14731477
}

p2p/src/network/pubsub/p2p_network_pubsub_actions.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,13 +37,23 @@ pub enum P2pNetworkPubsubAction {
3737
data: Data,
3838
topic: String,
3939
},
40+
#[action_event(level = warn, fields(display(author), display(topic)))]
41+
SignError {
42+
author: PeerId,
43+
topic: String,
44+
},
4045
BroadcastSigned {
4146
signature: Data,
4247
},
4348
OutgoingMessage {
4449
msg: pb::Rpc,
4550
peer_id: PeerId,
4651
},
52+
#[action_event(level = warn, fields(display(peer_id), debug(msg)))]
53+
OutgoingMessageError {
54+
msg: pb::Rpc,
55+
peer_id: PeerId,
56+
},
4757
OutgoingData {
4858
data: Data,
4959
peer_id: PeerId,

p2p/src/network/pubsub/p2p_network_pubsub_effects.rs

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,9 @@ impl P2pNetworkPubsubAction {
100100
P2pNetworkPubsubAction::Broadcast { message } => {
101101
// println!("(pubsub) {this} broadcast");
102102
let mut buffer = vec![0; 8];
103-
binprot::BinProtWrite::binprot_write(&message, &mut buffer).expect("msg");
103+
if binprot::BinProtWrite::binprot_write(&message, &mut buffer).is_err() {
104+
return;
105+
}
104106
let len = buffer.len() - 8;
105107
buffer[..8].clone_from_slice(&(len as u64).to_le_bytes());
106108

@@ -111,12 +113,11 @@ impl P2pNetworkPubsubAction {
111113
topic: TOPIC.to_owned(),
112114
});
113115
}
114-
P2pNetworkPubsubAction::Sign { .. } => {
116+
P2pNetworkPubsubAction::Sign { author, topic, .. } => {
115117
if let Some(to_sign) = state.to_sign.front() {
116118
let mut publication = vec![];
117-
if let Err(err) = prost::Message::encode(to_sign, &mut publication) {
118-
// TODO: dispatch action for logging
119-
let _ = err;
119+
if prost::Message::encode(to_sign, &mut publication).is_err() {
120+
store.dispatch(P2pNetworkPubsubAction::SignError { author, topic });
120121
} else {
121122
let signature = store.service().sign_publication(&publication).into();
122123
store.dispatch(P2pNetworkPubsubAction::BroadcastSigned { signature });
@@ -183,17 +184,20 @@ impl P2pNetworkPubsubAction {
183184
// println!("{}", std::str::from_utf8(&id).unwrap());
184185
// }
185186
let mut data = vec![];
186-
if let Err(err) = prost::Message::encode_length_delimited(&msg, &mut data) {
187-
// TODO: dispatch action for logging
188-
let _ = err;
187+
if prost::Message::encode_length_delimited(&msg, &mut data).is_err() {
188+
store.dispatch(P2pNetworkPubsubAction::OutgoingMessageError {
189+
msg,
190+
peer_id,
191+
});
189192
} else {
190193
store.dispatch(P2pNetworkPubsubAction::OutgoingData {
191-
data: data.clone().into(),
194+
data: data.into(),
192195
peer_id,
193196
});
194197
}
195198
}
196199
}
200+
P2pNetworkPubsubAction::OutgoingMessageError { .. } => {}
197201
P2pNetworkPubsubAction::OutgoingData { mut data, peer_id } => {
198202
let Some(state) = store
199203
.state()
@@ -217,6 +221,7 @@ impl P2pNetworkPubsubAction {
217221
});
218222
}
219223
}
224+
P2pNetworkPubsubAction::SignError { .. } => (),
220225
}
221226
}
222227
}

p2p/src/network/pubsub/p2p_network_pubsub_reducer.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,10 @@ impl P2pNetworkPubsubState {
106106
}
107107
}
108108
for message in v.publish {
109-
let message_id = self.mcache.put(message.clone());
109+
let Some(message_id) = self.mcache.put(message.clone()) else {
110+
continue;
111+
};
112+
110113
let topic = self.topics.entry(message.topic.clone()).or_default();
111114
if let Some(signature) = &message.signature {
112115
// skip recently seen message
@@ -258,6 +261,7 @@ impl P2pNetworkPubsubState {
258261
v.message.control = None;
259262
}
260263
}
264+
P2pNetworkPubsubAction::OutgoingMessageError { .. } => {}
261265
P2pNetworkPubsubAction::Broadcast { .. } => {}
262266
P2pNetworkPubsubAction::Sign {
263267
seqno,
@@ -277,6 +281,9 @@ impl P2pNetworkPubsubState {
277281
key: None,
278282
});
279283
}
284+
P2pNetworkPubsubAction::SignError { .. } => {
285+
let _ = self.to_sign.pop_front();
286+
}
280287
P2pNetworkPubsubAction::BroadcastSigned { signature } => {
281288
if let Some(mut message) = self.to_sign.pop_front() {
282289
message.signature = Some(signature.clone().0.to_vec());

p2p/src/network/pubsub/p2p_network_pubsub_state.rs

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -43,38 +43,40 @@ pub struct P2pNetworkPubsubMessageCache {
4343
impl P2pNetworkPubsubMessageCache {
4444
const CAPACITY: usize = 100;
4545

46-
pub fn put(&mut self, message: pb::Message) -> Vec<u8> {
47-
let id = compute_message_id(&message);
46+
pub fn put(&mut self, message: pb::Message) -> Option<Vec<u8>> {
47+
let id = compute_message_id(&message)?;
4848
self.map.insert(id.clone(), message);
4949
self.queue.push_back(id.clone());
5050
if self.queue.len() > Self::CAPACITY {
5151
if let Some(id) = self.queue.pop_back() {
5252
self.map.remove(&id);
5353
}
5454
}
55-
id
55+
Some(id)
5656
}
5757
}
5858

5959
// TODO: what if wasm32?
6060
// How to test it?
61-
pub fn compute_message_id(message: &pb::Message) -> Vec<u8> {
61+
pub fn compute_message_id(message: &pb::Message) -> Option<Vec<u8>> {
6262
let source_bytes = message
6363
.from
6464
.as_ref()
6565
.map(AsRef::as_ref)
6666
.unwrap_or(&[0, 1, 0][..]);
67+
6768
let mut source_string = libp2p_identity::PeerId::from_bytes(source_bytes)
68-
.expect("Valid peer id")
69+
.ok()?
6970
.to_base58();
71+
7072
let sequence_number = message
7173
.seqno
7274
.as_ref()
7375
.and_then(|b| <[u8; 8]>::try_from(b.as_slice()).ok())
7476
.map(u64::from_be_bytes)
7577
.unwrap_or_default();
7678
source_string.push_str(&sequence_number.to_string());
77-
source_string.into_bytes()
79+
Some(source_string.into_bytes())
7880
}
7981

8082
#[derive(Default, Serialize, Deserialize, Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]

0 commit comments

Comments
 (0)