From 2c02b818fe2363ac62451486fa3e70b75d1baee5 Mon Sep 17 00:00:00 2001 From: Jason Fong Date: Thu, 20 Nov 2025 14:03:29 -0500 Subject: [PATCH] ts: record child metrics with low frequency poller This change adds the ability to get changefeed child metrics to be recorded. The child metrics have their names augmented with their labels. Epic: CRDB-55079 Release: None --- pkg/server/status/recorder.go | 126 +++++++++- pkg/server/status/recorder_test.go | 369 +++++++++++++++++++++++++++++ pkg/ts/keys_test.go | 8 + pkg/util/metric/metric.go | 11 +- pkg/util/metric/metric.proto | 6 +- pkg/util/metric/registry.go | 6 +- 6 files changed, 520 insertions(+), 6 deletions(-) diff --git a/pkg/server/status/recorder.go b/pkg/server/status/recorder.go index db8c8262fd93..14f9c64afb64 100644 --- a/pkg/server/status/recorder.go +++ b/pkg/server/status/recorder.go @@ -14,7 +14,9 @@ import ( "math" "os" "runtime" + "sort" "strconv" + "strings" "sync" "sync/atomic" "time" @@ -455,7 +457,42 @@ func (mr *MetricsRecorder) GetTimeSeriesData(childMetrics bool) []tspb.TimeSerie if !ChildMetricsStorageEnabled.Get(&mr.settings.SV) { return nil } - return nil // TODO(jasonlmfong): to be implemented + + mr.mu.RLock() + defer mr.mu.RUnlock() + + if mr.mu.nodeRegistry == nil { + // We haven't yet processed initialization information; do nothing. + if log.V(1) { + log.Dev.Warning(context.TODO(), "MetricsRecorder.GetChildMetricsData() called before NodeID allocation") + } + return nil + } + + data := make([]tspb.TimeSeriesData, 0) + + // Record child metrics from app registry for system tenant only. + now := mr.clock.Now() + recorder := registryRecorder{ + registry: mr.mu.appRegistry, + format: nodeTimeSeriesPrefix, + source: mr.mu.desc.NodeID.String(), + timestampNanos: now.UnixNano(), + } + recorder.recordChangefeedChildMetrics(&data) + + // Record child metrics from app-level registries for secondary tenants + for tenantID, r := range mr.mu.tenantRegistries { + tenantRecorder := registryRecorder{ + registry: r, + format: nodeTimeSeriesPrefix, + source: tsutil.MakeTenantSource(mr.mu.desc.NodeID.String(), tenantID.String()), + timestampNanos: now.UnixNano(), + } + tenantRecorder.recordChangefeedChildMetrics(&data) + } + + return data } lastDataCount := atomic.LoadInt64(&mr.lastDataCount) @@ -906,6 +943,93 @@ func (rr registryRecorder) recordChild( }) } +// recordChangefeedChildMetrics iterates through changefeed metrics in the registry and processes child metrics +// for those that have EnableLowFreqChildCollection set to true in their metadata. +// Records up to 1024 child metrics to prevent unbounded memory usage and performance issues. +// +// NB: Only available for Counter and Gauge metrics. +func (rr registryRecorder) recordChangefeedChildMetrics(dest *[]tspb.TimeSeriesData) { + maxChildMetrics := 1024 + + var childMetricsCount int + labels := rr.registry.GetLabels() + rr.registry.Each(func(name string, v interface{}) { + // Filter for changefeed metrics only + if !strings.HasPrefix(name, "changefeed.") { + return + } + + prom, ok := v.(metric.PrometheusExportable) + if !ok { + return + } + promIter, ok := v.(metric.PrometheusIterable) + if !ok { + return + } + + // Check if the metric has child collection enabled in its metadata + if iterable, ok := v.(metric.Iterable); ok { + metadata := iterable.GetMetadata() + if !metadata.GetEnableLowFreqChildCollection() { + return // Skip this metric if child collection is not enabled + } + } else { + // If we can't get metadata, skip child collection for safety + return + } + m := prom.ToPrometheusMetric() + m.Label = append(labels, prom.GetLabels(false /* useStaticLabels */)...) + + processChildMetric := func(childMetric *prometheusgo.Metric) { + if childMetricsCount >= maxChildMetrics { + return // Stop processing once we hit the limit + } + + // Sanitize and sort labels using the same sanitization as Prometheus export + sanitizedLabels := make([]struct{ key, value string }, len(childMetric.Label)) + for i, label := range childMetric.Label { + sanitizedLabels[i].key = metric.ExportedLabel(label.GetName()) + sanitizedLabels[i].value = label.GetValue() + } + sort.Slice(sanitizedLabels, func(i, j int) bool { + return sanitizedLabels[i].key < sanitizedLabels[j].key + }) + + // Build labels suffix using sanitized and sorted key-value pairs + var labelsSuffix string + for _, label := range sanitizedLabels { + labelsSuffix += fmt.Sprintf("%s=\"%s\",", label.key, label.value) + } + // Remove trailing comma and wrap with curly braces + if len(labelsSuffix) > 0 { + labelsSuffix = "{" + labelsSuffix[:len(labelsSuffix)-1] + "}" + } + + var value float64 + if childMetric.Gauge != nil { + value = *childMetric.Gauge.Value + } else if childMetric.Counter != nil { + value = *childMetric.Counter.Value + } else { + return + } + *dest = append(*dest, tspb.TimeSeriesData{ + Name: fmt.Sprintf(rr.format, prom.GetName(false /* useStaticLabels */)+labelsSuffix), + Source: rr.source, + Datapoints: []tspb.TimeSeriesDatapoint{ + { + TimestampNanos: rr.timestampNanos, + Value: value, + }, + }, + }) + childMetricsCount++ + } + promIter.Each(m.Label, processChildMetric) + }) +} + // GetTotalMemory returns either the total system memory (in bytes) or if // possible the cgroups available memory. func GetTotalMemory(ctx context.Context) (int64, error) { diff --git a/pkg/server/status/recorder_test.go b/pkg/server/status/recorder_test.go index aa6d2b2ed1e7..4ba489fa75e4 100644 --- a/pkg/server/status/recorder_test.go +++ b/pkg/server/status/recorder_test.go @@ -14,6 +14,7 @@ import ( "reflect" "sort" "strconv" + "strings" "sync" "testing" "time" @@ -258,6 +259,164 @@ func TestMetricsRecorderLabels(t *testing.T) { if a, e := actualData, expectedData; !reflect.DeepEqual(a, e) { t.Errorf("recorder did not yield expected time series collection; diff:\n %v", pretty.Diff(e, a)) } + + // ======================================== + // Verify that GetTimeSeriesData with includeChildMetrics=true returns child labels + // ======================================== + + // Add aggmetrics with child labels to the app registry + aggCounter := aggmetric.NewCounter( + metric.Metadata{Name: "changefeed.agg_counter"}, + "some-id", "feed_id", + ) + appReg.AddMetric(aggCounter) + // Add child metrics with specific labels + child1 := aggCounter.AddChild("123", "feed-a") + child1.Inc(100) + child2 := aggCounter.AddChild("456", "feed-b") + child2.Inc(200) + + aggGauge := aggmetric.NewGauge( + metric.Metadata{Name: "changefeed.agg_gauge"}, + "db", "status!!", + ) + appReg.AddMetric(aggGauge) + // Add child metrics with different label combinations + child3 := aggGauge.AddChild("mine", "active") + child3.Update(1) + child4 := aggGauge.AddChild("yours", "paused") + child4.Update(0) + + // Enable the cluster setting for child metrics storage + ChildMetricsStorageEnabled.Override(context.Background(), &st.SV, true) + + // Get time series data with child metrics enabled + childData := recorder.GetTimeSeriesData(true) + + var foundCounterFeedA123 bool + var foundCounterFeedB456 bool + var foundGaugeMineActive bool + var foundGaugeYoursPaused bool + + for _, ts := range childData { + // Check for changefeed.agg_counter with some_id and feed_id labels + if strings.Contains(ts.Name, "cr.node.changefeed.agg_counter") { + if strings.Contains(ts.Name, "feed_id=\"feed-a\"") && strings.Contains(ts.Name, "some_id=\"123\"") { + foundCounterFeedA123 = true + require.Equal(t, "7", ts.Source, "Expected source to be node ID 7") + require.Len(t, ts.Datapoints, 1) + require.Equal(t, float64(100), ts.Datapoints[0].Value) + } + if strings.Contains(ts.Name, "feed_id=\"feed-b\"") && strings.Contains(ts.Name, "some_id=\"456\"") { + foundCounterFeedB456 = true + require.Equal(t, "7", ts.Source, "Expected source to be node ID 7") + require.Len(t, ts.Datapoints, 1) + require.Equal(t, float64(200), ts.Datapoints[0].Value) + } + } + + // Check for changefeed.agg_gauge with db and status labels + if strings.Contains(ts.Name, "cr.node.changefeed.agg_gauge") { + if strings.Contains(ts.Name, "db=\"mine\"") && strings.Contains(ts.Name, "status__=\"active\"") { + foundGaugeMineActive = true + require.Equal(t, "7", ts.Source, "Expected source to be node ID 7") + require.Len(t, ts.Datapoints, 1) + require.Equal(t, float64(1), ts.Datapoints[0].Value) + } + if strings.Contains(ts.Name, "db=\"yours\"") && strings.Contains(ts.Name, "status__=\"paused\"") { + foundGaugeYoursPaused = true + require.Equal(t, "7", ts.Source, "Expected source to be node ID 7") + require.Len(t, ts.Datapoints, 1) + require.Equal(t, float64(0), ts.Datapoints[0].Value) + } + } + } + + require.True(t, foundCounterFeedA123, "Expected to find changefeed.agg_counter with some_id=123 and feed_id=feed-a") + require.True(t, foundCounterFeedB456, "Expected to find changefeed.agg_counter with some_id=456 and feed_id=feed-b") + require.True(t, foundGaugeMineActive, "Expected to find changefeed.agg_gauge with db=mine and status=active") + require.True(t, foundGaugeYoursPaused, "Expected to find changefeed.agg_gauge with db=yours and status=paused") + + // ======================================== + // Verify that tenant changefeed child metrics are collected with proper source + // ======================================== + + // Add changefeed aggmetrics to the tenant registry with EnableLowFreqChildCollection + enableLowFreqChildCollection := true + tenantAggCounter := aggmetric.NewCounter( + metric.Metadata{ + Name: "changefeed.emitted_messages", + EnableLowFreqChildCollection: &enableLowFreqChildCollection, + }, + "scope", "feed_id", + ) + regTenant.AddMetric(tenantAggCounter) + + // Add child metrics for different changefeeds in the tenant + tenantChild1 := tenantAggCounter.AddChild("default", "tenant-feed-1") + tenantChild1.Inc(500) + tenantChild2 := tenantAggCounter.AddChild("user", "tenant-feed-2") + tenantChild2.Inc(750) + + tenantAggGauge := aggmetric.NewGauge( + metric.Metadata{ + Name: "changefeed.running", + EnableLowFreqChildCollection: &enableLowFreqChildCollection, + }, + "scope", + ) + regTenant.AddMetric(tenantAggGauge) + + tenantChild3 := tenantAggGauge.AddChild("default") + tenantChild3.Update(2) + tenantChild4 := tenantAggGauge.AddChild("user") + tenantChild4.Update(1) + + // Get time series data with child metrics enabled + tenantChildData := recorder.GetTimeSeriesData(true) + + var foundTenantCounterFeed1 bool + var foundTenantCounterFeed2 bool + var foundTenantGaugeDefault bool + var foundTenantGaugeUser bool + + for _, ts := range tenantChildData { + // Verify tenant changefeed metrics have correct source (7-123 for node 7, tenant 123) + if strings.Contains(ts.Name, "cr.node.changefeed.emitted_messages") { + if strings.Contains(ts.Name, `scope="default"`) && strings.Contains(ts.Name, `feed_id="tenant-feed-1"`) { + foundTenantCounterFeed1 = true + require.Equal(t, "7-123", ts.Source, "Expected tenant source format node-tenant") + require.Len(t, ts.Datapoints, 1) + require.Equal(t, float64(500), ts.Datapoints[0].Value) + } + if strings.Contains(ts.Name, `scope="user"`) && strings.Contains(ts.Name, `feed_id="tenant-feed-2"`) { + foundTenantCounterFeed2 = true + require.Equal(t, "7-123", ts.Source, "Expected tenant source format node-tenant") + require.Len(t, ts.Datapoints, 1) + require.Equal(t, float64(750), ts.Datapoints[0].Value) + } + } + + if strings.Contains(ts.Name, "cr.node.changefeed.running") { + if strings.Contains(ts.Name, `scope="default"`) { + foundTenantGaugeDefault = true + require.Equal(t, "7-123", ts.Source, "Expected tenant source format node-tenant") + require.Len(t, ts.Datapoints, 1) + require.Equal(t, float64(2), ts.Datapoints[0].Value) + } + if strings.Contains(ts.Name, `scope="user"`) { + foundTenantGaugeUser = true + require.Equal(t, "7-123", ts.Source, "Expected tenant source format node-tenant") + require.Len(t, ts.Datapoints, 1) + require.Equal(t, float64(1), ts.Datapoints[0].Value) + } + } + } + + require.True(t, foundTenantCounterFeed1, "Expected to find tenant changefeed.emitted_messages with scope=default and feed_id=tenant-feed-1") + require.True(t, foundTenantCounterFeed2, "Expected to find tenant changefeed.emitted_messages with scope=user and feed_id=tenant-feed-2") + require.True(t, foundTenantGaugeDefault, "Expected to find tenant changefeed.running with scope=default") + require.True(t, foundTenantGaugeUser, "Expected to find tenant changefeed.running with scope=user") } func TestRegistryRecorder_RecordChild(t *testing.T) { @@ -763,3 +922,213 @@ func BenchmarkExtractValueAllocs(b *testing.B) { } b.ReportAllocs() } + +func TestRecordChangefeedChildMetrics(t *testing.T) { + defer leaktest.AfterTest(t)() + + manual := timeutil.NewManualTime(timeutil.Unix(0, 100)) + + t.Run("empty registry", func(t *testing.T) { + reg := metric.NewRegistry() + recorder := registryRecorder{ + registry: reg, + format: nodeTimeSeriesPrefix, + source: "test-source", + timestampNanos: manual.Now().UnixNano(), + } + + var dest []tspb.TimeSeriesData + recorder.recordChangefeedChildMetrics(&dest) + + require.Empty(t, dest) + }) + + t.Run("non-changefeed metrics ignored", func(t *testing.T) { + reg := metric.NewRegistry() + + // Add non-changefeed metrics + gauge := metric.NewGauge(metric.Metadata{Name: "sql.connections"}) + reg.AddMetric(gauge) + gauge.Update(10) + + counter := metric.NewCounter(metric.Metadata{Name: "kv.requests"}) + reg.AddMetric(counter) + counter.Inc(5) + + recorder := registryRecorder{ + registry: reg, + format: nodeTimeSeriesPrefix, + source: "test-source", + timestampNanos: manual.Now().UnixNano(), + } + + var dest []tspb.TimeSeriesData + recorder.recordChangefeedChildMetrics(&dest) + + require.Empty(t, dest) + }) + + t.Run("changefeed metrics without child collection", func(t *testing.T) { + reg := metric.NewRegistry() + + // Add changefeed aggmetric with child collection explicitly disabled + enableLowFreqChildCollection := false + gauge := aggmetric.NewGauge( + metric.Metadata{ + Name: "changefeed.test_metric", + EnableLowFreqChildCollection: &enableLowFreqChildCollection, + }, + "job_id", "feed_id", + ) + reg.AddMetric(gauge) + + // Add child metrics with labels + child1 := gauge.AddChild("123", "abc") + child1.Update(50) + + recorder := registryRecorder{ + registry: reg, + format: nodeTimeSeriesPrefix, + source: "test-source", + timestampNanos: manual.Now().UnixNano(), + } + + var dest []tspb.TimeSeriesData + recorder.recordChangefeedChildMetrics(&dest) + + // Should be empty since EnableLowFreqChildCollection is false + require.Empty(t, dest) + }) + + t.Run("changefeed aggmetrics with child collection", func(t *testing.T) { + reg := metric.NewRegistry() + + // Create an aggmetric which supports child metrics + // Note: aggmetrics automatically have child collection capabilities + gauge := aggmetric.NewGauge(metric.Metadata{Name: "changefeed.test_metric"}, "job_id", "feed_id") + reg.AddMetric(gauge) + + // Add child metrics with labels + child1 := gauge.AddChild("123", "abc") + child1.Update(50) + + child2 := gauge.AddChild("456", "def") + child2.Update(75) + + recorder := registryRecorder{ + registry: reg, + format: nodeTimeSeriesPrefix, + source: "test-source", + timestampNanos: manual.Now().UnixNano(), + } + + var dest []tspb.TimeSeriesData + recorder.recordChangefeedChildMetrics(&dest) + + // Should have data for child metrics (aggmetrics have child collection enabled by default) + require.Len(t, dest, 2) + + // Verify the data structure + for _, data := range dest { + require.Contains(t, data.Name, "changefeed.test_metric") + require.Equal(t, "test-source", data.Source) + require.Len(t, data.Datapoints, 1) + require.Equal(t, manual.Now().UnixNano(), data.Datapoints[0].TimestampNanos) + require.Contains(t, []float64{50, 75}, data.Datapoints[0].Value) + } + }) + + t.Run("cardinality limit enforcement", func(t *testing.T) { + reg := metric.NewRegistry() + + gauge := aggmetric.NewGauge(metric.Metadata{Name: "changefeed.high_cardinality_metric"}, "job_id") + reg.AddMetric(gauge) + + // Add more than 1024 child metrics to test the limit + for i := 0; i < 1500; i++ { + child := gauge.AddChild(strconv.Itoa(i)) + child.Update(int64(i)) + } + + recorder := registryRecorder{ + registry: reg, + format: nodeTimeSeriesPrefix, + source: "test-source", + timestampNanos: manual.Now().UnixNano(), + } + + var dest []tspb.TimeSeriesData + recorder.recordChangefeedChildMetrics(&dest) + + // Should be limited to 1024 child metrics + require.Len(t, dest, 1024) + }) + + t.Run("label sanitization and sorting", func(t *testing.T) { + reg := metric.NewRegistry() + + gauge := aggmetric.NewGauge(metric.Metadata{Name: "changefeed.label_test"}, "job-id", "feed.name") + reg.AddMetric(gauge) + + // Add child with labels that need sanitization + child := gauge.AddChild("test@123", "my-feed!") + child.Update(42) + + recorder := registryRecorder{ + registry: reg, + format: nodeTimeSeriesPrefix, + source: "test-source", + timestampNanos: manual.Now().UnixNano(), + } + + var dest []tspb.TimeSeriesData + recorder.recordChangefeedChildMetrics(&dest) + + require.Len(t, dest, 1) + + // Verify that the metric name contains sanitized labels + metricName := dest[0].Name + require.Contains(t, metricName, "changefeed.label_test{feed_name=\"my-feed!\",job_id=\"test@123\"}") + }) + + t.Run("counter and gauge support", func(t *testing.T) { + reg := metric.NewRegistry() + + // Test with gauge + gauge := aggmetric.NewGauge(metric.Metadata{Name: "changefeed.gauge_metric"}, "type") + reg.AddMetric(gauge) + gaugeChild := gauge.AddChild("gauge") + gaugeChild.Update(100) + + // Test with counter + counter := aggmetric.NewCounter(metric.Metadata{Name: "changefeed.counter_metric"}, "type") + reg.AddMetric(counter) + counterChild := counter.AddChild("counter") + counterChild.Inc(50) + + recorder := registryRecorder{ + registry: reg, + format: nodeTimeSeriesPrefix, + source: "test-source", + timestampNanos: manual.Now().UnixNano(), + } + + var dest []tspb.TimeSeriesData + recorder.recordChangefeedChildMetrics(&dest) + + require.Len(t, dest, 2) + + // Find gauge and counter data points + var gaugeValue, counterValue float64 + for _, data := range dest { + if data.Datapoints[0].Value == 100 { + gaugeValue = data.Datapoints[0].Value + } else if data.Datapoints[0].Value == 50 { + counterValue = data.Datapoints[0].Value + } + } + + require.Equal(t, float64(100), gaugeValue) + require.Equal(t, float64(50), counterValue) + }) +} diff --git a/pkg/ts/keys_test.go b/pkg/ts/keys_test.go index b3820bc4b1eb..7615c28e0173 100644 --- a/pkg/ts/keys_test.go +++ b/pkg/ts/keys_test.go @@ -72,6 +72,14 @@ func TestDataKeys(t *testing.T) { 12, "/System/tsd//1m/1924-09-18T08:00:00Z/", }, + { + "metric.with:special/chars{label=\"@@ αβ_ !星\",}", + "source:with/special-chars", + 0, + Resolution10s, + 83, + "/System/tsd/metric.with:special/chars{label=\"@@ αβ_ !星\",}/10s/1970-01-01T00:00:00Z/source:with/special-chars", + }, } for i, tc := range testCases { diff --git a/pkg/util/metric/metric.go b/pkg/util/metric/metric.go index 7d151392fb63..bc5ad7994fed 100644 --- a/pkg/util/metric/metric.go +++ b/pkg/util/metric/metric.go @@ -230,11 +230,20 @@ func (m *Metadata) GetLabels(useStaticLabels bool) []*prometheusgo.LabelPair { return lps } +// Returns the value for EnableLowFreqChildCollection, +// defaults to True when it is not supplied. +func (m *Metadata) GetEnableLowFreqChildCollection() bool { + if m.EnableLowFreqChildCollection == nil { + return true + } + return *m.EnableLowFreqChildCollection +} + // AddLabel adds a label/value pair for this metric. func (m *Metadata) AddLabel(name, value string) { m.Labels = append(m.Labels, &LabelPair{ - Name: proto.String(exportedLabel(name)), + Name: proto.String(ExportedLabel(name)), Value: proto.String(value), }) } diff --git a/pkg/util/metric/metric.proto b/pkg/util/metric/metric.proto index b2d6387d7139..84019f71d0dc 100644 --- a/pkg/util/metric/metric.proto +++ b/pkg/util/metric/metric.proto @@ -117,6 +117,10 @@ message Metadata { // field is only output to metrics.yaml for documentation purposes, // with plans for future use in automated dashboard generation. required string how_to_use = 12 [(gogoproto.nullable) = false]; + // enable_low_freq_child_collection enables child metrics to be collected + // in the low-frequency child metrics poller. When true, child metrics with + // labels will be recorded in the time series database. + optional bool enable_low_freq_child_collection = 13; - // Next ID: 10 (then 13). + // Next ID: 10 (then 14). } diff --git a/pkg/util/metric/registry.go b/pkg/util/metric/registry.go index 928c91c0ca30..c5375f11c06d 100644 --- a/pkg/util/metric/registry.go +++ b/pkg/util/metric/registry.go @@ -79,7 +79,7 @@ func NewRegistry() *Registry { func (r *Registry) AddLabel(name string, value interface{}) { r.Lock() defer r.Unlock() - r.labels = append(r.labels, labelPair{name: exportedLabel(name), value: value}) + r.labels = append(r.labels, labelPair{name: ExportedLabel(name), value: value}) r.computedLabels = append(r.computedLabels, &prometheusgo.LabelPair{}) } @@ -288,8 +288,8 @@ func ExportedName(name string) string { return prometheusNameReplaceRE.ReplaceAllString(name, "_") } -// exportedLabel takes a metric name and generates a valid prometheus name. -func exportedLabel(name string) string { +// ExportedLabel takes a metric label and generates a valid prometheus label name. +func ExportedLabel(name string) string { return prometheusLabelReplaceRE.ReplaceAllString(name, "_") }