@@ -160,51 +160,55 @@ impl P2pNetworkPubsubState {
160160 message,
161161 seen_limit,
162162 } => {
163- pubsub_state. reduce_incoming_message ( peer_id, message, seen_limit) ?;
163+ // Check result later to ensure we always dispatch the cleanup action
164+ let reduce_incoming_result =
165+ pubsub_state. reduce_incoming_message ( peer_id, message, seen_limit) ;
164166
165167 let ( dispatcher, global_state) = state_context. into_dispatcher_and_state ( ) ;
166168
167169 dispatcher. push ( P2pNetworkPubsubAction :: IncomingMessageCleanup { peer_id } ) ;
168170
171+ reduce_incoming_result?;
172+
169173 let state: & Self = global_state. substate ( ) ?;
170174 let config: & P2pConfig = global_state. substate ( ) ?;
171175
172- let incoming_block = state. incoming_block . as_ref ( ) . cloned ( ) ;
173- let incoming_transactions = state. incoming_transactions . clone ( ) ;
174- let incoming_snarks = state. incoming_snarks . clone ( ) ;
175- let topics = state. topics . clone ( ) ;
176-
177- for ( topic_id, map) in topics {
176+ for ( topic_id, map) in & state. topics {
178177 let mesh_size = map. values ( ) . filter ( |s| s. on_mesh ( ) ) . count ( ) ;
179178 let could_accept = mesh_size < config. meshsub . outbound_degree_high ;
180179
181180 if !could_accept {
182181 if let Some ( topic_state) = map. get ( & peer_id) {
183182 if topic_state. on_mesh ( ) {
183+ let topic_id = topic_id. clone ( ) ;
184184 dispatcher. push ( P2pNetworkPubsubAction :: Prune { peer_id, topic_id } )
185185 }
186186 }
187187 }
188188 }
189189
190- broadcast ( dispatcher, global_state) ?;
190+ if let Err ( error) = broadcast ( dispatcher, global_state) {
191+ bug_condition ! (
192+ "Failure when trying to broadcast incoming pubsub message: {error}"
193+ ) ;
194+ } ;
191195
192- if let Some ( ( _, block) ) = incoming_block {
193- let best_tip = BlockWithHash :: try_new ( block) ?;
196+ if let Some ( ( _, block) ) = state . incoming_block . as_ref ( ) {
197+ let best_tip = BlockWithHash :: try_new ( block. clone ( ) ) ?;
194198 dispatcher. push ( P2pPeerAction :: BestTipUpdate { peer_id, best_tip } ) ;
195199 }
196- for ( transaction, nonce) in incoming_transactions {
200+ for ( transaction, nonce) in & state . incoming_transactions {
197201 dispatcher. push ( P2pChannelsTransactionAction :: Libp2pReceived {
198202 peer_id,
199- transaction : Box :: new ( transaction) ,
200- nonce,
203+ transaction : Box :: new ( transaction. clone ( ) ) ,
204+ nonce : * nonce ,
201205 } ) ;
202206 }
203- for ( snark, nonce) in incoming_snarks {
207+ for ( snark, nonce) in & state . incoming_snarks {
204208 dispatcher. push ( P2pChannelsSnarkAction :: Libp2pReceived {
205209 peer_id,
206- snark : Box :: new ( snark) ,
207- nonce,
210+ snark : Box :: new ( snark. clone ( ) ) ,
211+ nonce : * nonce ,
208212 } ) ;
209213 }
210214 Ok ( ( ) )
0 commit comments