1818// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
1919// DEALINGS IN THE SOFTWARE.
2020
21- use std:: { collections:: HashMap , convert:: Infallible , pin:: Pin } ;
21+ use std:: {
22+ collections:: HashMap ,
23+ convert:: Infallible ,
24+ future:: Future ,
25+ pin:: Pin ,
26+ sync:: Arc ,
27+ task:: { Context , Poll } ,
28+ } ;
2229
2330use asynchronous_codec:: { Decoder , Encoder , Framed } ;
2431use byteorder:: { BigEndian , ByteOrder } ;
@@ -62,7 +69,7 @@ pub(crate) const FLOODSUB_PROTOCOL: ProtocolId = ProtocolId {
6269} ;
6370
6471/// Implementation of [`InboundUpgrade`] and [`OutboundUpgrade`] for the Gossipsub protocol.
65- #[ derive( Debug , Clone ) ]
72+ #[ derive( Clone ) ]
6673pub struct ProtocolConfig {
6774 /// The Gossipsub protocol id to listen on.
6875 pub ( crate ) protocol_ids : Vec < ProtocolId > ,
@@ -72,6 +79,17 @@ pub struct ProtocolConfig {
7279 pub ( crate ) default_max_transmit_size : usize ,
7380 /// The max transmit sizes for a topic.
7481 pub ( crate ) max_transmit_sizes : HashMap < TopicHash , usize > ,
82+ /// Optional spawner for message verification.
83+ pub ( crate ) message_verification_spawner : Option <
84+ Arc <
85+ dyn Fn (
86+ Box < dyn FnOnce ( ) -> ValidationResult + Send > ,
87+ ) -> Pin < Box < dyn Future < Output = ValidationResult > + Send > >
88+ + Send
89+ + Sync
90+ + ' static ,
91+ > ,
92+ > ,
7593}
7694
7795impl Default for ProtocolConfig {
@@ -85,10 +103,26 @@ impl Default for ProtocolConfig {
85103 ] ,
86104 default_max_transmit_size : 65536 ,
87105 max_transmit_sizes : HashMap :: new ( ) ,
106+ message_verification_spawner : None ,
88107 }
89108 }
90109}
91110
111+ impl std:: fmt:: Debug for ProtocolConfig {
112+ fn fmt ( & self , f : & mut std:: fmt:: Formatter < ' _ > ) -> std:: fmt:: Result {
113+ f. debug_struct ( "ProtocolConfig" )
114+ . field ( "protocol_ids" , & self . protocol_ids )
115+ . field ( "validation_mode" , & self . validation_mode )
116+ . field ( "default_max_transmit_size" , & self . default_max_transmit_size )
117+ . field ( "max_transmit_sizes" , & self . max_transmit_sizes )
118+ . field (
119+ "message_verification_spawner" ,
120+ & self . message_verification_spawner . is_some ( ) ,
121+ )
122+ . finish ( )
123+ }
124+ }
125+
92126impl ProtocolConfig {
93127 /// Get the max transmit size for a given topic, falling back to the default.
94128 pub fn max_transmit_size_for_topic ( & self , topic : & TopicHash ) -> usize {
@@ -139,6 +173,7 @@ where
139173 self . default_max_transmit_size ,
140174 self . validation_mode ,
141175 self . max_transmit_sizes ,
176+ self . message_verification_spawner ,
142177 ) ,
143178 ) ,
144179 protocol_id. kind ,
@@ -162,6 +197,7 @@ where
162197 self . default_max_transmit_size ,
163198 self . validation_mode ,
164199 self . max_transmit_sizes ,
200+ self . message_verification_spawner ,
165201 ) ,
166202 ) ,
167203 protocol_id. kind ,
@@ -178,11 +214,22 @@ pub struct GossipsubCodec {
178214 codec : quick_protobuf_codec:: Codec < proto:: RPC > ,
179215 /// Maximum transmit sizes per topic, with a default if not specified.
180216 max_transmit_sizes : HashMap < TopicHash , usize > ,
217+ /// Optional spawner for message verification.
218+ message_verification_spawner : Option <
219+ Arc <
220+ dyn Fn (
221+ Box < dyn FnOnce ( ) -> ValidationResult + Send > ,
222+ ) -> Pin < Box < dyn Future < Output = ValidationResult > + Send > >
223+ + Send
224+ + Sync
225+ + ' static ,
226+ > ,
227+ > ,
181228}
182229
183230/// Result of message validation
184231#[ derive( Debug ) ]
185- enum ValidationResult {
232+ pub enum ValidationResult {
186233 Valid {
187234 source : Option < PeerId > ,
188235 sequence_number : Option < u64 > ,
@@ -195,12 +242,24 @@ impl GossipsubCodec {
195242 max_length : usize ,
196243 validation_mode : ValidationMode ,
197244 max_transmit_sizes : HashMap < TopicHash , usize > ,
245+ message_verification_spawner : Option <
246+ Arc <
247+ dyn Fn (
248+ Box < dyn FnOnce ( ) -> ValidationResult + Send > ,
249+ )
250+ -> Pin < Box < dyn Future < Output = ValidationResult > + Send > >
251+ + Send
252+ + Sync
253+ + ' static ,
254+ > ,
255+ > ,
198256 ) -> GossipsubCodec {
199257 let codec = quick_protobuf_codec:: Codec :: new ( max_length) ;
200258 GossipsubCodec {
201259 validation_mode,
202260 codec,
203261 max_transmit_sizes,
262+ message_verification_spawner,
204263 }
205264 }
206265
@@ -408,13 +467,70 @@ impl Decoder for GossipsubCodec {
408467 // Store any invalid messages.
409468 let mut invalid_messages = Vec :: new ( ) ;
410469
411- for message in rpc. publish . into_iter ( ) {
412- // Validate the message using the extracted validation function
413- match Self :: validate_message (
414- self . validation_mode . clone ( ) ,
415- & self . max_transmit_sizes ,
416- & message,
417- ) {
470+ // Collect validation results and corresponding messages
471+ let mut validation_results = Vec :: new ( ) ;
472+ let mut validation_futures = Vec :: new ( ) ;
473+ let messages_to_process: Vec < _ > = rpc. publish . into_iter ( ) . collect ( ) ;
474+
475+ for message in & messages_to_process {
476+ if let Some ( spawner) = & self . message_verification_spawner {
477+ // Use spawner - defer validation
478+ let validation_mode = self . validation_mode . clone ( ) ;
479+ let max_transmit_sizes = self . max_transmit_sizes . clone ( ) ;
480+ let message_clone = message. clone ( ) ;
481+ let future = spawner ( Box :: new ( move || {
482+ Self :: validate_message ( validation_mode, & max_transmit_sizes, & message_clone)
483+ } ) ) ;
484+ validation_futures. push ( future) ;
485+ } else {
486+ // No spawner - validate immediately
487+ let result = Self :: validate_message (
488+ self . validation_mode . clone ( ) ,
489+ & self . max_transmit_sizes ,
490+ message,
491+ ) ;
492+ validation_results. push ( result) ;
493+ }
494+ }
495+
496+ // Poll all validation futures until completion
497+ if !validation_futures. is_empty ( ) {
498+ let waker = futures:: task:: noop_waker ( ) ;
499+ let mut context = Context :: from_waker ( & waker) ;
500+ let mut future_results: Vec < Option < ValidationResult > > =
501+ ( 0 ..validation_futures. len ( ) ) . map ( |_| None ) . collect ( ) ;
502+
503+ // Poll until all futures are ready
504+ loop {
505+ let mut all_ready = true ;
506+ for ( i, future) in validation_futures. iter_mut ( ) . enumerate ( ) {
507+ if future_results[ i] . is_none ( ) {
508+ match future. as_mut ( ) . poll ( & mut context) {
509+ Poll :: Ready ( result) => {
510+ future_results[ i] = Some ( result) ;
511+ }
512+ Poll :: Pending => {
513+ all_ready = false ;
514+ std:: thread:: yield_now ( ) ;
515+ }
516+ }
517+ }
518+ }
519+ if all_ready {
520+ break ;
521+ }
522+ }
523+
524+ // Move future results into the main validation_results vector
525+ validation_results. extend ( future_results. into_iter ( ) . map ( |r| r. unwrap ( ) ) ) ;
526+ }
527+
528+ // Process all validation results uniformly
529+ for ( message, validation_result) in messages_to_process
530+ . into_iter ( )
531+ . zip ( validation_results. into_iter ( ) )
532+ {
533+ match validation_result {
418534 ValidationResult :: Valid {
419535 source,
420536 sequence_number,
@@ -455,8 +571,6 @@ impl Decoder for GossipsubCodec {
455571 validated : false ,
456572 } ;
457573 invalid_messages. push ( ( raw_message, validation_error) ) ;
458- // proceed to the next message
459- continue ;
460574 }
461575 }
462576 }
@@ -650,8 +764,12 @@ mod tests {
650764 timeout : Delay :: new ( Duration :: from_secs ( 1 ) ) ,
651765 } ;
652766
653- let mut codec =
654- GossipsubCodec :: new ( u32:: MAX as usize , ValidationMode :: Strict , HashMap :: new ( ) ) ;
767+ let mut codec = GossipsubCodec :: new (
768+ u32:: MAX as usize ,
769+ ValidationMode :: Strict ,
770+ HashMap :: new ( ) ,
771+ None ,
772+ ) ;
655773 let mut buf = BytesMut :: new ( ) ;
656774 codec. encode ( rpc. into_protobuf ( ) , & mut buf) . unwrap ( ) ;
657775 let decoded_rpc = codec. decode ( & mut buf) . unwrap ( ) . unwrap ( ) ;
0 commit comments