Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cmd/spicedb/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"errors"
"os"
"time"

"github.com/rs/zerolog"
"github.com/sercand/kuberesolver/v5"
Expand All @@ -22,6 +23,7 @@ func main() {

// Set up root logger
// This will typically be overwritten by the logging setup for a given command.
zerolog.TimeFieldFormat = time.RFC3339Nano
log.SetGlobalLogger(zerolog.New(os.Stderr).Level(zerolog.InfoLevel))

// Enable Kubernetes gRPC resolver
Expand Down
121 changes: 70 additions & 51 deletions internal/dispatch/remote/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,12 +192,12 @@ func (dal *digestAndLock) getWaitTime(maximumHedgingDelay time.Duration) time.Du
return waitTime
}

func (dal *digestAndLock) addResultTime(duration time.Duration) {
func (dal *digestAndLock) addResultTime(ctx context.Context, duration time.Duration) {
if dal.lock.TryLock() {
err := dal.digest.Add(float64(duration.Milliseconds()))
dal.lock.Unlock()
if err != nil {
log.Warn().Err(err).Msg("error when trying to add result time to digest")
log.Ctx(ctx).Warn().Err(err).Msg("error when trying to add result time to digest")
}
}
}
Expand Down Expand Up @@ -290,16 +290,16 @@ func dispatchSyncRequest[Q requestMessage, S responseMessage](
}

// Run the dispatch expression to find the name(s) of the secondary dispatchers to use, if any.
log.Trace().Object("request", req).Msg("running dispatch expression")
log.Ctx(ctx).Trace().Object("request", req).Msg("running dispatch expression")
secondaryDispatcherNames, err := RunDispatchExpr(expr, req)
if err != nil {
log.Warn().Err(err).Msg("error when trying to evaluate the dispatch expression")
log.Ctx(ctx).Warn().Err(err).Msg("error when trying to evaluate the dispatch expression")
return handler(withTimeout, cr.clusterClient)
}

if len(secondaryDispatcherNames) == 0 {
// If no secondary dispatches are defined, just invoke directly.
log.Trace().Str("request-key", reqKey).Msg("no secondary dispatches defined, running primary dispatch")
log.Ctx(ctx).Trace().Str("request-key", reqKey).Msg("no secondary dispatches defined, running primary dispatch")
primaryDispatch.WithLabelValues("false", reqKey).Inc()
return handler(withTimeout, cr.clusterClient)
}
Expand All @@ -308,7 +308,7 @@ func dispatchSyncRequest[Q requestMessage, S responseMessage](
for _, secondaryDispatchName := range secondaryDispatcherNames {
secondary, ok := cr.secondaryDispatch[secondaryDispatchName]
if !ok {
log.Warn().Str("secondary-dispatcher-name", secondaryDispatchName).Msg("received unknown secondary dispatcher")
log.Ctx(ctx).Warn().Str("secondary-dispatcher-name", secondaryDispatchName).Msg("received unknown secondary dispatcher")
continue
}

Expand All @@ -327,16 +327,16 @@ func dispatchSyncRequest[Q requestMessage, S responseMessage](
// potentially return first.
primarySleeper.sleep(withTimeout)

log.Trace().Msg("running primary dispatch after wait")
log.Ctx(ctx).Trace().Msg("running primary dispatch after wait")
select {
case <-withTimeout.Done():
log.Trace().Str("request-key", reqKey).Msg("primary dispatch timed out or was canceled")
log.Ctx(ctx).Trace().Str("request-key", reqKey).Msg("primary dispatch timed out or was canceled")
primaryDispatch.WithLabelValues("true", reqKey).Inc()
return

default:
resp, err := handler(withTimeout, cr.clusterClient)
log.Trace().Str("request-key", reqKey).Msg("primary dispatch completed")
log.Ctx(ctx).Trace().Str("request-key", reqKey).Msg("primary dispatch completed")
primaryResultChan <- respTuple[S]{resp, err}
primaryDispatch.WithLabelValues("false", reqKey).Inc()
}
Expand All @@ -345,18 +345,18 @@ func dispatchSyncRequest[Q requestMessage, S responseMessage](
for _, secondaryDispatchName := range secondaryDispatcherNames {
secondary, ok := cr.secondaryDispatch[secondaryDispatchName]
if !ok {
log.Warn().Str("secondary-dispatcher-name", secondaryDispatchName).Msg("received unknown secondary dispatcher")
log.Ctx(ctx).Warn().Str("secondary-dispatcher-name", secondaryDispatchName).Msg("received unknown secondary dispatcher")
continue
}

go func() {
select {
case <-withTimeout.Done():
log.Trace().Str("secondary", secondary.Name).Msg("secondary dispatch timed out or was canceled")
log.Ctx(ctx).Trace().Str("secondary", secondary.Name).Msg("secondary dispatch timed out or was canceled")
return

default:
log.Trace().Str("secondary", secondary.Name).Msg("running secondary dispatch")
log.Ctx(ctx).Trace().Str("secondary", secondary.Name).Msg("running secondary dispatch")
startTime := time.Now()
resp, err := handler(withTimeout, secondary.Client)
endTime := time.Now()
Expand All @@ -365,17 +365,17 @@ func dispatchSyncRequest[Q requestMessage, S responseMessage](
if err != nil {
// For secondary dispatches, ignore any errors, as only the primary will be handled in
// that scenario.
log.Trace().Stringer("duration", handlerDuration).Str("secondary", secondary.Name).Err(err).Msg("got ignored secondary dispatch error")
log.Ctx(ctx).Trace().Stringer("duration", handlerDuration).Str("secondary", secondary.Name).Err(err).Msg("got ignored secondary dispatch error")
cr.supportedResourceSubjectTracker.updateForError(err)

// Cancel the primary's sleep if it is still sleeping, so it can immediately being its own work.
primarySleeper.cancelSleep()
return
}

log.Trace().Stringer("duration", handlerDuration).Str("secondary", secondary.Name).Msg("secondary dispatch completed")
log.Ctx(ctx).Trace().Stringer("duration", handlerDuration).Str("secondary", secondary.Name).Msg("secondary dispatch completed")
go cr.supportedResourceSubjectTracker.updateForSuccess(resourceTypeAndRelation, subjectTypeAndRelation)
cr.secondaryInitialResponseDigests[reqKey].addResultTime(handlerDuration)
cr.secondaryInitialResponseDigests[reqKey].addResultTime(ctx, handlerDuration)
secondaryResultChan <- secondaryRespTuple[S]{resp: resp, handlerName: secondary.Name}
}
}()
Expand Down Expand Up @@ -423,6 +423,7 @@ type receiver[S any] interface {
const (
secondaryCursorPrefix = "$$secondary:"
primaryDispatcher = "$primary"
noDispatcherResults = "$$no_dispatcher_results"
)

func publishClient[R any](ctx context.Context, client receiver[R], reqKey string, stream dispatch.Stream[R], secondaryDispatchName string) error {
Expand Down Expand Up @@ -481,16 +482,16 @@ func dispatchStreamingRequest[Q streamingRequestMessage, R any](
stream dispatch.Stream[R],
handler func(context.Context, ClusterClient) (receiver[R], error),
) error {
withTimeout, cancelFn := context.WithTimeout(ctx, cr.dispatchOverallTimeout)
ctxWithTimeout, cancelFn := context.WithTimeout(ctx, cr.dispatchOverallTimeout)
defer cancelFn()

// If no secondary dispatches are defined, just invoke directly.
if len(cr.secondaryDispatchExprs) == 0 || len(cr.secondaryDispatch) == 0 {
client, err := handler(withTimeout, cr.clusterClient)
client, err := handler(ctxWithTimeout, cr.clusterClient)
if err != nil {
return err
}
return publishClient(withTimeout, client, reqKey, stream, primaryDispatcher)
return publishClient(ctxWithTimeout, client, reqKey, stream, primaryDispatcher)
}

// Check the cursor to see if the dispatch went to one of the secondary endpoints.
Expand All @@ -517,7 +518,7 @@ func dispatchStreamingRequest[Q streamingRequestMessage, R any](
} else if expr, ok := cr.secondaryDispatchExprs[reqKey]; ok {
dispatcherNames, err := RunDispatchExpr(expr, req)
if err != nil {
log.Warn().Err(err).Msg("error when trying to evaluate the dispatch expression")
log.Ctx(ctxWithTimeout).Warn().Err(err).Msg("error when trying to evaluate the dispatch expression")
} else {
for _, secondaryDispatchName := range dispatcherNames {
if sd, ok := cr.secondaryDispatch[secondaryDispatchName]; ok {
Expand All @@ -533,11 +534,11 @@ func dispatchStreamingRequest[Q streamingRequestMessage, R any](
return errors.New("cursor locked to unknown secondary dispatcher")
}

client, err := handler(withTimeout, cr.clusterClient)
client, err := handler(ctxWithTimeout, cr.clusterClient)
if err != nil {
return err
}
return publishClient(withTimeout, client, reqKey, stream, primaryDispatcher)
return publishClient(ctxWithTimeout, client, reqKey, stream, primaryDispatcher)
}

var maximumHedgingDelay time.Duration
Expand All @@ -547,10 +548,14 @@ func dispatchStreamingRequest[Q streamingRequestMessage, R any](

contexts := make(map[string]ctxAndCancel, len(validSecondaryDispatchers)+1)

primaryCtx, primaryCancelFn := context.WithCancel(withTimeout)
primaryCtx, primaryCancelFn := context.WithCancel(ctxWithTimeout)
primaryDeadline, _ := primaryCtx.Deadline()
primaryCtx = log.Ctx(primaryCtx).With().Time("deadline", primaryDeadline).Logger().WithContext(primaryCtx)
contexts[primaryDispatcher] = ctxAndCancel{primaryCtx, primaryCancelFn}
for _, secondary := range validSecondaryDispatchers {
secondaryCtx, secondaryCancelFn := context.WithCancel(withTimeout)
secondaryCtx, secondaryCancelFn := context.WithCancel(ctxWithTimeout)
secondaryDeadline, _ := secondaryCtx.Deadline()
secondaryCtx = log.Ctx(secondaryCtx).With().Time("deadline", secondaryDeadline).Logger().WithContext(secondaryCtx)
contexts[secondary.Name] = ctxAndCancel{secondaryCtx, secondaryCancelFn}
}

Expand All @@ -565,31 +570,35 @@ func dispatchStreamingRequest[Q streamingRequestMessage, R any](

wg.Add(len(validSecondaryDispatchers))

returnedResultsDispatcherName := atomic.NewString("")
returnedResultsDispatcherName := atomic.NewString(noDispatcherResults)

primarySleeper := cr.newPrimarySleeper(reqKey,
tuple.FromCoreRelationReference(req.GetResourceRelation()),
tuple.FromCoreRelationReference(req.GetSubjectRelation()),
maximumHedgingDelay,
)

runHandler := func(name string, clusterClient ClusterClient) {
ctx := contexts[name].ctx
log.Debug().Str("dispatcher", name).Msg("running secondary dispatcher")
runHandler := func(handlerContext context.Context, name string, clusterClient ClusterClient) {
defer wg.Done()
if name == "" {
log.Ctx(handlerContext).Warn().Msg("attempting to run a dispatch handler with an empty name, skipping")
return
}

log.Ctx(handlerContext).Debug().Str("dispatcher", name).Msg("preparing to run streaming dispatcher")

var startTime time.Time
isPrimary := name == primaryDispatcher
if isPrimary {
// Have the primary wait a bit to ensure the secondaries have a chance to return first.
primarySleeper.sleep(ctx)
primarySleeper.sleep(handlerContext)
} else {
startTime = time.Now()
}

select {
case <-ctx.Done():
log.Trace().Str("dispatcher", name).Msg("dispatcher context canceled")
case <-handlerContext.Done():
log.Ctx(handlerContext).Trace().Str("dispatcher", name).Msg("dispatcher context canceled")
if isPrimary {
primaryDispatch.WithLabelValues("true", reqKey).Inc()
}
Expand All @@ -599,9 +608,9 @@ func dispatchStreamingRequest[Q streamingRequestMessage, R any](
// Do the rest of the work in a function
}

log.Trace().Str("dispatcher", name).Msg("running streaming dispatcher")
client, err := handler(ctx, clusterClient)
log.Trace().Str("dispatcher", name).Msg("streaming dispatcher completed initial request")
log.Ctx(handlerContext).Trace().Str("dispatcher", name).Msg("creating streaming dispatcher stream client")
client, err := handler(handlerContext, clusterClient)
log.Ctx(handlerContext).Trace().Str("dispatcher", name).Msg("created streaming dispatcher stream client")
if isPrimary {
primaryDispatch.WithLabelValues("false", reqKey).Inc()
}
Expand All @@ -610,21 +619,23 @@ func dispatchStreamingRequest[Q streamingRequestMessage, R any](
cr.supportedResourceSubjectTracker.updateForError(err)
}

log.Warn().Err(err).Str("dispatcher", name).Msg("error when trying to run secondary dispatcher")
log.Ctx(handlerContext).Warn().Err(err).Str("dispatcher", name).Msg("error when trying to run secondary dispatcher")
errorsByDispatcherName.Store(name, err)
return
}

var hasPublishedFirstResult bool
for {
select {
case <-ctx.Done():
log.Trace().Str("dispatcher", name).Msg("dispatcher context canceled, in results loop")
case <-handlerContext.Done():
log.Ctx(handlerContext).Trace().Str("dispatcher", name).Msg("dispatcher context canceled, in results loop")
return

default:
result, err := client.Recv()
log.Trace().Str("dispatcher", name).Err(err).Any("result", result).Msg("dispatcher recv")
if err != nil {
log.Ctx(handlerContext).Trace().Str("dispatcher", name).Err(err).Msg("dispatcher recv error")
}

// isResult is true if the result is not an error or we received an EOF, which is considered
// a "result" (just the end of the stream).
Expand All @@ -635,7 +646,7 @@ func dispatchStreamingRequest[Q streamingRequestMessage, R any](
if !isPrimary {
finishTime := time.Now()
duration := finishTime.Sub(startTime)
cr.secondaryInitialResponseDigests[reqKey].addResultTime(duration)
cr.secondaryInitialResponseDigests[reqKey].addResultTime(handlerContext, duration)
go cr.supportedResourceSubjectTracker.updateForSuccess(
tuple.FromCoreRelationReference(req.GetResourceRelation()),
tuple.FromCoreRelationReference(req.GetSubjectRelation()),
Expand All @@ -644,21 +655,23 @@ func dispatchStreamingRequest[Q streamingRequestMessage, R any](

// If a valid result, and we have not yet returned any results, try take a "lock" on
// returning results to ensure only a single dispatcher returns results.
swapped := returnedResultsDispatcherName.CompareAndSwap("", name)
swapped := returnedResultsDispatcherName.CompareAndSwap(noDispatcherResults, name)
if !swapped {
// Another dispatcher has started returning results, so terminate.
log.Trace().Str("dispatcher", name).Msg("another dispatcher has already returned results")
log.Ctx(handlerContext).Trace().Str("dispatcher", name).Msg("another dispatcher has already returned results")
return
}

log.Ctx(handlerContext).Trace().Str("dispatcher", name).Msg("this dispatcher is the first to return results, will publish them and cancel the others")
dispatchCounter.WithLabelValues(reqKey, name).Add(1)

// Cancel all other contexts to prevent them from running, or stop them
// from running if started.
log.Trace().Str("dispatcher", name).Msg("canceling other dispatchers")
for key, ctxAndCancel := range contexts {
log.Ctx(handlerContext).Trace().Str("dispatcher", name).Msg("canceling other dispatchers")
for key, otherCtx := range contexts {
if key != name {
ctxAndCancel.cancel()
log.Ctx(handlerContext).Trace().Str("canceling-dispatcher", key).Msg("canceling dispatcher context")
otherCtx.cancel()
}
}
}
Expand Down Expand Up @@ -693,21 +706,27 @@ func dispatchStreamingRequest[Q streamingRequestMessage, R any](

// Run the primary.
if allowPrimary {
go runHandler(primaryDispatcher, cr.clusterClient)
go runHandler(contexts[primaryDispatcher].ctx, primaryDispatcher, cr.clusterClient)
}

// Run each of the secondary dispatches.
for _, secondary := range validSecondaryDispatchers {
go runHandler(secondary.Name, secondary.Client)
go runHandler(contexts[secondary.Name].ctx, secondary.Name, secondary.Client)
}

// Wait for all the handlers to finish.
wg.Wait()

// Check for the first dispatcher that returned results and return its error, if any.
resultHandlerName := returnedResultsDispatcherName.Load()
if resultHandlerName != "" {
if resultHandlerName == "" {
log.Ctx(ctxWithTimeout).Error().Msg("got empty result handler name; this should never happen")
return spiceerrors.MustBugf("got empty result handler name")
}

if resultHandlerName != noDispatcherResults {
if err, ok := errorsByDispatcherName.Load(resultHandlerName); ok {
log.Ctx(ctxWithTimeout).Warn().Err(err).Str("dispatcher", resultHandlerName).Msg("dispatcher that returned results encountered an error during streaming")
return err
}
return nil
Expand All @@ -720,7 +739,7 @@ func dispatchStreamingRequest[Q streamingRequestMessage, R any](
allErrors = append(allErrors, value)
return true
})
log.Warn().Err(primaryErr).Errs("all-errors", allErrors).Msg("returning primary dispatcher error as no dispatchers returned results")
log.Ctx(ctxWithTimeout).Warn().Err(primaryErr).Errs("all-errors", allErrors).Msg("returning primary dispatcher error as no dispatchers returned results")
return primaryErr
}

Expand Down Expand Up @@ -994,24 +1013,24 @@ func (s *primarySleeper) sleep(parentCtx context.Context) {
s.lock.Unlock()

hedgeWaitHistogram.WithLabelValues(s.reqKey).Observe(s.waitTime.Seconds())
log.Trace().Str("request-key", s.reqKey).Dur("wait-time", s.waitTime).Msg("primary dispatch waiting before running")
log.Ctx(parentCtx).Trace().Str("request-key", s.reqKey).Stringer("wait-time", s.waitTime).Msg("primary dispatch waiting before running")

startTime := time.Now()
timer := time.NewTimer(s.waitTime)
defer timer.Stop()

select {
case <-parentCtx.Done():
log.Trace().Str("request-key", s.reqKey).Msg("primary dispatch canceled before waiting completed")
log.Ctx(parentCtx).Trace().Str("request-key", s.reqKey).Msg("primary dispatch canceled before waiting completed")
return

case <-sleepCtx.Done():
log.Trace().Str("request-key", s.reqKey).Msg("primary dispatch early terminated waiting")
log.Ctx(parentCtx).Trace().Str("request-key", s.reqKey).Msg("primary dispatch early terminated waiting")
hedgeActualWaitHistogram.WithLabelValues(s.reqKey).Observe(time.Since(startTime).Seconds())
return

case <-timer.C:
log.Trace().Str("request-key", s.reqKey).Msg("primary dispatch finished waiting")
log.Ctx(parentCtx).Trace().Str("request-key", s.reqKey).Msg("primary dispatch finished waiting")
hedgeActualWaitHistogram.WithLabelValues(s.reqKey).Observe(s.waitTime.Seconds())
return
}
Expand Down
Loading
Loading