Skip to content

Commit 3e1f555

Browse files
authored
Unistore Chore: Add database-level observability (#92266)
* add testing harness * fix mockery and linters * WIP * wip * fix transactions * fix transaction tracing; add tracing by default * rename package * move WithTx to simplify logic of DB implementations * fix potential issue with context deadline * add db instrumentation to dbutil * add otel tests * improve naming * minor fix in semantics and add comprehensive OTel testing * fix naming * instrument resourceVersionAtomicInc * provide a default testing tracer * fix docs * fix typo in docs * add semconv for k8s
1 parent 9125f0d commit 3e1f555

File tree

7 files changed

+851
-22
lines changed

7 files changed

+851
-22
lines changed

pkg/storage/unified/sql/backend.go

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ import (
1010
"time"
1111

1212
"github.com/google/uuid"
13+
"go.opentelemetry.io/otel/attribute"
14+
semconv "go.opentelemetry.io/otel/semconv/v1.26.0"
1315
"go.opentelemetry.io/otel/trace"
1416
"go.opentelemetry.io/otel/trace/noop"
1517
"google.golang.org/protobuf/proto"
@@ -164,7 +166,7 @@ func (b *backend) create(ctx context.Context, event resource.WriteEvent) (int64,
164166
// 3. TODO: Rebuild the whole folder tree structure if we're creating a folder
165167

166168
// 4. Atomically increment resource version for this kind
167-
rv, err := resourceVersionAtomicInc(ctx, tx, b.dialect, event.Key)
169+
rv, err := b.resourceVersionAtomicInc(ctx, tx, event.Key)
168170
if err != nil {
169171
return fmt.Errorf("increment resource version: %w", err)
170172
}
@@ -222,7 +224,7 @@ func (b *backend) update(ctx context.Context, event resource.WriteEvent) (int64,
222224
// 3. TODO: Rebuild the whole folder tree structure if we're creating a folder
223225

224226
// 4. Atomically increment resource version for this kind
225-
rv, err := resourceVersionAtomicInc(ctx, tx, b.dialect, event.Key)
227+
rv, err := b.resourceVersionAtomicInc(ctx, tx, event.Key)
226228
if err != nil {
227229
return fmt.Errorf("increment resource version: %w", err)
228230
}
@@ -282,7 +284,7 @@ func (b *backend) delete(ctx context.Context, event resource.WriteEvent) (int64,
282284
// 3. TODO: Rebuild the whole folder tree structure if we're creating a folder
283285

284286
// 4. Atomically increment resource version for this kind
285-
rv, err := resourceVersionAtomicInc(ctx, tx, b.dialect, event.Key)
287+
rv, err := b.resourceVersionAtomicInc(ctx, tx, event.Key)
286288
if err != nil {
287289
return fmt.Errorf("increment resource version: %w", err)
288290
}
@@ -661,10 +663,18 @@ func (b *backend) poll(ctx context.Context, grp string, res string, since int64,
661663
// TODO: Ideally we should attempt to update the RV in the resource and resource_history tables
662664
// in a single roundtrip. This would reduce the latency of the operation, and also increase the
663665
// throughput of the system. This is a good candidate for a future optimization.
664-
func resourceVersionAtomicInc(ctx context.Context, x db.ContextExecer, d sqltemplate.Dialect, key *resource.ResourceKey) (newVersion int64, err error) {
666+
func (b *backend) resourceVersionAtomicInc(ctx context.Context, x db.ContextExecer, key *resource.ResourceKey) (newVersion int64, err error) {
667+
ctx, span := b.tracer.Start(ctx, "resourceVersionAtomicInc", trace.WithAttributes(
668+
semconv.K8SNamespaceName(key.Namespace),
669+
// TODO: the following attributes could use some standardization.
670+
attribute.String("k8s.resource.group", key.Group),
671+
attribute.String("k8s.resource.type", key.Resource),
672+
))
673+
defer span.End()
674+
665675
// 1. Lock to row and prevent concurrent updates until the transaction is committed.
666676
res, err := dbutil.QueryRow(ctx, x, sqlResourceVersionGet, sqlResourceVersionGetRequest{
667-
SQLTemplate: sqltemplate.New(d),
677+
SQLTemplate: sqltemplate.New(b.dialect),
668678
Group: key.Group,
669679
Resource: key.Resource,
670680

@@ -674,14 +684,14 @@ func resourceVersionAtomicInc(ctx context.Context, x db.ContextExecer, d sqltemp
674684
if errors.Is(err, sql.ErrNoRows) {
675685
// if there wasn't a row associated with the given resource, then we create it.
676686
if _, err = dbutil.Exec(ctx, x, sqlResourceVersionInsert, sqlResourceVersionUpsertRequest{
677-
SQLTemplate: sqltemplate.New(d),
687+
SQLTemplate: sqltemplate.New(b.dialect),
678688
Group: key.Group,
679689
Resource: key.Resource,
680690
}); err != nil {
681691
return 0, fmt.Errorf("insert into resource_version: %w", err)
682692
}
683693
res, err = dbutil.QueryRow(ctx, x, sqlResourceVersionGet, sqlResourceVersionGetRequest{
684-
SQLTemplate: sqltemplate.New(d),
694+
SQLTemplate: sqltemplate.New(b.dialect),
685695
Group: key.Group,
686696
Resource: key.Resource,
687697
Response: new(resourceVersionResponse),
@@ -702,7 +712,7 @@ func resourceVersionAtomicInc(ctx context.Context, x db.ContextExecer, d sqltemp
702712
nextRV := max(res.CurrentEpoch, res.ResourceVersion+1)
703713

704714
_, err = dbutil.Exec(ctx, x, sqlResourceVersionUpdate, sqlResourceVersionUpsertRequest{
705-
SQLTemplate: sqltemplate.New(d),
715+
SQLTemplate: sqltemplate.New(b.dialect),
706716
Group: key.Group,
707717
Resource: key.Resource,
708718
ResourceVersion: nextRV,

pkg/storage/unified/sql/backend_test.go

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ import (
1111

1212
"github.com/grafana/grafana/pkg/storage/unified/resource"
1313
"github.com/grafana/grafana/pkg/storage/unified/sql/db/dbimpl"
14-
"github.com/grafana/grafana/pkg/storage/unified/sql/sqltemplate"
1514
"github.com/grafana/grafana/pkg/storage/unified/sql/test"
1615
"github.com/grafana/grafana/pkg/util/testutil"
1716
)
@@ -217,15 +216,13 @@ func expectUnsuccessfulResourceVersionAtomicInc(t *testing.T, b testBackend, err
217216
func TestResourceVersionAtomicInc(t *testing.T) {
218217
t.Parallel()
219218

220-
dialect := sqltemplate.MySQL
221-
222219
t.Run("happy path - insert new row", func(t *testing.T) {
223220
t.Parallel()
224221
b, ctx := setupBackendTest(t)
225222

226223
expectSuccessfulResourceVersionAtomicInc(t, b) // returns RV=1
227224

228-
v, err := resourceVersionAtomicInc(ctx, b.DB, dialect, resKey)
225+
v, err := b.resourceVersionAtomicInc(ctx, b.DB, resKey)
229226
require.NoError(t, err)
230227
require.Equal(t, int64(23456), v)
231228
})
@@ -238,7 +235,7 @@ func TestResourceVersionAtomicInc(t *testing.T) {
238235
b.QueryWithResult("select resource_version for update", 2, Rows{{12345, 23456}})
239236
b.ExecWithResult("update resource_version", 0, 1)
240237

241-
v, err := resourceVersionAtomicInc(ctx, b.DB, dialect, resKey)
238+
v, err := b.resourceVersionAtomicInc(ctx, b.DB, resKey)
242239
require.NoError(t, err)
243240
require.Equal(t, int64(23456), v)
244241
})
@@ -248,7 +245,7 @@ func TestResourceVersionAtomicInc(t *testing.T) {
248245
b, ctx := setupBackendTest(t)
249246
b.QueryWithErr("select resource_version for update", errTest)
250247

251-
v, err := resourceVersionAtomicInc(ctx, b.DB, dialect, resKey)
248+
v, err := b.resourceVersionAtomicInc(ctx, b.DB, resKey)
252249
require.Zero(t, v)
253250
require.Error(t, err)
254251
require.ErrorContains(t, err, "lock the resource version")
@@ -262,7 +259,7 @@ func TestResourceVersionAtomicInc(t *testing.T) {
262259
b.QueryWithResult("select resource_version", 0, Rows{})
263260
b.ExecWithErr("insert resource_version", errTest)
264261

265-
v, err := resourceVersionAtomicInc(ctx, b.DB, dialect, resKey)
262+
v, err := b.resourceVersionAtomicInc(ctx, b.DB, resKey)
266263
require.Zero(t, v)
267264
require.Error(t, err)
268265
require.ErrorContains(t, err, "insert into resource_version")
@@ -275,7 +272,7 @@ func TestResourceVersionAtomicInc(t *testing.T) {
275272
b.QueryWithResult("select resource_version for update", 2, Rows{{12345, 23456}})
276273
b.ExecWithErr("update resource_version", errTest)
277274

278-
v, err := resourceVersionAtomicInc(ctx, b.DB, dialect, resKey)
275+
v, err := b.resourceVersionAtomicInc(ctx, b.DB, resKey)
279276
require.Zero(t, v)
280277
require.Error(t, err)
281278
require.ErrorContains(t, err, "increase resource version")

pkg/storage/unified/sql/db/dbimpl/dbimpl.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,16 +7,17 @@ import (
77
"sync"
88

99
"github.com/dlmiddlecote/sqlstats"
10-
"github.com/grafana/grafana/pkg/infra/tracing"
1110
"github.com/prometheus/client_golang/prometheus"
1211
"go.opentelemetry.io/otel/trace"
12+
"go.opentelemetry.io/otel/trace/noop"
1313
"xorm.io/xorm"
1414

1515
infraDB "github.com/grafana/grafana/pkg/infra/db"
1616
"github.com/grafana/grafana/pkg/infra/log"
1717
"github.com/grafana/grafana/pkg/setting"
1818
"github.com/grafana/grafana/pkg/storage/unified/sql/db"
1919
"github.com/grafana/grafana/pkg/storage/unified/sql/db/migrations"
20+
"github.com/grafana/grafana/pkg/storage/unified/sql/db/otel"
2021
)
2122

2223
const (
@@ -33,7 +34,10 @@ var errGrafanaDBInstrumentedNotSupported = errors.New("the Resource API is " +
3334
grafanaDBInstrumentQueriesKey + "` is enabled in [database], and that" +
3435
" setup is currently unsupported. Please, consider disabling that flag")
3536

36-
func ProvideResourceDB(grafanaDB infraDB.DB, cfg *setting.Cfg, tracer tracing.Tracer) (db.DBProvider, error) {
37+
func ProvideResourceDB(grafanaDB infraDB.DB, cfg *setting.Cfg, tracer trace.Tracer) (db.DBProvider, error) {
38+
if tracer == nil {
39+
tracer = noop.NewTracerProvider().Tracer("test-tracer")
40+
}
3741
p, err := newResourceDBProvider(grafanaDB, cfg, tracer)
3842
if err != nil {
3943
return nil, fmt.Errorf("provide Resource DB: %w", err)
@@ -148,6 +152,7 @@ func (p *resourceDBProvider) init(ctx context.Context) (db.DB, error) {
148152
}
149153

150154
d := NewDB(p.engine.DB().DB, p.engine.Dialect().DriverName())
155+
d = otel.NewInstrumentedDB(d, p.tracer)
151156

152157
return d, nil
153158
}

pkg/storage/unified/sql/db/dbimpl/regression_incident_2144_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ func TestReproIncident2144UsingGrafanaDB(t *testing.T) {
7575
cfg := newCfgFromIniMap(t, cfgMap)
7676
setupDBForGrafana(t, ctx, cfgMap)
7777
grafanaDB := newTestInfraDB(t, cfgMap)
78-
resourceDB, err := ProvideResourceDB(grafanaDB, cfg, testGrafanaTracer{})
78+
resourceDB, err := ProvideResourceDB(grafanaDB, cfg, nil)
7979
require.NotNil(t, resourceDB)
8080
require.NoError(t, err)
8181
})
@@ -105,7 +105,7 @@ func TestReproIncident2144UsingGrafanaDB(t *testing.T) {
105105
t.Run("Resource API provides a reasonable error for this case", func(t *testing.T) {
106106
t.Parallel()
107107
cfg := newCfgFromIniMap(t, cfgMap)
108-
resourceDB, err := ProvideResourceDB(grafanaDB, cfg, testGrafanaTracer{})
108+
resourceDB, err := ProvideResourceDB(grafanaDB, cfg, nil)
109109
require.Nil(t, resourceDB)
110110
require.Error(t, err)
111111
require.ErrorIs(t, err, errGrafanaDBInstrumentedNotSupported)

0 commit comments

Comments
 (0)