@@ -604,18 +604,36 @@ impl P2pNetworkPubsubState {
604604 Ok ( ( ) )
605605 }
606606 P2pNetworkPubsubAction :: BroadcastValidatedMessage { message_id } => {
607- let Some ( ( mcache_message_id , _) ) =
607+ let Some ( ( message_id , _) ) =
608608 pubsub_state. mcache . get_message_id_and_message ( & message_id)
609609 else {
610610 bug_condition ! ( "Message with id: {:?} not found" , message_id) ;
611611 return Ok ( ( ) ) ;
612612 } ;
613613
614- let dispatcher = state_context. into_dispatcher ( ) ;
615- dispatcher. push ( P2pNetworkPubsubAction :: BroadcastMessage {
616- message_id : mcache_message_id,
617- } ) ;
618- Ok ( ( ) )
614+ let Some ( message) = pubsub_state. mcache . map . get ( & message_id) else {
615+ bug_condition ! ( "Message with id: {:?} not found" , message_id) ;
616+ return Ok ( ( ) ) ;
617+ } ;
618+
619+ let raw_message = message. message ( ) . clone ( ) ;
620+ let peer_id = * message. peer_id ( ) ;
621+
622+ pubsub_state. reduce_incoming_validated_message ( message_id, peer_id, & raw_message) ;
623+
624+ let Some ( message) = pubsub_state. mcache . map . get_mut ( & message_id) else {
625+ bug_condition ! ( "Message with id: {:?} not found" , message_id) ;
626+ return Ok ( ( ) ) ;
627+ } ;
628+
629+ * message = P2pNetworkPubsubMessageCacheMessage :: Validated {
630+ message : raw_message,
631+ peer_id,
632+ time : * message. time ( ) ,
633+ } ;
634+
635+ let ( dispatcher, state) = state_context. into_dispatcher_and_state ( ) ;
636+ Self :: broadcast ( dispatcher, state)
619637 }
620638 P2pNetworkPubsubAction :: PruneMessages { } => {
621639 let messages = pubsub_state
@@ -672,31 +690,6 @@ impl P2pNetworkPubsubState {
672690 Ok ( ( ) )
673691 }
674692 P2pNetworkPubsubAction :: IgnoreMessage { .. } => Ok ( ( ) ) ,
675- P2pNetworkPubsubAction :: BroadcastMessage { message_id } => {
676- let Some ( message) = pubsub_state. mcache . map . get ( & message_id) else {
677- bug_condition ! ( "Message with id: {:?} not found" , message_id) ;
678- return Ok ( ( ) ) ;
679- } ;
680-
681- let raw_message = message. message ( ) . clone ( ) ;
682- let peer_id = * message. peer_id ( ) ;
683-
684- pubsub_state. reduce_incoming_validated_message ( message_id, peer_id, & raw_message) ;
685-
686- let Some ( message) = pubsub_state. mcache . map . get_mut ( & message_id) else {
687- bug_condition ! ( "Message with id: {:?} not found" , message_id) ;
688- return Ok ( ( ) ) ;
689- } ;
690-
691- * message = P2pNetworkPubsubMessageCacheMessage :: Validated {
692- message : raw_message,
693- peer_id,
694- time : * message. time ( ) ,
695- } ;
696-
697- let ( dispatcher, state) = state_context. into_dispatcher_and_state ( ) ;
698- Self :: broadcast ( dispatcher, state)
699- }
700693 }
701694 }
702695
@@ -725,6 +718,10 @@ impl P2pNetworkPubsubState {
725718 Ok ( ( ) )
726719 }
727720
721+ /// Queues a validated message for propagation to other peers in the pubsub network.
722+ /// For peers that are "on mesh" for the message's topic, queues the full message.
723+ /// For other peers, queues an IHAVE control message to notify about message availability.
724+ /// The original sender is excluded from propagation.
728725 fn reduce_incoming_validated_message (
729726 & mut self ,
730727 message_id : P2pNetworkPubsubMessageCacheId ,
0 commit comments