11use crate :: {
22 config:: { DelaySchedule , RecipientCountSchedule } ,
33 dag:: { DagUnit , Request } ,
4- dissemination:: { Addressed , DisseminationMessage , ReconstructionRequest } ,
4+ dissemination:: { Addressed , DisseminationMessage , ReconstructionRequest , LOG_TARGET } ,
55 task_queue:: TaskQueue ,
66 units:: { SignedUnit , Unit , UnitCoord , UnitStore , WrappedUnit } ,
77 Data , DelayConfig , Hasher , MultiKeychain , NodeCount , NodeIndex , NodeMap , Recipient , Round ,
88 Signature ,
99} ;
1010use itertools:: Itertools ;
11+ use log:: trace;
1112use rand:: { prelude:: SliceRandom , Rng } ;
1213use std:: {
1314 collections:: HashSet ,
@@ -103,6 +104,26 @@ impl<H: Hasher> ManagerStatus<H> {
103104 long_time_pending_tasks,
104105 }
105106 }
107+
108+ fn longest_pending_tasks ( & self ) -> Vec < & RepeatableTask < H > > {
109+ const ITEMS_PRINT_LIMIT : usize = 10 ;
110+ self . long_time_pending_tasks
111+ . iter ( )
112+ . fold ( Vec :: new ( ) , |mut highest, task| {
113+ if let Some ( index) = highest
114+ . iter ( )
115+ . position ( |ancient_task| ancient_task. counter < task. counter )
116+ {
117+ highest. insert ( index, task) ;
118+ } else if highest. len ( ) < ITEMS_PRINT_LIMIT {
119+ highest. push ( task) ;
120+ }
121+ if highest. len ( ) > ITEMS_PRINT_LIMIT {
122+ highest. pop ( ) ;
123+ }
124+ highest
125+ } )
126+ }
106127}
107128
108129impl < H : Hasher > Display for ManagerStatus < H > {
@@ -113,23 +134,15 @@ impl<H: Hasher> Display for ManagerStatus<H> {
113134 "CoordRequest - {}, ParentsRequest - {}, UnitBroadcast - {}" ,
114135 self . coord_request_count, self . parent_request_count, self . rebroadcast_count,
115136 ) ?;
116- const ITEMS_PRINT_LIMIT : usize = 10 ;
117137 if !self . long_time_pending_tasks . is_empty ( ) {
118- write ! ( f, "; pending tasks with counter >= 5 -" ) ?;
119- write ! ( f, " {}" , {
120- self . long_time_pending_tasks
121- . iter( )
122- . take( ITEMS_PRINT_LIMIT )
123- . join( ", " )
138+ write ! (
139+ f,
140+ "; {} pending tasks with counter >= 5," ,
141+ self . long_time_pending_tasks. len( )
142+ ) ?;
143+ write ! ( f, "longest pending: {}" , {
144+ self . longest_pending_tasks( ) . iter( ) . join( ", " )
124145 } ) ?;
125-
126- if let Some ( remaining) = self
127- . long_time_pending_tasks
128- . len ( )
129- . checked_sub ( ITEMS_PRINT_LIMIT )
130- {
131- write ! ( f, " and {remaining} more" ) ?
132- }
133146 }
134147 Ok ( ( ) )
135148 }
@@ -265,19 +278,28 @@ impl<H: Hasher> Manager<H> {
265278 stored_units : & UnitStore < DagUnit < H , D , MK > > ,
266279 processing_units : & UnitStore < SignedUnit < H , D , MK > > ,
267280 ) -> Vec < Addressed < DisseminationMessage < H , D , MK :: Signature > > > {
281+ trace ! ( target: LOG_TARGET , "Checking for due tasks." ) ;
268282 use TaskDetails :: * ;
269283 let mut result = Vec :: new ( ) ;
270284 while let Some ( mut task) = self . task_queue . pop_due_task ( ) {
285+ trace ! ( target: LOG_TARGET , "Triggering due task: {:?}" , task) ;
271286 match self . task_details ( & task, stored_units, processing_units) {
272- Cancel => ( ) ,
273- Delay ( delay) => self . task_queue . schedule_in ( task, delay) ,
287+ Cancel => {
288+ trace ! ( target: LOG_TARGET , "Task outdated, dropped." ) ;
289+ }
290+ Delay ( delay) => {
291+ trace ! ( target: LOG_TARGET , "Task pending verification, delayed by {:?}." , delay) ;
292+ self . task_queue . schedule_in ( task, delay)
293+ }
274294 Perform { message, delay } => {
295+ trace ! ( target: LOG_TARGET , "Executing task by sending {:?}, and rescheduling it delayed by {:?}." , message, delay) ;
275296 result. push ( message) ;
276297 task. counter += 1 ;
277298 self . task_queue . schedule_in ( task, delay)
278299 }
279300 }
280301 }
302+ trace ! ( target: LOG_TARGET , "Task resulted in sending {} messages." , result. len( ) ) ;
281303 result
282304 }
283305
@@ -289,11 +311,13 @@ impl<H: Hasher> Manager<H> {
289311 stored_units : & UnitStore < DagUnit < H , D , MK > > ,
290312 processing_units : & UnitStore < SignedUnit < H , D , MK > > ,
291313 ) -> Vec < Addressed < DisseminationMessage < H , D , MK :: Signature > > > {
314+ trace ! ( target: LOG_TARGET , "Handling newly created request: {:?}" , request) ;
292315 if let Request :: Coord ( coord) = request {
293316 if !self . missing_coords . insert ( coord) {
294317 return Vec :: new ( ) ;
295318 }
296319 }
320+ trace ! ( target: LOG_TARGET , "Scheduling newly created request: {:?}" , request) ;
297321 self . task_queue
298322 . schedule_now ( RepeatableTask :: new ( DisseminationTask :: Request ( request) ) ) ;
299323 self . trigger_tasks ( stored_units, processing_units)
@@ -305,6 +329,7 @@ impl<H: Hasher> Manager<H> {
305329 & mut self ,
306330 unit : & DagUnit < H , D , MK > ,
307331 ) -> Option < Addressed < DisseminationMessage < H , D , MK :: Signature > > > {
332+ trace ! ( target: LOG_TARGET , "New unit with hash {:?} at {}." , unit. hash( ) , unit. coord( ) ) ;
308333 let hash = unit. hash ( ) ;
309334 let round = unit. round ( ) ;
310335 let creator = unit. creator ( ) ;
@@ -321,6 +346,7 @@ impl<H: Hasher> Manager<H> {
321346 RepeatableTask :: new ( DisseminationTask :: Broadcast ( hash) ) ,
322347 self . broadcast_delay ( ) ,
323348 ) ;
349+ trace ! ( target: LOG_TARGET , "Scheduled broadcast for unit with hash {:?}." , unit. hash( ) ) ;
324350 match creator == self . index ( ) {
325351 true => Some ( Addressed :: broadcast ( DisseminationMessage :: Unit (
326352 unit. clone ( ) . unpack ( ) . into ( ) ,
0 commit comments