diff --git a/pkg/goldfish/migrations/20251007000001_add_result_storage_columns.sql b/pkg/goldfish/migrations/20251007000001_add_result_storage_columns.sql new file mode 100644 index 0000000000000..785cd1a6b928a --- /dev/null +++ b/pkg/goldfish/migrations/20251007000001_add_result_storage_columns.sql @@ -0,0 +1,21 @@ +-- +goose Up +-- Add columns to persist result storage metadata for Goldfish + +ALTER TABLE sampled_queries + ADD COLUMN cell_a_result_uri TEXT NULL AFTER cell_a_status_code, + ADD COLUMN cell_b_result_uri TEXT NULL AFTER cell_a_result_uri, + ADD COLUMN cell_a_result_size_bytes BIGINT NULL AFTER cell_b_result_uri, + ADD COLUMN cell_b_result_size_bytes BIGINT NULL AFTER cell_a_result_size_bytes, + ADD COLUMN cell_a_result_compression VARCHAR(32) NULL AFTER cell_b_result_size_bytes, + ADD COLUMN cell_b_result_compression VARCHAR(32) NULL AFTER cell_a_result_compression; + +-- +goose Down +-- Remove result storage metadata columns + +ALTER TABLE sampled_queries + DROP COLUMN cell_b_result_compression, + DROP COLUMN cell_a_result_compression, + DROP COLUMN cell_b_result_size_bytes, + DROP COLUMN cell_a_result_size_bytes, + DROP COLUMN cell_b_result_uri, + DROP COLUMN cell_a_result_uri; diff --git a/pkg/goldfish/storage_mysql.go b/pkg/goldfish/storage_mysql.go index a5f945a425e51..24e1c66ad4f79 100644 --- a/pkg/goldfish/storage_mysql.go +++ b/pkg/goldfish/storage_mysql.go @@ -92,6 +92,9 @@ func (s *MySQLStorage) StoreQuerySample(ctx context.Context, sample *QuerySample cell_a_response_hash, cell_b_response_hash, cell_a_response_size, cell_b_response_size, cell_a_status_code, cell_b_status_code, + cell_a_result_uri, cell_b_result_uri, + cell_a_result_size_bytes, cell_b_result_size_bytes, + cell_a_result_compression, cell_b_result_compression, cell_a_trace_id, cell_b_trace_id, cell_a_span_id, cell_b_span_id, cell_a_used_new_engine, cell_b_used_new_engine, @@ -108,6 +111,25 @@ func (s *MySQLStorage) StoreQuerySample(ctx context.Context, sample *QuerySample cellBSpanID = sample.CellBSpanID } + // Prepare nullable result storage metadata + var cellAResultURI, cellBResultURI any + var cellAResultSize, cellBResultSize any + var cellAResultCompression, cellBResultCompression any + if sample.CellAResultURI != "" { + cellAResultURI = sample.CellAResultURI + cellAResultSize = sample.CellAResultSize + if sample.CellAResultCompression != "" { + cellAResultCompression = sample.CellAResultCompression + } + } + if sample.CellBResultURI != "" { + cellBResultURI = sample.CellBResultURI + cellBResultSize = sample.CellBResultSize + if sample.CellBResultCompression != "" { + cellBResultCompression = sample.CellBResultCompression + } + } + _, err := s.db.ExecContext(ctx, query, sample.CorrelationID, sample.TenantID, @@ -142,6 +164,12 @@ func (s *MySQLStorage) StoreQuerySample(ctx context.Context, sample *QuerySample sample.CellBResponseSize, sample.CellAStatusCode, sample.CellBStatusCode, + cellAResultURI, + cellBResultURI, + cellAResultSize, + cellBResultSize, + cellAResultCompression, + cellBResultCompression, sample.CellATraceID, sample.CellBTraceID, cellASpanID, @@ -214,6 +242,9 @@ func (s *MySQLStorage) GetSampledQueries(ctx context.Context, page, pageSize int cell_a_entries_returned, cell_b_entries_returned, cell_a_splits, cell_b_splits, cell_a_shards, cell_b_shards, cell_a_response_hash, cell_b_response_hash, cell_a_response_size, cell_b_response_size, cell_a_status_code, cell_b_status_code, + cell_a_result_uri, cell_b_result_uri, + cell_a_result_size_bytes, cell_b_result_size_bytes, + cell_a_result_compression, cell_b_result_compression, cell_a_trace_id, cell_b_trace_id, cell_a_span_id, cell_b_span_id, cell_a_used_new_engine, cell_b_used_new_engine, @@ -245,6 +276,9 @@ func (s *MySQLStorage) GetSampledQueries(ctx context.Context, page, pageSize int var createdAt time.Time // Use sql.NullString for nullable span ID columns var cellASpanID, cellBSpanID sql.NullString + var cellAResultURI, cellBResultURI sql.NullString + var cellAResultCompression, cellBResultCompression sql.NullString + var cellAResultSize, cellBResultSize sql.NullInt64 err := rows.Scan( &q.CorrelationID, &q.TenantID, &q.User, &q.Query, &q.QueryType, &q.StartTime, &q.EndTime, &stepDurationMs, @@ -254,6 +288,9 @@ func (s *MySQLStorage) GetSampledQueries(ctx context.Context, page, pageSize int &q.CellAStats.TotalEntriesReturned, &q.CellBStats.TotalEntriesReturned, &q.CellAStats.Splits, &q.CellBStats.Splits, &q.CellAStats.Shards, &q.CellBStats.Shards, &q.CellAResponseHash, &q.CellBResponseHash, &q.CellAResponseSize, &q.CellBResponseSize, &q.CellAStatusCode, &q.CellBStatusCode, + &cellAResultURI, &cellBResultURI, + &cellAResultSize, &cellBResultSize, + &cellAResultCompression, &cellBResultCompression, &q.CellATraceID, &q.CellBTraceID, &cellASpanID, &cellBSpanID, &q.CellAUsedNewEngine, &q.CellBUsedNewEngine, @@ -270,6 +307,24 @@ func (s *MySQLStorage) GetSampledQueries(ctx context.Context, page, pageSize int if cellBSpanID.Valid { q.CellBSpanID = cellBSpanID.String } + if cellAResultURI.Valid { + q.CellAResultURI = cellAResultURI.String + } + if cellBResultURI.Valid { + q.CellBResultURI = cellBResultURI.String + } + if cellAResultSize.Valid { + q.CellAResultSize = cellAResultSize.Int64 + } + if cellBResultSize.Valid { + q.CellBResultSize = cellBResultSize.Int64 + } + if cellAResultCompression.Valid { + q.CellAResultCompression = cellAResultCompression.String + } + if cellBResultCompression.Valid { + q.CellBResultCompression = cellBResultCompression.String + } // Convert step duration from milliseconds to Duration q.Step = time.Duration(stepDurationMs) * time.Millisecond diff --git a/pkg/goldfish/types.go b/pkg/goldfish/types.go index a4e6b1d3760e0..751623571637e 100644 --- a/pkg/goldfish/types.go +++ b/pkg/goldfish/types.go @@ -32,6 +32,14 @@ type QuerySample struct { CellASpanID string `json:"cellASpanID"` CellBSpanID string `json:"cellBSpanID"` + // Result storage metadata + CellAResultURI string `json:"cellAResultURI"` + CellBResultURI string `json:"cellBResultURI"` + CellAResultSize int64 `json:"cellAResultSize"` + CellBResultSize int64 `json:"cellBResultSize"` + CellAResultCompression string `json:"cellAResultCompression"` + CellBResultCompression string `json:"cellBResultCompression"` + // Query engine version tracking CellAUsedNewEngine bool `json:"cellAUsedNewEngine"` CellBUsedNewEngine bool `json:"cellBUsedNewEngine"` diff --git a/pkg/ui/goldfish.go b/pkg/ui/goldfish.go index bb69314a1a806..91ed1a5e96512 100644 --- a/pkg/ui/goldfish.go +++ b/pkg/ui/goldfish.go @@ -76,6 +76,14 @@ type SampledQuery struct { CellAStatusCode *int `json:"cellAStatusCode" db:"cell_a_status_code"` CellBStatusCode *int `json:"cellBStatusCode" db:"cell_b_status_code"` + // Result storage metadata - nullable when persistence is disabled + CellAResultURI *string `json:"cellAResultURI,omitempty" db:"cell_a_result_uri"` + CellBResultURI *string `json:"cellBResultURI,omitempty" db:"cell_b_result_uri"` + CellAResultSizeBytes *int64 `json:"cellAResultSizeBytes,omitempty" db:"cell_a_result_size_bytes"` + CellBResultSizeBytes *int64 `json:"cellBResultSizeBytes,omitempty" db:"cell_b_result_size_bytes"` + CellAResultCompression *string `json:"cellAResultCompression,omitempty" db:"cell_a_result_compression"` + CellBResultCompression *string `json:"cellBResultCompression,omitempty" db:"cell_b_result_compression"` + // Trace IDs - nullable as not all requests have traces CellATraceID *string `json:"cellATraceID" db:"cell_a_trace_id"` CellBTraceID *string `json:"cellBTraceID" db:"cell_b_trace_id"` @@ -242,6 +250,25 @@ func (s *Service) GetSampledQueriesWithContext(ctx context.Context, page, pageSi CellBUsedNewEngine: q.CellBUsedNewEngine, } + if q.CellAResultURI != "" { + uiQuery.CellAResultURI = strPtr(q.CellAResultURI) + size := q.CellAResultSize + uiQuery.CellAResultSizeBytes = &size + if q.CellAResultCompression != "" { + comp := q.CellAResultCompression + uiQuery.CellAResultCompression = &comp + } + } + if q.CellBResultURI != "" { + uiQuery.CellBResultURI = strPtr(q.CellBResultURI) + size := q.CellBResultSize + uiQuery.CellBResultSizeBytes = &size + if q.CellBResultCompression != "" { + comp := q.CellBResultCompression + uiQuery.CellBResultCompression = &comp + } + } + // Determine comparison status based on response codes and hashes if q.CellAStatusCode < 200 || q.CellAStatusCode >= 300 || q.CellBStatusCode < 200 || q.CellBStatusCode >= 300 { uiQuery.ComparisonStatus = string(goldfish.ComparisonStatusError) diff --git a/tools/querytee/goldfish/README.md b/tools/querytee/goldfish/README.md index 62c2dbc3d1cd2..5ff1a864fb234 100644 --- a/tools/querytee/goldfish/README.md +++ b/tools/querytee/goldfish/README.md @@ -16,7 +16,8 @@ Goldfish is a feature within QueryTee that enables sampling and comparison of qu - Query complexity metrics (splits, shards) - Performance variance detection and reporting - **Query Engine Version Tracking**: Tracks which queries used the new experimental query engine vs the old engine -- **Persistent Storage**: MySQL storage via Google Cloud SQL Proxy or Amazon RDS for storing query samples and comparison results +- **Persistent Metadata Storage**: MySQL storage via Google Cloud SQL Proxy or Amazon RDS for storing query samples and comparison results +- **Raw Result Persistence**: Optional upload of exact JSON responses to GCS or S3 for deep diffing ## Configuration @@ -55,6 +56,22 @@ export GOLDFISH_DB_PASSWORD=your-password # Performance comparison settings -goldfish.performance-tolerance=0.1 # 10% tolerance for execution time variance +# Result persistence (optional) +-goldfish.results.enabled=true # store raw responses +-goldfish.results.mode=mismatch-only # or 'all' +-goldfish.results.backend=gcs # inferred for CloudSQL +-goldfish.results.bucket.gcs.bucket-name= # required for GCS +-goldfish.results.bucket.gcs.service-account= # optional service account JSON +-goldfish.results.prefix=goldfish/results # optional prefix inside the bucket +-goldfish.results.compression=gzip # gzip or none + +# S3 example (RDS deployments) +-goldfish.results.backend=s3 +-goldfish.results.bucket.s3.bucket-name= +-goldfish.results.bucket.s3.region= +-goldfish.results.bucket.s3.access-key-id= # optional when using IAM roles +-goldfish.results.bucket.s3.secret-access-key= # optional when using IAM roles + # Or run without storage (sampling and comparison only, no persistence) # Simply omit the storage configuration ``` @@ -79,6 +96,17 @@ export GOLDFISH_DB_PASSWORD=your-password └──────────┘ └──────────┘ └──────────┘ ``` +## Result Storage Layout + +When result persistence is enabled, Goldfish writes two objects per sampled query under the configured prefix using UTC date partitions and the correlation ID: + +``` +////
//cell-a.json.gz +////
//cell-b.json.gz +``` + +Metadata includes the tenant, backend name, HTTP status, and original fnv32 hash so you can correlate objects with database entries. Payloads are gzip-compressed by default. Apply lifecycle policies and IAM so raw log data is protected. + ## Database Schema Goldfish uses two main tables: diff --git a/tools/querytee/goldfish/config.go b/tools/querytee/goldfish/config.go index 54c43ff995825..7fd774cc9b3d8 100644 --- a/tools/querytee/goldfish/config.go +++ b/tools/querytee/goldfish/config.go @@ -6,8 +6,32 @@ import ( "fmt" "strconv" "strings" + + "github.com/grafana/loki/v3/pkg/storage/bucket" ) +const ( + // ResultsPersistenceModeMismatchOnly persists only mismatched comparisons. + ResultsPersistenceModeMismatchOnly ResultsPersistenceMode = "mismatch-only" + // ResultsPersistenceModeAll persists all sampled comparisons regardless of outcome. + ResultsPersistenceModeAll ResultsPersistenceMode = "all" + + // ResultsBackendGCS stores results in Google Cloud Storage. + ResultsBackendGCS = bucket.GCS + // ResultsBackendS3 stores results in Amazon S3. + ResultsBackendS3 = bucket.S3 + + // ResultsCompressionNone stores payloads without additional compression. + ResultsCompressionNone = "none" + // ResultsCompressionGzip gzip-compresses payloads before upload. + ResultsCompressionGzip = "gzip" + + defaultResultsObjectPrefix = "goldfish/results" +) + +// ResultsPersistenceMode describes how often to persist query results. +type ResultsPersistenceMode string + // Config holds the Goldfish configuration type Config struct { Enabled bool `yaml:"enabled"` @@ -15,9 +39,12 @@ type Config struct { // Sampling configuration SamplingConfig SamplingConfig `yaml:"sampling"` - // Storage configuration + // Storage configuration (SQL backend for metadata) StorageConfig StorageConfig `yaml:"storage"` + // Result storage configuration (object storage for raw payloads) + ResultsStorage ResultsStorageConfig `yaml:"results"` + // Performance comparison tolerance (0.0-1.0, where 0.1 = 10%) PerformanceTolerance float64 `yaml:"performance_tolerance"` } @@ -50,6 +77,26 @@ type StorageConfig struct { MaxIdleTime int `yaml:"max_idle_time_seconds"` } +// ResultsStorageConfig defines configuration for storing raw query results in object storage. +type ResultsStorageConfig struct { + Enabled bool `yaml:"enabled"` + + // Mode controls when to persist query results (mismatch-only or all). + Mode ResultsPersistenceMode `yaml:"mode"` + + // Backend selects the object storage provider (gcs, s3). When omitted we attempt to infer from SQL storage type. + Backend string `yaml:"backend"` + + // ObjectPrefix is appended to generated object keys inside the bucket. + ObjectPrefix string `yaml:"object_prefix"` + + // Compression codec applied before upload (gzip or none) + Compression string `yaml:"compression"` + + // Bucket holds provider-specific configuration exposed through the shared bucket client. + Bucket bucket.Config `yaml:"bucket"` +} + // RegisterFlags registers Goldfish flags func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.BoolVar(&cfg.Enabled, "goldfish.enabled", false, "Enable Goldfish query sampling and comparison") @@ -74,6 +121,14 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.IntVar(&cfg.StorageConfig.MaxConnections, "goldfish.storage.max-connections", 10, "Maximum database connections") f.IntVar(&cfg.StorageConfig.MaxIdleTime, "goldfish.storage.max-idle-time", 300, "Maximum idle time in seconds") + // Result storage flags + f.BoolVar(&cfg.ResultsStorage.Enabled, "goldfish.results.enabled", false, "Enable persisting raw query results to object storage") + f.StringVar((*string)(&cfg.ResultsStorage.Mode), "goldfish.results.mode", string(ResultsPersistenceModeMismatchOnly), "Result persistence mode (mismatch-only or all)") + f.StringVar(&cfg.ResultsStorage.Backend, "goldfish.results.backend", "", "Results storage backend (gcs, s3). When empty, inferred from goldfish.storage.type") + f.StringVar(&cfg.ResultsStorage.ObjectPrefix, "goldfish.results.prefix", defaultResultsObjectPrefix, "Prefix for objects stored in the bucket") + f.StringVar(&cfg.ResultsStorage.Compression, "goldfish.results.compression", ResultsCompressionGzip, "Compression codec for stored results (gzip or none)") + cfg.ResultsStorage.Bucket.RegisterFlagsWithPrefix("goldfish.results.", f) + // Performance comparison flags f.Float64Var(&cfg.PerformanceTolerance, "goldfish.performance-tolerance", 0.1, "Performance comparison tolerance (0.0-1.0, where 0.1 = 10%)") } @@ -98,27 +153,106 @@ func (cfg *Config) Validate() error { return errors.New("performance tolerance must be between 0 and 1") } - // Only validate storage if one is configured - if cfg.StorageConfig.Type == "" { + // Validate SQL storage if configured + if err := cfg.StorageConfig.validate(); err != nil { + return err + } + + // Validate result storage when enabled + if cfg.ResultsStorage.Enabled { + if cfg.StorageConfig.Type == "" { + return errors.New("goldfish.results.enabled requires a SQL storage backend (cloudsql or rds)") + } + if err := cfg.ResultsStorage.validate(cfg.StorageConfig.Type); err != nil { + return err + } + } + + return nil +} + +func (cfg StorageConfig) validate() error { + if cfg.Type == "" { return nil } - switch cfg.StorageConfig.Type { + switch cfg.Type { case "cloudsql": - if cfg.StorageConfig.CloudSQLDatabase == "" || cfg.StorageConfig.CloudSQLUser == "" { + if cfg.CloudSQLDatabase == "" || cfg.CloudSQLUser == "" { return errors.New("CloudSQL database and user must be specified") } case "rds": - if cfg.StorageConfig.RDSEndpoint == "" || cfg.StorageConfig.RDSDatabase == "" || cfg.StorageConfig.RDSUser == "" { + if cfg.RDSEndpoint == "" || cfg.RDSDatabase == "" || cfg.RDSUser == "" { return errors.New("RDS endpoint, database, and user must be specified") } default: - return fmt.Errorf("unsupported storage type: %s", cfg.StorageConfig.Type) + return fmt.Errorf("unsupported storage type: %s", cfg.Type) + } + + return nil +} + +func (cfg *ResultsStorageConfig) validate(sqlStorageType string) error { + mode := strings.ToLower(strings.TrimSpace(string(cfg.Mode))) + if mode == "" { + cfg.Mode = ResultsPersistenceModeMismatchOnly + } else { + cfg.Mode = ResultsPersistenceMode(mode) + } + + switch cfg.Mode { + case ResultsPersistenceModeMismatchOnly, ResultsPersistenceModeAll: + default: + return fmt.Errorf("unsupported goldfish.results.mode: %s", cfg.Mode) + } + + backend := strings.ToLower(strings.TrimSpace(cfg.Backend)) + if backend == "" { + backend = inferResultsBackend(sqlStorageType) + cfg.Backend = backend + } + + switch backend { + case ResultsBackendGCS, ResultsBackendS3: + case "": + return errors.New("goldfish.results.backend must be specified when goldfish.results.enabled is true") + default: + return fmt.Errorf("unsupported goldfish.results.backend: %s", cfg.Backend) + } + + if cfg.ObjectPrefix == "" { + cfg.ObjectPrefix = defaultResultsObjectPrefix + } + + compression := strings.ToLower(strings.TrimSpace(cfg.Compression)) + if compression == "" { + compression = ResultsCompressionGzip + } + switch compression { + case ResultsCompressionGzip, ResultsCompressionNone: + cfg.Compression = compression + default: + return fmt.Errorf("unsupported goldfish.results.compression: %s", cfg.Compression) + } + + if err := cfg.Bucket.Validate(); err != nil { + return fmt.Errorf("invalid goldfish.results.bucket configuration: %w", err) } return nil } +func inferResultsBackend(sqlStorageType string) string { + switch sqlStorageType { + case "cloudsql": + return ResultsBackendGCS + case "rds": + return ResultsBackendS3 + default: + return "" + } +} + // tenantRulesFlag implements flag.Value for parsing tenant rules type tenantRulesFlag struct { target *map[string]float64 diff --git a/tools/querytee/goldfish/config_test.go b/tools/querytee/goldfish/config_test.go new file mode 100644 index 0000000000000..94a12b540e6fa --- /dev/null +++ b/tools/querytee/goldfish/config_test.go @@ -0,0 +1,98 @@ +package goldfish + +import ( + "testing" + + "github.com/grafana/loki/v3/pkg/storage/bucket" +) + +func TestResultsStorageValidationRequiresSQLBackend(t *testing.T) { + cfg := Config{ + Enabled: true, + ResultsStorage: ResultsStorageConfig{ + Enabled: true, + Mode: ResultsPersistenceModeMismatchOnly, + }, + } + + err := cfg.Validate() + if err == nil || err.Error() != "goldfish.results.enabled requires a SQL storage backend (cloudsql or rds)" { + t.Fatalf("expected SQL backend requirement error, got %v", err) + } +} + +func TestResultsStorageValidationInfersGCS(t *testing.T) { + cfg := Config{ + Enabled: true, + StorageConfig: StorageConfig{ + Type: "cloudsql", + CloudSQLUser: "user", + CloudSQLDatabase: "db", + }, + ResultsStorage: ResultsStorageConfig{ + Enabled: true, + Bucket: bucket.Config{}, + }, + } + cfg.ResultsStorage.Bucket.GCS.BucketName = "my-bucket" + + if err := cfg.Validate(); err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if cfg.ResultsStorage.Backend != ResultsBackendGCS { + t.Fatalf("expected backend to be inferred as gcs, got %s", cfg.ResultsStorage.Backend) + } + + if cfg.ResultsStorage.Mode != ResultsPersistenceModeMismatchOnly { + t.Fatalf("expected mismatch-only mode, got %s", cfg.ResultsStorage.Mode) + } + + if cfg.ResultsStorage.Compression != ResultsCompressionGzip { + t.Fatalf("expected default compression gzip, got %s", cfg.ResultsStorage.Compression) + } + + if cfg.ResultsStorage.ObjectPrefix != defaultResultsObjectPrefix { + t.Fatalf("expected default object prefix, got %s", cfg.ResultsStorage.ObjectPrefix) + } +} + +func TestResultsStorageValidationUnsupportedBackend(t *testing.T) { + cfg := Config{ + Enabled: true, + StorageConfig: StorageConfig{ + Type: "cloudsql", + CloudSQLUser: "user", + CloudSQLDatabase: "db", + }, + ResultsStorage: ResultsStorageConfig{ + Enabled: true, + Backend: "unsupported", + }, + } + + err := cfg.Validate() + if err == nil || err.Error() != "unsupported goldfish.results.backend: unsupported" { + t.Fatalf("expected unsupported backend error, got %v", err) + } +} + +func TestResultsStorageValidationMode(t *testing.T) { + cfg := Config{ + Enabled: true, + StorageConfig: StorageConfig{ + Type: "cloudsql", + CloudSQLUser: "user", + CloudSQLDatabase: "db", + }, + ResultsStorage: ResultsStorageConfig{ + Enabled: true, + Mode: "always", + }, + } + + err := cfg.Validate() + if err == nil || err.Error() != "unsupported goldfish.results.mode: always" { + t.Fatalf("expected unsupported mode error, got %v", err) + } +} diff --git a/tools/querytee/goldfish/end_to_end_test.go b/tools/querytee/goldfish/end_to_end_test.go index dfa219704d13e..122bbb14c9028 100644 --- a/tools/querytee/goldfish/end_to_end_test.go +++ b/tools/querytee/goldfish/end_to_end_test.go @@ -32,7 +32,7 @@ func TestGoldfishEndToEnd(t *testing.T) { storage := &mockStorage{} // Create manager - manager, err := NewManager(config, storage, log.NewNopLogger(), prometheus.NewRegistry()) + manager, err := NewManager(config, storage, nil, log.NewNopLogger(), prometheus.NewRegistry()) require.NoError(t, err) defer manager.Close() @@ -177,7 +177,7 @@ func TestGoldfishMismatchDetection(t *testing.T) { } storage := &mockStorage{} - manager, err := NewManager(config, storage, log.NewNopLogger(), prometheus.NewRegistry()) + manager, err := NewManager(config, storage, nil, log.NewNopLogger(), prometheus.NewRegistry()) require.NoError(t, err) defer manager.Close() @@ -287,7 +287,7 @@ func TestGoldfishNewEngineDetection(t *testing.T) { } storage := &mockStorage{} - manager, err := NewManager(config, storage, log.NewNopLogger(), prometheus.NewRegistry()) + manager, err := NewManager(config, storage, nil, log.NewNopLogger(), prometheus.NewRegistry()) require.NoError(t, err) defer manager.Close() diff --git a/tools/querytee/goldfish/manager.go b/tools/querytee/goldfish/manager.go index afc005846c0fb..3f8dd4a7957f5 100644 --- a/tools/querytee/goldfish/manager.go +++ b/tools/querytee/goldfish/manager.go @@ -3,6 +3,7 @@ package goldfish import ( "bytes" "context" + "errors" "fmt" "io" "net/http" @@ -27,13 +28,14 @@ const ( ) // Manager coordinates Goldfish sampling and comparison operations. -// It handles query sampling decisions, response comparison, and storage of results. +// It handles query sampling decisions, response comparison, persistence, and storage of results. type Manager struct { - config Config - sampler *Sampler - storage goldfish.Storage - logger log.Logger - metrics *metrics + config Config + sampler *Sampler + storage goldfish.Storage + resultStore ResultStore + logger log.Logger + metrics *metrics } type metrics struct { @@ -46,16 +48,17 @@ type metrics struct { // NewManager creates a new Goldfish manager with the provided configuration. // Returns an error if the configuration is invalid. -func NewManager(config Config, storage goldfish.Storage, logger log.Logger, registerer prometheus.Registerer) (*Manager, error) { +func NewManager(config Config, storage goldfish.Storage, resultStore ResultStore, logger log.Logger, registerer prometheus.Registerer) (*Manager, error) { if err := config.Validate(); err != nil { return nil, fmt.Errorf("invalid config: %w", err) } m := &Manager{ - config: config, - sampler: NewSampler(config.SamplingConfig), - storage: storage, - logger: logger, + config: config, + sampler: NewSampler(config.SamplingConfig), + storage: storage, + resultStore: resultStore, + logger: logger, metrics: &metrics{ sampledQueries: promauto.With(registerer).NewCounter(prometheus.CounterOpts{ Name: "goldfish_sampled_queries_total", @@ -98,19 +101,21 @@ func (m *Manager) ShouldSample(tenantID string) bool { } // ProcessQueryPair processes a sampled query pair from both cells. -// It extracts performance statistics, compares responses, and stores the results asynchronously. +// It extracts performance statistics, compares responses, persists raw payloads when configured, and stores metadata/results. func (m *Manager) ProcessQueryPair(ctx context.Context, req *http.Request, cellAResp, cellBResp *ResponseData) { if !m.config.Enabled { return } correlationID := uuid.New().String() + tenantID := extractTenant(req) + queryType := getQueryType(req.URL.Path) + level.Info(m.logger).Log("msg", "Processing query pair in Goldfish", "correlation_id", correlationID, - "tenant", extractTenant(req), - "query_type", getQueryType(req.URL.Path)) + "tenant", tenantID, + "query_type", queryType) - // Create query sample with performance statistics startTime := parseTime(req.URL.Query().Get("start")) endTime := parseTime(req.URL.Query().Get("end")) @@ -122,13 +127,15 @@ func (m *Manager) ProcessQueryPair(ctx context.Context, req *http.Request, cellA endTime = time.Now() } + sampledAt := time.Now() + sample := &goldfish.QuerySample{ CorrelationID: correlationID, - TenantID: extractTenant(req), + TenantID: tenantID, User: extractUserFromQueryTags(req, m.logger), IsLogsDrilldown: isLogsDrilldownRequest(req), Query: req.URL.Query().Get("query"), - QueryType: getQueryType(req.URL.Path), + QueryType: queryType, StartTime: startTime, EndTime: endTime, Step: parseDuration(req.URL.Query().Get("step")), @@ -146,15 +153,35 @@ func (m *Manager) ProcessQueryPair(ctx context.Context, req *http.Request, cellA CellBSpanID: cellBResp.SpanID, CellAUsedNewEngine: cellAResp.UsedNewEngine, CellBUsedNewEngine: cellBResp.UsedNewEngine, - SampledAt: time.Now(), + SampledAt: sampledAt, } m.metrics.sampledQueries.Inc() + comparisonStart := time.Now() + result := CompareResponses(sample, m.config.PerformanceTolerance) + m.metrics.comparisonDuration.Observe(time.Since(comparisonStart).Seconds()) + m.metrics.comparisonResults.WithLabelValues(string(result.ComparisonStatus)).Inc() + + // Persist raw payloads when configured + var persistedA, persistedB *StoredResult + if m.resultStore != nil { + persistedA, persistedB = m.persistResultPayloads(ctx, sample, cellAResp, cellBResp, result) + if persistedA != nil { + sample.CellAResultURI = persistedA.URI + sample.CellAResultSize = persistedA.Size + sample.CellAResultCompression = persistedA.Compression + } + if persistedB != nil { + sample.CellBResultURI = persistedB.URI + sample.CellBResultSize = persistedB.Size + sample.CellBResultCompression = persistedB.Compression + } + } + // Track whether the sample was stored successfully sampleStored := false - // Store the sample if storage is available if m.storage != nil { if err := m.storage.StoreQuerySample(ctx, sample); err != nil { level.Error(m.logger).Log("msg", "failed to store query sample", "correlation_id", correlationID, "err", err) @@ -165,14 +192,6 @@ func (m *Manager) ProcessQueryPair(ctx context.Context, req *http.Request, cellA } } - // Compare responses using simplified comparator - start := time.Now() - result := CompareResponses(sample, m.config.PerformanceTolerance) - - m.metrics.comparisonDuration.Observe(time.Since(start).Seconds()) - - m.metrics.comparisonResults.WithLabelValues(string(result.ComparisonStatus)).Inc() - // Log user extraction debug info user := sample.User if user != "" && user != unknownUser { @@ -185,7 +204,6 @@ func (m *Manager) ProcessQueryPair(ctx context.Context, req *http.Request, cellA logLevel = level.Warn } - // Build log fields logFields := []interface{}{ "msg", "query comparison completed", "correlation_id", correlationID, @@ -200,6 +218,13 @@ func (m *Manager) ProcessQueryPair(ctx context.Context, req *http.Request, cellA "cell_b_entries_returned", sample.CellBStats.TotalEntriesReturned, } + if persistedA != nil { + logFields = append(logFields, "cell_a_result_uri", persistedA.URI, "cell_a_result_size", persistedA.Size) + } + if persistedB != nil { + logFields = append(logFields, "cell_b_result_uri", persistedB.URI, "cell_b_result_size", persistedB.Size) + } + // Add performance ratios if available if result.PerformanceMetrics.QueryTimeRatio > 0 { logFields = append(logFields, "query_time_ratio", result.PerformanceMetrics.QueryTimeRatio) @@ -207,7 +232,6 @@ func (m *Manager) ProcessQueryPair(ctx context.Context, req *http.Request, cellA // Add difference summary if there are any if len(result.DifferenceDetails) > 0 { - // Count types of differences perfDiffs := 0 contentDiffs := 0 for key := range result.DifferenceDetails { @@ -254,13 +278,87 @@ func (m *Manager) ProcessQueryPair(ctx context.Context, req *http.Request, cellA } } -// Close closes the manager and its storage connections. +func (m *Manager) persistResultPayloads(ctx context.Context, sample *goldfish.QuerySample, cellAResp, cellBResp *ResponseData, comparison goldfish.ComparisonResult) (*StoredResult, *StoredResult) { + if !m.shouldPersistResults(comparison) { + return nil, nil + } + + persistSingle := func(cellLabel string, resp *ResponseData, hash string, statusCode int) *StoredResult { + if resp == nil { + return nil + } + + stored, err := m.resultStore.Store(ctx, resp.Body, StoreOptions{ + CorrelationID: sample.CorrelationID, + CellLabel: cellLabel, + BackendName: resp.BackendName, + TenantID: sample.TenantID, + QueryType: sample.QueryType, + Hash: hash, + StatusCode: statusCode, + Timestamp: sample.SampledAt, + ContentType: "application/json", + }) + if err != nil { + level.Error(m.logger).Log("msg", "failed to persist query payload", "correlation_id", sample.CorrelationID, "cell", cellLabel, "err", err) + m.metrics.storageOperations.WithLabelValues("store_payload", "error").Inc() + return nil + } + + m.metrics.storageOperations.WithLabelValues("store_payload", "success").Inc() + level.Info(m.logger).Log("msg", "persisted query payload", "correlation_id", sample.CorrelationID, "cell", cellLabel, "uri", stored.URI, "compressed_size", stored.Size) + return stored + } + + var storedA, storedB *StoredResult + if m.resultStore != nil { + storedA = persistSingle("cell-a", cellAResp, sample.CellAResponseHash, sample.CellAStatusCode) + storedB = persistSingle("cell-b", cellBResp, sample.CellBResponseHash, sample.CellBStatusCode) + } + + return storedA, storedB +} + +func (m *Manager) shouldPersistResults(result goldfish.ComparisonResult) bool { + if m.resultStore == nil { + return false + } + + switch m.config.ResultsStorage.Mode { + case ResultsPersistenceModeAll: + return true + case ResultsPersistenceModeMismatchOnly: + return result.ComparisonStatus != goldfish.ComparisonStatusMatch + default: + return false + } +} + +// Close closes the manager and its dependent storage connections. // Should be called when the manager is no longer needed to properly clean up resources. func (m *Manager) Close() error { + var errs []error + + if m.resultStore != nil { + if err := m.resultStore.Close(context.Background()); err != nil { + errs = append(errs, err) + } + } + if m.storage != nil { - return m.storage.Close() + if err := m.storage.Close(); err != nil { + errs = append(errs, err) + } + } + + switch len(errs) { + case 0: + return nil + case 1: + return errs[0] + default: + return errors.Join(errs...) } - return nil } // ResponseData contains response data from a backend cell including performance statistics. @@ -275,6 +373,7 @@ type ResponseData struct { UsedNewEngine bool TraceID string SpanID string + BackendName string } // CaptureResponse captures response data for comparison including trace ID and span ID @@ -300,6 +399,8 @@ func CaptureResponse(resp *http.Response, duration time.Duration, traceID, spanI // Log error but don't fail the capture level.Warn(log.NewNopLogger()).Log("msg", "failed to extract response statistics", "err", err) } + } else { + size = int64(len(body)) } return &ResponseData{ diff --git a/tools/querytee/goldfish/manager_test.go b/tools/querytee/goldfish/manager_test.go index caf05900e0a52..6fa477fe66971 100644 --- a/tools/querytee/goldfish/manager_test.go +++ b/tools/querytee/goldfish/manager_test.go @@ -10,6 +10,7 @@ import ( "github.com/go-kit/log" "github.com/grafana/loki/v3/pkg/goldfish" + "github.com/grafana/loki/v3/pkg/storage/bucket" "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -103,7 +104,7 @@ func TestManager_ShouldSample(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { storage := &mockStorage{} - manager, err := NewManager(tt.config, storage, log.NewNopLogger(), prometheus.NewRegistry()) + manager, err := NewManager(tt.config, storage, nil, log.NewNopLogger(), prometheus.NewRegistry()) require.NoError(t, err) got := manager.ShouldSample(tt.tenantID) @@ -121,7 +122,7 @@ func TestManager_ProcessQueryPair(t *testing.T) { } storage := &mockStorage{} - manager, err := NewManager(config, storage, log.NewNopLogger(), prometheus.NewRegistry()) + manager, err := NewManager(config, storage, nil, log.NewNopLogger(), prometheus.NewRegistry()) require.NoError(t, err) req, _ := http.NewRequest("GET", "/loki/api/v1/query_range?query=count_over_time({job=\"test\"}[5m])&start=1700000000&end=1700001000&step=60s", nil) @@ -223,7 +224,7 @@ func Test_ProcessQueryPair_populatesTraceIDs(t *testing.T) { } storage := &mockStorage{} - manager, err := NewManager(config, storage, log.NewNopLogger(), prometheus.NewRegistry()) + manager, err := NewManager(config, storage, nil, log.NewNopLogger(), prometheus.NewRegistry()) require.NoError(t, err) req, _ := http.NewRequest("GET", "/loki/api/v1/query_range?query=count_over_time({job=\"test\"}[5m])&start=1700000000&end=1700001000&step=60s", nil) @@ -266,7 +267,7 @@ func Test_ProcessQueryPair_populatesTraceIDs(t *testing.T) { func TestManager_Close(t *testing.T) { storage := &mockStorage{} - manager, err := NewManager(Config{Enabled: true}, storage, log.NewNopLogger(), prometheus.NewRegistry()) + manager, err := NewManager(Config{Enabled: true}, storage, nil, log.NewNopLogger(), prometheus.NewRegistry()) require.NoError(t, err) err = manager.Close() @@ -307,7 +308,7 @@ func TestProcessQueryPairCapturesUser(t *testing.T) { } storage := &mockStorage{} - manager, err := NewManager(config, storage, log.NewNopLogger(), prometheus.NewRegistry()) + manager, err := NewManager(config, storage, nil, log.NewNopLogger(), prometheus.NewRegistry()) require.NoError(t, err) req, _ := http.NewRequest("GET", "/loki/api/v1/query_range?query=count_over_time({job=\"test\"}[5m])&start=1700000000&end=1700001000&step=60s", nil) @@ -445,7 +446,7 @@ func TestProcessQueryPair_CapturesLogsDrilldown(t *testing.T) { } storage := &mockStorage{} - manager, err := NewManager(config, storage, log.NewNopLogger(), prometheus.NewRegistry()) + manager, err := NewManager(config, storage, nil, log.NewNopLogger(), prometheus.NewRegistry()) require.NoError(t, err) req, _ := http.NewRequest("GET", "/loki/api/v1/query_range?query=count_over_time({job=\"test\"}[5m])&start=1700000000&end=1700001000&step=60s", nil) @@ -487,3 +488,130 @@ func TestProcessQueryPair_CapturesLogsDrilldown(t *testing.T) { }) } } + +func TestManagerResultPersistenceModes(t *testing.T) { + baseConfig := Config{ + Enabled: true, + SamplingConfig: SamplingConfig{ + DefaultRate: 1.0, + }, + StorageConfig: StorageConfig{ + Type: "cloudsql", + CloudSQLUser: "user", + CloudSQLDatabase: "db", + }, + ResultsStorage: ResultsStorageConfig{ + Enabled: true, + Backend: ResultsBackendGCS, + Compression: ResultsCompressionGzip, + Bucket: bucket.Config{}, + }, + } + baseConfig.ResultsStorage.Bucket.GCS.BucketName = "bucket" + + tests := []struct { + name string + mode ResultsPersistenceMode + cellAHash string + cellBHash string + expectStores int + }{ + { + name: "mismatch only stores when hashes differ", + mode: ResultsPersistenceModeMismatchOnly, + cellAHash: "hash-a", + cellBHash: "hash-b", + expectStores: 2, + }, + { + name: "mismatch only skips identical hashes", + mode: ResultsPersistenceModeMismatchOnly, + cellAHash: "hash-same", + cellBHash: "hash-same", + expectStores: 0, + }, + { + name: "all mode stores for every sample", + mode: ResultsPersistenceModeAll, + cellAHash: "hash-same", + cellBHash: "hash-same", + expectStores: 2, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + config := baseConfig + config.ResultsStorage.Mode = tt.mode + + storage := &mockStorage{} + results := &mockResultStore{} + manager, err := NewManager(config, storage, results, log.NewNopLogger(), prometheus.NewRegistry()) + require.NoError(t, err) + + req, _ := http.NewRequest("GET", "/loki/api/v1/query_range?query=sum(rate({job=\"app\"}[1m]))", nil) + req.Header.Set("X-Scope-OrgID", "tenant1") + + cellA := &ResponseData{ + Body: []byte(`{"status":"success","data":{"resultType":"matrix","result":[]}}`), + StatusCode: 200, + Duration: 90 * time.Millisecond, + Stats: goldfish.QueryStats{ExecTimeMs: 90}, + Hash: tt.cellAHash, + Size: 140, + BackendName: "cell-a", + } + + cellB := &ResponseData{ + Body: []byte(`{"status":"success","data":{"resultType":"matrix","result":[]}}`), + StatusCode: 200, + Duration: 110 * time.Millisecond, + Stats: goldfish.QueryStats{ExecTimeMs: 110}, + Hash: tt.cellBHash, + Size: 150, + BackendName: "cell-b", + } + + manager.ProcessQueryPair(context.Background(), req, cellA, cellB) + + require.Equal(t, tt.expectStores, len(results.calls)) + + if tt.expectStores > 0 { + require.Len(t, storage.samples, 1) + sample := storage.samples[0] + assert.NotEmpty(t, sample.CellAResultURI) + assert.NotEmpty(t, sample.CellBResultURI) + assert.Equal(t, "cell-a", results.calls[0].opts.CellLabel) + if tt.expectStores == 2 { + assert.Equal(t, "cell-b", results.calls[1].opts.CellLabel) + } + } + }) + } +} + +type resultStoreCall struct { + opts StoreOptions + originalSize int64 +} + +type mockResultStore struct { + calls []resultStoreCall + closed bool +} + +func (m *mockResultStore) Store(_ context.Context, payload []byte, opts StoreOptions) (*StoredResult, error) { + m.calls = append(m.calls, resultStoreCall{opts: opts, originalSize: int64(len(payload))}) + uri := "mock://" + opts.CellLabel + "/" + opts.CorrelationID + return &StoredResult{ + URI: uri, + Size: int64(len(payload)), + OriginalSize: int64(len(payload)), + Compression: ResultsCompressionGzip, + }, nil +} + +func (m *mockResultStore) Close(context.Context) error { + m.closed = true + return nil +} diff --git a/tools/querytee/goldfish/result_store.go b/tools/querytee/goldfish/result_store.go new file mode 100644 index 0000000000000..b278b3f9a49dc --- /dev/null +++ b/tools/querytee/goldfish/result_store.go @@ -0,0 +1,184 @@ +package goldfish + +import ( + "bytes" + "compress/gzip" + "context" + "encoding/json" + "fmt" + "path" + "strings" + "time" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/thanos-io/objstore" + + bucketclient "github.com/grafana/loki/v3/pkg/storage/bucket" +) + +// ResultStore handles persistence of raw query results to object storage. +type ResultStore interface { + Store(ctx context.Context, payload []byte, opts StoreOptions) (*StoredResult, error) + Close(ctx context.Context) error +} + +// StoreOptions contains metadata required to persist a result payload. +type StoreOptions struct { + CorrelationID string + CellLabel string + BackendName string + TenantID string + QueryType string + Hash string + StatusCode int + Timestamp time.Time + ContentType string +} + +// StoredResult describes the object stored in the backing bucket. +type StoredResult struct { + URI string + Size int64 + OriginalSize int64 + Compression string +} + +type bucketResultStore struct { + bucket objstore.InstrumentedBucket + prefix string + compression string + backend string + logger log.Logger +} + +// NewResultStore creates a ResultStore based on configuration. +func NewResultStore(ctx context.Context, cfg ResultsStorageConfig, logger log.Logger) (ResultStore, error) { + bucketClient, err := bucketclient.NewClient(ctx, cfg.Backend, cfg.Bucket, "goldfish-results", logger) + if err != nil { + return nil, fmt.Errorf("create bucket client: %w", err) + } + + return &bucketResultStore{ + bucket: bucketClient, + prefix: strings.Trim(cfg.ObjectPrefix, "/"), + compression: cfg.Compression, + backend: cfg.Backend, + logger: logger, + }, nil +} + +func (s *bucketResultStore) Store(ctx context.Context, payload []byte, opts StoreOptions) (*StoredResult, error) { + encoded, err := encodePayload(payload, s.compression) + if err != nil { + return nil, err + } + + key := buildObjectKey(s.prefix, opts.CorrelationID, opts.CellLabel, opts.Timestamp, s.compression) + if err := s.bucket.Upload(ctx, key, bytes.NewReader(encoded)); err != nil { + return nil, fmt.Errorf("upload object: %w", err) + } + + // Best-effort metadata sidecar for quick inspection. + if meta := buildMetadata(payload, opts, s.compression); meta != nil { + metaKey := key + ".meta.json" + if err := s.bucket.Upload(ctx, metaKey, bytes.NewReader(meta)); err != nil { + level.Warn(s.logger).Log( + "component", "goldfish-result-store", + "msg", "failed to upload metadata object", + "object", metaKey, + "err", err, + ) + } + } + + uri := fmt.Sprintf("%s://%s/%s", s.backend, s.bucket.Name(), key) + + level.Debug(s.logger).Log( + "component", "goldfish-result-store", + "object", key, + "bucket", s.bucket.Name(), + "backend", s.backend, + "size_bytes", len(encoded), + ) + + return &StoredResult{ + URI: uri, + Size: int64(len(encoded)), + OriginalSize: int64(len(payload)), + Compression: s.compression, + }, nil +} + +func (s *bucketResultStore) Close(context.Context) error { + return s.bucket.Close() +} + +func buildObjectKey(prefix, correlationID, cell string, ts time.Time, compression string) string { + if ts.IsZero() { + ts = time.Now().UTC() + } else { + ts = ts.UTC() + } + + datePath := fmt.Sprintf("%04d/%02d/%02d", ts.Year(), ts.Month(), ts.Day()) + safePrefix := strings.Trim(prefix, "/") + safeCorrelation := strings.ReplaceAll(correlationID, "..", "--") + safeCell := strings.ReplaceAll(cell, "..", "--") + + ext := ".json" + if compression == ResultsCompressionGzip { + ext += ".gz" + } + + parts := []string{safePrefix, datePath, safeCorrelation, fmt.Sprintf("%s%s", safeCell, ext)} + return path.Join(filterEmpty(parts)...) +} + +func filterEmpty(items []string) []string { + out := make([]string, 0, len(items)) + for _, item := range items { + if item != "" { + out = append(out, item) + } + } + return out +} + +func encodePayload(data []byte, compression string) ([]byte, error) { + switch strings.ToLower(compression) { + case ResultsCompressionNone: + return append([]byte(nil), data...), nil + case ResultsCompressionGzip: + var buf bytes.Buffer + gz := gzip.NewWriter(&buf) + if _, err := gz.Write(data); err != nil { + return nil, fmt.Errorf("gzip write: %w", err) + } + if err := gz.Close(); err != nil { + return nil, fmt.Errorf("gzip close: %w", err) + } + return buf.Bytes(), nil + default: + return nil, fmt.Errorf("unknown compression: %s", compression) + } +} + +func buildMetadata(payload []byte, opts StoreOptions, compression string) []byte { + meta := map[string]interface{}{ + "correlation_id": opts.CorrelationID, + "cell": opts.CellLabel, + "tenant": opts.TenantID, + "query_type": opts.QueryType, + "status_code": opts.StatusCode, + "hash_fnv32": opts.Hash, + "stored_at": time.Now().UTC().Format(time.RFC3339), + "compression": compression, + "original_bytes": len(payload), + } + b, err := json.Marshal(meta) + if err != nil { + return nil + } + return b +} diff --git a/tools/querytee/goldfish/result_store_test.go b/tools/querytee/goldfish/result_store_test.go new file mode 100644 index 0000000000000..7f75807380d17 --- /dev/null +++ b/tools/querytee/goldfish/result_store_test.go @@ -0,0 +1,92 @@ +package goldfish + +import ( + "compress/gzip" + "context" + "encoding/json" + "io" + "testing" + "time" + + "github.com/go-kit/log" + "github.com/prometheus/client_golang/prometheus" + "github.com/stretchr/testify/require" + "github.com/thanos-io/objstore" +) + +func TestBucketResultStoreStoresCompressedPayload(t *testing.T) { + bkt := objstore.NewInMemBucket() + inst := objstore.WrapWith(bkt, objstore.BucketMetrics(prometheus.NewRegistry(), "test")) + store := &bucketResultStore{ + bucket: inst, + prefix: "goldfish/results", + compression: ResultsCompressionGzip, + backend: ResultsBackendGCS, + logger: log.NewNopLogger(), + } + + payload := []byte(`{"status":"success","data":{"resultType":"matrix","result":[]}}`) + ts := time.Unix(1700000000, 0) + opts := StoreOptions{ + CorrelationID: "abc-123", + CellLabel: "cell-a", + TenantID: "tenant-1", + QueryType: "query_range", + Hash: "deadbeef", + StatusCode: 200, + Timestamp: ts, + } + + res, err := store.Store(context.Background(), payload, opts) + require.NoError(t, err) + require.Equal(t, ResultsCompressionGzip, res.Compression) + require.NotZero(t, res.Size) + require.Equal(t, int64(len(payload)), res.OriginalSize) + + key := buildObjectKey(store.prefix, opts.CorrelationID, opts.CellLabel, ts, store.compression) + rc, err := inst.Get(context.Background(), key) + require.NoError(t, err) + defer rc.Close() + + gz, err := gzip.NewReader(rc) + require.NoError(t, err) + decompressed, err := io.ReadAll(gz) + require.NoError(t, err) + require.Equal(t, payload, decompressed) + + metaRC, err := inst.Get(context.Background(), key+".meta.json") + require.NoError(t, err) + defer metaRC.Close() + + var meta map[string]any + require.NoError(t, json.NewDecoder(metaRC).Decode(&meta)) + require.Equal(t, opts.CorrelationID, meta["correlation_id"]) + require.Equal(t, store.compression, meta["compression"]) +} + +func TestBucketResultStoreStoresUncompressedPayload(t *testing.T) { + bkt := objstore.NewInMemBucket() + inst := objstore.WrapWith(bkt, objstore.BucketMetrics(prometheus.NewRegistry(), "test")) + store := &bucketResultStore{ + bucket: inst, + prefix: "goldfish/results", + compression: ResultsCompressionNone, + backend: ResultsBackendS3, + logger: log.NewNopLogger(), + } + + payload := []byte("{}") + opts := StoreOptions{CorrelationID: "abc", CellLabel: "cell-b"} + + res, err := store.Store(context.Background(), payload, opts) + require.NoError(t, err) + require.Equal(t, ResultsCompressionNone, res.Compression) + require.Equal(t, int64(len(payload)), res.Size) + + key := buildObjectKey(store.prefix, opts.CorrelationID, opts.CellLabel, opts.Timestamp, store.compression) + rc, err := inst.Get(context.Background(), key) + require.NoError(t, err) + body, err := io.ReadAll(rc) + require.NoError(t, err) + require.Equal(t, payload, body) +} diff --git a/tools/querytee/proxy.go b/tools/querytee/proxy.go index b67c4be3ce2c7..5aa2dfa1d0abe 100644 --- a/tools/querytee/proxy.go +++ b/tools/querytee/proxy.go @@ -194,9 +194,21 @@ func NewProxy(cfg ProxyConfig, logger log.Logger, readRoutes, writeRoutes []Rout return nil, errors.Wrap(err, "failed to create goldfish storage") } + var resultStore goldfish.ResultStore + if cfg.Goldfish.ResultsStorage.Enabled { + resultStore, err = goldfish.NewResultStore(context.Background(), cfg.Goldfish.ResultsStorage, logger) + if err != nil { + storage.Close() + return nil, errors.Wrap(err, "failed to create goldfish result store") + } + } + // Create Goldfish manager - goldfishManager, err := goldfish.NewManager(cfg.Goldfish, storage, logger, registerer) + goldfishManager, err := goldfish.NewManager(cfg.Goldfish, storage, resultStore, logger, registerer) if err != nil { + if resultStore != nil { + _ = resultStore.Close(context.Background()) + } storage.Close() return nil, errors.Wrap(err, "failed to create goldfish manager") } @@ -204,7 +216,9 @@ func NewProxy(cfg ProxyConfig, logger log.Logger, readRoutes, writeRoutes []Rout level.Info(logger).Log("msg", "Goldfish enabled", "storage_type", cfg.Goldfish.StorageConfig.Type, - "default_rate", cfg.Goldfish.SamplingConfig.DefaultRate) + "default_rate", cfg.Goldfish.SamplingConfig.DefaultRate, + "results_mode", string(cfg.Goldfish.ResultsStorage.Mode), + "results_backend", cfg.Goldfish.ResultsStorage.Backend) } return p, nil diff --git a/tools/querytee/proxy_endpoint.go b/tools/querytee/proxy_endpoint.go index 6fd494ecfdba0..79c3a9b42a559 100644 --- a/tools/querytee/proxy_endpoint.go +++ b/tools/querytee/proxy_endpoint.go @@ -359,6 +359,7 @@ func (p *ProxyEndpoint) processWithGoldfish(r *http.Request, cellAResp, cellBRes level.Error(p.logger).Log("msg", "failed to capture cell A response", "err", err) return } + cellAData.BackendName = cellAResp.backend.name cellBData, err := goldfish.CaptureResponse(&http.Response{ StatusCode: cellBResp.status, @@ -368,6 +369,7 @@ func (p *ProxyEndpoint) processWithGoldfish(r *http.Request, cellAResp, cellBRes level.Error(p.logger).Log("msg", "failed to capture cell B response", "err", err) return } + cellBData.BackendName = cellBResp.backend.name p.goldfishManager.ProcessQueryPair(ctx, r, cellAData, cellBData) } diff --git a/tools/querytee/proxy_endpoint_test.go b/tools/querytee/proxy_endpoint_test.go index f857569f8ee6a..66ce31e5daa10 100644 --- a/tools/querytee/proxy_endpoint_test.go +++ b/tools/querytee/proxy_endpoint_test.go @@ -459,7 +459,7 @@ func Test_endToEnd_traceIDFlow(t *testing.T) { DefaultRate: 1.0, // Always sample for testing }, } - goldfishManager, err := querytee_goldfish.NewManager(goldfishConfig, storage, log.NewNopLogger(), prometheus.NewRegistry()) + goldfishManager, err := querytee_goldfish.NewManager(goldfishConfig, storage, nil, log.NewNopLogger(), prometheus.NewRegistry()) require.NoError(t, err) endpoint := NewProxyEndpoint(backends, "test", NewProxyMetrics(nil), log.NewNopLogger(), nil, false).WithGoldfish(goldfishManager)