Skip to content

Commit dc672ff

Browse files
authored
BugFix: Aggregrate neuroncore utilization across core/device pair (#1713)
1 parent 4123ffc commit dc672ff

File tree

2 files changed

+87
-9
lines changed

2 files changed

+87
-9
lines changed

plugins/processors/gpuattributes/internal/awsneuron_metric_modifier.go

Lines changed: 53 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
package internal
55

66
import (
7+
"strconv"
78
"strings"
89

910
"go.opentelemetry.io/collector/pdata/pmetric"
@@ -48,6 +49,8 @@ const (
4849
RuntimeTagOverride = "DEFAULT"
4950
NeuronExecutionErrorsAggregatedMetric = containerinsightscommon.NeuronExecutionErrors + "_total"
5051
NeuronDeviceHardwareEccEventsAggregatedMetric = containerinsightscommon.NeuronDeviceHardwareEccEvents + "_total"
52+
NeuronCoreLabel = "neuroncore"
53+
NeuronCorePerDevice = 2
5154
)
5255

5356
type AwsNeuronMetricModifier struct {
@@ -67,6 +70,11 @@ type MetricDatapointAggregationKey struct {
6770
deviceId string
6871
}
6972

73+
type NeuronCoreUtilizationDatapointAggregationKey struct {
74+
runtimeTag string
75+
coreID string
76+
}
77+
7078
var (
7179
metricModificationsMap = map[string]MetricModifications{
7280
containerinsightscommon.NeuronExecutionErrors: {DuplicationTypes: []string{containerinsightscommon.TypeNode}, UniqueAttribute: ErrorType, LogTypeSuffix: "", Unit: Count},
@@ -135,7 +143,17 @@ func (md *AwsNeuronMetricModifier) ModifyMetric(originalMetric pmetric.Metric, m
135143
keepSpecificDatapointBasedOnAttribute(originalMetric, MemoryLocation, "neuron_device")
136144
}
137145

138-
modifiedMetricSlice := md.extractDatapointsAsMetricsAndAggregate(originalMetric)
146+
var modifiedMetricSlice pmetric.MetricSlice
147+
148+
// For NeuronCoreUtilization metrics, perform additional aggregation to calculate the maximum utilization
149+
// value per core across all datapoints. This ensures we capture peak utilization rather than average values,
150+
// which is more useful for monitoring core performance and potential bottlenecks.
151+
if originalMetric.Name() == containerinsightscommon.NeuronCoreUtilization {
152+
modifiedMetricSlice = md.aggregateCoreUtilizationMetrics(originalMetric)
153+
} else {
154+
modifiedMetricSlice = md.extractDatapointsAsMetricsAndAggregate(originalMetric)
155+
}
156+
139157
md.duplicateMetrics(modifiedMetricSlice, originalMetricName, originalMetric.Sum().DataPoints(), metrics)
140158
}
141159

@@ -200,6 +218,7 @@ func keepSpecificDatapointBasedOnAttribute(originalMetric pmetric.Metric, attrib
200218
func (md *AwsNeuronMetricModifier) extractDatapointsAsMetricsAndAggregate(originalMetric pmetric.Metric) pmetric.MetricSlice {
201219
newMetricSlice := pmetric.NewMetricSlice()
202220
uniqueAttribute := metricModificationsMap[originalMetric.Name()].UniqueAttribute
221+
203222
if uniqueAttribute == "" {
204223
originalMetric.CopyTo(newMetricSlice.AppendEmpty())
205224
return newMetricSlice
@@ -286,6 +305,39 @@ func (md *AwsNeuronMetricModifier) duplicateMetrics(metricsSlice pmetric.MetricS
286305
}
287306
}
288307

308+
func (md *AwsNeuronMetricModifier) aggregateCoreUtilizationMetrics(originalMetric pmetric.Metric) pmetric.MetricSlice {
309+
newMetricSlice := pmetric.NewMetricSlice()
310+
originalMetricDatapoints := originalMetric.Sum().DataPoints()
311+
aggregatedValuesPerCore := map[NeuronCoreUtilizationDatapointAggregationKey]float64{}
312+
for i := 0; i < originalMetricDatapoints.Len(); i++ {
313+
originalDatapoint := originalMetricDatapoints.At(i)
314+
runtimeTag, _ := originalDatapoint.Attributes().Get(RuntimeTag)
315+
coreIDTag, _ := originalDatapoint.Attributes().Get(NeuronCoreLabel)
316+
key := NeuronCoreUtilizationDatapointAggregationKey{runtimeTag: runtimeTag.Str(), coreID: coreIDTag.Str()}
317+
aggregatedValuesPerCore[key] = max(aggregatedValuesPerCore[key], originalDatapoint.DoubleValue(), 0)
318+
}
319+
320+
if len(aggregatedValuesPerCore) == 0 {
321+
return newMetricSlice
322+
}
323+
324+
aggregatedMetric := setMetricMetadata(newMetricSlice.AppendEmpty(), originalMetric.Name(), originalMetric.Unit())
325+
aggregateDatapoints := aggregatedMetric.SetEmptySum().DataPoints()
326+
firstOriginalDatapoint := originalMetricDatapoints.At(0)
327+
// Creating body for the aggregated metric and add it to the new newMetricSlice for each Core
328+
for aggregatedMetricMetadata, value := range aggregatedValuesPerCore {
329+
datapoint := aggregateDatapoints.AppendEmpty()
330+
firstOriginalDatapoint.CopyTo(datapoint)
331+
datapoint.SetDoubleValue(value)
332+
datapoint.Attributes().PutStr(RuntimeTag, aggregatedMetricMetadata.runtimeTag)
333+
datapoint.Attributes().PutStr(NeuronCoreLabel, aggregatedMetricMetadata.coreID)
334+
datapoint.Attributes().PutStr(NeuronCoreAttributeKey, "core"+aggregatedMetricMetadata.coreID)
335+
coreID, _ := strconv.Atoi(aggregatedMetricMetadata.coreID)
336+
datapoint.Attributes().PutStr(NeuronDeviceAttributeKey, "device"+strconv.Itoa(coreID/NeuronCorePerDevice))
337+
}
338+
return newMetricSlice
339+
}
340+
289341
// This method creates new metrics by prefixing the metric name with each k8 concepts (pod, node and container).
290342
// It also adds logTypes to all the metric datapoint attributes.
291343
func duplicateMetricForType(metric pmetric.Metric, duplicateType string, originalMetricName string, metrics pmetric.MetricSlice) {

0 commit comments

Comments
 (0)