Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 125 additions & 1 deletion pkg/server/status/recorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ import (
"math"
"os"
"runtime"
"sort"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -455,7 +457,42 @@ func (mr *MetricsRecorder) GetTimeSeriesData(childMetrics bool) []tspb.TimeSerie
if !ChildMetricsStorageEnabled.Get(&mr.settings.SV) {
return nil
}
return nil // TODO(jasonlmfong): to be implemented

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not for this PR, but its worth thinking about utilities which can be refactored out of this function. There seems to be a lot of repetition.

mr.mu.RLock()
defer mr.mu.RUnlock()

if mr.mu.nodeRegistry == nil {
// We haven't yet processed initialization information; do nothing.
if log.V(1) {
log.Dev.Warning(context.TODO(), "MetricsRecorder.GetChildMetricsData() called before NodeID allocation")
}
return nil
}

data := make([]tspb.TimeSeriesData, 0)

// Record child metrics from app registry for system tenant only.
now := mr.clock.Now()
recorder := registryRecorder{
registry: mr.mu.appRegistry,
format: nodeTimeSeriesPrefix,
source: mr.mu.desc.NodeID.String(),
timestampNanos: now.UnixNano(),
}
recorder.recordChangefeedChildMetrics(&data)

// Record child metrics from app-level registries for secondary tenants
for tenantID, r := range mr.mu.tenantRegistries {
tenantRecorder := registryRecorder{
registry: r,
format: nodeTimeSeriesPrefix,
source: tsutil.MakeTenantSource(mr.mu.desc.NodeID.String(), tenantID.String()),
timestampNanos: now.UnixNano(),
}
tenantRecorder.recordChangefeedChildMetrics(&data)
}
Comment on lines +474 to +493
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changefeed metrics only live on app registry (and tenant app registry)


return data
}

lastDataCount := atomic.LoadInt64(&mr.lastDataCount)
Expand Down Expand Up @@ -906,6 +943,93 @@ func (rr registryRecorder) recordChild(
})
}

// recordChangefeedChildMetrics iterates through changefeed metrics in the registry and processes child metrics
// for those that have EnableLowFreqChildCollection set to true in their metadata.
// Records up to 1024 child metrics to prevent unbounded memory usage and performance issues.
//
// NB: Only available for Counter and Gauge metrics.
func (rr registryRecorder) recordChangefeedChildMetrics(dest *[]tspb.TimeSeriesData) {
maxChildMetrics := 1024

var childMetricsCount int
labels := rr.registry.GetLabels()
rr.registry.Each(func(name string, v interface{}) {
// Filter for changefeed metrics only
if !strings.HasPrefix(name, "changefeed.") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we actually do a check on the metadata further down below? Eg:

metadata := iterable.GetMetadata()
if metadata.Category != metric.Metadata_CHANGEFEEDS {
   return
}

You may need to annotate a few more metrics with the category, but I want to begin enforcing use of the category field. In the future, if we want to track metric storage volume by category, having strict annotation will be useful.

return
}

prom, ok := v.(metric.PrometheusExportable)
if !ok {
return
}
promIter, ok := v.(metric.PrometheusIterable)
if !ok {
return
}

// Check if the metric has child collection enabled in its metadata
if iterable, ok := v.(metric.Iterable); ok {
metadata := iterable.GetMetadata()
if !metadata.GetEnableLowFreqChildCollection() {
return // Skip this metric if child collection is not enabled
}
} else {
// If we can't get metadata, skip child collection for safety
return
}
m := prom.ToPrometheusMetric()
m.Label = append(labels, prom.GetLabels(false /* useStaticLabels */)...)

processChildMetric := func(childMetric *prometheusgo.Metric) {
if childMetricsCount >= maxChildMetrics {
return // Stop processing once we hit the limit
}

// Sanitize and sort labels using the same sanitization as Prometheus export
sanitizedLabels := make([]struct{ key, value string }, len(childMetric.Label))
for i, label := range childMetric.Label {
sanitizedLabels[i].key = metric.ExportedLabel(label.GetName())
sanitizedLabels[i].value = label.GetValue()
}
Comment on lines +991 to +994
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is aligned with prometheus export:
keys are cleaned and values are preserved

sort.Slice(sanitizedLabels, func(i, j int) bool {
return sanitizedLabels[i].key < sanitizedLabels[j].key
})

// Build labels suffix using sanitized and sorted key-value pairs
var labelsSuffix string
for _, label := range sanitizedLabels {
labelsSuffix += fmt.Sprintf("%s=\"%s\",", label.key, label.value)
}
// Remove trailing comma and wrap with curly braces
if len(labelsSuffix) > 0 {
labelsSuffix = "{" + labelsSuffix[:len(labelsSuffix)-1] + "}"
}

var value float64
if childMetric.Gauge != nil {
value = *childMetric.Gauge.Value
} else if childMetric.Counter != nil {
value = *childMetric.Counter.Value
} else {
return
}
*dest = append(*dest, tspb.TimeSeriesData{
Name: fmt.Sprintf(rr.format, prom.GetName(false /* useStaticLabels */)+labelsSuffix),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: suffix encoding and decoding should have their own functions. Could we create something to the effect of:

package metric

func EncodeLabeledName(format string, name string, labels []*io_prometheus_client.LabelPair) string {

}

Somewhere we can place an appropriate metric.DecodeLabeleledName(name string) (string, []*io_prometheus_client.LabelPair) {}?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic should be tight to the point where they may use helper functions like encodeLabels and decodeLabels.

Source: rr.source,
Datapoints: []tspb.TimeSeriesDatapoint{
{
TimestampNanos: rr.timestampNanos,
Value: value,
},
},
})
childMetricsCount++
}
promIter.Each(m.Label, processChildMetric)
})
}

// GetTotalMemory returns either the total system memory (in bytes) or if
// possible the cgroups available memory.
func GetTotalMemory(ctx context.Context) (int64, error) {
Expand Down
Loading