Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions pkg/server/module_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ func TestIntegrationWillRunInstrumentationServerWhenTargetHasNoHttpServer(t *tes
if dbType == "sqlite3" {
t.Skip("skipping - sqlite not supported for storage server target")
}
// TODO - fix this test for postgres
if dbType == "postgres" {
t.Skip("skipping - test not working with postgres in Drone. Works locally.")
}
Comment on lines +35 to +38
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Postgres skip hides integration regressions; narrow or remove it

The unconditional dbType == "postgres" skip means this integration path is never exercised for postgres, so regressions can go unnoticed. Since the comment already acknowledges this is a TODO, it would be better to scope the skip to CI or tie it to a tracked issue.

-	// TODO - fix this test for postgres
-	if dbType == "postgres" {
-		t.Skip("skipping - test not working with postgres in Drone. Works locally.")
-	}
+	// TODO(grafana/<issue-id>): fix this test for postgres in CI
+	if dbType == "postgres" && os.Getenv("CI") != "" {
+		t.Skip("skipping in CI - test not working with postgres in Drone. Works locally.")
+	}

This keeps local postgres runs meaningful while unblocking Drone until the underlying issue is fixed.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// TODO - fix this test for postgres
if dbType == "postgres" {
t.Skip("skipping - test not working with postgres in Drone. Works locally.")
}
// TODO(grafana/<issue-id>): fix this test for postgres in CI
if dbType == "postgres" && os.Getenv("CI") != "" {
t.Skip("skipping in CI - test not working with postgres in Drone. Works locally.")
}
🤖 Prompt for AI Agents
In pkg/server/module_server_test.go around lines 35 to 38, the test currently
unconditionally skips when dbType == "postgres", which hides postgres
regressions; change the skip to be conditional (e.g., only skip in CI/Drone) or
attach it to a tracked issue. Update the logic to detect CI (for example via an
environment variable like CI or DRONE) and call t.Skipf with the CI condition
and include the issue/PR number in the message, or remove the skip entirely and
fix the underlying flakiness so postgres runs in CI; ensure the test still runs
locally for developers by only skipping when the CI flag is set.


_, cfg := db.InitTestDBWithCfg(t)
cfg.HTTPPort = "3001"
Expand Down
8 changes: 4 additions & 4 deletions pkg/storage/unified/resource/search.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"sync"
"time"

"github.com/grafana/grafana/pkg/cmd/grafana-cli/logger"
"github.com/hashicorp/golang-lru/v2/expirable"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
Expand Down Expand Up @@ -171,7 +170,7 @@ func (s *searchSupport) Search(ctx context.Context, req *ResourceSearchRequest)

// init is called during startup. any failure will block startup and continued execution
func (s *searchSupport) init(ctx context.Context) error {
_, span := s.tracer.Start(ctx, tracingPrexfixSearch+"Init")
ctx, span := s.tracer.Start(ctx, tracingPrexfixSearch+"Init")
defer span.End()
start := time.Now().Unix()

Expand Down Expand Up @@ -214,6 +213,7 @@ func (s *searchSupport) init(ctx context.Context) error {
}()

end := time.Now().Unix()
s.log.Info("search index initialized", "duration_secs", end-start, "total_docs", s.search.TotalDocs())
if IndexMetrics != nil {
IndexMetrics.IndexCreationTime.WithLabelValues().Observe(float64(end - start))
}
Expand Down Expand Up @@ -277,7 +277,7 @@ func (s *searchSupport) handleEvent(ctx context.Context, evt *WrittenEvent) {
// record latency from when event was created to when it was indexed
latencySeconds := float64(time.Now().UnixMicro()-evt.ResourceVersion) / 1e6
if latencySeconds > 5 {
logger.Warn("high index latency", "latency", latencySeconds)
s.log.Warn("high index latency", "latency", latencySeconds)
}
if IndexMetrics != nil {
IndexMetrics.IndexLatency.WithLabelValues(evt.Key.Resource).Observe(latencySeconds)
Expand Down Expand Up @@ -307,7 +307,7 @@ func (s *searchSupport) getOrCreateIndex(ctx context.Context, key NamespacedReso
}

func (s *searchSupport) build(ctx context.Context, nsr NamespacedResource, size int64, rv int64) (ResourceIndex, int64, error) {
_, span := s.tracer.Start(ctx, tracingPrexfixSearch+"Build")
ctx, span := s.tracer.Start(ctx, tracingPrexfixSearch+"Build")
defer span.End()

builder, err := s.builders.get(ctx, nsr)
Expand Down
57 changes: 11 additions & 46 deletions pkg/storage/unified/resource/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,12 @@ func NewResourceServer(opts ResourceServerOptions) (ResourceServer, error) {
}
}

err := s.Init(ctx)
if err != nil {
s.log.Error("error initializing resource server", "error", err)
return nil, err
}

return s, nil
}

Expand Down Expand Up @@ -294,16 +300,16 @@ func (s *server) Init(ctx context.Context) error {
}
}

// Start watching for changes
if s.initErr == nil {
s.initErr = s.initWatcher()
}

// initialize the search index
if s.initErr == nil && s.search != nil {
s.initErr = s.search.init(ctx)
}

// Start watching for changes
if s.initErr == nil {
s.initErr = s.initWatcher()
}

if s.initErr != nil {
s.log.Error("error initializing resource server", "error", s.initErr)
}
Expand Down Expand Up @@ -446,10 +452,6 @@ func (s *server) Create(ctx context.Context, req *CreateRequest) (*CreateRespons
ctx, span := s.tracer.Start(ctx, "storage_server.Create")
defer span.End()

if err := s.Init(ctx); err != nil {
return nil, err
}

rsp := &CreateResponse{}
user, ok := claims.From(ctx)
if !ok || user == nil {
Expand Down Expand Up @@ -488,10 +490,6 @@ func (s *server) Update(ctx context.Context, req *UpdateRequest) (*UpdateRespons
ctx, span := s.tracer.Start(ctx, "storage_server.Update")
defer span.End()

if err := s.Init(ctx); err != nil {
return nil, err
}

rsp := &UpdateResponse{}
user, ok := claims.From(ctx)
if !ok || user == nil {
Expand Down Expand Up @@ -542,10 +540,6 @@ func (s *server) Delete(ctx context.Context, req *DeleteRequest) (*DeleteRespons
ctx, span := s.tracer.Start(ctx, "storage_server.Delete")
defer span.End()

if err := s.Init(ctx); err != nil {
return nil, err
}

rsp := &DeleteResponse{}
if req.ResourceVersion < 0 {
return nil, apierrors.NewBadRequest("update must include the previous version")
Expand Down Expand Up @@ -634,9 +628,6 @@ func (s *server) Delete(ctx context.Context, req *DeleteRequest) (*DeleteRespons
}

func (s *server) Read(ctx context.Context, req *ReadRequest) (*ReadResponse, error) {
if err := s.Init(ctx); err != nil {
return nil, err
}
user, ok := claims.From(ctx)
if !ok || user == nil {
return &ReadResponse{
Expand Down Expand Up @@ -693,9 +684,6 @@ func (s *server) List(ctx context.Context, req *ListRequest) (*ListResponse, err
}}, nil
}

if err := s.Init(ctx); err != nil {
return nil, err
}
if req.Limit < 1 {
req.Limit = 50 // default max 50 items in a page
}
Expand Down Expand Up @@ -786,10 +774,6 @@ func (s *server) initWatcher() error {
func (s *server) Watch(req *WatchRequest, srv ResourceStore_WatchServer) error {
ctx := srv.Context()

if err := s.Init(ctx); err != nil {
return err
}

user, ok := claims.From(ctx)
if !ok || user == nil {
return apierrors.NewUnauthorized("no user found in context")
Expand Down Expand Up @@ -930,9 +914,6 @@ func (s *server) Watch(req *WatchRequest, srv ResourceStore_WatchServer) error {
}

func (s *server) Search(ctx context.Context, req *ResourceSearchRequest) (*ResourceSearchResponse, error) {
if err := s.Init(ctx); err != nil {
return nil, err
}
if s.search == nil {
return nil, fmt.Errorf("search index not configured")
}
Expand All @@ -941,25 +922,16 @@ func (s *server) Search(ctx context.Context, req *ResourceSearchRequest) (*Resou

// History implements ResourceServer.
func (s *server) History(ctx context.Context, req *HistoryRequest) (*HistoryResponse, error) {
if err := s.Init(ctx); err != nil {
return nil, err
}
return s.search.History(ctx, req)
}

// Origin implements ResourceServer.
func (s *server) Origin(ctx context.Context, req *OriginRequest) (*OriginResponse, error) {
if err := s.Init(ctx); err != nil {
return nil, err
}
return s.search.Origin(ctx, req)
}

// IsHealthy implements ResourceServer.
func (s *server) IsHealthy(ctx context.Context, req *HealthCheckRequest) (*HealthCheckResponse, error) {
if err := s.Init(ctx); err != nil {
return nil, err
}
return s.diagnostics.IsHealthy(ctx, req)
}

Expand All @@ -971,9 +943,6 @@ func (s *server) PutBlob(ctx context.Context, req *PutBlobRequest) (*PutBlobResp
Code: http.StatusNotImplemented,
}}, nil
}
if err := s.Init(ctx); err != nil {
return nil, err
}

rsp, err := s.blob.PutResourceBlob(ctx, req)
if err != nil {
Expand Down Expand Up @@ -1016,10 +985,6 @@ func (s *server) GetBlob(ctx context.Context, req *GetBlobRequest) (*GetBlobResp
}}, nil
}

if err := s.Init(ctx); err != nil {
return nil, err
}

// The linked blob is stored in the resource metadata attributes
obj, status := s.getPartialObject(ctx, req.Resource, req.ResourceVersion)
if status != nil {
Expand Down
11 changes: 5 additions & 6 deletions pkg/storage/unified/search/bleve.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,6 @@ func (b *bleveBackend) BuildIndex(ctx context.Context,
// The builder will write all documents before returning
builder func(index resource.ResourceIndex) (int64, error),
) (resource.ResourceIndex, error) {
b.cacheMu.Lock()
defer b.cacheMu.Unlock()

_, span := b.tracer.Start(ctx, tracingPrexfixBleve+"BuildIndex")
defer span.End()

Expand All @@ -99,9 +96,9 @@ func (b *bleveBackend) BuildIndex(ctx context.Context,
if size > b.opts.FileThreshold {
dir := filepath.Join(b.opts.Root, key.Namespace, fmt.Sprintf("%s.%s", key.Resource, key.Group))
index, err = bleve.New(dir, mapper)
if err == nil {
b.log.Info("TODO, check last RV so we can see if the numbers have changed", "dir", dir)
}

// TODO, check last RV so we can see if the numbers have changed

resource.IndexMetrics.IndexTenants.WithLabelValues(key.Namespace, "file").Inc()
} else {
index, err = bleve.NewMemOnly(mapper)
Expand Down Expand Up @@ -137,7 +134,9 @@ func (b *bleveBackend) BuildIndex(ctx context.Context,
return nil, err
}

b.cacheMu.Lock()
b.cache[key] = idx
b.cacheMu.Unlock()
return idx, nil
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/unified/sql/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func (b *backend) Stop(_ context.Context) error {

// GetResourceStats implements Backend.
func (b *backend) GetResourceStats(ctx context.Context, namespace string, minCount int) ([]resource.ResourceStats, error) {
_, span := b.tracer.Start(ctx, tracePrefix+".GetResourceStats")
ctx, span := b.tracer.Start(ctx, tracePrefix+".GetResourceStats")
defer span.End()

req := &sqlStatsRequest{
Expand Down
Loading