diff --git a/internal/server/spanner/client.go b/internal/server/spanner/client.go index a9df51e0d..e81f155e4 100644 --- a/internal/server/spanner/client.go +++ b/internal/server/spanner/client.go @@ -21,7 +21,6 @@ import ( "log/slog" "sync" "sync/atomic" - "time" "cloud.google.com/go/spanner" pb "github.com/datacommonsorg/mixer/internal/proto" @@ -76,10 +75,8 @@ func newSpannerDatabaseClient(client *spanner.Client, useStaleReads bool) (*span sc.ticker = NewTimestampTicker() sc.stopCh = make(chan struct{}) sc.updateTimestamp = sc.fetchAndUpdateTimestamp - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - if err := sc.updateTimestamp(ctx); err != nil { - slog.Error("Error initializing Spanner staleness timestamp") + if err := sc.updateTimestamp(context.Background()); err != nil { + slog.Error("Error initializing Spanner staleness timestamp", "error", err.Error()) return nil, err } return sc, nil diff --git a/internal/server/spanner/query.go b/internal/server/spanner/query.go index 1e0dec52b..e7320e1de 100644 --- a/internal/server/spanner/query.go +++ b/internal/server/spanner/query.go @@ -17,6 +17,7 @@ package spanner import ( "context" + "errors" "fmt" "log/slog" "time" @@ -263,7 +264,10 @@ func (sc *spannerDatabaseClient) GetVariableMetadata(ctx context.Context, variab // fetchAndUpdateTimestamp queries Spanner and updates the timestamp. func (sc *spannerDatabaseClient) fetchAndUpdateTimestamp(ctx context.Context) error { - iter := sc.client.Single().Query(ctx, *GetCompletionTimestampQuery()) + queryCtx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + + iter := sc.client.Single().Query(queryCtx, *GetCompletionTimestampQuery()) defer iter.Stop() row, err := iter.Next() @@ -271,6 +275,12 @@ func (sc *spannerDatabaseClient) fetchAndUpdateTimestamp(ctx context.Context) er return fmt.Errorf("no valid rows found in IngestionHistory") } if err != nil { + if isTimeoutError(err) { + slog.ErrorContext(queryCtx, "Spanner timestamp polling timed out", + "timeout_duration", "10s", + "error", err.Error(), + ) + } return fmt.Errorf("failed to fetch row: %w", err) } @@ -297,11 +307,37 @@ func (sc *spannerDatabaseClient) executeQuery( stmt spanner.Statement, handleRows func(*spanner.RowIterator) error, ) error { + var queryCtx context.Context + var cancel context.CancelFunc + var timeout time.Duration + + if deadline, ok := ctx.Deadline(); ok { + timeout = time.Until(deadline) + } else { + // Fallback if the parent context surprisingly has no deadline. + // Using the default API timeout of 60 seconds. + slog.Warn("Parent context has no deadline; using default API timeout of 60 seconds") + timeout = 60 * time.Second + } + queryCtx, cancel = context.WithTimeout(ctx, timeout) + defer cancel() + runQuery := func(tb spanner.TimestampBound) error { - metrics.RecordSpannerQuery(ctx) - iter := sc.client.Single().WithTimestampBound(tb).Query(ctx, stmt) + metrics.RecordSpannerQuery(queryCtx) + iter := sc.client.Single().WithTimestampBound(tb).Query(queryCtx, stmt) defer iter.Stop() - return handleRows(iter) + err := handleRows(iter) + + // Log slow Spanner queries that timed out. + if isTimeoutError(err) { + slog.ErrorContext(queryCtx, "Spanner query timed out", + "sql", stmt.SQL, + "timeout", timeout.String(), + "error", err.Error(), + ) + } + + return err } if sc.useStaleReads { @@ -446,3 +482,8 @@ func processCacheRows[T proto.Message](iter *spanner.RowIterator, newProto func( return results, nil } + +// isTimeoutError checks if an error is a timeout error from Spanner or context. +func isTimeoutError(err error) bool { + return spanner.ErrCode(err) == codes.DeadlineExceeded || errors.Is(err, context.DeadlineExceeded) +}