@@ -3,16 +3,20 @@ use std::sync::Arc;
33use std:: time:: Duration ;
44
55use libp2p:: identity:: { PeerId , PublicKey } ;
6- use tokio :: sync :: mpsc ;
7- use tokio:: time :: sleep_until ;
6+ use rand :: seq :: SliceRandom ;
7+ use tokio:: sync :: { mpsc , oneshot } ;
88use tracing:: { debug, trace} ;
99
10+ use crate :: sharding:: reconstruct_message_from_shards;
1011use crate :: tree:: PropellerScheduleManager ;
11- use crate :: types:: { Channel , Event , MessageRoot } ;
12+ use crate :: types:: { Channel , Event , MessageRoot , ReconstructionError , ShardValidationError } ;
1213use crate :: unit:: PropellerUnit ;
13- use crate :: ShardIndex ;
14+ use crate :: unit_validator:: UnitValidator ;
15+ use crate :: { MerkleProof , ShardIndex } ;
1416
1517pub type UnitToValidate = ( PeerId , PropellerUnit ) ;
18+ type ValidationResult = ( Result < ( ) , ShardValidationError > , UnitValidator , PropellerUnit ) ;
19+ type ReconstructionResult = Result < ReconstructionSuccess , ReconstructionError > ;
1620
1721#[ derive( Debug ) ]
1822pub enum EventStateManagerToEngine {
@@ -21,6 +25,61 @@ pub enum EventStateManagerToEngine {
2125 BroadcastUnit { unit : PropellerUnit , peers : Vec < PeerId > } ,
2226}
2327
28+ #[ derive( Debug ) ]
29+ struct ReconstructionSuccess {
30+ message : Vec < u8 > ,
31+ my_shard : Vec < u8 > ,
32+ my_shard_proof : MerkleProof ,
33+ }
34+
35+ /// Tracks reconstruction progress for a single message.
36+ struct ReconstructionState {
37+ received_shards : Vec < PropellerUnit > ,
38+ received_my_index : bool ,
39+ signature : Option < Vec < u8 > > ,
40+ reconstructed_message : Option < Vec < u8 > > ,
41+ count_at_reconstruction : usize ,
42+ additional_shards_after_reconstruction : usize ,
43+ }
44+
45+ impl ReconstructionState {
46+ fn new ( ) -> Self {
47+ Self {
48+ received_shards : Vec :: new ( ) ,
49+ received_my_index : false ,
50+ signature : None ,
51+ reconstructed_message : None ,
52+ count_at_reconstruction : 0 ,
53+ additional_shards_after_reconstruction : 0 ,
54+ }
55+ }
56+
57+ fn is_reconstructed ( & self ) -> bool {
58+ self . reconstructed_message . is_some ( )
59+ }
60+
61+ fn record_shard ( & mut self , is_my_shard : bool ) {
62+ if is_my_shard {
63+ self . received_my_index = true ;
64+ } else if self . is_reconstructed ( ) {
65+ self . additional_shards_after_reconstruction += 1 ;
66+ }
67+ }
68+
69+ fn capture_signature ( & mut self , unit : & PropellerUnit ) {
70+ if self . signature . is_none ( ) {
71+ self . signature = Some ( unit. signature ( ) . to_vec ( ) ) ;
72+ }
73+ }
74+
75+ /// Total shard count used for the access-threshold check.
76+ fn access_count ( & self ) -> usize {
77+ self . count_at_reconstruction
78+ + self . additional_shards_after_reconstruction
79+ + usize:: from ( !self . received_my_index )
80+ }
81+ }
82+
2483/// Message processor that handles validation and state management for a single message.
2584pub struct MessageProcessor {
2685 pub channel : Channel ,
@@ -47,18 +106,10 @@ impl MessageProcessor {
47106 self . channel, self . publisher, self . message_root
48107 ) ;
49108
50- // Local state variables
51- let deadline = tokio:: time:: Instant :: now ( ) + self . timeout ;
52-
53- // TODO(AndrewL): remove this
54- #[ allow( clippy:: never_loop) ]
55- loop {
56- tokio:: select! {
57- _ = sleep_until( deadline) => {
58- let _ = self . emit_timeout_and_finalize( ) . await ;
59- break ;
60- }
61- }
109+ let timed_out = tokio:: time:: timeout ( self . timeout , self . process_units ( ) ) . await . is_err ( ) ;
110+
111+ if timed_out {
112+ self . emit_timeout_and_finalize ( ) ;
62113 }
63114
64115 debug ! (
@@ -67,32 +118,216 @@ impl MessageProcessor {
67118 ) ;
68119 }
69120
70- async fn emit_timeout_and_finalize ( & mut self ) -> ControlFlow < ( ) > {
121+ async fn process_units ( & mut self ) {
122+ let mut validator = UnitValidator :: new (
123+ self . channel ,
124+ self . publisher ,
125+ self . publisher_public_key . clone ( ) ,
126+ self . message_root ,
127+ Arc :: clone ( & self . tree_manager ) ,
128+ ) ;
129+ let mut state = ReconstructionState :: new ( ) ;
130+
131+ while let Some ( ( sender, unit) ) = self . unit_rx . recv ( ) . await {
132+ trace ! ( "[MSG_PROC] Validating unit from sender={:?} index={:?}" , sender, unit. index( ) ) ;
133+
134+ let ( result, returned_validator, unit) =
135+ Self :: validate_on_rayon ( validator, sender, unit) . await ;
136+ validator = returned_validator;
137+
138+ if let Err ( err) = result {
139+ trace ! ( "[MSG_PROC] Validation failed for index={:?}: {:?}" , unit. index( ) , err) ;
140+ continue ;
141+ }
142+
143+ self . maybe_broadcast_my_shard ( & unit, & state) ;
144+ state. record_shard ( unit. index ( ) == self . my_shard_index ) ;
145+ state. capture_signature ( & unit) ;
146+
147+ if self . advance_reconstruction ( unit, & mut state) . await . is_break ( ) {
148+ return ;
149+ }
150+ }
151+
71152 trace ! (
72- "[MSG_PROC] Timeout reached for channel={:?} publisher={:?} root={:?}" ,
153+ "[MSG_PROC] All channels closed for channel={:?} publisher={:?} root={:?}" ,
73154 self . channel,
74155 self . publisher,
75156 self . message_root
76157 ) ;
158+ self . finalize ( ) ;
159+ }
77160
78- self . emit_and_finalize ( Event :: MessageTimeout {
79- channel : self . channel ,
161+ // --- Validation --------------------------------------------------------
162+
163+ /// Offloads CPU-bound validation (signature verification, merkle proofs) to the rayon thread
164+ /// pool to avoid blocking the tokio runtime. Benchmarked to outperform `spawn_blocking`.
165+ async fn validate_on_rayon (
166+ mut validator : UnitValidator ,
167+ sender : PeerId ,
168+ unit : PropellerUnit ,
169+ ) -> ValidationResult {
170+ let ( tx, rx) = oneshot:: channel ( ) ;
171+ rayon:: spawn ( move || {
172+ let result = validator. validate_shard ( sender, & unit) ;
173+ let _ = tx. send ( ( result, validator, unit) ) ;
174+ } ) ;
175+ rx. await . expect ( "Validation task panicked" )
176+ }
177+
178+ // --- Broadcasting ------------------------------------------------------
179+
180+ fn maybe_broadcast_my_shard ( & self , unit : & PropellerUnit , state : & ReconstructionState ) {
181+ if unit. index ( ) == self . my_shard_index && !state. received_my_index {
182+ self . broadcast_shard ( unit) ;
183+ }
184+ }
185+
186+ fn broadcast_shard ( & self , unit : & PropellerUnit ) {
187+ let mut peers: Vec < PeerId > = self
188+ . tree_manager
189+ . get_nodes ( )
190+ . iter ( )
191+ . map ( |( p, _) | * p)
192+ . filter ( |p| * p != self . publisher && * p != self . local_peer_id )
193+ . collect ( ) ;
194+ peers. shuffle ( & mut rand:: thread_rng ( ) ) ;
195+ trace ! ( "[MSG_PROC] Broadcasting unit index={:?} to {} peers" , unit. index( ) , peers. len( ) ) ;
196+ self . engine_tx
197+ . send ( EventStateManagerToEngine :: BroadcastUnit { unit : unit. clone ( ) , peers } )
198+ . expect ( "Engine task has exited" ) ;
199+ }
200+
201+ // --- Reconstruction ----------------------------------------------------
202+
203+ async fn advance_reconstruction (
204+ & self ,
205+ unit : PropellerUnit ,
206+ state : & mut ReconstructionState ,
207+ ) -> ControlFlow < ( ) > {
208+ if state. is_reconstructed ( ) {
209+ return self . maybe_emit_message ( state) ;
210+ }
211+
212+ state. received_shards . push ( unit) ;
213+
214+ if !self . tree_manager . should_build ( state. received_shards . len ( ) ) {
215+ return ControlFlow :: Continue ( ( ) ) ;
216+ }
217+
218+ trace ! ( "[MSG_PROC] Starting reconstruction with {} shards" , state. received_shards. len( ) ) ;
219+ state. count_at_reconstruction = state. received_shards . len ( ) ;
220+
221+ match self . reconstruct_on_rayon ( state) . await {
222+ Ok ( success) => self . handle_reconstruction_success ( success, state) ,
223+ Err ( e) => {
224+ tracing:: error!( "[MSG_PROC] Reconstruction failed: {:?}" , e) ;
225+ self . emit_and_finalize ( Event :: MessageReconstructionFailed {
226+ publisher : self . publisher ,
227+ message_root : self . message_root ,
228+ error : e,
229+ } )
230+ }
231+ }
232+ }
233+
234+ /// Offloads erasure-coding reconstruction to rayon.
235+ async fn reconstruct_on_rayon ( & self , state : & mut ReconstructionState ) -> ReconstructionResult {
236+ let shards = std:: mem:: take ( & mut state. received_shards ) ;
237+ let message_root = self . message_root ;
238+ let my_index: usize = self . my_shard_index . 0 . try_into ( ) . unwrap ( ) ;
239+ let data_count = self . tree_manager . num_data_shards ( ) ;
240+ let coding_count = self . tree_manager . num_coding_shards ( ) ;
241+
242+ let ( tx, rx) = oneshot:: channel ( ) ;
243+ rayon:: spawn ( move || {
244+ let result = reconstruct_message_from_shards (
245+ shards,
246+ message_root,
247+ my_index,
248+ data_count,
249+ coding_count,
250+ )
251+ . map ( |( message, my_shard, my_shard_proof) | ReconstructionSuccess {
252+ message,
253+ my_shard,
254+ my_shard_proof,
255+ } ) ;
256+ let _ = tx. send ( result) ;
257+ } ) ;
258+ rx. await . expect ( "Reconstruction task panicked" )
259+ }
260+
261+ fn handle_reconstruction_success (
262+ & self ,
263+ success : ReconstructionSuccess ,
264+ state : & mut ReconstructionState ,
265+ ) -> ControlFlow < ( ) > {
266+ let ReconstructionSuccess { message, my_shard, my_shard_proof } = success;
267+
268+ if !state. received_my_index {
269+ let signature = state. signature . clone ( ) . expect ( "Signature must exist" ) ;
270+ let reconstructed_unit = PropellerUnit :: new (
271+ self . channel ,
272+ self . publisher ,
273+ self . message_root ,
274+ signature,
275+ self . my_shard_index ,
276+ my_shard,
277+ my_shard_proof,
278+ ) ;
279+ self . broadcast_shard ( & reconstructed_unit) ;
280+ }
281+
282+ state. reconstructed_message = Some ( message) ;
283+ self . maybe_emit_message ( state)
284+ }
285+
286+ // --- Emission / finalization -------------------------------------------
287+
288+ fn maybe_emit_message ( & self , state : & mut ReconstructionState ) -> ControlFlow < ( ) > {
289+ if !self . tree_manager . should_receive ( state. access_count ( ) ) {
290+ return ControlFlow :: Continue ( ( ) ) ;
291+ }
292+
293+ trace ! ( "[MSG_PROC] Access threshold reached, emitting message" ) ;
294+ let message = state. reconstructed_message . take ( ) . expect ( "Message must exist" ) ;
295+ self . emit_and_finalize ( Event :: MessageReceived {
80296 publisher : self . publisher ,
81297 message_root : self . message_root ,
298+ message,
82299 } )
83300 }
84301
302+ fn emit_timeout_and_finalize ( & self ) {
303+ trace ! (
304+ "[MSG_PROC] Timeout reached for channel={:?} publisher={:?} root={:?}" ,
305+ self . channel,
306+ self . publisher,
307+ self . message_root
308+ ) ;
309+ let _ = self . emit_and_finalize ( Event :: MessageTimeout {
310+ channel : self . channel ,
311+ publisher : self . publisher ,
312+ message_root : self . message_root ,
313+ } ) ;
314+ }
315+
85316 fn emit_and_finalize ( & self , event : Event ) -> ControlFlow < ( ) > {
86317 self . engine_tx
87318 . send ( EventStateManagerToEngine :: BehaviourEvent ( event) )
88319 . expect ( "Engine task has exited" ) ;
320+ self . finalize ( ) ;
321+ ControlFlow :: Break ( ( ) )
322+ }
323+
324+ fn finalize ( & self ) {
89325 self . engine_tx
90326 . send ( EventStateManagerToEngine :: Finalized {
91327 channel : self . channel ,
92328 publisher : self . publisher ,
93329 message_root : self . message_root ,
94330 } )
95331 . expect ( "Engine task has exited" ) ;
96- ControlFlow :: Break ( ( ) )
97332 }
98333}
0 commit comments