Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions exporter/awsemfexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ type Config struct {
// MetricDeclarations is the list of rules to be used to set dimensions for exported metrics.
MetricDeclarations []*MetricDeclaration `mapstructure:"metric_declarations"`

// List of string denoting Gauge metric names required for compaction to values and counts
GaugeMetricsToCompact []string `mapstructure:"gauge_metrics_to_compact"`

// MetricDescriptors is the list of override metric descriptors that are sent to the CloudWatch
MetricDescriptors []MetricDescriptor `mapstructure:"metric_descriptors"`

Expand Down
217 changes: 157 additions & 60 deletions exporter/awsemfexporter/grouped_metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package awsemfexporter // import "github.com/open-telemetry/opentelemetry-collec

import (
"encoding/json"
"math"
"strings"

"go.opentelemetry.io/collector/pdata/pmetric"
Expand Down Expand Up @@ -41,50 +42,20 @@ func addToGroupedMetric(
return nil
}

for i := 0; i < dps.Len(); i++ {
// Drop stale or NaN metric values
if isStaleNanInf, attrs := dps.IsStaleNaNInf(i); isStaleNanInf {
if config != nil && config.logger != nil {
config.logger.Debug("dropped metric with nan value",
zap.String("metric.name", pmd.Name()),
zap.Any("metric.attributes", attrs))
}
continue
}
dps, retained := dps.CalculateDeltaDatapoints(i, metadata.instrumentationScopeName, config.DetailedMetrics, calculators)
if !retained {
continue
}

for i, dp := range dps {
labels := dp.labels

if metricType, ok := labels["Type"]; ok {
if (metricType == "Pod" || metricType == "Container") && config.EKSFargateContainerInsightsEnabled {
addKubernetesWrapper(labels)
}
}

// if patterns were found in config file and weren't replaced by resource attributes, replace those patterns with metric labels.
// if patterns are provided for a valid key and that key doesn't exist in the resource attributes, it is replaced with `undefined`.
if !patternReplaceSucceeded {
if strings.Contains(metadata.logGroup, "undefined") {
metadata.logGroup, _ = replacePatterns(config.LogGroupName, labels, config.logger)
}
if strings.Contains(metadata.logStream, "undefined") {
metadata.logStream, _ = replacePatterns(config.LogStreamName, labels, config.logger)
}
}

metric := &metricInfo{
value: dp.value,
unit: translateUnit(pmd, descriptor),
}
filteredDps := filterAndCalculateDps(dps, pmd.Name(), metadata, config, calculators)

if shouldCompactMetrics(pmd, config) {
compactedMetrics, labels, updatedMetadata := compactGaugeMetrics(filteredDps, metadata, patternReplaceSucceeded, config)
if compactedMetrics != nil {
upsertGroupedMetric(groupedMetrics, updatedMetadata, labels, pmd.Name(), compactedMetrics, translateUnit(pmd, descriptor), config.logger)
}
} else {
for i, dp := range filteredDps {
labels := enrichLabels(dp, config)
metadata = replacePatternsIfNeeded(metadata, labels, config, patternReplaceSucceeded)
if dp.timestampMs > 0 {
metadata.timestampMs = dp.timestampMs
}

// Extra params to use when grouping metrics
if metadata.metricDataType != pmetric.MetricTypeSummary || !config.DetailedMetrics {
// Summary metrics can be split into separate datapoints when using DetailedMetrics, but we still want to group
Expand All @@ -99,28 +70,10 @@ func addToGroupedMetric(
metadata.metricDataType = pmetric.MetricTypeSum
}
}

groupKey := aws.NewKey(metadata.groupedMetricMetadata, labels)
if _, ok := groupedMetrics[groupKey]; ok {
// if MetricName already exists in metrics map, print warning log
if _, ok := groupedMetrics[groupKey].metrics[dp.name]; ok {
config.logger.Warn(
"Duplicate metric found",
zap.String("Name", dp.name),
zap.Any("Labels", labels),
)
} else {
groupedMetrics[groupKey].metrics[dp.name] = metric
}
} else {
groupedMetrics[groupKey] = &groupedMetric{
labels: labels,
metrics: map[string]*metricInfo{(dp.name): metric},
metadata: metadata,
}
}
upsertGroupedMetric(groupedMetrics, metadata, labels, dp.name, dp.value, translateUnit(pmd, descriptor), config.logger)
}
}

return nil
}

Expand Down Expand Up @@ -224,3 +177,147 @@ func translateUnit(metric pmetric.Metric, descriptor map[string]MetricDescriptor
}
return unit
}

func shouldCompactMetrics(pmd pmetric.Metric, config *Config) bool {
if pmd.Type() != pmetric.MetricTypeGauge {
return false
}
// Check if the current metric is in the GaugeMetricsToCompact list
for _, name := range config.GaugeMetricsToCompact {
if name == pmd.Name() {
return true
}
}
return false
}

func filterAndCalculateDps(dps dataPoints, metricName string, metadata cWMetricMetadata, config *Config, calculators *emfCalculators) []dataPoint {
var result []dataPoint
for i := 0; i < dps.Len(); i++ {
if isStale, attrs := dps.IsStaleNaNInf(i); isStale {
if config != nil && config.logger != nil {
config.logger.Debug("dropped metric with nan value",
zap.String("metric.name", metricName),
zap.Any("metric.attributes", attrs))
}
continue
}
calculated, retained := dps.CalculateDeltaDatapoints(i, metadata.instrumentationScopeName, config.DetailedMetrics, calculators)
if retained {
result = append(result, calculated...)
}
}
return result
}

func enrichLabels(dp dataPoint, config *Config) map[string]string {
labels := dp.labels
if metricType, ok := labels["Type"]; ok {
if (metricType == "Pod" || metricType == "Container") && config.EKSFargateContainerInsightsEnabled {
addKubernetesWrapper(labels)
}
}
return labels
}

func replacePatternsIfNeeded(metadata cWMetricMetadata, labels map[string]string, config *Config, patternReplaceSucceeded bool) cWMetricMetadata {
if !patternReplaceSucceeded {
if strings.Contains(metadata.logGroup, "undefined") {
metadata.logGroup, _ = replacePatterns(config.LogGroupName, labels, config.logger)
}
if strings.Contains(metadata.logStream, "undefined") {
metadata.logStream, _ = replacePatterns(config.LogStreamName, labels, config.logger)
}
}
return metadata
}

func upsertGroupedMetric(
groupedMetrics map[any]*groupedMetric,
metadata cWMetricMetadata,
labels map[string]string,
metricName string,
metricVal any,
unit string,
logger *zap.Logger,
) {
metric := &metricInfo{value: metricVal, unit: unit}
groupKey := aws.NewKey(metadata.groupedMetricMetadata, labels)

if _, ok := groupedMetrics[groupKey]; ok {
// if MetricName already exists in metrics map, print warning log
if _, ok := groupedMetrics[groupKey].metrics[metricName]; ok {
logger.Warn("Duplicate metric found", zap.String("Name", metricName), zap.Any("Labels", labels))
} else {
groupedMetrics[groupKey].metrics[metricName] = metric
}
} else {
groupedMetrics[groupKey] = &groupedMetric{
labels: labels,
metrics: map[string]*metricInfo{metricName: metric},
metadata: metadata,
}
}
}

// compactGaugeMetrics converts a collection of gauge data points into a compact representation, e.g. values and counts.
func compactGaugeMetrics(
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

iiuc it's not really compacting gauges but converting to CWHistogram. The name could be more aligned with what it actually does like convertGaugesToCWHistogram or something

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously I named it as buildHistogram, but from Rick's perspective, it should be a kind of compaction or deduplication, so renamed all occurrence of Histogram to Compact. Pasted the comment from Rick for reference:

Correct me if I'm wrong, but if this is the core of the aggregation logic, then this isn't really creating a histogram. It's aggregating to a series of value/count pairs for each unique floating point value. Considering the precision of float64, I dont think we'll get much aggregation out of it.

If we wanted to create an actual histogram that aggregates a range of datapoints, we'd need define bucket where each bucket represents a range of values, store all of the incoming datapoints into those buckets, and then convert the buckets to values/counts.

dps []dataPoint,
metadata cWMetricMetadata,
patternReplaceSucceeded bool,
config *Config,
) (compactedMetrics *cWMetricHistogram, labels map[string]string, updatedMetadata cWMetricMetadata) {
var values []float64
var timestampMs int64

// Extract float values from data points and find the latest timestamp
for _, dp := range dps {
if dp.timestampMs > timestampMs {
timestampMs = dp.timestampMs
}
if v, ok := dp.value.(float64); ok {
values = append(values, v)
labels = enrichLabels(dp, config)
}
}

if len(values) == 0 {
return nil, nil, metadata
}

updatedMetadata = replacePatternsIfNeeded(metadata, labels, config, patternReplaceSucceeded)
updatedMetadata.metricDataType = pmetric.MetricTypeGauge
updatedMetadata.timestampMs = timestampMs

compactedMetrics = &cWMetricHistogram{
Values: []float64{},
Counts: []float64{},
Count: uint64(len(values)),
Sum: 0,
Min: math.MaxFloat64,
Max: -math.MaxFloat64,
}

// Calculate sum, min, max and count frequencies for each unique value
countMap := make(map[float64]float64)
for _, v := range values {
compactedMetrics.Sum += v
if v < compactedMetrics.Min {
compactedMetrics.Min = v
}
if v > compactedMetrics.Max {
compactedMetrics.Max = v
}
countMap[v]++

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct me if I'm wrong, but if this is the core of the aggregation logic, then this isn't really creating a histogram. It's aggregating to a series of value/count pairs for each unique floating point value. Considering the precision of float64, I dont think we'll get much aggregation out of it.

If we wanted to create an actual histogram that aggregates a range of datapoints, we'd need define bucket where each bucket represents a range of values, store all of the incoming datapoints into those buckets, and then convert the buckets to values/counts.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are correct, in fact, we are aggregating to cloudwatch histogram instead of opentelemetry histogram. For cw histogram, it's exactly in the formact of values and counts, and cloudwatch backend would do the calcluation for percentile values e.g. P90.

}

// Pre-allocate slices to avoid multiple allocations during append
compactedMetrics.Values = make([]float64, 0, len(countMap))
compactedMetrics.Counts = make([]float64, 0, len(countMap))
for val, cnt := range countMap {
compactedMetrics.Values = append(compactedMetrics.Values, val)
compactedMetrics.Counts = append(compactedMetrics.Counts, cnt)
}

return compactedMetrics, labels, updatedMetadata
}
87 changes: 87 additions & 0 deletions exporter/awsemfexporter/grouped_metric_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -508,6 +508,69 @@ func TestAddToGroupedMetric(t *testing.T) {
}
}
})

t.Run("Gauge metric converted to compacted metric of values and counts ", func(t *testing.T) {
emfCalcs := setupEmfCalculators()
defer require.NoError(t, shutdownEmfCalculators(emfCalcs))

// Create a gauge metric with multiple data points
gpuMetricName := "container_gpu_utilization"
gaugeMetric := generateTestGaugeMetricWithDataPoints(gpuMetricName, doubleValueType, []float64{10, 25, 15, 30, 20})

// Create a config that includes the metric in GaugeMetricsToCompact
cfg := createDefaultConfig().(*Config)
cfg.GaugeMetricsToCompact = []string{gpuMetricName}

// Set up the test
groupedMetrics := make(map[any]*groupedMetric)
rms := gaugeMetric.ResourceMetrics()
ilms := rms.At(0).ScopeMetrics()
metrics := ilms.At(0).Metrics()

// Call addToGroupedMetric
err := addToGroupedMetric(
metrics.At(0),
groupedMetrics,
generateTestMetricMetadata(namespace, timestamp, logGroup, logStreamName, instrumentationLibName, metrics.At(0).Type()),
true,
nil,
cfg,
emfCalcs,
)

// Verify results
assert.NoError(t, err)
assert.Len(t, groupedMetrics, 1, "Should have one grouped metric")

// Get the grouped metric
var group *groupedMetric
for _, g := range groupedMetrics {
group = g
break
}

// Verify the metric was compacted
assert.NotNil(t, group)
assert.Contains(t, group.metrics, gpuMetricName)

// Verify the metric value is in format of values and counts
metricInfo := group.metrics[gpuMetricName]
compactedMetrics, ok := metricInfo.value.(*cWMetricHistogram)
assert.True(t, ok, "Metric should be converted to a format of values and counts")

// Verify compactedMetrics properties
assert.Equal(t, uint64(5), compactedMetrics.Count, "CompactedMetrics should have count of 5")
assert.Equal(t, 100.0, compactedMetrics.Sum, "CompactedMetrics sum should match the sum of all values")
assert.Equal(t, 10.0, compactedMetrics.Min, "CompactedMetrics min should match the minimum value")
assert.Equal(t, 30.0, compactedMetrics.Max, "CompactedMetrics max should match the maximum value")

// Check that we have the right number of unique values and counts
assert.Len(t, compactedMetrics.Values, 5, "CompactedMetrics should have 5 unique values")
assert.Len(t, compactedMetrics.Counts, 5, "CompactedMetrics should have 5 counts")

// Verify the metadata was updated
assert.Equal(t, pmetric.MetricTypeGauge, group.metadata.metricDataType)
})
}

func TestAddKubernetesWrapper(t *testing.T) {
Expand Down Expand Up @@ -630,3 +693,27 @@ func generateTestMetricMetadata(namespace string, timestamp int64, logGroup, log
instrumentationScopeName: instrumentationScopeName,
}
}

func generateTestGaugeMetricWithDataPoints(name string, valueType metricValueType, values []float64) pmetric.Metrics {
otelMetrics := pmetric.NewMetrics()
rs := otelMetrics.ResourceMetrics().AppendEmpty()
metrics := rs.ScopeMetrics().AppendEmpty().Metrics()
metric := metrics.AppendEmpty()
metric.SetName(name)
metric.SetUnit("Count")
gaugeMetric := metric.SetEmptyGauge()

for _, val := range values {
gaugeDatapoint := gaugeMetric.DataPoints().AppendEmpty()
gaugeDatapoint.Attributes().PutStr("label1", "value1")

switch valueType {
case doubleValueType:
gaugeDatapoint.SetDoubleValue(val)
default:
gaugeDatapoint.SetIntValue(int64(val))
}
}

return otelMetrics
}