@@ -115,12 +115,24 @@ impl P2pNetworkPubsubState {
115115 . filter ( |( c, _) | {
116116 // don't send back to who sent this
117117 * * c != * peer_id
118- && topic. get ( c) . map_or (
119- false ,
120- P2pNetworkPubsubClientTopicState :: on_mesh,
121- )
122118 } )
123- . for_each ( |( _, state) | state. message . publish . push ( message. clone ( ) ) ) ;
119+ . for_each ( |( c, state) | {
120+ let Some ( topic_state) = topic. get ( c) else {
121+ return ;
122+ } ;
123+ if topic_state. on_mesh ( ) {
124+ state. message . publish . push ( message. clone ( ) )
125+ } else {
126+ let ctr = state
127+ . message
128+ . control
129+ . get_or_insert_with ( Default :: default) ;
130+ ctr. ihave . push ( pb:: ControlIHave {
131+ topic_id : Some ( message. topic . clone ( ) ) ,
132+ message_ids : vec ! [ compute_message_id( & message) ] ,
133+ } )
134+ }
135+ } ) ;
124136
125137 if let Some ( data) = message. data {
126138 if data. len ( ) <= 8 {
@@ -245,3 +257,24 @@ impl P2pNetworkPubsubState {
245257 }
246258 }
247259}
260+
261+ // TODO: what if wasm32?
262+ // How to test it?
263+ fn compute_message_id ( message : & pb:: Message ) -> Vec < u8 > {
264+ let source_bytes = message
265+ . from
266+ . as_ref ( )
267+ . map ( AsRef :: as_ref)
268+ . unwrap_or ( & [ 0 , 1 , 0 ] [ ..] ) ;
269+ let mut source_string = libp2p_identity:: PeerId :: from_bytes ( source_bytes)
270+ . expect ( "Valid peer id" )
271+ . to_base58 ( ) ;
272+ let sequence_number = message
273+ . seqno
274+ . as_ref ( )
275+ . and_then ( |b| <[ u8 ; 8 ] >:: try_from ( b. as_slice ( ) ) . ok ( ) )
276+ . map ( u64:: from_be_bytes)
277+ . unwrap_or_default ( ) ;
278+ source_string. push_str ( & sequence_number. to_string ( ) ) ;
279+ source_string. into_bytes ( )
280+ }
0 commit comments