diff --git a/pkg/server/status/recorder.go b/pkg/server/status/recorder.go index db8c8262fd93..14f9c64afb64 100644 --- a/pkg/server/status/recorder.go +++ b/pkg/server/status/recorder.go @@ -14,7 +14,9 @@ import ( "math" "os" "runtime" + "sort" "strconv" + "strings" "sync" "sync/atomic" "time" @@ -455,7 +457,42 @@ func (mr *MetricsRecorder) GetTimeSeriesData(childMetrics bool) []tspb.TimeSerie if !ChildMetricsStorageEnabled.Get(&mr.settings.SV) { return nil } - return nil // TODO(jasonlmfong): to be implemented + + mr.mu.RLock() + defer mr.mu.RUnlock() + + if mr.mu.nodeRegistry == nil { + // We haven't yet processed initialization information; do nothing. + if log.V(1) { + log.Dev.Warning(context.TODO(), "MetricsRecorder.GetChildMetricsData() called before NodeID allocation") + } + return nil + } + + data := make([]tspb.TimeSeriesData, 0) + + // Record child metrics from app registry for system tenant only. + now := mr.clock.Now() + recorder := registryRecorder{ + registry: mr.mu.appRegistry, + format: nodeTimeSeriesPrefix, + source: mr.mu.desc.NodeID.String(), + timestampNanos: now.UnixNano(), + } + recorder.recordChangefeedChildMetrics(&data) + + // Record child metrics from app-level registries for secondary tenants + for tenantID, r := range mr.mu.tenantRegistries { + tenantRecorder := registryRecorder{ + registry: r, + format: nodeTimeSeriesPrefix, + source: tsutil.MakeTenantSource(mr.mu.desc.NodeID.String(), tenantID.String()), + timestampNanos: now.UnixNano(), + } + tenantRecorder.recordChangefeedChildMetrics(&data) + } + + return data } lastDataCount := atomic.LoadInt64(&mr.lastDataCount) @@ -906,6 +943,93 @@ func (rr registryRecorder) recordChild( }) } +// recordChangefeedChildMetrics iterates through changefeed metrics in the registry and processes child metrics +// for those that have EnableLowFreqChildCollection set to true in their metadata. +// Records up to 1024 child metrics to prevent unbounded memory usage and performance issues. +// +// NB: Only available for Counter and Gauge metrics. +func (rr registryRecorder) recordChangefeedChildMetrics(dest *[]tspb.TimeSeriesData) { + maxChildMetrics := 1024 + + var childMetricsCount int + labels := rr.registry.GetLabels() + rr.registry.Each(func(name string, v interface{}) { + // Filter for changefeed metrics only + if !strings.HasPrefix(name, "changefeed.") { + return + } + + prom, ok := v.(metric.PrometheusExportable) + if !ok { + return + } + promIter, ok := v.(metric.PrometheusIterable) + if !ok { + return + } + + // Check if the metric has child collection enabled in its metadata + if iterable, ok := v.(metric.Iterable); ok { + metadata := iterable.GetMetadata() + if !metadata.GetEnableLowFreqChildCollection() { + return // Skip this metric if child collection is not enabled + } + } else { + // If we can't get metadata, skip child collection for safety + return + } + m := prom.ToPrometheusMetric() + m.Label = append(labels, prom.GetLabels(false /* useStaticLabels */)...) + + processChildMetric := func(childMetric *prometheusgo.Metric) { + if childMetricsCount >= maxChildMetrics { + return // Stop processing once we hit the limit + } + + // Sanitize and sort labels using the same sanitization as Prometheus export + sanitizedLabels := make([]struct{ key, value string }, len(childMetric.Label)) + for i, label := range childMetric.Label { + sanitizedLabels[i].key = metric.ExportedLabel(label.GetName()) + sanitizedLabels[i].value = label.GetValue() + } + sort.Slice(sanitizedLabels, func(i, j int) bool { + return sanitizedLabels[i].key < sanitizedLabels[j].key + }) + + // Build labels suffix using sanitized and sorted key-value pairs + var labelsSuffix string + for _, label := range sanitizedLabels { + labelsSuffix += fmt.Sprintf("%s=\"%s\",", label.key, label.value) + } + // Remove trailing comma and wrap with curly braces + if len(labelsSuffix) > 0 { + labelsSuffix = "{" + labelsSuffix[:len(labelsSuffix)-1] + "}" + } + + var value float64 + if childMetric.Gauge != nil { + value = *childMetric.Gauge.Value + } else if childMetric.Counter != nil { + value = *childMetric.Counter.Value + } else { + return + } + *dest = append(*dest, tspb.TimeSeriesData{ + Name: fmt.Sprintf(rr.format, prom.GetName(false /* useStaticLabels */)+labelsSuffix), + Source: rr.source, + Datapoints: []tspb.TimeSeriesDatapoint{ + { + TimestampNanos: rr.timestampNanos, + Value: value, + }, + }, + }) + childMetricsCount++ + } + promIter.Each(m.Label, processChildMetric) + }) +} + // GetTotalMemory returns either the total system memory (in bytes) or if // possible the cgroups available memory. func GetTotalMemory(ctx context.Context) (int64, error) { diff --git a/pkg/server/status/recorder_test.go b/pkg/server/status/recorder_test.go index aa6d2b2ed1e7..4ba489fa75e4 100644 --- a/pkg/server/status/recorder_test.go +++ b/pkg/server/status/recorder_test.go @@ -14,6 +14,7 @@ import ( "reflect" "sort" "strconv" + "strings" "sync" "testing" "time" @@ -258,6 +259,164 @@ func TestMetricsRecorderLabels(t *testing.T) { if a, e := actualData, expectedData; !reflect.DeepEqual(a, e) { t.Errorf("recorder did not yield expected time series collection; diff:\n %v", pretty.Diff(e, a)) } + + // ======================================== + // Verify that GetTimeSeriesData with includeChildMetrics=true returns child labels + // ======================================== + + // Add aggmetrics with child labels to the app registry + aggCounter := aggmetric.NewCounter( + metric.Metadata{Name: "changefeed.agg_counter"}, + "some-id", "feed_id", + ) + appReg.AddMetric(aggCounter) + // Add child metrics with specific labels + child1 := aggCounter.AddChild("123", "feed-a") + child1.Inc(100) + child2 := aggCounter.AddChild("456", "feed-b") + child2.Inc(200) + + aggGauge := aggmetric.NewGauge( + metric.Metadata{Name: "changefeed.agg_gauge"}, + "db", "status!!", + ) + appReg.AddMetric(aggGauge) + // Add child metrics with different label combinations + child3 := aggGauge.AddChild("mine", "active") + child3.Update(1) + child4 := aggGauge.AddChild("yours", "paused") + child4.Update(0) + + // Enable the cluster setting for child metrics storage + ChildMetricsStorageEnabled.Override(context.Background(), &st.SV, true) + + // Get time series data with child metrics enabled + childData := recorder.GetTimeSeriesData(true) + + var foundCounterFeedA123 bool + var foundCounterFeedB456 bool + var foundGaugeMineActive bool + var foundGaugeYoursPaused bool + + for _, ts := range childData { + // Check for changefeed.agg_counter with some_id and feed_id labels + if strings.Contains(ts.Name, "cr.node.changefeed.agg_counter") { + if strings.Contains(ts.Name, "feed_id=\"feed-a\"") && strings.Contains(ts.Name, "some_id=\"123\"") { + foundCounterFeedA123 = true + require.Equal(t, "7", ts.Source, "Expected source to be node ID 7") + require.Len(t, ts.Datapoints, 1) + require.Equal(t, float64(100), ts.Datapoints[0].Value) + } + if strings.Contains(ts.Name, "feed_id=\"feed-b\"") && strings.Contains(ts.Name, "some_id=\"456\"") { + foundCounterFeedB456 = true + require.Equal(t, "7", ts.Source, "Expected source to be node ID 7") + require.Len(t, ts.Datapoints, 1) + require.Equal(t, float64(200), ts.Datapoints[0].Value) + } + } + + // Check for changefeed.agg_gauge with db and status labels + if strings.Contains(ts.Name, "cr.node.changefeed.agg_gauge") { + if strings.Contains(ts.Name, "db=\"mine\"") && strings.Contains(ts.Name, "status__=\"active\"") { + foundGaugeMineActive = true + require.Equal(t, "7", ts.Source, "Expected source to be node ID 7") + require.Len(t, ts.Datapoints, 1) + require.Equal(t, float64(1), ts.Datapoints[0].Value) + } + if strings.Contains(ts.Name, "db=\"yours\"") && strings.Contains(ts.Name, "status__=\"paused\"") { + foundGaugeYoursPaused = true + require.Equal(t, "7", ts.Source, "Expected source to be node ID 7") + require.Len(t, ts.Datapoints, 1) + require.Equal(t, float64(0), ts.Datapoints[0].Value) + } + } + } + + require.True(t, foundCounterFeedA123, "Expected to find changefeed.agg_counter with some_id=123 and feed_id=feed-a") + require.True(t, foundCounterFeedB456, "Expected to find changefeed.agg_counter with some_id=456 and feed_id=feed-b") + require.True(t, foundGaugeMineActive, "Expected to find changefeed.agg_gauge with db=mine and status=active") + require.True(t, foundGaugeYoursPaused, "Expected to find changefeed.agg_gauge with db=yours and status=paused") + + // ======================================== + // Verify that tenant changefeed child metrics are collected with proper source + // ======================================== + + // Add changefeed aggmetrics to the tenant registry with EnableLowFreqChildCollection + enableLowFreqChildCollection := true + tenantAggCounter := aggmetric.NewCounter( + metric.Metadata{ + Name: "changefeed.emitted_messages", + EnableLowFreqChildCollection: &enableLowFreqChildCollection, + }, + "scope", "feed_id", + ) + regTenant.AddMetric(tenantAggCounter) + + // Add child metrics for different changefeeds in the tenant + tenantChild1 := tenantAggCounter.AddChild("default", "tenant-feed-1") + tenantChild1.Inc(500) + tenantChild2 := tenantAggCounter.AddChild("user", "tenant-feed-2") + tenantChild2.Inc(750) + + tenantAggGauge := aggmetric.NewGauge( + metric.Metadata{ + Name: "changefeed.running", + EnableLowFreqChildCollection: &enableLowFreqChildCollection, + }, + "scope", + ) + regTenant.AddMetric(tenantAggGauge) + + tenantChild3 := tenantAggGauge.AddChild("default") + tenantChild3.Update(2) + tenantChild4 := tenantAggGauge.AddChild("user") + tenantChild4.Update(1) + + // Get time series data with child metrics enabled + tenantChildData := recorder.GetTimeSeriesData(true) + + var foundTenantCounterFeed1 bool + var foundTenantCounterFeed2 bool + var foundTenantGaugeDefault bool + var foundTenantGaugeUser bool + + for _, ts := range tenantChildData { + // Verify tenant changefeed metrics have correct source (7-123 for node 7, tenant 123) + if strings.Contains(ts.Name, "cr.node.changefeed.emitted_messages") { + if strings.Contains(ts.Name, `scope="default"`) && strings.Contains(ts.Name, `feed_id="tenant-feed-1"`) { + foundTenantCounterFeed1 = true + require.Equal(t, "7-123", ts.Source, "Expected tenant source format node-tenant") + require.Len(t, ts.Datapoints, 1) + require.Equal(t, float64(500), ts.Datapoints[0].Value) + } + if strings.Contains(ts.Name, `scope="user"`) && strings.Contains(ts.Name, `feed_id="tenant-feed-2"`) { + foundTenantCounterFeed2 = true + require.Equal(t, "7-123", ts.Source, "Expected tenant source format node-tenant") + require.Len(t, ts.Datapoints, 1) + require.Equal(t, float64(750), ts.Datapoints[0].Value) + } + } + + if strings.Contains(ts.Name, "cr.node.changefeed.running") { + if strings.Contains(ts.Name, `scope="default"`) { + foundTenantGaugeDefault = true + require.Equal(t, "7-123", ts.Source, "Expected tenant source format node-tenant") + require.Len(t, ts.Datapoints, 1) + require.Equal(t, float64(2), ts.Datapoints[0].Value) + } + if strings.Contains(ts.Name, `scope="user"`) { + foundTenantGaugeUser = true + require.Equal(t, "7-123", ts.Source, "Expected tenant source format node-tenant") + require.Len(t, ts.Datapoints, 1) + require.Equal(t, float64(1), ts.Datapoints[0].Value) + } + } + } + + require.True(t, foundTenantCounterFeed1, "Expected to find tenant changefeed.emitted_messages with scope=default and feed_id=tenant-feed-1") + require.True(t, foundTenantCounterFeed2, "Expected to find tenant changefeed.emitted_messages with scope=user and feed_id=tenant-feed-2") + require.True(t, foundTenantGaugeDefault, "Expected to find tenant changefeed.running with scope=default") + require.True(t, foundTenantGaugeUser, "Expected to find tenant changefeed.running with scope=user") } func TestRegistryRecorder_RecordChild(t *testing.T) { @@ -763,3 +922,213 @@ func BenchmarkExtractValueAllocs(b *testing.B) { } b.ReportAllocs() } + +func TestRecordChangefeedChildMetrics(t *testing.T) { + defer leaktest.AfterTest(t)() + + manual := timeutil.NewManualTime(timeutil.Unix(0, 100)) + + t.Run("empty registry", func(t *testing.T) { + reg := metric.NewRegistry() + recorder := registryRecorder{ + registry: reg, + format: nodeTimeSeriesPrefix, + source: "test-source", + timestampNanos: manual.Now().UnixNano(), + } + + var dest []tspb.TimeSeriesData + recorder.recordChangefeedChildMetrics(&dest) + + require.Empty(t, dest) + }) + + t.Run("non-changefeed metrics ignored", func(t *testing.T) { + reg := metric.NewRegistry() + + // Add non-changefeed metrics + gauge := metric.NewGauge(metric.Metadata{Name: "sql.connections"}) + reg.AddMetric(gauge) + gauge.Update(10) + + counter := metric.NewCounter(metric.Metadata{Name: "kv.requests"}) + reg.AddMetric(counter) + counter.Inc(5) + + recorder := registryRecorder{ + registry: reg, + format: nodeTimeSeriesPrefix, + source: "test-source", + timestampNanos: manual.Now().UnixNano(), + } + + var dest []tspb.TimeSeriesData + recorder.recordChangefeedChildMetrics(&dest) + + require.Empty(t, dest) + }) + + t.Run("changefeed metrics without child collection", func(t *testing.T) { + reg := metric.NewRegistry() + + // Add changefeed aggmetric with child collection explicitly disabled + enableLowFreqChildCollection := false + gauge := aggmetric.NewGauge( + metric.Metadata{ + Name: "changefeed.test_metric", + EnableLowFreqChildCollection: &enableLowFreqChildCollection, + }, + "job_id", "feed_id", + ) + reg.AddMetric(gauge) + + // Add child metrics with labels + child1 := gauge.AddChild("123", "abc") + child1.Update(50) + + recorder := registryRecorder{ + registry: reg, + format: nodeTimeSeriesPrefix, + source: "test-source", + timestampNanos: manual.Now().UnixNano(), + } + + var dest []tspb.TimeSeriesData + recorder.recordChangefeedChildMetrics(&dest) + + // Should be empty since EnableLowFreqChildCollection is false + require.Empty(t, dest) + }) + + t.Run("changefeed aggmetrics with child collection", func(t *testing.T) { + reg := metric.NewRegistry() + + // Create an aggmetric which supports child metrics + // Note: aggmetrics automatically have child collection capabilities + gauge := aggmetric.NewGauge(metric.Metadata{Name: "changefeed.test_metric"}, "job_id", "feed_id") + reg.AddMetric(gauge) + + // Add child metrics with labels + child1 := gauge.AddChild("123", "abc") + child1.Update(50) + + child2 := gauge.AddChild("456", "def") + child2.Update(75) + + recorder := registryRecorder{ + registry: reg, + format: nodeTimeSeriesPrefix, + source: "test-source", + timestampNanos: manual.Now().UnixNano(), + } + + var dest []tspb.TimeSeriesData + recorder.recordChangefeedChildMetrics(&dest) + + // Should have data for child metrics (aggmetrics have child collection enabled by default) + require.Len(t, dest, 2) + + // Verify the data structure + for _, data := range dest { + require.Contains(t, data.Name, "changefeed.test_metric") + require.Equal(t, "test-source", data.Source) + require.Len(t, data.Datapoints, 1) + require.Equal(t, manual.Now().UnixNano(), data.Datapoints[0].TimestampNanos) + require.Contains(t, []float64{50, 75}, data.Datapoints[0].Value) + } + }) + + t.Run("cardinality limit enforcement", func(t *testing.T) { + reg := metric.NewRegistry() + + gauge := aggmetric.NewGauge(metric.Metadata{Name: "changefeed.high_cardinality_metric"}, "job_id") + reg.AddMetric(gauge) + + // Add more than 1024 child metrics to test the limit + for i := 0; i < 1500; i++ { + child := gauge.AddChild(strconv.Itoa(i)) + child.Update(int64(i)) + } + + recorder := registryRecorder{ + registry: reg, + format: nodeTimeSeriesPrefix, + source: "test-source", + timestampNanos: manual.Now().UnixNano(), + } + + var dest []tspb.TimeSeriesData + recorder.recordChangefeedChildMetrics(&dest) + + // Should be limited to 1024 child metrics + require.Len(t, dest, 1024) + }) + + t.Run("label sanitization and sorting", func(t *testing.T) { + reg := metric.NewRegistry() + + gauge := aggmetric.NewGauge(metric.Metadata{Name: "changefeed.label_test"}, "job-id", "feed.name") + reg.AddMetric(gauge) + + // Add child with labels that need sanitization + child := gauge.AddChild("test@123", "my-feed!") + child.Update(42) + + recorder := registryRecorder{ + registry: reg, + format: nodeTimeSeriesPrefix, + source: "test-source", + timestampNanos: manual.Now().UnixNano(), + } + + var dest []tspb.TimeSeriesData + recorder.recordChangefeedChildMetrics(&dest) + + require.Len(t, dest, 1) + + // Verify that the metric name contains sanitized labels + metricName := dest[0].Name + require.Contains(t, metricName, "changefeed.label_test{feed_name=\"my-feed!\",job_id=\"test@123\"}") + }) + + t.Run("counter and gauge support", func(t *testing.T) { + reg := metric.NewRegistry() + + // Test with gauge + gauge := aggmetric.NewGauge(metric.Metadata{Name: "changefeed.gauge_metric"}, "type") + reg.AddMetric(gauge) + gaugeChild := gauge.AddChild("gauge") + gaugeChild.Update(100) + + // Test with counter + counter := aggmetric.NewCounter(metric.Metadata{Name: "changefeed.counter_metric"}, "type") + reg.AddMetric(counter) + counterChild := counter.AddChild("counter") + counterChild.Inc(50) + + recorder := registryRecorder{ + registry: reg, + format: nodeTimeSeriesPrefix, + source: "test-source", + timestampNanos: manual.Now().UnixNano(), + } + + var dest []tspb.TimeSeriesData + recorder.recordChangefeedChildMetrics(&dest) + + require.Len(t, dest, 2) + + // Find gauge and counter data points + var gaugeValue, counterValue float64 + for _, data := range dest { + if data.Datapoints[0].Value == 100 { + gaugeValue = data.Datapoints[0].Value + } else if data.Datapoints[0].Value == 50 { + counterValue = data.Datapoints[0].Value + } + } + + require.Equal(t, float64(100), gaugeValue) + require.Equal(t, float64(50), counterValue) + }) +} diff --git a/pkg/ts/keys_test.go b/pkg/ts/keys_test.go index b3820bc4b1eb..7615c28e0173 100644 --- a/pkg/ts/keys_test.go +++ b/pkg/ts/keys_test.go @@ -72,6 +72,14 @@ func TestDataKeys(t *testing.T) { 12, "/System/tsd//1m/1924-09-18T08:00:00Z/", }, + { + "metric.with:special/chars{label=\"@@ αβ_ !星\",}", + "source:with/special-chars", + 0, + Resolution10s, + 83, + "/System/tsd/metric.with:special/chars{label=\"@@ αβ_ !星\",}/10s/1970-01-01T00:00:00Z/source:with/special-chars", + }, } for i, tc := range testCases { diff --git a/pkg/util/metric/metric.go b/pkg/util/metric/metric.go index 7d151392fb63..bc5ad7994fed 100644 --- a/pkg/util/metric/metric.go +++ b/pkg/util/metric/metric.go @@ -230,11 +230,20 @@ func (m *Metadata) GetLabels(useStaticLabels bool) []*prometheusgo.LabelPair { return lps } +// Returns the value for EnableLowFreqChildCollection, +// defaults to True when it is not supplied. +func (m *Metadata) GetEnableLowFreqChildCollection() bool { + if m.EnableLowFreqChildCollection == nil { + return true + } + return *m.EnableLowFreqChildCollection +} + // AddLabel adds a label/value pair for this metric. func (m *Metadata) AddLabel(name, value string) { m.Labels = append(m.Labels, &LabelPair{ - Name: proto.String(exportedLabel(name)), + Name: proto.String(ExportedLabel(name)), Value: proto.String(value), }) } diff --git a/pkg/util/metric/metric.proto b/pkg/util/metric/metric.proto index b2d6387d7139..84019f71d0dc 100644 --- a/pkg/util/metric/metric.proto +++ b/pkg/util/metric/metric.proto @@ -117,6 +117,10 @@ message Metadata { // field is only output to metrics.yaml for documentation purposes, // with plans for future use in automated dashboard generation. required string how_to_use = 12 [(gogoproto.nullable) = false]; + // enable_low_freq_child_collection enables child metrics to be collected + // in the low-frequency child metrics poller. When true, child metrics with + // labels will be recorded in the time series database. + optional bool enable_low_freq_child_collection = 13; - // Next ID: 10 (then 13). + // Next ID: 10 (then 14). } diff --git a/pkg/util/metric/registry.go b/pkg/util/metric/registry.go index 928c91c0ca30..c5375f11c06d 100644 --- a/pkg/util/metric/registry.go +++ b/pkg/util/metric/registry.go @@ -79,7 +79,7 @@ func NewRegistry() *Registry { func (r *Registry) AddLabel(name string, value interface{}) { r.Lock() defer r.Unlock() - r.labels = append(r.labels, labelPair{name: exportedLabel(name), value: value}) + r.labels = append(r.labels, labelPair{name: ExportedLabel(name), value: value}) r.computedLabels = append(r.computedLabels, &prometheusgo.LabelPair{}) } @@ -288,8 +288,8 @@ func ExportedName(name string) string { return prometheusNameReplaceRE.ReplaceAllString(name, "_") } -// exportedLabel takes a metric name and generates a valid prometheus name. -func exportedLabel(name string) string { +// ExportedLabel takes a metric label and generates a valid prometheus label name. +func ExportedLabel(name string) string { return prometheusLabelReplaceRE.ReplaceAllString(name, "_") }