Skip to content

Commit 4748184

Browse files
committed
ts: record child metrics with low frequency poller
This change adds the ability to get changefeed child metrics to be recorded. The child metrics have their names augmented with their labels. Epic: CRDB-55079 Release: None
1 parent 032bf32 commit 4748184

File tree

6 files changed

+428
-6
lines changed

6 files changed

+428
-6
lines changed

pkg/server/status/recorder.go

Lines changed: 128 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,9 @@ import (
1414
"math"
1515
"os"
1616
"runtime"
17+
"sort"
1718
"strconv"
19+
"strings"
1820
"sync"
1921
"sync/atomic"
2022
"time"
@@ -40,6 +42,7 @@ import (
4042
"github.com/cockroachdb/cockroach/pkg/util/hlc"
4143
"github.com/cockroachdb/cockroach/pkg/util/humanizeutil"
4244
"github.com/cockroachdb/cockroach/pkg/util/log"
45+
4346
// Import the logmetrics package to trigger its own init function, which inits and injects
4447
// metrics functionality into pkg/util/log.
4548
_ "github.com/cockroachdb/cockroach/pkg/util/log/logmetrics"
@@ -455,7 +458,42 @@ func (mr *MetricsRecorder) GetTimeSeriesData(childMetrics bool) []tspb.TimeSerie
455458
if !ChildMetricsStorageEnabled.Get(&mr.settings.SV) {
456459
return nil
457460
}
458-
return nil // TODO(jasonlmfong): to be implemented
461+
462+
mr.mu.RLock()
463+
defer mr.mu.RUnlock()
464+
465+
if mr.mu.nodeRegistry == nil {
466+
// We haven't yet processed initialization information; do nothing.
467+
if log.V(1) {
468+
log.Dev.Warning(context.TODO(), "MetricsRecorder.GetChildMetricsData() called before NodeID allocation")
469+
}
470+
return nil
471+
}
472+
473+
data := make([]tspb.TimeSeriesData, 0)
474+
475+
// Record child metrics from app registry for system tenant only.
476+
now := mr.clock.Now()
477+
recorder := registryRecorder{
478+
registry: mr.mu.appRegistry,
479+
format: nodeTimeSeriesPrefix,
480+
source: mr.mu.desc.NodeID.String(),
481+
timestampNanos: now.UnixNano(),
482+
}
483+
recorder.recordChangefeedChildMetrics(&data)
484+
485+
// Record child metrics from app-level registries for secondary tenants
486+
for tenantID, r := range mr.mu.tenantRegistries {
487+
tenantRecorder := registryRecorder{
488+
registry: r,
489+
format: nodeTimeSeriesPrefix,
490+
source: tsutil.MakeTenantSource(mr.mu.desc.NodeID.String(), tenantID.String()),
491+
timestampNanos: now.UnixNano(),
492+
}
493+
tenantRecorder.recordChangefeedChildMetrics(&data)
494+
}
495+
496+
return data
459497
}
460498

461499
lastDataCount := atomic.LoadInt64(&mr.lastDataCount)
@@ -906,6 +944,95 @@ func (rr registryRecorder) recordChild(
906944
})
907945
}
908946

947+
// recordChangefeedChildMetrics iterates through changefeed metrics in the registry and processes child metrics
948+
// for those that have EnableLowFreqChildCollection set to true in their metadata.
949+
// Records up to 1024 child metrics to prevent unbounded memory usage and performance issues.
950+
//
951+
// NB: Only available for Counter and Gauge metrics.
952+
func (rr registryRecorder) recordChangefeedChildMetrics(
953+
dest *[]tspb.TimeSeriesData,
954+
) {
955+
maxChildMetrics := 1024
956+
957+
var childMetricsCount int
958+
labels := rr.registry.GetLabels()
959+
rr.registry.Each(func(name string, v interface{}) {
960+
// Filter for changefeed metrics only
961+
if !strings.HasPrefix(name, "changefeed.") {
962+
return
963+
}
964+
965+
prom, ok := v.(metric.PrometheusExportable)
966+
if !ok {
967+
return
968+
}
969+
promIter, ok := v.(metric.PrometheusIterable)
970+
if !ok {
971+
return
972+
}
973+
974+
// Check if the metric has child collection enabled in its metadata
975+
if iterable, ok := v.(metric.Iterable); ok {
976+
metadata := iterable.GetMetadata()
977+
if !metadata.GetEnableLowFreqChildCollection() {
978+
return // Skip this metric if child collection is not enabled
979+
}
980+
} else {
981+
// If we can't get metadata, skip child collection for safety
982+
return
983+
}
984+
m := prom.ToPrometheusMetric()
985+
m.Label = append(labels, prom.GetLabels(false /* useStaticLabels */)...)
986+
987+
processChildMetric := func(childMetric *prometheusgo.Metric) {
988+
if childMetricsCount >= maxChildMetrics {
989+
return // Stop processing once we hit the limit
990+
}
991+
992+
// Sanitize and sort labels using the same sanitization as Prometheus export
993+
sanitizedLabels := make([]struct{ key, value string }, len(childMetric.Label))
994+
for i, label := range childMetric.Label {
995+
sanitizedLabels[i].key = metric.ExportedLabel(label.GetName())
996+
sanitizedLabels[i].value = label.GetValue()
997+
}
998+
sort.Slice(sanitizedLabels, func(i, j int) bool {
999+
return sanitizedLabels[i].key < sanitizedLabels[j].key
1000+
})
1001+
1002+
// Build labels suffix using sanitized and sorted key-value pairs
1003+
var labelsSuffix string
1004+
for _, label := range sanitizedLabels {
1005+
labelsSuffix += fmt.Sprintf("%s=\"%s\",", label.key, label.value)
1006+
}
1007+
// Remove trailing comma and wrap with curly braces
1008+
if len(labelsSuffix) > 0 {
1009+
labelsSuffix = "{" + labelsSuffix[:len(labelsSuffix)-1] + "}"
1010+
}
1011+
1012+
var value float64
1013+
if childMetric.Gauge != nil {
1014+
value = *childMetric.Gauge.Value
1015+
} else if childMetric.Counter != nil {
1016+
value = *childMetric.Counter.Value
1017+
} else {
1018+
return
1019+
}
1020+
*dest = append(*dest, tspb.TimeSeriesData{
1021+
Name: fmt.Sprintf(rr.format, prom.GetName(false /* useStaticLabels */)+labelsSuffix),
1022+
Source: rr.source,
1023+
Datapoints: []tspb.TimeSeriesDatapoint{
1024+
{
1025+
TimestampNanos: rr.timestampNanos,
1026+
Value: value,
1027+
},
1028+
},
1029+
})
1030+
childMetricsCount++
1031+
}
1032+
promIter.Each(m.Label, processChildMetric)
1033+
})
1034+
}
1035+
9091036
// GetTotalMemory returns either the total system memory (in bytes) or if
9101037
// possible the cgroups available memory.
9111038
func GetTotalMemory(ctx context.Context) (int64, error) {

0 commit comments

Comments
 (0)