@@ -192,12 +192,12 @@ func (dal *digestAndLock) getWaitTime(maximumHedgingDelay time.Duration) time.Du
192192 return waitTime
193193}
194194
195- func (dal * digestAndLock ) addResultTime (duration time.Duration ) {
195+ func (dal * digestAndLock ) addResultTime (ctx context. Context , duration time.Duration ) {
196196 if dal .lock .TryLock () {
197197 err := dal .digest .Add (float64 (duration .Milliseconds ()))
198198 dal .lock .Unlock ()
199199 if err != nil {
200- log .Warn ().Err (err ).Msg ("error when trying to add result time to digest" )
200+ log .Ctx ( ctx ). Warn ().Err (err ).Msg ("error when trying to add result time to digest" )
201201 }
202202 }
203203}
@@ -290,16 +290,16 @@ func dispatchSyncRequest[Q requestMessage, S responseMessage](
290290 }
291291
292292 // Run the dispatch expression to find the name(s) of the secondary dispatchers to use, if any.
293- log .Trace ().Object ("request" , req ).Msg ("running dispatch expression" )
293+ log .Ctx ( ctx ). Trace ().Object ("request" , req ).Msg ("running dispatch expression" )
294294 secondaryDispatcherNames , err := RunDispatchExpr (expr , req )
295295 if err != nil {
296- log .Warn ().Err (err ).Msg ("error when trying to evaluate the dispatch expression" )
296+ log .Ctx ( ctx ). Warn ().Err (err ).Msg ("error when trying to evaluate the dispatch expression" )
297297 return handler (withTimeout , cr .clusterClient )
298298 }
299299
300300 if len (secondaryDispatcherNames ) == 0 {
301301 // If no secondary dispatches are defined, just invoke directly.
302- log .Trace ().Str ("request-key" , reqKey ).Msg ("no secondary dispatches defined, running primary dispatch" )
302+ log .Ctx ( ctx ). Trace ().Str ("request-key" , reqKey ).Msg ("no secondary dispatches defined, running primary dispatch" )
303303 primaryDispatch .WithLabelValues ("false" , reqKey ).Inc ()
304304 return handler (withTimeout , cr .clusterClient )
305305 }
@@ -308,7 +308,7 @@ func dispatchSyncRequest[Q requestMessage, S responseMessage](
308308 for _ , secondaryDispatchName := range secondaryDispatcherNames {
309309 secondary , ok := cr .secondaryDispatch [secondaryDispatchName ]
310310 if ! ok {
311- log .Warn ().Str ("secondary-dispatcher-name" , secondaryDispatchName ).Msg ("received unknown secondary dispatcher" )
311+ log .Ctx ( ctx ). Warn ().Str ("secondary-dispatcher-name" , secondaryDispatchName ).Msg ("received unknown secondary dispatcher" )
312312 continue
313313 }
314314
@@ -327,16 +327,16 @@ func dispatchSyncRequest[Q requestMessage, S responseMessage](
327327 // potentially return first.
328328 primarySleeper .sleep (withTimeout )
329329
330- log .Trace ().Msg ("running primary dispatch after wait" )
330+ log .Ctx ( ctx ). Trace ().Msg ("running primary dispatch after wait" )
331331 select {
332332 case <- withTimeout .Done ():
333- log .Trace ().Str ("request-key" , reqKey ).Msg ("primary dispatch timed out or was canceled" )
333+ log .Ctx ( ctx ). Trace ().Str ("request-key" , reqKey ).Msg ("primary dispatch timed out or was canceled" )
334334 primaryDispatch .WithLabelValues ("true" , reqKey ).Inc ()
335335 return
336336
337337 default :
338338 resp , err := handler (withTimeout , cr .clusterClient )
339- log .Trace ().Str ("request-key" , reqKey ).Msg ("primary dispatch completed" )
339+ log .Ctx ( ctx ). Trace ().Str ("request-key" , reqKey ).Msg ("primary dispatch completed" )
340340 primaryResultChan <- respTuple [S ]{resp , err }
341341 primaryDispatch .WithLabelValues ("false" , reqKey ).Inc ()
342342 }
@@ -345,18 +345,18 @@ func dispatchSyncRequest[Q requestMessage, S responseMessage](
345345 for _ , secondaryDispatchName := range secondaryDispatcherNames {
346346 secondary , ok := cr .secondaryDispatch [secondaryDispatchName ]
347347 if ! ok {
348- log .Warn ().Str ("secondary-dispatcher-name" , secondaryDispatchName ).Msg ("received unknown secondary dispatcher" )
348+ log .Ctx ( ctx ). Warn ().Str ("secondary-dispatcher-name" , secondaryDispatchName ).Msg ("received unknown secondary dispatcher" )
349349 continue
350350 }
351351
352352 go func () {
353353 select {
354354 case <- withTimeout .Done ():
355- log .Trace ().Str ("secondary" , secondary .Name ).Msg ("secondary dispatch timed out or was canceled" )
355+ log .Ctx ( ctx ). Trace ().Str ("secondary" , secondary .Name ).Msg ("secondary dispatch timed out or was canceled" )
356356 return
357357
358358 default :
359- log .Trace ().Str ("secondary" , secondary .Name ).Msg ("running secondary dispatch" )
359+ log .Ctx ( ctx ). Trace ().Str ("secondary" , secondary .Name ).Msg ("running secondary dispatch" )
360360 startTime := time .Now ()
361361 resp , err := handler (withTimeout , secondary .Client )
362362 endTime := time .Now ()
@@ -365,17 +365,17 @@ func dispatchSyncRequest[Q requestMessage, S responseMessage](
365365 if err != nil {
366366 // For secondary dispatches, ignore any errors, as only the primary will be handled in
367367 // that scenario.
368- log .Trace ().Stringer ("duration" , handlerDuration ).Str ("secondary" , secondary .Name ).Err (err ).Msg ("got ignored secondary dispatch error" )
368+ log .Ctx ( ctx ). Trace ().Stringer ("duration" , handlerDuration ).Str ("secondary" , secondary .Name ).Err (err ).Msg ("got ignored secondary dispatch error" )
369369 cr .supportedResourceSubjectTracker .updateForError (err )
370370
371371 // Cancel the primary's sleep if it is still sleeping, so it can immediately being its own work.
372372 primarySleeper .cancelSleep ()
373373 return
374374 }
375375
376- log .Trace ().Stringer ("duration" , handlerDuration ).Str ("secondary" , secondary .Name ).Msg ("secondary dispatch completed" )
376+ log .Ctx ( ctx ). Trace ().Stringer ("duration" , handlerDuration ).Str ("secondary" , secondary .Name ).Msg ("secondary dispatch completed" )
377377 go cr .supportedResourceSubjectTracker .updateForSuccess (resourceTypeAndRelation , subjectTypeAndRelation )
378- cr .secondaryInitialResponseDigests [reqKey ].addResultTime (handlerDuration )
378+ cr .secondaryInitialResponseDigests [reqKey ].addResultTime (ctx , handlerDuration )
379379 secondaryResultChan <- secondaryRespTuple [S ]{resp : resp , handlerName : secondary .Name }
380380 }
381381 }()
@@ -423,6 +423,7 @@ type receiver[S any] interface {
423423const (
424424 secondaryCursorPrefix = "$$secondary:"
425425 primaryDispatcher = "$primary"
426+ noDispatcherResults = "$$no_dispatcher_results"
426427)
427428
428429func publishClient [R any ](ctx context.Context , client receiver [R ], reqKey string , stream dispatch.Stream [R ], secondaryDispatchName string ) error {
@@ -481,16 +482,16 @@ func dispatchStreamingRequest[Q streamingRequestMessage, R any](
481482 stream dispatch.Stream [R ],
482483 handler func (context.Context , ClusterClient ) (receiver [R ], error ),
483484) error {
484- withTimeout , cancelFn := context .WithTimeout (ctx , cr .dispatchOverallTimeout )
485+ ctxWithTimeout , cancelFn := context .WithTimeout (ctx , cr .dispatchOverallTimeout )
485486 defer cancelFn ()
486487
487488 // If no secondary dispatches are defined, just invoke directly.
488489 if len (cr .secondaryDispatchExprs ) == 0 || len (cr .secondaryDispatch ) == 0 {
489- client , err := handler (withTimeout , cr .clusterClient )
490+ client , err := handler (ctxWithTimeout , cr .clusterClient )
490491 if err != nil {
491492 return err
492493 }
493- return publishClient (withTimeout , client , reqKey , stream , primaryDispatcher )
494+ return publishClient (ctxWithTimeout , client , reqKey , stream , primaryDispatcher )
494495 }
495496
496497 // Check the cursor to see if the dispatch went to one of the secondary endpoints.
@@ -517,7 +518,7 @@ func dispatchStreamingRequest[Q streamingRequestMessage, R any](
517518 } else if expr , ok := cr .secondaryDispatchExprs [reqKey ]; ok {
518519 dispatcherNames , err := RunDispatchExpr (expr , req )
519520 if err != nil {
520- log .Warn ().Err (err ).Msg ("error when trying to evaluate the dispatch expression" )
521+ log .Ctx ( ctxWithTimeout ). Warn ().Err (err ).Msg ("error when trying to evaluate the dispatch expression" )
521522 } else {
522523 for _ , secondaryDispatchName := range dispatcherNames {
523524 if sd , ok := cr .secondaryDispatch [secondaryDispatchName ]; ok {
@@ -533,11 +534,11 @@ func dispatchStreamingRequest[Q streamingRequestMessage, R any](
533534 return errors .New ("cursor locked to unknown secondary dispatcher" )
534535 }
535536
536- client , err := handler (withTimeout , cr .clusterClient )
537+ client , err := handler (ctxWithTimeout , cr .clusterClient )
537538 if err != nil {
538539 return err
539540 }
540- return publishClient (withTimeout , client , reqKey , stream , primaryDispatcher )
541+ return publishClient (ctxWithTimeout , client , reqKey , stream , primaryDispatcher )
541542 }
542543
543544 var maximumHedgingDelay time.Duration
@@ -547,10 +548,14 @@ func dispatchStreamingRequest[Q streamingRequestMessage, R any](
547548
548549 contexts := make (map [string ]ctxAndCancel , len (validSecondaryDispatchers )+ 1 )
549550
550- primaryCtx , primaryCancelFn := context .WithCancel (withTimeout )
551+ primaryCtx , primaryCancelFn := context .WithCancel (ctxWithTimeout )
552+ primaryDeadline , _ := primaryCtx .Deadline ()
553+ primaryCtx = log .Ctx (primaryCtx ).With ().Time ("deadline" , primaryDeadline ).Logger ().WithContext (primaryCtx )
551554 contexts [primaryDispatcher ] = ctxAndCancel {primaryCtx , primaryCancelFn }
552555 for _ , secondary := range validSecondaryDispatchers {
553- secondaryCtx , secondaryCancelFn := context .WithCancel (withTimeout )
556+ secondaryCtx , secondaryCancelFn := context .WithCancel (ctxWithTimeout )
557+ secondaryDeadline , _ := secondaryCtx .Deadline ()
558+ secondaryCtx = log .Ctx (secondaryCtx ).With ().Time ("deadline" , secondaryDeadline ).Logger ().WithContext (secondaryCtx )
554559 contexts [secondary .Name ] = ctxAndCancel {secondaryCtx , secondaryCancelFn }
555560 }
556561
@@ -565,31 +570,35 @@ func dispatchStreamingRequest[Q streamingRequestMessage, R any](
565570
566571 wg .Add (len (validSecondaryDispatchers ))
567572
568- returnedResultsDispatcherName := atomic .NewString ("" )
573+ returnedResultsDispatcherName := atomic .NewString (noDispatcherResults )
569574
570575 primarySleeper := cr .newPrimarySleeper (reqKey ,
571576 tuple .FromCoreRelationReference (req .GetResourceRelation ()),
572577 tuple .FromCoreRelationReference (req .GetSubjectRelation ()),
573578 maximumHedgingDelay ,
574579 )
575580
576- runHandler := func (name string , clusterClient ClusterClient ) {
577- ctx := contexts [name ].ctx
578- log .Debug ().Str ("dispatcher" , name ).Msg ("running secondary dispatcher" )
581+ runHandler := func (handlerContext context.Context , name string , clusterClient ClusterClient ) {
579582 defer wg .Done ()
583+ if name == "" {
584+ log .Ctx (handlerContext ).Warn ().Msg ("attempting to run a dispatch handler with an empty name, skipping" )
585+ return
586+ }
587+
588+ log .Ctx (handlerContext ).Debug ().Str ("dispatcher" , name ).Msg ("preparing to run streaming dispatcher" )
580589
581590 var startTime time.Time
582591 isPrimary := name == primaryDispatcher
583592 if isPrimary {
584593 // Have the primary wait a bit to ensure the secondaries have a chance to return first.
585- primarySleeper .sleep (ctx )
594+ primarySleeper .sleep (handlerContext )
586595 } else {
587596 startTime = time .Now ()
588597 }
589598
590599 select {
591- case <- ctx .Done ():
592- log .Trace ().Str ("dispatcher" , name ).Msg ("dispatcher context canceled" )
600+ case <- handlerContext .Done ():
601+ log .Ctx ( handlerContext ). Trace ().Str ("dispatcher" , name ).Msg ("dispatcher context canceled" )
593602 if isPrimary {
594603 primaryDispatch .WithLabelValues ("true" , reqKey ).Inc ()
595604 }
@@ -599,9 +608,9 @@ func dispatchStreamingRequest[Q streamingRequestMessage, R any](
599608 // Do the rest of the work in a function
600609 }
601610
602- log .Trace ().Str ("dispatcher" , name ).Msg ("running streaming dispatcher" )
603- client , err := handler (ctx , clusterClient )
604- log .Trace ().Str ("dispatcher" , name ).Msg ("streaming dispatcher completed initial request " )
611+ log .Ctx ( handlerContext ). Trace ().Str ("dispatcher" , name ).Msg ("creating streaming dispatcher stream client " )
612+ client , err := handler (handlerContext , clusterClient )
613+ log .Ctx ( handlerContext ). Trace ().Str ("dispatcher" , name ).Msg ("created streaming dispatcher stream client " )
605614 if isPrimary {
606615 primaryDispatch .WithLabelValues ("false" , reqKey ).Inc ()
607616 }
@@ -610,21 +619,23 @@ func dispatchStreamingRequest[Q streamingRequestMessage, R any](
610619 cr .supportedResourceSubjectTracker .updateForError (err )
611620 }
612621
613- log .Warn ().Err (err ).Str ("dispatcher" , name ).Msg ("error when trying to run secondary dispatcher" )
622+ log .Ctx ( handlerContext ). Warn ().Err (err ).Str ("dispatcher" , name ).Msg ("error when trying to run secondary dispatcher" )
614623 errorsByDispatcherName .Store (name , err )
615624 return
616625 }
617626
618627 var hasPublishedFirstResult bool
619628 for {
620629 select {
621- case <- ctx .Done ():
622- log .Trace ().Str ("dispatcher" , name ).Msg ("dispatcher context canceled, in results loop" )
630+ case <- handlerContext .Done ():
631+ log .Ctx ( handlerContext ). Trace ().Str ("dispatcher" , name ).Msg ("dispatcher context canceled, in results loop" )
623632 return
624633
625634 default :
626635 result , err := client .Recv ()
627- log .Trace ().Str ("dispatcher" , name ).Err (err ).Any ("result" , result ).Msg ("dispatcher recv" )
636+ if err != nil {
637+ log .Ctx (handlerContext ).Trace ().Str ("dispatcher" , name ).Err (err ).Msg ("dispatcher recv error" )
638+ }
628639
629640 // isResult is true if the result is not an error or we received an EOF, which is considered
630641 // a "result" (just the end of the stream).
@@ -635,7 +646,7 @@ func dispatchStreamingRequest[Q streamingRequestMessage, R any](
635646 if ! isPrimary {
636647 finishTime := time .Now ()
637648 duration := finishTime .Sub (startTime )
638- cr .secondaryInitialResponseDigests [reqKey ].addResultTime (duration )
649+ cr .secondaryInitialResponseDigests [reqKey ].addResultTime (handlerContext , duration )
639650 go cr .supportedResourceSubjectTracker .updateForSuccess (
640651 tuple .FromCoreRelationReference (req .GetResourceRelation ()),
641652 tuple .FromCoreRelationReference (req .GetSubjectRelation ()),
@@ -644,21 +655,23 @@ func dispatchStreamingRequest[Q streamingRequestMessage, R any](
644655
645656 // If a valid result, and we have not yet returned any results, try take a "lock" on
646657 // returning results to ensure only a single dispatcher returns results.
647- swapped := returnedResultsDispatcherName .CompareAndSwap ("" , name )
658+ swapped := returnedResultsDispatcherName .CompareAndSwap (noDispatcherResults , name )
648659 if ! swapped {
649660 // Another dispatcher has started returning results, so terminate.
650- log .Trace ().Str ("dispatcher" , name ).Msg ("another dispatcher has already returned results" )
661+ log .Ctx ( handlerContext ). Trace ().Str ("dispatcher" , name ).Msg ("another dispatcher has already returned results" )
651662 return
652663 }
653664
665+ log .Ctx (handlerContext ).Trace ().Str ("dispatcher" , name ).Msg ("this dispatcher is the first to return results, will publish them and cancel the others" )
654666 dispatchCounter .WithLabelValues (reqKey , name ).Add (1 )
655667
656668 // Cancel all other contexts to prevent them from running, or stop them
657669 // from running if started.
658- log .Trace ().Str ("dispatcher" , name ).Msg ("canceling other dispatchers" )
659- for key , ctxAndCancel := range contexts {
670+ log .Ctx ( handlerContext ). Trace ().Str ("dispatcher" , name ).Msg ("canceling other dispatchers" )
671+ for key , otherCtx := range contexts {
660672 if key != name {
661- ctxAndCancel .cancel ()
673+ log .Ctx (handlerContext ).Trace ().Str ("canceling-dispatcher" , key ).Msg ("canceling dispatcher context" )
674+ otherCtx .cancel ()
662675 }
663676 }
664677 }
@@ -693,21 +706,27 @@ func dispatchStreamingRequest[Q streamingRequestMessage, R any](
693706
694707 // Run the primary.
695708 if allowPrimary {
696- go runHandler (primaryDispatcher , cr .clusterClient )
709+ go runHandler (contexts [ primaryDispatcher ]. ctx , primaryDispatcher , cr .clusterClient )
697710 }
698711
699712 // Run each of the secondary dispatches.
700713 for _ , secondary := range validSecondaryDispatchers {
701- go runHandler (secondary .Name , secondary .Client )
714+ go runHandler (contexts [ secondary . Name ]. ctx , secondary .Name , secondary .Client )
702715 }
703716
704717 // Wait for all the handlers to finish.
705718 wg .Wait ()
706719
707720 // Check for the first dispatcher that returned results and return its error, if any.
708721 resultHandlerName := returnedResultsDispatcherName .Load ()
709- if resultHandlerName != "" {
722+ if resultHandlerName == "" {
723+ log .Ctx (ctxWithTimeout ).Error ().Msg ("got empty result handler name; this should never happen" )
724+ return spiceerrors .MustBugf ("got empty result handler name" )
725+ }
726+
727+ if resultHandlerName != noDispatcherResults {
710728 if err , ok := errorsByDispatcherName .Load (resultHandlerName ); ok {
729+ log .Ctx (ctxWithTimeout ).Warn ().Err (err ).Str ("dispatcher" , resultHandlerName ).Msg ("dispatcher that returned results encountered an error during streaming" )
711730 return err
712731 }
713732 return nil
@@ -720,7 +739,7 @@ func dispatchStreamingRequest[Q streamingRequestMessage, R any](
720739 allErrors = append (allErrors , value )
721740 return true
722741 })
723- log .Warn ().Err (primaryErr ).Errs ("all-errors" , allErrors ).Msg ("returning primary dispatcher error as no dispatchers returned results" )
742+ log .Ctx ( ctxWithTimeout ). Warn ().Err (primaryErr ).Errs ("all-errors" , allErrors ).Msg ("returning primary dispatcher error as no dispatchers returned results" )
724743 return primaryErr
725744 }
726745
@@ -994,24 +1013,24 @@ func (s *primarySleeper) sleep(parentCtx context.Context) {
9941013 s .lock .Unlock ()
9951014
9961015 hedgeWaitHistogram .WithLabelValues (s .reqKey ).Observe (s .waitTime .Seconds ())
997- log .Trace ().Str ("request-key" , s .reqKey ).Dur ("wait-time" , s .waitTime ).Msg ("primary dispatch waiting before running" )
1016+ log .Ctx ( parentCtx ). Trace ().Str ("request-key" , s .reqKey ).Stringer ("wait-time" , s .waitTime ).Msg ("primary dispatch waiting before running" )
9981017
9991018 startTime := time .Now ()
10001019 timer := time .NewTimer (s .waitTime )
10011020 defer timer .Stop ()
10021021
10031022 select {
10041023 case <- parentCtx .Done ():
1005- log .Trace ().Str ("request-key" , s .reqKey ).Msg ("primary dispatch canceled before waiting completed" )
1024+ log .Ctx ( parentCtx ). Trace ().Str ("request-key" , s .reqKey ).Msg ("primary dispatch canceled before waiting completed" )
10061025 return
10071026
10081027 case <- sleepCtx .Done ():
1009- log .Trace ().Str ("request-key" , s .reqKey ).Msg ("primary dispatch early terminated waiting" )
1028+ log .Ctx ( parentCtx ). Trace ().Str ("request-key" , s .reqKey ).Msg ("primary dispatch early terminated waiting" )
10101029 hedgeActualWaitHistogram .WithLabelValues (s .reqKey ).Observe (time .Since (startTime ).Seconds ())
10111030 return
10121031
10131032 case <- timer .C :
1014- log .Trace ().Str ("request-key" , s .reqKey ).Msg ("primary dispatch finished waiting" )
1033+ log .Ctx ( parentCtx ). Trace ().Str ("request-key" , s .reqKey ).Msg ("primary dispatch finished waiting" )
10151034 hedgeActualWaitHistogram .WithLabelValues (s .reqKey ).Observe (s .waitTime .Seconds ())
10161035 return
10171036 }
0 commit comments