Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 2 additions & 5 deletions internal/server/spanner/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"log/slog"
"sync"
"sync/atomic"
"time"

"cloud.google.com/go/spanner"
pb "github.com/datacommonsorg/mixer/internal/proto"
Expand Down Expand Up @@ -76,10 +75,8 @@ func newSpannerDatabaseClient(client *spanner.Client, useStaleReads bool) (*span
sc.ticker = NewTimestampTicker()
sc.stopCh = make(chan struct{})
sc.updateTimestamp = sc.fetchAndUpdateTimestamp
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
if err := sc.updateTimestamp(ctx); err != nil {
slog.Error("Error initializing Spanner staleness timestamp")
if err := sc.updateTimestamp(context.Background()); err != nil {
slog.Error("Error initializing Spanner staleness timestamp", "error", err.Error())
return nil, err
}
return sc, nil
Expand Down
49 changes: 45 additions & 4 deletions internal/server/spanner/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package spanner

import (
"context"
"errors"
"fmt"
"log/slog"
"time"
Expand Down Expand Up @@ -263,14 +264,23 @@ func (sc *spannerDatabaseClient) GetVariableMetadata(ctx context.Context, variab

// fetchAndUpdateTimestamp queries Spanner and updates the timestamp.
func (sc *spannerDatabaseClient) fetchAndUpdateTimestamp(ctx context.Context) error {
iter := sc.client.Single().Query(ctx, *GetCompletionTimestampQuery())
queryCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()

iter := sc.client.Single().Query(queryCtx, *GetCompletionTimestampQuery())
defer iter.Stop()

row, err := iter.Next()
if err == iterator.Done {
return fmt.Errorf("no valid rows found in IngestionHistory")
}
if err != nil {
if isTimeoutError(err) {
slog.ErrorContext(queryCtx, "Spanner timestamp polling timed out",
"timeout_duration", "10s",
"error", err.Error(),
)
}
return fmt.Errorf("failed to fetch row: %w", err)
}

Expand All @@ -297,11 +307,37 @@ func (sc *spannerDatabaseClient) executeQuery(
stmt spanner.Statement,
handleRows func(*spanner.RowIterator) error,
) error {
var queryCtx context.Context
var cancel context.CancelFunc
var timeout time.Duration

if deadline, ok := ctx.Deadline(); ok {
timeout = time.Until(deadline)
} else {
// Fallback if the parent context surprisingly has no deadline.
// Using the default API timeout of 60 seconds.
slog.Warn("Parent context has no deadline; using default API timeout of 60 seconds")
timeout = 60 * time.Second
}
queryCtx, cancel = context.WithTimeout(ctx, timeout)
defer cancel()

runQuery := func(tb spanner.TimestampBound) error {
metrics.RecordSpannerQuery(ctx)
iter := sc.client.Single().WithTimestampBound(tb).Query(ctx, stmt)
metrics.RecordSpannerQuery(queryCtx)
iter := sc.client.Single().WithTimestampBound(tb).Query(queryCtx, stmt)
defer iter.Stop()
return handleRows(iter)
err := handleRows(iter)

// Log slow Spanner queries that timed out.
if isTimeoutError(err) {
slog.ErrorContext(queryCtx, "Spanner query timed out",
"sql", stmt.SQL,
"timeout", timeout.String(),
"error", err.Error(),
)
}

return err
}

if sc.useStaleReads {
Expand Down Expand Up @@ -446,3 +482,8 @@ func processCacheRows[T proto.Message](iter *spanner.RowIterator, newProto func(

return results, nil
}

// isTimeoutError checks if an error is a timeout error from Spanner or context.
func isTimeoutError(err error) bool {
return spanner.ErrCode(err) == codes.DeadlineExceeded || errors.Is(err, context.DeadlineExceeded)
}
Loading