@@ -549,9 +549,13 @@ func dispatchStreamingRequest[Q streamingRequestMessage, R any](
549549 contexts := make (map [string ]ctxAndCancel , len (validSecondaryDispatchers )+ 1 )
550550
551551 primaryCtx , primaryCancelFn := context .WithCancel (ctxWithTimeout )
552+ primaryDeadline , _ := primaryCtx .Deadline ()
553+ primaryCtx = log .Ctx (primaryCtx ).With ().Time ("deadline" , primaryDeadline ).Logger ().WithContext (primaryCtx )
552554 contexts [primaryDispatcher ] = ctxAndCancel {primaryCtx , primaryCancelFn }
553555 for _ , secondary := range validSecondaryDispatchers {
554556 secondaryCtx , secondaryCancelFn := context .WithCancel (ctxWithTimeout )
557+ secondaryDeadline , _ := secondaryCtx .Deadline ()
558+ secondaryCtx = log .Ctx (secondaryCtx ).With ().Time ("deadline" , secondaryDeadline ).Logger ().WithContext (secondaryCtx )
555559 contexts [secondary .Name ] = ctxAndCancel {secondaryCtx , secondaryCancelFn }
556560 }
557561
@@ -575,14 +579,13 @@ func dispatchStreamingRequest[Q streamingRequestMessage, R any](
575579 )
576580
577581 runHandler := func (handlerContext context.Context , name string , clusterClient ClusterClient ) {
582+ defer wg .Done ()
578583 if name == "" {
579584 log .Ctx (handlerContext ).Warn ().Msg ("attempting to run a dispatch handler with an empty name, skipping" )
580- wg .Done ()
581585 return
582586 }
583587
584- log .Ctx (handlerContext ).Debug ().Str ("dispatcher" , name ).Msg ("running secondary dispatcher" )
585- defer wg .Done ()
588+ log .Ctx (handlerContext ).Debug ().Str ("dispatcher" , name ).Msg ("preparing to run streaming dispatcher" )
586589
587590 var startTime time.Time
588591 isPrimary := name == primaryDispatcher
@@ -605,9 +608,9 @@ func dispatchStreamingRequest[Q streamingRequestMessage, R any](
605608 // Do the rest of the work in a function
606609 }
607610
608- log .Ctx (handlerContext ).Trace ().Str ("dispatcher" , name ).Msg ("running streaming dispatcher" )
611+ log .Ctx (handlerContext ).Trace ().Str ("dispatcher" , name ).Msg ("creating streaming dispatcher stream client " )
609612 client , err := handler (handlerContext , clusterClient )
610- log .Ctx (handlerContext ).Trace ().Str ("dispatcher" , name ).Msg ("streaming dispatcher completed initial request " )
613+ log .Ctx (handlerContext ).Trace ().Str ("dispatcher" , name ).Msg ("created streaming dispatcher stream client " )
611614 if isPrimary {
612615 primaryDispatch .WithLabelValues ("false" , reqKey ).Inc ()
613616 }
@@ -1010,7 +1013,7 @@ func (s *primarySleeper) sleep(parentCtx context.Context) {
10101013 s .lock .Unlock ()
10111014
10121015 hedgeWaitHistogram .WithLabelValues (s .reqKey ).Observe (s .waitTime .Seconds ())
1013- log .Ctx (parentCtx ).Trace ().Str ("request-key" , s .reqKey ).Dur ("wait-time" , s .waitTime ).Msg ("primary dispatch waiting before running" )
1016+ log .Ctx (parentCtx ).Trace ().Str ("request-key" , s .reqKey ).Stringer ("wait-time" , s .waitTime ).Msg ("primary dispatch waiting before running" )
10141017
10151018 startTime := time .Now ()
10161019 timer := time .NewTimer (s .waitTime )
0 commit comments