@@ -423,38 +423,77 @@ func (pf *pipeFactory) NewFuncRequest(f func(context.Context) (interface{}, erro
423
423
}
424
424
425
425
func debugSchedulerPreUnpark (e * edge , inc []pipe.Sender , updates , allPipes []pipe.Receiver ) {
426
- log := bklog .G (context .TODO ())
427
-
428
- log .Debugf (">> unpark %s req=%d upt=%d out=%d state=%s %s" , e .edge .Vertex .Name (), len (inc ), len (updates ), len (allPipes ), e .state , e .edge .Vertex .Digest ())
426
+ log := bklog .G (context .TODO ()).
427
+ WithField ("edge_vertex_name" , e .edge .Vertex .Name ()).
428
+ WithField ("edge_vertex_digest" , e .edge .Vertex .Digest ()).
429
+ WithField ("edge_index" , e .edge .Index )
430
+
431
+ log .
432
+ WithField ("edge_state" , e .state ).
433
+ WithField ("req" , len (inc )).
434
+ WithField ("upt" , len (updates )).
435
+ WithField ("out" , len (allPipes )).
436
+ Debug (">> unpark" )
429
437
430
438
for i , dep := range e .deps {
431
439
des := edgeStatusInitial
432
440
if dep .req != nil {
433
441
des = dep .req .Request ().(* edgeRequest ).desiredState
434
442
}
435
- log .Debugf (":: dep%d %s state=%s des=%s keys=%d hasslowcache=%v preprocessfunc=%v" , i , e .edge .Vertex .Inputs ()[i ].Vertex .Name (), dep .state , des , len (dep .keys ), e .slowCacheFunc (dep ) != nil , e .preprocessFunc (dep ) != nil )
443
+ log .
444
+ WithField ("dep_index" , i ).
445
+ WithField ("dep_vertex_name" , e .edge .Vertex .Inputs ()[i ].Vertex .Name ()).
446
+ WithField ("dep_vertex_digest" , e .edge .Vertex .Inputs ()[i ].Vertex .Digest ()).
447
+ WithField ("dep_state" , dep .state ).
448
+ WithField ("dep_desired_state" , des ).
449
+ WithField ("dep_keys" , len (dep .keys )).
450
+ WithField ("dep_has_slow_cache" , e .slowCacheFunc (dep ) != nil ).
451
+ WithField ("dep_preprocess_func" , e .preprocessFunc (dep ) != nil ).
452
+ Debug (":: dep" )
436
453
}
437
454
438
455
for i , in := range inc {
439
456
req := in .Request ()
440
- log .Debugf ("> incoming-%d: %p dstate=%s canceled=%v" , i , in , req .Payload .(* edgeRequest ).desiredState , req .Canceled )
457
+ log .
458
+ WithField ("incoming_index" , i ).
459
+ WithField ("incoming_pointer" , in ).
460
+ WithField ("incoming_desired_state" , req .Payload .(* edgeRequest ).desiredState ).
461
+ WithField ("incoming_canceled" , req .Canceled ).
462
+ Debug ("> incoming" )
441
463
}
442
464
443
465
for i , up := range updates {
444
466
if up == e .cacheMapReq {
445
- log .Debugf ("> update-%d: %p cacheMapReq complete=%v" , i , up , up .Status ().Completed )
467
+ log .
468
+ WithField ("update_index" , i ).
469
+ WithField ("update_pointer" , up ).
470
+ WithField ("update_complete" , up .Status ().Completed ).
471
+ Debug ("> update cacheMapReq" )
446
472
} else if up == e .execReq {
447
- log .Debugf ("> update-%d: %p execReq complete=%v" , i , up , up .Status ().Completed )
473
+ log .
474
+ WithField ("update_index" , i ).
475
+ WithField ("update_pointer" , up ).
476
+ WithField ("update_complete" , up .Status ().Completed ).
477
+ Debug ("> update execReq" )
448
478
} else {
449
479
st , ok := up .Status ().Value .(* edgeState )
450
480
if ok {
451
481
index := - 1
452
482
if dep , ok := e .depRequests [up ]; ok {
453
483
index = int (dep .index )
454
484
}
455
- log .Debugf ("> update-%d: %p input-%d keys=%d state=%s" , i , up , index , len (st .keys ), st .state )
485
+ log .
486
+ WithField ("update_index" , i ).
487
+ WithField ("update_pointer" , up ).
488
+ WithField ("update_complete" , up .Status ().Completed ).
489
+ WithField ("update_input_index" , index ).
490
+ WithField ("update_keys" , len (st .keys )).
491
+ WithField ("update_state" , st .state ).
492
+ Debugf ("> update edgeState" )
456
493
} else {
457
- log .Debugf ("> update-%d: unknown" , i )
494
+ log .
495
+ WithField ("update_index" , i ).
496
+ Debug ("> update unknown" )
458
497
}
459
498
}
460
499
}
@@ -463,7 +502,16 @@ func debugSchedulerPreUnpark(e *edge, inc []pipe.Sender, updates, allPipes []pip
463
502
func debugSchedulerPostUnpark (e * edge , inc []pipe.Sender ) {
464
503
log := bklog .G (context .TODO ())
465
504
for i , in := range inc {
466
- log .Debugf ("< incoming-%d: %p completed=%v" , i , in , in .Status ().Completed )
467
- }
468
- log .Debugf ("<< unpark %s\n " , e .edge .Vertex .Name ())
505
+ log .
506
+ WithField ("incoming_index" , i ).
507
+ WithField ("incoming_pointer" , in ).
508
+ WithField ("incoming_complete" , in .Status ().Completed ).
509
+ Debug ("< incoming" )
510
+ }
511
+ log .
512
+ WithField ("edge_vertex_name" , e .edge .Vertex .Name ()).
513
+ WithField ("edge_vertex_digest" , e .edge .Vertex .Digest ()).
514
+ WithField ("edge_index" , e .edge .Index ).
515
+ WithField ("edge_state" , e .state ).
516
+ Debug ("<< unpark" )
469
517
}
0 commit comments