Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 2 additions & 5 deletions internal/server/spanner/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"log/slog"
"sync"
"sync/atomic"
"time"

"cloud.google.com/go/spanner"
pb "github.com/datacommonsorg/mixer/internal/proto"
Expand Down Expand Up @@ -76,10 +75,8 @@ func newSpannerDatabaseClient(client *spanner.Client, useStaleReads bool) (*span
sc.ticker = NewTimestampTicker()
sc.stopCh = make(chan struct{})
sc.updateTimestamp = sc.fetchAndUpdateTimestamp
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
if err := sc.updateTimestamp(ctx); err != nil {
slog.Error("Error initializing Spanner staleness timestamp")
if err := sc.updateTimestamp(context.Background()); err != nil {
slog.Error("Error initializing Spanner staleness timestamp", "error", err.Error())
return nil, err
}
return sc, nil
Expand Down
44 changes: 40 additions & 4 deletions internal/server/spanner/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package spanner

import (
"context"
"errors"
"fmt"
"log/slog"
"time"
Expand Down Expand Up @@ -263,14 +264,23 @@ func (sc *spannerDatabaseClient) GetVariableMetadata(ctx context.Context, variab

// fetchAndUpdateTimestamp queries Spanner and updates the timestamp.
func (sc *spannerDatabaseClient) fetchAndUpdateTimestamp(ctx context.Context) error {
iter := sc.client.Single().Query(ctx, *GetCompletionTimestampQuery())
queryCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()

iter := sc.client.Single().Query(queryCtx, *GetCompletionTimestampQuery())
defer iter.Stop()

row, err := iter.Next()
if err == iterator.Done {
return fmt.Errorf("no valid rows found in IngestionHistory")
}
if err != nil {
if spanner.ErrCode(err) == codes.DeadlineExceeded || errors.Is(err, context.DeadlineExceeded) {
slog.ErrorContext(queryCtx, "Spanner timestamp polling timed out",
"timeout_duration", "10s",
"error", err.Error(),
)
}
return fmt.Errorf("failed to fetch row: %w", err)
}

Expand All @@ -297,11 +307,37 @@ func (sc *spannerDatabaseClient) executeQuery(
stmt spanner.Statement,
handleRows func(*spanner.RowIterator) error,
) error {
var queryCtx context.Context
var cancel context.CancelFunc
var timeout time.Duration

if deadline, ok := ctx.Deadline(); ok {
timeout = time.Until(deadline)
} else {
// Fallback if the parent context surprisingly has no deadline.
// Using the default API timeout of 60 seconds.
slog.Warn("Parent context has no deadline; using default API timeout of 60 seconds")
timeout = 60 * time.Second
}
queryCtx, cancel = context.WithTimeout(ctx, timeout)
defer cancel()

runQuery := func(tb spanner.TimestampBound) error {
metrics.RecordSpannerQuery(ctx)
iter := sc.client.Single().WithTimestampBound(tb).Query(ctx, stmt)
metrics.RecordSpannerQuery(queryCtx)
iter := sc.client.Single().WithTimestampBound(tb).Query(queryCtx, stmt)
defer iter.Stop()
return handleRows(iter)
err := handleRows(iter)

// Log slow Spanner queries that timed out.
if err != nil && (spanner.ErrCode(err) == codes.DeadlineExceeded || errors.Is(err, context.DeadlineExceeded)) {
slog.ErrorContext(queryCtx, "Spanner query timed out",
"sql", stmt.SQL,
"timeout", timeout.String(),
"error", err.Error(),
)
}

return err
}

if sc.useStaleReads {
Expand Down
Loading