Skip to content

Commit 904e345

Browse files
authored
Merge branch 'aws-cwa-dev' into mpmann/more-efa-metrics
2 parents 4b4b3ff + cd34549 commit 904e345

File tree

16 files changed

+639
-57
lines changed

16 files changed

+639
-57
lines changed

exporter/awsemfexporter/emf_exporter.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -231,7 +231,7 @@ func (emf *emfExporter) start(_ context.Context, host component.Host) error {
231231
// Below are optimizatons to minimize amoount of
232232
// metrics processing. We have two scearios
233233
// 1. AppSignal - Only run Process function for AppSignal related useragent
234-
// 2. Enhanced Container Insights - Only run ProcessMetrics function for CI EBS related useragent
234+
// 2. Enhanced Container Insights - Only run ProcessMetrics function for CI related useragent
235235
if emf.config.IsAppSignalsEnabled() || emf.config.IsEnhancedContainerInsights() {
236236
userAgent := useragent.NewUserAgent()
237237
emf.svcStructuredLog.Handlers().Build.PushFrontNamed(userAgent.Handler())

exporter/awsemfexporter/internal/useragent/useragent.go

Lines changed: 24 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,16 @@ const (
2828

2929
// TODO: Available in semconv/v1.21.0+. Replace after collector dependency is v0.91.0+.
3030
attributeTelemetryDistroVersion = "telemetry.distro.version"
31-
32-
attributeEBS = "ci_ebs"
33-
ebsMetricPrefix = "node_diskio_ebs"
31+
attributeEBS = "ci_ebs"
32+
attributeLocalInstanceStore = "ci_lis"
3433
)
3534

35+
// Map of NVMe feature attributes to their corresponding metric prefixes
36+
var featureMetricPrefixes = map[string]string{
37+
attributeEBS: "node_diskio_ebs",
38+
attributeLocalInstanceStore: "node_diskio_instance_store",
39+
}
40+
3641
type UserAgent struct {
3742
mu sync.RWMutex
3843
prebuiltStr string
@@ -95,8 +100,15 @@ func (ua *UserAgent) Process(labels map[string]string) {
95100

96101
// ProcessMetrics checks metric names for specific patterns and updates user agent accordingly
97102
func (ua *UserAgent) ProcessMetrics(metrics pmetric.Metrics) {
98-
// Check if we've already detected NVME
99-
if _, exists := ua.featureList[attributeEBS]; exists {
103+
// Check if all NVME features are already detected
104+
allFeaturesFound := true
105+
for feature := range featureMetricPrefixes {
106+
if _, exists := ua.featureList[feature]; !exists {
107+
allFeaturesFound = false
108+
break
109+
}
110+
}
111+
if allFeaturesFound {
100112
return
101113
}
102114

@@ -107,14 +119,17 @@ func (ua *UserAgent) ProcessMetrics(metrics pmetric.Metrics) {
107119
ms := ilms.At(j).Metrics()
108120
for k := 0; k < ms.Len(); k++ {
109121
metric := ms.At(k)
110-
if strings.HasPrefix(metric.Name(), ebsMetricPrefix) {
111-
ua.featureList[attributeEBS] = struct{}{}
112-
ua.build()
113-
return
122+
for feature, prefix := range featureMetricPrefixes {
123+
if strings.HasPrefix(metric.Name(), prefix) {
124+
if _, exists := ua.featureList[feature]; !exists {
125+
ua.featureList[feature] = struct{}{}
126+
}
127+
}
114128
}
115129
}
116130
}
117131
}
132+
ua.build()
118133
}
119134

120135
// build the user agent string from the items in the cache. Format is telemetry-sdk (<lang1>/<ver1>;<lang2>/<ver2>).

exporter/awsemfexporter/internal/useragent/useragent_test.go

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,10 @@ func TestUserAgent(t *testing.T) {
7575
metrics: []string{"node_diskio_ebs_something"},
7676
want: "feature:(ci_ebs)",
7777
},
78+
"WithLocalInstanceStoreMetrics": {
79+
metrics: []string{"node_diskio_instance_store_something"},
80+
want: "feature:(ci_lis)",
81+
},
7882
"WithBothTelemetryAndEBS": {
7983
labelSets: []map[string]string{
8084
{
@@ -85,14 +89,32 @@ func TestUserAgent(t *testing.T) {
8589
metrics: []string{"node_diskio_ebs_something"},
8690
want: "telemetry-sdk (test/1.0) feature:(ci_ebs)",
8791
},
92+
"WithBothTelemetryAndLocalInstanceStore": {
93+
labelSets: []map[string]string{
94+
{
95+
semconv.AttributeTelemetrySDKLanguage: "test",
96+
attributeTelemetryDistroVersion: "1.0",
97+
},
98+
},
99+
metrics: []string{"node_diskio_instance_store_something"},
100+
want: "telemetry-sdk (test/1.0) feature:(ci_lis)",
101+
},
88102
"WithNonEBSMetrics": {
89103
metrics: []string{"some_other_metric"},
90104
want: "",
91105
},
92-
"WithMultipleFeatures": {
106+
"WithMultipleEBSMetrics": {
93107
metrics: []string{"node_diskio_ebs_something", "node_diskio_ebs_something_else"},
94108
want: "feature:(ci_ebs)",
95109
},
110+
"WithMultipleLocalInstanceStoreMetrics": {
111+
metrics: []string{"node_diskio_instance_store_something", "node_diskio_instance_store_something_else"},
112+
want: "feature:(ci_lis)",
113+
},
114+
"WithMixedEBSAndInstanceStoreMetrics": {
115+
metrics: []string{"node_diskio_ebs_something", "node_diskio_ebs_something", "node_diskio_instance_store_something", "node_diskio_instance_store_something"},
116+
want: "feature:(ci_ebs ci_lis)", // Both features should be detected and sorted alphabetically
117+
},
96118
}
97119
for name, testCase := range testCases {
98120
t.Run(name, func(t *testing.T) {

exporter/awsemfexporter/metric_translator.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,8 @@ func (mt metricTranslator) translateOTelToGroupedMetric(rm pmetric.ResourceMetri
147147
strings.HasPrefix(serviceName.Str(), "containerInsightsDCGMExporterScraper") ||
148148
strings.HasPrefix(serviceName.Str(), "containerInsightsNeuronMonitorScraper") ||
149149
strings.HasPrefix(serviceName.Str(), "containerInsightsKueueMetricsScraper") ||
150-
strings.HasPrefix(serviceName.Str(), "containerInsightsNVMeExporterScraper") {
150+
strings.HasPrefix(serviceName.Str(), "containerInsightsNVMeEBSScraper") ||
151+
strings.HasPrefix(serviceName.Str(), "containerInsightsNVMeLISScraper") {
151152
// the prometheus metrics that come from the container insight receiver need to be clearly tagged as coming from container insights
152153
metricReceiver = containerInsightsReceiver
153154
}

exporter/awsemfexporter/metric_translator_test.go

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -279,7 +279,9 @@ func TestTranslateOtToGroupedMetric(t *testing.T) {
279279
kueueMetric := createTestResourceMetricsHelper(defaultNumberOfTestMetrics + 1)
280280
kueueMetric.Resource().Attributes().PutStr(conventions.AttributeServiceName, "containerInsightsKueueMetricsScraper")
281281
nvmeMetric := createTestResourceMetricsHelper(defaultNumberOfTestMetrics + 1)
282-
nvmeMetric.Resource().Attributes().PutStr(conventions.AttributeServiceName, "containerInsightsNVMeExporterScraper")
282+
nvmeMetric.Resource().Attributes().PutStr(conventions.AttributeServiceName, "containerInsightsNVMeEBSScraper")
283+
nvmeLisMetric := createTestResourceMetricsHelper(defaultNumberOfTestMetrics + 1)
284+
nvmeLisMetric.Resource().Attributes().PutStr(conventions.AttributeServiceName, "containerInsightsNVMeLISScraper")
283285

284286
counterSumMetrics := map[string]*metricInfo{
285287
"spanCounter": {
@@ -407,7 +409,20 @@ func TestTranslateOtToGroupedMetric(t *testing.T) {
407409
map[string]string{
408410
"spanName": "testSpan",
409411
},
410-
"myServiceNS/containerInsightsNVMeExporterScraper",
412+
"myServiceNS/containerInsightsNVMeEBSScraper",
413+
containerInsightsReceiver,
414+
},
415+
{
416+
"nvme lis receiver",
417+
nvmeLisMetric,
418+
map[string]string{
419+
"isItAnError": "false",
420+
"spanName": "testSpan",
421+
},
422+
map[string]string{
423+
"spanName": "testSpan",
424+
},
425+
"myServiceNS/containerInsightsNVMeLISScraper",
411426
containerInsightsReceiver,
412427
},
413428
}

exporter/awsxrayexporter/internal/translator/segment.go

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -334,8 +334,8 @@ func MakeSegment(span ptrace.Span, resource pcommon.Resource, indexedAttrs []str
334334
attributes := span.Attributes()
335335

336336
var (
337-
startTime = timestampToFloatSeconds(span.StartTimestamp())
338-
endTime = timestampToFloatSeconds(span.EndTimestamp())
337+
startTimePtr = awsP.Float64(timestampToFloatSeconds(span.StartTimestamp()))
338+
endTimePtr, inProgress = makeEndTimeAndInProgress(span, attributes)
339339
httpfiltered, http = makeHTTP(span)
340340
isError, isFault, isThrottle, causefiltered, cause = makeCause(span, httpfiltered, resource)
341341
origin = determineAwsOrigin(resource)
@@ -456,8 +456,9 @@ func MakeSegment(span ptrace.Span, resource pcommon.Resource, indexedAttrs []str
456456
ID: awsxray.String(traceutil.SpanIDToHexOrEmptyString(span.SpanID())),
457457
TraceID: awsxray.String(traceID),
458458
Name: awsxray.String(name),
459-
StartTime: awsP.Float64(startTime),
460-
EndTime: awsP.Float64(endTime),
459+
StartTime: startTimePtr,
460+
EndTime: endTimePtr,
461+
InProgress: inProgress,
461462
ParentID: awsxray.String(traceutil.SpanIDToHexOrEmptyString(span.ParentSpanID())),
462463
Fault: awsP.Bool(isFault),
463464
Error: awsP.Bool(isError),
@@ -758,6 +759,17 @@ func fixAnnotationKey(key string) string {
758759
}, key)
759760
}
760761

762+
func makeEndTimeAndInProgress(span ptrace.Span, attributes pcommon.Map) (*float64, *bool) {
763+
if inProgressAttr, ok := attributes.Get(awsxray.AWSXRayInProgressAttribute); ok && inProgressAttr.Type() == pcommon.ValueTypeBool {
764+
inProgressVal := inProgressAttr.Bool()
765+
attributes.Remove(awsxray.AWSXRayInProgressAttribute)
766+
if inProgressVal {
767+
return nil, &inProgressVal
768+
}
769+
}
770+
return awsP.Float64(timestampToFloatSeconds(span.EndTimestamp())), nil
771+
}
772+
761773
func trimAwsSdkPrefix(name string, span ptrace.Span) string {
762774
if isAwsSdkSpan(span) {
763775
if strings.HasPrefix(name, "AWS.SDK.") {

exporter/awsxrayexporter/internal/translator/segment_test.go

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1444,6 +1444,11 @@ func TestLocalRootConsumer(t *testing.T) {
14441444
// Checks these values are the same for both
14451445
assert.Equal(t, segments[0].StartTime, segments[1].StartTime)
14461446
assert.Equal(t, segments[0].EndTime, segments[1].EndTime)
1447+
1448+
// Check that segment EndTime matches span's EndTime
1449+
expectedEndTime := float64(span.EndTimestamp()) / float64(time.Second)
1450+
assert.Equal(t, expectedEndTime, *segments[0].EndTime)
1451+
assert.Equal(t, expectedEndTime, *segments[1].EndTime)
14471452
}
14481453

14491454
func TestNonLocalRootConsumerProcess(t *testing.T) {
@@ -1966,6 +1971,8 @@ func constructSpanAttributes(attributes map[string]any) pcommon.Map {
19661971
attrs.PutInt(key, int64(cast))
19671972
} else if cast, ok := value.(int64); ok {
19681973
attrs.PutInt(key, cast)
1974+
} else if cast, ok := value.(bool); ok {
1975+
attrs.PutBool(key, cast)
19691976
} else if cast, ok := value.([]string); ok {
19701977
slice := attrs.PutEmptySlice(key)
19711978
for _, v := range cast {
@@ -2041,6 +2048,40 @@ func constructTimedEventsWithSentMessageEvent(tm pcommon.Timestamp) ptrace.SpanE
20412048
}
20422049

20432050
// newTraceID generates a new valid X-Ray TraceID
2051+
func TestSpanWithInProgressTrue(t *testing.T) {
2052+
spanName := "/api/test"
2053+
parentSpanID := newSegmentID()
2054+
attributes := make(map[string]any)
2055+
attributes[awsxray.AWSXRayInProgressAttribute] = true
2056+
resource := constructDefaultResource()
2057+
span := constructServerSpan(parentSpanID, spanName, ptrace.StatusCodeOk, "OK", attributes)
2058+
2059+
segment, _ := MakeSegment(span, resource, nil, false, nil, false)
2060+
2061+
assert.NotNil(t, segment)
2062+
assert.NotNil(t, segment.InProgress)
2063+
assert.True(t, *segment.InProgress)
2064+
assert.Nil(t, segment.EndTime)
2065+
assert.Nil(t, segment.Metadata["default"][awsxray.AWSXRayInProgressAttribute])
2066+
}
2067+
2068+
func TestSpanWithInProgressFalse(t *testing.T) {
2069+
spanName := "/api/test"
2070+
parentSpanID := newSegmentID()
2071+
attributes := make(map[string]any)
2072+
attributes[awsxray.AWSXRayInProgressAttribute] = false
2073+
resource := constructDefaultResource()
2074+
span := constructServerSpan(parentSpanID, spanName, ptrace.StatusCodeOk, "OK", attributes)
2075+
2076+
segment, _ := MakeSegment(span, resource, nil, false, nil, false)
2077+
2078+
assert.NotNil(t, segment)
2079+
assert.Nil(t, segment.InProgress)
2080+
assert.NotNil(t, segment.EndTime)
2081+
assert.Empty(t, segment.Annotations)
2082+
assert.Nil(t, segment.Metadata["default"][awsxray.AWSXRayInProgressAttribute])
2083+
}
2084+
20442085
func newTraceID() pcommon.TraceID {
20452086
var r [16]byte
20462087
epoch := time.Now().Unix()

receiver/awscontainerinsightreceiver/internal/nvme/metric_unit.go

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import (
77
)
88

99
const (
10-
// Original Metric Names
10+
// EBS Original Metric Names
1111
ebsReadOpsTotal = "aws_ebs_csi_read_ops_total"
1212
ebsWriteOpsTotal = "aws_ebs_csi_write_ops_total"
1313
ebsReadBytesTotal = "aws_ebs_csi_read_bytes_total"
@@ -19,9 +19,21 @@ const (
1919
ebsExceededEC2IOPSTime = "aws_ebs_csi_ec2_exceeded_iops_seconds_total"
2020
ebsExceededEC2TPTime = "aws_ebs_csi_ec2_exceeded_tp_seconds_total"
2121
ebsVolumeQueueLength = "aws_ebs_csi_volume_queue_length"
22+
23+
// LIS Original Metric Names
24+
lisReadOpsTotal = "aws_ec2_instance_store_csi_read_ops_total"
25+
lisWriteOpsTotal = "aws_ec2_instance_store_csi_write_ops_total"
26+
lisReadBytesTotal = "aws_ec2_instance_store_csi_read_bytes_total"
27+
lisWriteBytesTotal = "aws_ec2_instance_store_csi_write_bytes_total"
28+
lisReadTime = "aws_ec2_instance_store_csi_read_seconds_total"
29+
lisWriteTime = "aws_ec2_instance_store_csi_write_seconds_total"
30+
lisExceededIOPSTime = "aws_ec2_instance_store_csi_ec2_exceeded_iops_seconds_total"
31+
lisExceededTPTime = "aws_ec2_instance_store_csi_ec2_exceeded_tp_seconds_total"
32+
lisVolumeQueueLength = "aws_ec2_instance_store_csi_volume_queue_length"
2233
)
2334

2435
var MetricToUnit = map[string]string{
36+
// EBS metrics
2537
ebsReadOpsTotal: containerinsight.UnitCount,
2638
ebsWriteOpsTotal: containerinsight.UnitCount,
2739
ebsReadBytesTotal: containerinsight.UnitBytes,
@@ -33,4 +45,15 @@ var MetricToUnit = map[string]string{
3345
ebsExceededEC2IOPSTime: containerinsight.UnitSecond,
3446
ebsExceededEC2TPTime: containerinsight.UnitSecond,
3547
ebsVolumeQueueLength: containerinsight.UnitCount,
48+
49+
// LIS metrics
50+
lisReadOpsTotal: containerinsight.UnitCount,
51+
lisWriteOpsTotal: containerinsight.UnitCount,
52+
lisReadBytesTotal: containerinsight.UnitBytes,
53+
lisWriteBytesTotal: containerinsight.UnitBytes,
54+
lisReadTime: containerinsight.UnitSecond,
55+
lisWriteTime: containerinsight.UnitSecond,
56+
lisExceededIOPSTime: containerinsight.UnitSecond,
57+
lisExceededTPTime: containerinsight.UnitSecond,
58+
lisVolumeQueueLength: containerinsight.UnitCount,
3659
}

receiver/awscontainerinsightreceiver/internal/nvme/nvmescraper_config.go renamed to receiver/awscontainerinsightreceiver/internal/nvme/nvme_ebs_scraper_config.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ import (
1818

1919
const (
2020
collectionInterval = 60 * time.Second
21-
jobName = "containerInsightsNVMeExporterScraper"
21+
jobName = "containerInsightsNVMeEBSScraper"
2222
scraperMetricsPath = "/metrics"
2323
scraperK8sServiceSelector = "app=ebs-csi-node"
2424
)
@@ -29,7 +29,7 @@ type hostInfoProvider interface {
2929
GetInstanceType() string
3030
}
3131

32-
func GetScraperConfig(hostInfoProvider hostInfoProvider) *config.ScrapeConfig {
32+
func GetEbsScraperConfig(hostInfoProvider hostInfoProvider) *config.ScrapeConfig {
3333
return &config.ScrapeConfig{
3434
ScrapeInterval: model.Duration(collectionInterval),
3535
ScrapeTimeout: model.Duration(collectionInterval),

receiver/awscontainerinsightreceiver/internal/nvme/nvmescraper_test.go renamed to receiver/awscontainerinsightreceiver/internal/nvme/nvme_ebs_scraper_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ func TestNewNVMEScraperEndToEnd(t *testing.T) {
141141
Consumer: mConsumer,
142142
Host: componenttest.NewNopHost(),
143143
HostInfoProvider: mockHostInfoProvider{},
144-
ScraperConfigs: GetScraperConfig(mockHostInfoProvider{}),
144+
ScraperConfigs: GetEbsScraperConfig(mockHostInfoProvider{}),
145145
Logger: settings.Logger,
146146
})
147147
assert.NoError(t, err)
@@ -218,5 +218,5 @@ func TestNewNVMEScraperEndToEnd(t *testing.T) {
218218

219219
func TestNvmeScraperJobName(t *testing.T) {
220220
// needs to start with containerInsights
221-
assert.True(t, strings.HasPrefix(jobName, "containerInsightsNVMeExporterScraper"))
221+
assert.True(t, strings.HasPrefix(jobName, "containerInsightsNVMeEBSScraper"))
222222
}

0 commit comments

Comments
 (0)