diff --git a/exporter/awsemfexporter/config.go b/exporter/awsemfexporter/config.go index f91fb29f75233..ea43e6c56b6d8 100644 --- a/exporter/awsemfexporter/config.go +++ b/exporter/awsemfexporter/config.go @@ -59,6 +59,9 @@ type Config struct { // MetricDeclarations is the list of rules to be used to set dimensions for exported metrics. MetricDeclarations []*MetricDeclaration `mapstructure:"metric_declarations"` + // List of string denoting Gauge metric names required for compaction to values and counts + GaugeMetricsToCompact []string `mapstructure:"gauge_metrics_to_compact"` + // MetricDescriptors is the list of override metric descriptors that are sent to the CloudWatch MetricDescriptors []MetricDescriptor `mapstructure:"metric_descriptors"` diff --git a/exporter/awsemfexporter/grouped_metric.go b/exporter/awsemfexporter/grouped_metric.go index a5499505cbbf6..c4978080ba5fd 100644 --- a/exporter/awsemfexporter/grouped_metric.go +++ b/exporter/awsemfexporter/grouped_metric.go @@ -5,6 +5,7 @@ package awsemfexporter // import "github.com/open-telemetry/opentelemetry-collec import ( "encoding/json" + "math" "strings" "go.opentelemetry.io/collector/pdata/pmetric" @@ -41,50 +42,20 @@ func addToGroupedMetric( return nil } - for i := 0; i < dps.Len(); i++ { - // Drop stale or NaN metric values - if isStaleNanInf, attrs := dps.IsStaleNaNInf(i); isStaleNanInf { - if config != nil && config.logger != nil { - config.logger.Debug("dropped metric with nan value", - zap.String("metric.name", pmd.Name()), - zap.Any("metric.attributes", attrs)) - } - continue - } - dps, retained := dps.CalculateDeltaDatapoints(i, metadata.instrumentationScopeName, config.DetailedMetrics, calculators) - if !retained { - continue - } - - for i, dp := range dps { - labels := dp.labels - - if metricType, ok := labels["Type"]; ok { - if (metricType == "Pod" || metricType == "Container") && config.EKSFargateContainerInsightsEnabled { - addKubernetesWrapper(labels) - } - } - - // if patterns were found in config file and weren't replaced by resource attributes, replace those patterns with metric labels. - // if patterns are provided for a valid key and that key doesn't exist in the resource attributes, it is replaced with `undefined`. - if !patternReplaceSucceeded { - if strings.Contains(metadata.logGroup, "undefined") { - metadata.logGroup, _ = replacePatterns(config.LogGroupName, labels, config.logger) - } - if strings.Contains(metadata.logStream, "undefined") { - metadata.logStream, _ = replacePatterns(config.LogStreamName, labels, config.logger) - } - } - - metric := &metricInfo{ - value: dp.value, - unit: translateUnit(pmd, descriptor), - } + filteredDps := filterAndCalculateDps(dps, pmd.Name(), metadata, config, calculators) + if shouldCompactMetrics(pmd, config) { + compactedMetrics, labels, updatedMetadata := compactGaugeMetrics(filteredDps, metadata, patternReplaceSucceeded, config) + if compactedMetrics != nil { + upsertGroupedMetric(groupedMetrics, updatedMetadata, labels, pmd.Name(), compactedMetrics, translateUnit(pmd, descriptor), config.logger) + } + } else { + for i, dp := range filteredDps { + labels := enrichLabels(dp, config) + metadata = replacePatternsIfNeeded(metadata, labels, config, patternReplaceSucceeded) if dp.timestampMs > 0 { metadata.timestampMs = dp.timestampMs } - // Extra params to use when grouping metrics if metadata.metricDataType != pmetric.MetricTypeSummary || !config.DetailedMetrics { // Summary metrics can be split into separate datapoints when using DetailedMetrics, but we still want to group @@ -99,28 +70,10 @@ func addToGroupedMetric( metadata.metricDataType = pmetric.MetricTypeSum } } - - groupKey := aws.NewKey(metadata.groupedMetricMetadata, labels) - if _, ok := groupedMetrics[groupKey]; ok { - // if MetricName already exists in metrics map, print warning log - if _, ok := groupedMetrics[groupKey].metrics[dp.name]; ok { - config.logger.Warn( - "Duplicate metric found", - zap.String("Name", dp.name), - zap.Any("Labels", labels), - ) - } else { - groupedMetrics[groupKey].metrics[dp.name] = metric - } - } else { - groupedMetrics[groupKey] = &groupedMetric{ - labels: labels, - metrics: map[string]*metricInfo{(dp.name): metric}, - metadata: metadata, - } - } + upsertGroupedMetric(groupedMetrics, metadata, labels, dp.name, dp.value, translateUnit(pmd, descriptor), config.logger) } } + return nil } @@ -224,3 +177,147 @@ func translateUnit(metric pmetric.Metric, descriptor map[string]MetricDescriptor } return unit } + +func shouldCompactMetrics(pmd pmetric.Metric, config *Config) bool { + if pmd.Type() != pmetric.MetricTypeGauge { + return false + } + // Check if the current metric is in the GaugeMetricsToCompact list + for _, name := range config.GaugeMetricsToCompact { + if name == pmd.Name() { + return true + } + } + return false +} + +func filterAndCalculateDps(dps dataPoints, metricName string, metadata cWMetricMetadata, config *Config, calculators *emfCalculators) []dataPoint { + var result []dataPoint + for i := 0; i < dps.Len(); i++ { + if isStale, attrs := dps.IsStaleNaNInf(i); isStale { + if config != nil && config.logger != nil { + config.logger.Debug("dropped metric with nan value", + zap.String("metric.name", metricName), + zap.Any("metric.attributes", attrs)) + } + continue + } + calculated, retained := dps.CalculateDeltaDatapoints(i, metadata.instrumentationScopeName, config.DetailedMetrics, calculators) + if retained { + result = append(result, calculated...) + } + } + return result +} + +func enrichLabels(dp dataPoint, config *Config) map[string]string { + labels := dp.labels + if metricType, ok := labels["Type"]; ok { + if (metricType == "Pod" || metricType == "Container") && config.EKSFargateContainerInsightsEnabled { + addKubernetesWrapper(labels) + } + } + return labels +} + +func replacePatternsIfNeeded(metadata cWMetricMetadata, labels map[string]string, config *Config, patternReplaceSucceeded bool) cWMetricMetadata { + if !patternReplaceSucceeded { + if strings.Contains(metadata.logGroup, "undefined") { + metadata.logGroup, _ = replacePatterns(config.LogGroupName, labels, config.logger) + } + if strings.Contains(metadata.logStream, "undefined") { + metadata.logStream, _ = replacePatterns(config.LogStreamName, labels, config.logger) + } + } + return metadata +} + +func upsertGroupedMetric( + groupedMetrics map[any]*groupedMetric, + metadata cWMetricMetadata, + labels map[string]string, + metricName string, + metricVal any, + unit string, + logger *zap.Logger, +) { + metric := &metricInfo{value: metricVal, unit: unit} + groupKey := aws.NewKey(metadata.groupedMetricMetadata, labels) + + if _, ok := groupedMetrics[groupKey]; ok { + // if MetricName already exists in metrics map, print warning log + if _, ok := groupedMetrics[groupKey].metrics[metricName]; ok { + logger.Warn("Duplicate metric found", zap.String("Name", metricName), zap.Any("Labels", labels)) + } else { + groupedMetrics[groupKey].metrics[metricName] = metric + } + } else { + groupedMetrics[groupKey] = &groupedMetric{ + labels: labels, + metrics: map[string]*metricInfo{metricName: metric}, + metadata: metadata, + } + } +} + +// compactGaugeMetrics converts a collection of gauge data points into a compact representation, e.g. values and counts. +func compactGaugeMetrics( + dps []dataPoint, + metadata cWMetricMetadata, + patternReplaceSucceeded bool, + config *Config, +) (compactedMetrics *cWMetricHistogram, labels map[string]string, updatedMetadata cWMetricMetadata) { + var values []float64 + var timestampMs int64 + + // Extract float values from data points and find the latest timestamp + for _, dp := range dps { + if dp.timestampMs > timestampMs { + timestampMs = dp.timestampMs + } + if v, ok := dp.value.(float64); ok { + values = append(values, v) + labels = enrichLabels(dp, config) + } + } + + if len(values) == 0 { + return nil, nil, metadata + } + + updatedMetadata = replacePatternsIfNeeded(metadata, labels, config, patternReplaceSucceeded) + updatedMetadata.metricDataType = pmetric.MetricTypeGauge + updatedMetadata.timestampMs = timestampMs + + compactedMetrics = &cWMetricHistogram{ + Values: []float64{}, + Counts: []float64{}, + Count: uint64(len(values)), + Sum: 0, + Min: math.MaxFloat64, + Max: -math.MaxFloat64, + } + + // Calculate sum, min, max and count frequencies for each unique value + countMap := make(map[float64]float64) + for _, v := range values { + compactedMetrics.Sum += v + if v < compactedMetrics.Min { + compactedMetrics.Min = v + } + if v > compactedMetrics.Max { + compactedMetrics.Max = v + } + countMap[v]++ + } + + // Pre-allocate slices to avoid multiple allocations during append + compactedMetrics.Values = make([]float64, 0, len(countMap)) + compactedMetrics.Counts = make([]float64, 0, len(countMap)) + for val, cnt := range countMap { + compactedMetrics.Values = append(compactedMetrics.Values, val) + compactedMetrics.Counts = append(compactedMetrics.Counts, cnt) + } + + return compactedMetrics, labels, updatedMetadata +} diff --git a/exporter/awsemfexporter/grouped_metric_test.go b/exporter/awsemfexporter/grouped_metric_test.go index 6b7dd59a1a34b..b93c5c51a73dd 100644 --- a/exporter/awsemfexporter/grouped_metric_test.go +++ b/exporter/awsemfexporter/grouped_metric_test.go @@ -508,6 +508,69 @@ func TestAddToGroupedMetric(t *testing.T) { } } }) + + t.Run("Gauge metric converted to compacted metric of values and counts ", func(t *testing.T) { + emfCalcs := setupEmfCalculators() + defer require.NoError(t, shutdownEmfCalculators(emfCalcs)) + + // Create a gauge metric with multiple data points + gpuMetricName := "container_gpu_utilization" + gaugeMetric := generateTestGaugeMetricWithDataPoints(gpuMetricName, doubleValueType, []float64{10, 25, 15, 30, 20}) + + // Create a config that includes the metric in GaugeMetricsToCompact + cfg := createDefaultConfig().(*Config) + cfg.GaugeMetricsToCompact = []string{gpuMetricName} + + // Set up the test + groupedMetrics := make(map[any]*groupedMetric) + rms := gaugeMetric.ResourceMetrics() + ilms := rms.At(0).ScopeMetrics() + metrics := ilms.At(0).Metrics() + + // Call addToGroupedMetric + err := addToGroupedMetric( + metrics.At(0), + groupedMetrics, + generateTestMetricMetadata(namespace, timestamp, logGroup, logStreamName, instrumentationLibName, metrics.At(0).Type()), + true, + nil, + cfg, + emfCalcs, + ) + + // Verify results + assert.NoError(t, err) + assert.Len(t, groupedMetrics, 1, "Should have one grouped metric") + + // Get the grouped metric + var group *groupedMetric + for _, g := range groupedMetrics { + group = g + break + } + + // Verify the metric was compacted + assert.NotNil(t, group) + assert.Contains(t, group.metrics, gpuMetricName) + + // Verify the metric value is in format of values and counts + metricInfo := group.metrics[gpuMetricName] + compactedMetrics, ok := metricInfo.value.(*cWMetricHistogram) + assert.True(t, ok, "Metric should be converted to a format of values and counts") + + // Verify compactedMetrics properties + assert.Equal(t, uint64(5), compactedMetrics.Count, "CompactedMetrics should have count of 5") + assert.Equal(t, 100.0, compactedMetrics.Sum, "CompactedMetrics sum should match the sum of all values") + assert.Equal(t, 10.0, compactedMetrics.Min, "CompactedMetrics min should match the minimum value") + assert.Equal(t, 30.0, compactedMetrics.Max, "CompactedMetrics max should match the maximum value") + + // Check that we have the right number of unique values and counts + assert.Len(t, compactedMetrics.Values, 5, "CompactedMetrics should have 5 unique values") + assert.Len(t, compactedMetrics.Counts, 5, "CompactedMetrics should have 5 counts") + + // Verify the metadata was updated + assert.Equal(t, pmetric.MetricTypeGauge, group.metadata.metricDataType) + }) } func TestAddKubernetesWrapper(t *testing.T) { @@ -630,3 +693,27 @@ func generateTestMetricMetadata(namespace string, timestamp int64, logGroup, log instrumentationScopeName: instrumentationScopeName, } } + +func generateTestGaugeMetricWithDataPoints(name string, valueType metricValueType, values []float64) pmetric.Metrics { + otelMetrics := pmetric.NewMetrics() + rs := otelMetrics.ResourceMetrics().AppendEmpty() + metrics := rs.ScopeMetrics().AppendEmpty().Metrics() + metric := metrics.AppendEmpty() + metric.SetName(name) + metric.SetUnit("Count") + gaugeMetric := metric.SetEmptyGauge() + + for _, val := range values { + gaugeDatapoint := gaugeMetric.DataPoints().AppendEmpty() + gaugeDatapoint.Attributes().PutStr("label1", "value1") + + switch valueType { + case doubleValueType: + gaugeDatapoint.SetDoubleValue(val) + default: + gaugeDatapoint.SetIntValue(int64(val)) + } + } + + return otelMetrics +}