Skip to content

data/sqlutil: (WIP) Add instrumented/cancellable FrameFromRowsWithContext #1278

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 123 additions & 0 deletions data/sqlutil/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
package sqlutil

import (
"context"
"errors"

"github.com/prometheus/client_golang/prometheus"
)

const (
namespace = "grafana"
subsystem = "datasources"
)

// The allowed label keys used across all metrics
var metricLabelKeys = []string{"query_type", "datasource_type"}

var rowsProcessed = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "sqlutil_rows_processed_total",
Help: "Total rows processed by FrameFromRows",
},
metricLabelKeys,
)

var rowCountHistogram = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "sqlutil_rows_per_query",
Help: "Histogram of row counts returned by FrameFromRows",
Buckets: prometheus.ExponentialBuckets(1, 2, 10),
},
metricLabelKeys,
)

var cellsProcessed = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "sqlutil_cells_processed_total",
Help: "Total number of individual SQL cells processed",
},
metricLabelKeys,
)

var cellCountHistogram = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "sqlutil_cells_per_query",
Help: "Histogram of the number of SQL cells processed per query",
Buckets: prometheus.ExponentialBuckets(1, 2, 12),
},
metricLabelKeys,
)

// RegisterMetrics registers Prometheus metrics for sqlutil.
// It safely handles duplicate registration and returns any non-duplicate errors.
func RegisterMetrics(reg prometheus.Registerer) error {
return registerAll(reg,
rowsProcessed,
rowCountHistogram,
cellsProcessed,
cellCountHistogram,
)
}

func registerAll(reg prometheus.Registerer, collectors ...prometheus.Collector) error {
for _, c := range collectors {
if err := reg.Register(c); err != nil {
var are prometheus.AlreadyRegisteredError
if errors.As(err, &are) {
// Copy underlying collector pointer to avoid nil metric errors
switch v := c.(type) {
case *prometheus.CounterVec:
if existing, ok := are.ExistingCollector.(*prometheus.CounterVec); ok {
*v = *existing
}
case *prometheus.HistogramVec:
if existing, ok := are.ExistingCollector.(*prometheus.HistogramVec); ok {
*v = *existing
}
}
continue // skip AlreadyRegisteredError
}
return err
}
}
return nil
}

// Context key for metric labels
type ctxKeyMetricLabels struct{}

// ContextWithMetricLabels returns a context with the given Prometheus labels attached.
// Callers should provide keys matching metricLabelKeys: "query_type" and "datasource_type".
func ContextWithMetricLabels(ctx context.Context, labels map[string]string) context.Context {
return context.WithValue(ctx, ctxKeyMetricLabels{}, labels)
}

// getMetricLabels extracts only the allowed metric labels from context.
// Missing keys are filled with empty strings.
func getMetricLabels(ctx context.Context) prometheus.Labels {
out := prometheus.Labels{}
for _, key := range metricLabelKeys {
out[key] = ""
}

if v := ctx.Value(ctxKeyMetricLabels{}); v != nil {
if m, ok := v.(map[string]string); ok {
for _, key := range metricLabelKeys {
if val, exists := m[key]; exists {
out[key] = val
}
}
}
}

return out
}
83 changes: 83 additions & 0 deletions data/sqlutil/metrics_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package sqlutil_test

import (
"context"
"testing"

"github.com/grafana/grafana-plugin-sdk-go/data/sqlutil"
"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
"github.com/stretchr/testify/require"
)

func TestFrameFromRowsWithContext_MetricsRecorded(t *testing.T) {
reg := prometheus.NewRegistry()
err := sqlutil.RegisterMetrics(reg)
require.NoError(t, err)

ctx := sqlutil.ContextWithMetricLabels(context.Background(), map[string]string{
"query_type": "test",
"datasource_type": "fake",
})

// 2 rows × 3 columns
rows := makeSingleResultSet(
[]string{"a", "b", "c"},
[]interface{}{1, 2, 3},
[]interface{}{4, 5, 6},
)
require.NoError(t, rows.Err())

_, err = sqlutil.FrameFromRowsWithContext(ctx, rows, 100)
require.NoError(t, err)

// Gather and inspect metrics
metrics, err := reg.Gather()
require.NoError(t, err)

labels := map[string]string{
"query_type": "test",
"datasource_type": "fake",
}

assertCounter := func(name string, want float64) {
m := findMetricWithLabels(metrics, name, labels)
require.NotNil(t, m, "metric %s not found", name)
require.Equal(t, want, m.GetCounter().GetValue(), "metric %s value mismatch", name)
}

assertHistogram := func(name string, minSamples uint64) {
m := findMetricWithLabels(metrics, name, labels)
require.NotNil(t, m, "metric %s not found", name)
require.GreaterOrEqual(t, m.GetHistogram().GetSampleCount(), minSamples, "metric %s histogram count too low", name)
}

assertCounter("grafana_datasources_sqlutil_rows_processed_total", 2)
assertCounter("grafana_datasources_sqlutil_cells_processed_total", 6)
assertHistogram("grafana_datasources_sqlutil_rows_per_query", 1)
assertHistogram("grafana_datasources_sqlutil_cells_per_query", 1)
}

// findMetricWithLabels finds a metric by name and label set
func findMetricWithLabels(metrics []*dto.MetricFamily, name string, expectedLabels map[string]string) *dto.Metric {
for _, mf := range metrics {
if mf.GetName() != name {
continue
}
for _, m := range mf.Metric {
matches := true
for _, label := range m.GetLabel() {
if val, ok := expectedLabels[label.GetName()]; ok {
if val != label.GetValue() {
matches = false
break
}
}
}
if matches {
return m
}
}
}
return nil
}
114 changes: 114 additions & 0 deletions data/sqlutil/sql.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
package sqlutil

import (
"context"
"database/sql"
"fmt"

"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/data"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
)

// FrameFromRows returns a new Frame populated with the data from rows. The field types
Expand Down Expand Up @@ -79,3 +83,113 @@ func FrameFromRows(rows *sql.Rows, rowLimit int64, converters ...Converter) (*da

return frame, nil
}

// FrameFromRowsWithContext is an enhanced version of FrameFromRows that adds observability and cancellation support.
//
// Compared to FrameFromRows:
// - Accepts a context.Context, enabling support for cancellation and timeouts.
// - Emits Prometheus metrics for rows and cells (total and per-query histograms).
// - Adds OpenTelemetry tracing with a span named "FrameFromRows", including row and cell counts.
// - Aborts processing early if the context is canceled (e.g. timeout or client disconnect).
//
// Callers can use sqlutil.ContextWithMetricLabels(ctx, map[string]string{...}) to attach metric labels.
// Allowed labels: "query_type", "datasource_type".
func FrameFromRowsWithContext(ctx context.Context, rows *sql.Rows, rowLimit int64, converters ...Converter) (*data.Frame, error) {
labels := getMetricLabels(ctx)

var (
rowCount int64
cellCount int64
)

// Start OpenTelemetry tracing span
tracer := otel.Tracer("grafana/sqlutil")
ctx, span := tracer.Start(ctx, "FrameFromRowsWithContext")
defer span.End()

// Emit metrics + span attributes at the end
defer func() {
rowsProcessed.With(labels).Add(float64(rowCount))
rowCountHistogram.With(labels).Observe(float64(rowCount))

cellsProcessed.With(labels).Add(float64(cellCount))
cellCountHistogram.With(labels).Observe(float64(cellCount))

span.SetAttributes(
attribute.Int64("sqlutil.row_count", rowCount),
attribute.Int64("sqlutil.cell_count", cellCount),
)
}()

types, err := rows.ColumnTypes()
if err != nil {
return nil, err
}

if isDynamic, converters := removeDynamicConverter(converters); isDynamic {
rows := Rows{itr: rows}
return frameDynamic(rows, rowLimit, types, converters)
}

names, err := rows.Columns()
if err != nil {
return nil, err
}

scanRow, err := MakeScanRow(types, names, converters...)
if err != nil {
return nil, err
}

frame := NewFrame(names, scanRow.Converters...)

for {
for rows.Next() {
// Abort if context was cancelled
if ctx.Err() != nil {
frame.AppendNotices(data.Notice{
Severity: data.NoticeSeverityWarning,
Text: "Query was cancelled",
})
span.SetStatus(codes.Error, "context canceled")
return frame, nil
}

if rowCount == rowLimit {
frame.AppendNotices(data.Notice{
Severity: data.NoticeSeverityWarning,
Text: fmt.Sprintf("Results have been limited to %v because the SQL row limit was reached", rowLimit),
})
return frame, nil
}

r := scanRow.NewScannableRow()
if err := rows.Scan(r...); err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return nil, err
}

if err := Append(frame, r, scanRow.Converters...); err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return nil, err
}

rowCount++
cellCount += int64(len(r))
}

if rowCount == rowLimit || !rows.NextResultSet() {
break
}
}

if err := rows.Err(); err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return frame, backend.DownstreamError(err)
}

return frame, nil
}
Loading