@@ -14,7 +14,9 @@ import (
1414 "math"
1515 "os"
1616 "runtime"
17+ "sort"
1718 "strconv"
19+ "strings"
1820 "sync"
1921 "sync/atomic"
2022 "time"
@@ -455,7 +457,42 @@ func (mr *MetricsRecorder) GetTimeSeriesData(childMetrics bool) []tspb.TimeSerie
455457 if ! ChildMetricsStorageEnabled .Get (& mr .settings .SV ) {
456458 return nil
457459 }
458- return nil // TODO(jasonlmfong): to be implemented
460+
461+ mr .mu .RLock ()
462+ defer mr .mu .RUnlock ()
463+
464+ if mr .mu .nodeRegistry == nil {
465+ // We haven't yet processed initialization information; do nothing.
466+ if log .V (1 ) {
467+ log .Dev .Warning (context .TODO (), "MetricsRecorder.GetChildMetricsData() called before NodeID allocation" )
468+ }
469+ return nil
470+ }
471+
472+ data := make ([]tspb.TimeSeriesData , 0 )
473+
474+ // Record child metrics from app registry for system tenant only.
475+ now := mr .clock .Now ()
476+ recorder := registryRecorder {
477+ registry : mr .mu .appRegistry ,
478+ format : nodeTimeSeriesPrefix ,
479+ source : mr .mu .desc .NodeID .String (),
480+ timestampNanos : now .UnixNano (),
481+ }
482+ recorder .recordChangefeedChildMetrics (& data )
483+
484+ // Record child metrics from app-level registries for secondary tenants
485+ for tenantID , r := range mr .mu .tenantRegistries {
486+ tenantRecorder := registryRecorder {
487+ registry : r ,
488+ format : nodeTimeSeriesPrefix ,
489+ source : tsutil .MakeTenantSource (mr .mu .desc .NodeID .String (), tenantID .String ()),
490+ timestampNanos : now .UnixNano (),
491+ }
492+ tenantRecorder .recordChangefeedChildMetrics (& data )
493+ }
494+
495+ return data
459496 }
460497
461498 lastDataCount := atomic .LoadInt64 (& mr .lastDataCount )
@@ -906,6 +943,93 @@ func (rr registryRecorder) recordChild(
906943 })
907944}
908945
946+ // recordChangefeedChildMetrics iterates through changefeed metrics in the registry and processes child metrics
947+ // for those that have EnableLowFreqChildCollection set to true in their metadata.
948+ // Records up to 1024 child metrics to prevent unbounded memory usage and performance issues.
949+ //
950+ // NB: Only available for Counter and Gauge metrics.
951+ func (rr registryRecorder ) recordChangefeedChildMetrics (dest * []tspb.TimeSeriesData ) {
952+ maxChildMetrics := 1024
953+
954+ var childMetricsCount int
955+ labels := rr .registry .GetLabels ()
956+ rr .registry .Each (func (name string , v interface {}) {
957+ // Filter for changefeed metrics only
958+ if ! strings .HasPrefix (name , "changefeed." ) {
959+ return
960+ }
961+
962+ prom , ok := v .(metric.PrometheusExportable )
963+ if ! ok {
964+ return
965+ }
966+ promIter , ok := v .(metric.PrometheusIterable )
967+ if ! ok {
968+ return
969+ }
970+
971+ // Check if the metric has child collection enabled in its metadata
972+ if iterable , ok := v .(metric.Iterable ); ok {
973+ metadata := iterable .GetMetadata ()
974+ if ! metadata .GetEnableLowFreqChildCollection () {
975+ return // Skip this metric if child collection is not enabled
976+ }
977+ } else {
978+ // If we can't get metadata, skip child collection for safety
979+ return
980+ }
981+ m := prom .ToPrometheusMetric ()
982+ m .Label = append (labels , prom .GetLabels (false /* useStaticLabels */ )... )
983+
984+ processChildMetric := func (childMetric * prometheusgo.Metric ) {
985+ if childMetricsCount >= maxChildMetrics {
986+ return // Stop processing once we hit the limit
987+ }
988+
989+ // Sanitize and sort labels using the same sanitization as Prometheus export
990+ sanitizedLabels := make ([]struct { key , value string }, len (childMetric .Label ))
991+ for i , label := range childMetric .Label {
992+ sanitizedLabels [i ].key = metric .ExportedLabel (label .GetName ())
993+ sanitizedLabels [i ].value = label .GetValue ()
994+ }
995+ sort .Slice (sanitizedLabels , func (i , j int ) bool {
996+ return sanitizedLabels [i ].key < sanitizedLabels [j ].key
997+ })
998+
999+ // Build labels suffix using sanitized and sorted key-value pairs
1000+ var labelsSuffix string
1001+ for _ , label := range sanitizedLabels {
1002+ labelsSuffix += fmt .Sprintf ("%s=\" %s\" ," , label .key , label .value )
1003+ }
1004+ // Remove trailing comma and wrap with curly braces
1005+ if len (labelsSuffix ) > 0 {
1006+ labelsSuffix = "{" + labelsSuffix [:len (labelsSuffix )- 1 ] + "}"
1007+ }
1008+
1009+ var value float64
1010+ if childMetric .Gauge != nil {
1011+ value = * childMetric .Gauge .Value
1012+ } else if childMetric .Counter != nil {
1013+ value = * childMetric .Counter .Value
1014+ } else {
1015+ return
1016+ }
1017+ * dest = append (* dest , tspb.TimeSeriesData {
1018+ Name : fmt .Sprintf (rr .format , prom .GetName (false /* useStaticLabels */ )+ labelsSuffix ),
1019+ Source : rr .source ,
1020+ Datapoints : []tspb.TimeSeriesDatapoint {
1021+ {
1022+ TimestampNanos : rr .timestampNanos ,
1023+ Value : value ,
1024+ },
1025+ },
1026+ })
1027+ childMetricsCount ++
1028+ }
1029+ promIter .Each (m .Label , processChildMetric )
1030+ })
1031+ }
1032+
9091033// GetTotalMemory returns either the total system memory (in bytes) or if
9101034// possible the cgroups available memory.
9111035func GetTotalMemory (ctx context.Context ) (int64 , error ) {
0 commit comments