Skip to content

Commit 6f15c6e

Browse files
committed
feat(p2p): Refactor incoming data reducer to make it more readable
1 parent 16c8ae6 commit 6f15c6e

File tree

1 file changed

+92
-61
lines changed

1 file changed

+92
-61
lines changed

p2p/src/network/pubsub/p2p_network_pubsub_reducer.rs

Lines changed: 92 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,8 @@ use crate::{
1414
use super::{
1515
p2p_network_pubsub_state::P2pNetworkPubsubClientMeshAddingState,
1616
pb::{self, Message},
17-
P2pNetworkPubsubAction, P2pNetworkPubsubClientState, P2pNetworkPubsubClientTopicState,
18-
P2pNetworkPubsubEffectfulAction, P2pNetworkPubsubState, TOPIC,
17+
P2pNetworkPubsubAction, P2pNetworkPubsubClientState, P2pNetworkPubsubEffectfulAction,
18+
P2pNetworkPubsubState, TOPIC,
1919
};
2020

2121
impl P2pNetworkPubsubState {
@@ -187,7 +187,7 @@ impl P2pNetworkPubsubState {
187187
}
188188
}
189189

190-
if let Err(error) = broadcast(dispatcher, global_state) {
190+
if let Err(error) = Self::broadcast(dispatcher, global_state) {
191191
bug_condition!(
192192
"Failure when trying to broadcast incoming pubsub message: {error}"
193193
);
@@ -408,7 +408,7 @@ impl P2pNetworkPubsubState {
408408
}
409409

410410
let (dispatcher, state) = state_context.into_dispatcher_and_state();
411-
broadcast(dispatcher, state)
411+
Self::broadcast(dispatcher, state)
412412
}
413413
P2pNetworkPubsubAction::OutgoingData { mut data, peer_id } => {
414414
let (dispatcher, state) = state_context.into_dispatcher_and_state();
@@ -537,66 +537,81 @@ impl P2pNetworkPubsubState {
537537
Ok(())
538538
}
539539

540+
fn combined_with_pending_buffer<'a>(buffer: &'a mut Vec<u8>, data: &'a [u8]) -> &'a [u8] {
541+
if buffer.is_empty() {
542+
// Nothing pending, we can use the data directly
543+
data
544+
} else {
545+
buffer.extend_from_slice(data);
546+
buffer.as_slice()
547+
}
548+
}
549+
550+
/// Processes incoming data from a peer, handling subscriptions, control messages,
551+
/// and message dissemination within the P2P pubsub system.
540552
fn reduce_incoming_data(
541553
&mut self,
542554
peer_id: &PeerId,
543555
data: Data,
544556
timestamp: Timestamp,
545557
) -> Result<(), String> {
546-
let Some(state) = self.clients.get_mut(peer_id) else {
558+
let Some(client_state) = self.clients.get_mut(peer_id) else {
547559
// TODO: investigate, cannot reproduce this
548560
// bug_condition!("State not found for action: P2pNetworkPubsubAction::IncomingData");
549561
return Ok(());
550562
};
551-
let slice = if state.buffer.is_empty() {
552-
&*data
553-
} else {
554-
state.buffer.extend_from_slice(&data);
555-
&state.buffer
556-
};
557-
let mut subscriptions = vec![];
558-
let mut control = pb::ControlMessage::default();
563+
564+
// Data may be part of a partial message we received before.
565+
let slice = Self::combined_with_pending_buffer(&mut client_state.buffer, &data);
566+
559567
match <pb::Rpc as prost::Message>::decode_length_delimited(slice) {
560-
Ok(v) => {
561-
state.buffer.clear();
562-
state.buffer.shrink_to(0x2000);
563-
// println!(
564-
// "(pubsub) this <- {peer_id}, {:?}, {:?}, {}",
565-
// v.subscriptions,
566-
// v.control,
567-
// v.publish.len()
568-
// );
569-
570-
subscriptions.extend_from_slice(&v.subscriptions);
571-
state.incoming_messages.extend_from_slice(&v.publish);
572-
if let Some(v) = v.control {
573-
control.graft.extend_from_slice(&v.graft);
574-
control.prune.extend_from_slice(&v.prune);
575-
control.ihave.extend_from_slice(&v.ihave);
576-
control.iwant.extend_from_slice(&v.iwant);
577-
}
568+
Ok(decoded) => {
569+
client_state.buffer.clear();
570+
client_state.buffer.shrink_to(0x2000);
571+
572+
client_state.incoming_messages.extend(decoded.publish);
573+
574+
let subscriptions = decoded.subscriptions;
575+
let control = decoded.control.unwrap_or_default();
576+
577+
self.update_subscriptions(peer_id, subscriptions);
578+
self.apply_control_commands(peer_id, &control);
579+
self.respond_to_iwant_requests(peer_id, &control.iwant);
580+
self.process_ihave_messages(peer_id, control.ihave, timestamp);
578581
}
579582
Err(err) => {
580-
// bad way to check the error, but `prost` doesn't provide better
581-
if err.to_string().contains("buffer underflow") && state.buffer.is_empty() {
582-
state.buffer = data.to_vec();
583+
// NOTE: not the ideal way to check for errors, but `prost` doesn't provide
584+
// a better alternative, so we must check the message contents.
585+
if err.to_string().contains("buffer underflow") && client_state.buffer.is_empty() {
586+
// Incomplete data, keep in buffer, should be completed later
587+
client_state.buffer = data.to_vec();
583588
}
589+
590+
// TODO: handle other errors
591+
// TODO: if the error is not a buffer underflow, buffer needs to be cleared.
584592
}
585593
}
586594

595+
Ok(())
596+
}
597+
598+
fn update_subscriptions(&mut self, peer_id: &PeerId, subscriptions: Vec<pb::rpc::SubOpts>) {
599+
// Update subscription status based on incoming subscription requests.
587600
for subscription in &subscriptions {
588601
let topic_id = subscription.topic_id().to_owned();
589602
let topic = self.topics.entry(topic_id).or_default();
590603

591604
if subscription.subscribe() {
592-
if let Entry::Vacant(v) = topic.entry(*peer_id) {
593-
v.insert(P2pNetworkPubsubClientTopicState::default());
594-
}
605+
topic.entry(*peer_id).or_default();
595606
} else {
596607
topic.remove(peer_id);
597608
}
598609
}
610+
}
599611

612+
/// Applies control commands (`graft` and `prune`) to manage the peer's mesh states within topics.
613+
fn apply_control_commands(&mut self, peer_id: &PeerId, control: &pb::ControlMessage) {
614+
// Apply graft commands to add the peer to specific topic meshes.
600615
for graft in &control.graft {
601616
if let Some(mesh_state) = self
602617
.topics
@@ -606,6 +621,8 @@ impl P2pNetworkPubsubState {
606621
mesh_state.mesh = P2pNetworkPubsubClientMeshAddingState::Added;
607622
}
608623
}
624+
625+
// Apply prune commands to remove the peer from specific topic meshes.
609626
for prune in &control.prune {
610627
if let Some(mesh_state) = self
611628
.topics
@@ -615,7 +632,11 @@ impl P2pNetworkPubsubState {
615632
mesh_state.mesh = P2pNetworkPubsubClientMeshAddingState::TheyRefused;
616633
}
617634
}
618-
for iwant in &control.iwant {
635+
}
636+
637+
fn respond_to_iwant_requests(&mut self, peer_id: &PeerId, iwant_requests: &[pb::ControlIWant]) {
638+
// Respond to iwant requests by publishing available messages from the cache.
639+
for iwant in iwant_requests {
619640
for msg_id in &iwant.message_ids {
620641
if let Some(msg) = self.mcache.map.get(msg_id) {
621642
if let Some(client) = self.clients.get_mut(peer_id) {
@@ -624,8 +645,16 @@ impl P2pNetworkPubsubState {
624645
}
625646
}
626647
}
648+
}
627649

628-
for ihave in control.ihave {
650+
fn process_ihave_messages(
651+
&mut self,
652+
peer_id: &PeerId,
653+
ihave_messages: Vec<pb::ControlIHave>,
654+
timestamp: Timestamp,
655+
) {
656+
// Process ihave messages by determining which available messages the client wants.
657+
for ihave in ihave_messages {
629658
if self.clients.contains_key(peer_id) {
630659
let message_ids = ihave
631660
.message_ids
@@ -634,34 +663,36 @@ impl P2pNetworkPubsubState {
634663
.collect::<Vec<_>>();
635664

636665
let Some(client) = self.clients.get_mut(peer_id) else {
637-
bug_condition!("State not found for {}", peer_id);
638-
return Ok(());
666+
bug_condition!("process_ihave_messages: State not found for {}", peer_id);
667+
return;
639668
};
640669

670+
// Queue the desired message IDs for the client to request.
641671
let ctr = client.message.control.get_or_insert_with(Default::default);
642672
ctr.iwant.push(pb::ControlIWant { message_ids })
643673
}
644674
}
645-
Ok(())
646675
}
647-
}
648676

649-
fn broadcast<Action, State>(
650-
dispatcher: &mut Dispatcher<Action, State>,
651-
state: &State,
652-
) -> Result<(), String>
653-
where
654-
State: crate::P2pStateTrait,
655-
Action: crate::P2pActionTrait<State>,
656-
{
657-
let state: &P2pNetworkPubsubState = state.substate()?;
658-
659-
state
660-
.clients
661-
.iter()
662-
.filter(|(_, s)| !s.message_is_empty())
663-
.map(|(peer_id, _)| P2pNetworkPubsubAction::OutgoingMessage { peer_id: *peer_id })
664-
.for_each(|action| dispatcher.push(action));
665-
666-
Ok(())
677+
fn broadcast<Action, State>(
678+
dispatcher: &mut Dispatcher<Action, State>,
679+
state: &State,
680+
) -> Result<(), String>
681+
where
682+
State: crate::P2pStateTrait,
683+
Action: crate::P2pActionTrait<State>,
684+
{
685+
let state: &P2pNetworkPubsubState = state.substate()?;
686+
687+
for peer_id in state
688+
.clients
689+
.iter()
690+
.filter(|(_, s)| !s.message_is_empty())
691+
.map(|(peer_id, _)| *peer_id)
692+
{
693+
dispatcher.push(P2pNetworkPubsubAction::OutgoingMessage { peer_id });
694+
}
695+
696+
Ok(())
697+
}
667698
}

0 commit comments

Comments
 (0)