diff --git a/data/sqlutil/metrics.go b/data/sqlutil/metrics.go new file mode 100644 index 000000000..6e2202158 --- /dev/null +++ b/data/sqlutil/metrics.go @@ -0,0 +1,123 @@ +package sqlutil + +import ( + "context" + "errors" + + "github.com/prometheus/client_golang/prometheus" +) + +const ( + namespace = "grafana" + subsystem = "datasources" +) + +// The allowed label keys used across all metrics +var metricLabelKeys = []string{"query_type", "datasource_type"} + +var rowsProcessed = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "sqlutil_rows_processed_total", + Help: "Total rows processed by FrameFromRows", + }, + metricLabelKeys, +) + +var rowCountHistogram = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "sqlutil_rows_per_query", + Help: "Histogram of row counts returned by FrameFromRows", + Buckets: prometheus.ExponentialBuckets(1, 2, 10), + }, + metricLabelKeys, +) + +var cellsProcessed = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "sqlutil_cells_processed_total", + Help: "Total number of individual SQL cells processed", + }, + metricLabelKeys, +) + +var cellCountHistogram = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "sqlutil_cells_per_query", + Help: "Histogram of the number of SQL cells processed per query", + Buckets: prometheus.ExponentialBuckets(1, 2, 12), + }, + metricLabelKeys, +) + +// RegisterMetrics registers Prometheus metrics for sqlutil. +// It safely handles duplicate registration and returns any non-duplicate errors. +func RegisterMetrics(reg prometheus.Registerer) error { + return registerAll(reg, + rowsProcessed, + rowCountHistogram, + cellsProcessed, + cellCountHistogram, + ) +} + +func registerAll(reg prometheus.Registerer, collectors ...prometheus.Collector) error { + for _, c := range collectors { + if err := reg.Register(c); err != nil { + var are prometheus.AlreadyRegisteredError + if errors.As(err, &are) { + // Copy underlying collector pointer to avoid nil metric errors + switch v := c.(type) { + case *prometheus.CounterVec: + if existing, ok := are.ExistingCollector.(*prometheus.CounterVec); ok { + *v = *existing + } + case *prometheus.HistogramVec: + if existing, ok := are.ExistingCollector.(*prometheus.HistogramVec); ok { + *v = *existing + } + } + continue // skip AlreadyRegisteredError + } + return err + } + } + return nil +} + +// Context key for metric labels +type ctxKeyMetricLabels struct{} + +// ContextWithMetricLabels returns a context with the given Prometheus labels attached. +// Callers should provide keys matching metricLabelKeys: "query_type" and "datasource_type". +func ContextWithMetricLabels(ctx context.Context, labels map[string]string) context.Context { + return context.WithValue(ctx, ctxKeyMetricLabels{}, labels) +} + +// getMetricLabels extracts only the allowed metric labels from context. +// Missing keys are filled with empty strings. +func getMetricLabels(ctx context.Context) prometheus.Labels { + out := prometheus.Labels{} + for _, key := range metricLabelKeys { + out[key] = "" + } + + if v := ctx.Value(ctxKeyMetricLabels{}); v != nil { + if m, ok := v.(map[string]string); ok { + for _, key := range metricLabelKeys { + if val, exists := m[key]; exists { + out[key] = val + } + } + } + } + + return out +} diff --git a/data/sqlutil/metrics_test.go b/data/sqlutil/metrics_test.go new file mode 100644 index 000000000..be14089a4 --- /dev/null +++ b/data/sqlutil/metrics_test.go @@ -0,0 +1,83 @@ +package sqlutil_test + +import ( + "context" + "testing" + + "github.com/grafana/grafana-plugin-sdk-go/data/sqlutil" + "github.com/prometheus/client_golang/prometheus" + dto "github.com/prometheus/client_model/go" + "github.com/stretchr/testify/require" +) + +func TestFrameFromRowsWithContext_MetricsRecorded(t *testing.T) { + reg := prometheus.NewRegistry() + err := sqlutil.RegisterMetrics(reg) + require.NoError(t, err) + + ctx := sqlutil.ContextWithMetricLabels(context.Background(), map[string]string{ + "query_type": "test", + "datasource_type": "fake", + }) + + // 2 rows × 3 columns + rows := makeSingleResultSet( + []string{"a", "b", "c"}, + []interface{}{1, 2, 3}, + []interface{}{4, 5, 6}, + ) + require.NoError(t, rows.Err()) + + _, err = sqlutil.FrameFromRowsWithContext(ctx, rows, 100) + require.NoError(t, err) + + // Gather and inspect metrics + metrics, err := reg.Gather() + require.NoError(t, err) + + labels := map[string]string{ + "query_type": "test", + "datasource_type": "fake", + } + + assertCounter := func(name string, want float64) { + m := findMetricWithLabels(metrics, name, labels) + require.NotNil(t, m, "metric %s not found", name) + require.Equal(t, want, m.GetCounter().GetValue(), "metric %s value mismatch", name) + } + + assertHistogram := func(name string, minSamples uint64) { + m := findMetricWithLabels(metrics, name, labels) + require.NotNil(t, m, "metric %s not found", name) + require.GreaterOrEqual(t, m.GetHistogram().GetSampleCount(), minSamples, "metric %s histogram count too low", name) + } + + assertCounter("grafana_datasources_sqlutil_rows_processed_total", 2) + assertCounter("grafana_datasources_sqlutil_cells_processed_total", 6) + assertHistogram("grafana_datasources_sqlutil_rows_per_query", 1) + assertHistogram("grafana_datasources_sqlutil_cells_per_query", 1) +} + +// findMetricWithLabels finds a metric by name and label set +func findMetricWithLabels(metrics []*dto.MetricFamily, name string, expectedLabels map[string]string) *dto.Metric { + for _, mf := range metrics { + if mf.GetName() != name { + continue + } + for _, m := range mf.Metric { + matches := true + for _, label := range m.GetLabel() { + if val, ok := expectedLabels[label.GetName()]; ok { + if val != label.GetValue() { + matches = false + break + } + } + } + if matches { + return m + } + } + } + return nil +} diff --git a/data/sqlutil/sql.go b/data/sqlutil/sql.go index 4cc16b214..2fbdd9928 100644 --- a/data/sqlutil/sql.go +++ b/data/sqlutil/sql.go @@ -1,11 +1,15 @@ package sqlutil import ( + "context" "database/sql" "fmt" "github.com/grafana/grafana-plugin-sdk-go/backend" "github.com/grafana/grafana-plugin-sdk-go/data" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" ) // FrameFromRows returns a new Frame populated with the data from rows. The field types @@ -79,3 +83,113 @@ func FrameFromRows(rows *sql.Rows, rowLimit int64, converters ...Converter) (*da return frame, nil } + +// FrameFromRowsWithContext is an enhanced version of FrameFromRows that adds observability and cancellation support. +// +// Compared to FrameFromRows: +// - Accepts a context.Context, enabling support for cancellation and timeouts. +// - Emits Prometheus metrics for rows and cells (total and per-query histograms). +// - Adds OpenTelemetry tracing with a span named "FrameFromRows", including row and cell counts. +// - Aborts processing early if the context is canceled (e.g. timeout or client disconnect). +// +// Callers can use sqlutil.ContextWithMetricLabels(ctx, map[string]string{...}) to attach metric labels. +// Allowed labels: "query_type", "datasource_type". +func FrameFromRowsWithContext(ctx context.Context, rows *sql.Rows, rowLimit int64, converters ...Converter) (*data.Frame, error) { + labels := getMetricLabels(ctx) + + var ( + rowCount int64 + cellCount int64 + ) + + // Start OpenTelemetry tracing span + tracer := otel.Tracer("grafana/sqlutil") + ctx, span := tracer.Start(ctx, "FrameFromRowsWithContext") + defer span.End() + + // Emit metrics + span attributes at the end + defer func() { + rowsProcessed.With(labels).Add(float64(rowCount)) + rowCountHistogram.With(labels).Observe(float64(rowCount)) + + cellsProcessed.With(labels).Add(float64(cellCount)) + cellCountHistogram.With(labels).Observe(float64(cellCount)) + + span.SetAttributes( + attribute.Int64("sqlutil.row_count", rowCount), + attribute.Int64("sqlutil.cell_count", cellCount), + ) + }() + + types, err := rows.ColumnTypes() + if err != nil { + return nil, err + } + + if isDynamic, converters := removeDynamicConverter(converters); isDynamic { + rows := Rows{itr: rows} + return frameDynamic(rows, rowLimit, types, converters) + } + + names, err := rows.Columns() + if err != nil { + return nil, err + } + + scanRow, err := MakeScanRow(types, names, converters...) + if err != nil { + return nil, err + } + + frame := NewFrame(names, scanRow.Converters...) + + for { + for rows.Next() { + // Abort if context was cancelled + if ctx.Err() != nil { + frame.AppendNotices(data.Notice{ + Severity: data.NoticeSeverityWarning, + Text: "Query was cancelled", + }) + span.SetStatus(codes.Error, "context canceled") + return frame, nil + } + + if rowCount == rowLimit { + frame.AppendNotices(data.Notice{ + Severity: data.NoticeSeverityWarning, + Text: fmt.Sprintf("Results have been limited to %v because the SQL row limit was reached", rowLimit), + }) + return frame, nil + } + + r := scanRow.NewScannableRow() + if err := rows.Scan(r...); err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + return nil, err + } + + if err := Append(frame, r, scanRow.Converters...); err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + return nil, err + } + + rowCount++ + cellCount += int64(len(r)) + } + + if rowCount == rowLimit || !rows.NextResultSet() { + break + } + } + + if err := rows.Err(); err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + return frame, backend.DownstreamError(err) + } + + return frame, nil +} diff --git a/data/sqlutil/sql_test.go b/data/sqlutil/sql_test.go index 3f07ed4c5..e6582d1e5 100644 --- a/data/sqlutil/sql_test.go +++ b/data/sqlutil/sql_test.go @@ -165,13 +165,52 @@ func makeMultipleResultSets( return rows } +type cancellingRows struct { + baseRows + ctx context.Context + cancel context.CancelFunc + rows [][]interface{} + currentRow int +} + +func (r *cancellingRows) Next(dest []driver.Value) error { + r.currentRow++ + if r.currentRow >= len(r.rows) { + return io.EOF + } + + // Cancel context after yielding first row + if r.currentRow == 1 { + r.cancel() + } + + for i := range dest { + dest[i] = r.rows[r.currentRow][i] + } + return nil +} + +func makeCancellingRows(ctx context.Context, cancel context.CancelFunc, columnNames []string, data ...[]interface{}) *sql.Rows { + db := &fakeDB{ + rows: &cancellingRows{ + baseRows: baseRows{columnNames: columnNames}, + ctx: ctx, + cancel: cancel, + rows: data, + currentRow: -1, + }, + } + rows, _ := sql.OpenDB(db).Query("") + return rows +} + func TestFrameFromRows(t *testing.T) { ptr := func(s string) *string { return &s } for _, tt := range []struct { name string - rows *sql.Rows + makeRows func() *sql.Rows rowLimit int64 converters []sqlutil.Converter frame *data.Frame @@ -179,22 +218,14 @@ func TestFrameFromRows(t *testing.T) { }{ { name: "rows not implements driver.RowsNextResultSet", - rows: makeSingleResultSet( //nolint:rowserrcheck - []string{ - "a", - "b", - "c", - }, - []interface{}{ - 1, 2, 3, - }, - []interface{}{ - 4, 5, 6, - }, - []interface{}{ - 7, 8, 9, - }, - ), + makeRows: func() *sql.Rows { + return makeSingleResultSet( + []string{"a", "b", "c"}, + []interface{}{1, 2, 3}, + []interface{}{4, 5, 6}, + []interface{}{7, 8, 9}, + ) + }, rowLimit: 100, converters: nil, frame: &data.Frame{ @@ -208,22 +239,14 @@ func TestFrameFromRows(t *testing.T) { }, { name: "rows not implements driver.RowsNextResultSet, limit reached", - rows: makeSingleResultSet( //nolint:rowserrcheck - []string{ - "a", - "b", - "c", - }, - []interface{}{ - 1, 2, 3, - }, - []interface{}{ - 4, 5, 6, - }, - []interface{}{ - 7, 8, 9, - }, - ), + makeRows: func() *sql.Rows { + return makeSingleResultSet( + []string{"a", "b", "c"}, + []interface{}{1, 2, 3}, + []interface{}{4, 5, 6}, + []interface{}{7, 8, 9}, + ) + }, rowLimit: 2, converters: nil, frame: &data.Frame{ @@ -245,24 +268,16 @@ func TestFrameFromRows(t *testing.T) { }, { name: "rows implements driver.RowsNextResultSet, but contains only one result set", - rows: makeMultipleResultSets( //nolint:rowserrcheck - []string{ - "a", - "b", - "c", - }, - [][]interface{}{ - { - 1, 2, 3, - }, - { - 4, 5, 6, - }, - { - 7, 8, 9, + makeRows: func() *sql.Rows { + return makeMultipleResultSets( + []string{"a", "b", "c"}, + [][]interface{}{ + {1, 2, 3}, + {4, 5, 6}, + {7, 8, 9}, }, - }, - ), + ) + }, rowLimit: 100, converters: nil, frame: &data.Frame{ @@ -276,26 +291,18 @@ func TestFrameFromRows(t *testing.T) { }, { name: "rows implements driver.RowsNextResultSet, but contains more then one result set", - rows: makeMultipleResultSets( //nolint:rowserrcheck - []string{ - "a", - "b", - "c", - }, - [][]interface{}{ - { - 1, 2, 3, - }, - { - 4, 5, 6, + makeRows: func() *sql.Rows { + return makeMultipleResultSets( + []string{"a", "b", "c"}, + [][]interface{}{ + {1, 2, 3}, + {4, 5, 6}, }, - }, - [][]interface{}{ - { - 7, 8, 9, + [][]interface{}{ + {7, 8, 9}, }, - }, - ), + ) + }, rowLimit: 100, converters: nil, frame: &data.Frame{ @@ -309,26 +316,18 @@ func TestFrameFromRows(t *testing.T) { }, { name: "rows implements driver.RowsNextResultSet, limit reached", - rows: makeMultipleResultSets( //nolint:rowserrcheck - []string{ - "a", - "b", - "c", - }, - [][]interface{}{ - { - 1, 2, 3, + makeRows: func() *sql.Rows { + return makeMultipleResultSets( + []string{"a", "b", "c"}, + [][]interface{}{ + {1, 2, 3}, + {4, 5, 6}, }, - { - 4, 5, 6, + [][]interface{}{ + {7, 8, 9}, }, - }, - [][]interface{}{ - { - 7, 8, 9, - }, - }, - ), + ) + }, rowLimit: 2, converters: nil, frame: &data.Frame{ @@ -342,7 +341,22 @@ func TestFrameFromRows(t *testing.T) { }, } { t.Run(tt.name, func(t *testing.T) { - frame, err := sqlutil.FrameFromRows(tt.rows, tt.rowLimit, tt.converters...) + rows := tt.makeRows() + require.NoError(t, rows.Err()) + frame, err := sqlutil.FrameFromRows(rows, tt.rowLimit, tt.converters...) + require.NoError(t, err) + if tt.err { + require.Error(t, err) + } else { + require.NoError(t, err) + require.Equal(t, tt.frame, frame) + } + }) + t.Run(tt.name+" (FrameFromRowsWithContext)", func(t *testing.T) { + rows := tt.makeRows() + require.NoError(t, rows.Err()) + frame, err := sqlutil.FrameFromRowsWithContext(context.Background(), rows, tt.rowLimit, tt.converters...) + require.NoError(t, err) if tt.err { require.Error(t, err) } else { @@ -352,3 +366,30 @@ func TestFrameFromRows(t *testing.T) { }) } } + +func TestFrameFromRowsWithContext_Cancelled(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + + columnNames := []string{"x"} + in := [][]interface{}{ + {1}, + {2}, // should be skipped due to context cancel + } + + rows := makeCancellingRows(ctx, cancel, columnNames, in...) + require.NoError(t, rows.Err()) + frame, err := sqlutil.FrameFromRowsWithContext(ctx, rows, 100) + require.NoError(t, err) + require.NotNil(t, frame) + + require.Len(t, frame.Fields, 1) + require.Equal(t, 1, frame.Fields[0].Len()) // Only 1 row processed + + require.NotNil(t, frame.Meta) + require.NotEmpty(t, frame.Meta.Notices) + + notice := frame.Meta.Notices[0] + require.Equal(t, data.NoticeSeverityWarning, notice.Severity) + require.Contains(t, notice.Text, "cancelled") + require.Empty(t, notice.Link) +}