Skip to content

Commit 237e256

Browse files
authored
Add support for EC2 metadata as dimensions for host metrics (#1876)
1 parent 1bd930e commit 237e256

File tree

14 files changed

+724
-72
lines changed

14 files changed

+724
-72
lines changed

plugins/processors/ec2tagger/constants.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -62,9 +62,9 @@ const sampleConfig = `
6262
const (
6363
Ec2InstanceTagKeyASG = "aws:autoscaling:groupName"
6464
CWDimensionASG = "AutoScalingGroupName"
65-
mdKeyInstanceId = "InstanceId"
66-
mdKeyImageId = "ImageId"
67-
mdKeyInstanceType = "InstanceType"
65+
MdKeyInstanceID = "InstanceId"
66+
MdKeyImageID = "ImageId"
67+
MdKeyInstanceType = "InstanceType"
6868
)
6969

7070
var (

plugins/processors/ec2tagger/ec2tagger.go

Lines changed: 21 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -140,27 +140,36 @@ func (t *Tagger) updateOtelAttributes(attributes []pcommon.Map) {
140140
for _, attr := range attributes {
141141
if t.ec2TagCache != nil {
142142
for k, v := range t.ec2TagCache {
143-
attr.PutStr(k, v)
143+
if _, exists := attr.Get(k); !exists {
144+
attr.PutStr(k, v)
145+
}
144146
}
145147
}
146148
if t.ec2MetadataLookup.instanceId {
147-
attr.PutStr(mdKeyInstanceId, t.ec2MetadataRespond.instanceId)
149+
if _, exists := attr.Get(MdKeyInstanceID); !exists {
150+
attr.PutStr(MdKeyInstanceID, t.ec2MetadataRespond.instanceId)
151+
}
148152
}
149153
if t.ec2MetadataLookup.imageId {
150-
attr.PutStr(mdKeyImageId, t.ec2MetadataRespond.imageId)
154+
if _, exists := attr.Get(MdKeyImageID); !exists {
155+
attr.PutStr(MdKeyImageID, t.ec2MetadataRespond.imageId)
156+
}
151157
}
152158
if t.ec2MetadataLookup.instanceType {
153-
attr.PutStr(mdKeyInstanceType, t.ec2MetadataRespond.instanceType)
159+
if _, exists := attr.Get(MdKeyInstanceType); !exists {
160+
attr.PutStr(MdKeyInstanceType, t.ec2MetadataRespond.instanceType)
161+
}
154162
}
155163
if t.volumeSerialCache != nil {
156-
if devName, found := attr.Get(t.DiskDeviceTagKey); found {
157-
serial := t.volumeSerialCache.Serial(devName.Str())
158-
if serial != "" {
159-
attr.PutStr(AttributeVolumeId, serial)
164+
if _, exists := attr.Get(AttributeVolumeId); !exists {
165+
if devName, found := attr.Get(t.DiskDeviceTagKey); found {
166+
serial := t.volumeSerialCache.Serial(devName.Str())
167+
if serial != "" {
168+
attr.PutStr(AttributeVolumeId, serial)
169+
}
160170
}
161171
}
162172
}
163-
// If append_dimensions are applied, then remove the host dimension.
164173
attr.Remove("host")
165174
}
166175
}
@@ -459,11 +468,11 @@ For more information on IMDS, please follow this document https://docs.aws.amazo
459468
func (t *Tagger) deriveEC2MetadataFromIMDS(ctx context.Context) error {
460469
for _, tag := range t.EC2MetadataTags {
461470
switch tag {
462-
case mdKeyInstanceId:
471+
case MdKeyInstanceID:
463472
t.ec2MetadataLookup.instanceId = true
464-
case mdKeyImageId:
473+
case MdKeyImageID:
465474
t.ec2MetadataLookup.imageId = true
466-
case mdKeyInstanceType:
475+
case MdKeyInstanceType:
467476
t.ec2MetadataLookup.instanceType = true
468477
default:
469478
t.logger.Error("ec2tagger: Unsupported EC2 Metadata key", zap.String("mdKey", tag))

plugins/processors/ec2tagger/ec2tagger_test.go

Lines changed: 82 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -285,7 +285,7 @@ func TestStartSuccessWithNoTagsVolumesUpdate(t *testing.T) {
285285
cfg := createDefaultConfig().(*Config)
286286
cfg.RefreshTagsInterval = 0 * time.Second
287287
cfg.RefreshVolumesInterval = 0 * time.Second
288-
cfg.EC2MetadataTags = []string{mdKeyInstanceId, mdKeyImageId, mdKeyInstanceType}
288+
cfg.EC2MetadataTags = []string{MdKeyInstanceID, MdKeyImageID, MdKeyInstanceType}
289289
cfg.EC2InstanceTagKeys = []string{tagKey1, tagKey2, "AutoScalingGroupName"}
290290
cfg.EBSDeviceKeys = []string{device1, device2}
291291
_, cancel := context.WithCancel(context.Background())
@@ -330,7 +330,7 @@ func TestStartSuccessWithTagsVolumesUpdate(t *testing.T) {
330330
//use millisecond rather than second to speed up test execution
331331
cfg.RefreshTagsInterval = 20 * time.Millisecond
332332
cfg.RefreshVolumesInterval = 20 * time.Millisecond
333-
cfg.EC2MetadataTags = []string{mdKeyInstanceId, mdKeyImageId, mdKeyInstanceType}
333+
cfg.EC2MetadataTags = []string{MdKeyInstanceID, MdKeyImageID, MdKeyInstanceType}
334334
cfg.EC2InstanceTagKeys = []string{tagKey1, tagKey2, "AutoScalingGroupName"}
335335
cfg.EBSDeviceKeys = []string{device1, device2}
336336
_, cancel := context.WithCancel(context.Background())
@@ -387,7 +387,7 @@ func TestStartSuccessWithWildcardTagVolumeKey(t *testing.T) {
387387
cfg := createDefaultConfig().(*Config)
388388
cfg.RefreshTagsInterval = 0 * time.Second
389389
cfg.RefreshVolumesInterval = 0 * time.Second
390-
cfg.EC2MetadataTags = []string{mdKeyInstanceId, mdKeyImageId, mdKeyInstanceType}
390+
cfg.EC2MetadataTags = []string{MdKeyInstanceID, MdKeyImageID, MdKeyInstanceType}
391391
cfg.EC2InstanceTagKeys = []string{"*"}
392392
cfg.EBSDeviceKeys = []string{"*"}
393393
_, cancel := context.WithCancel(context.Background())
@@ -432,7 +432,7 @@ func TestApplyWithTagsVolumesUpdate(t *testing.T) {
432432
//use millisecond rather than second to speed up test execution
433433
cfg.RefreshTagsInterval = 20 * time.Millisecond
434434
cfg.RefreshVolumesInterval = 20 * time.Millisecond
435-
cfg.EC2MetadataTags = []string{mdKeyInstanceId, mdKeyImageId, mdKeyInstanceType}
435+
cfg.EC2MetadataTags = []string{MdKeyInstanceID, MdKeyImageID, MdKeyInstanceType}
436436
cfg.EC2InstanceTagKeys = []string{tagKey1, tagKey2, "AutoScalingGroupName"}
437437
cfg.EBSDeviceKeys = []string{device1, device2}
438438
cfg.DiskDeviceTagKey = "device"
@@ -499,7 +499,16 @@ func TestApplyWithTagsVolumesUpdate(t *testing.T) {
499499
//assume one second is long enough for the api to be called many times
500500
//so that all tags/volumes are updated
501501
time.Sleep(time.Second)
502-
updatedOutput, err := tagger.processMetrics(context.Background(), md)
502+
// Create fresh metrics for the second processing to test updated cache values
503+
freshMd := createTestMetrics([]map[string]string{
504+
{
505+
"host": "example.org",
506+
},
507+
{
508+
"device": device2,
509+
},
510+
})
511+
updatedOutput, err := tagger.processMetrics(context.Background(), freshMd)
503512
assert.Nil(t, err)
504513
expectedUpdatedOutput := createTestMetrics([]map[string]string{
505514
map[string]string{
@@ -527,7 +536,7 @@ func TestMetricsDroppedBeforeStarted(t *testing.T) {
527536
cfg := createDefaultConfig().(*Config)
528537
cfg.RefreshTagsInterval = 0 * time.Millisecond
529538
cfg.RefreshVolumesInterval = 0 * time.Millisecond
530-
cfg.EC2MetadataTags = []string{mdKeyInstanceId, mdKeyImageId, mdKeyInstanceType}
539+
cfg.EC2MetadataTags = []string{MdKeyInstanceID, MdKeyImageID, MdKeyInstanceType}
531540
cfg.EC2InstanceTagKeys = []string{"*"}
532541
cfg.EBSDeviceKeys = []string{"*"}
533542
_, cancel := context.WithCancel(context.Background())
@@ -593,7 +602,7 @@ func TestTaggerStartDoesNotBlock(t *testing.T) {
593602
cfg := createDefaultConfig().(*Config)
594603
cfg.RefreshTagsInterval = 0 * time.Second
595604
cfg.RefreshVolumesInterval = 0 * time.Second
596-
cfg.EC2MetadataTags = []string{mdKeyInstanceId, mdKeyImageId, mdKeyInstanceType}
605+
cfg.EC2MetadataTags = []string{MdKeyInstanceID, MdKeyImageID, MdKeyInstanceType}
597606
cfg.EC2InstanceTagKeys = []string{"*"}
598607
cfg.EBSDeviceKeys = []string{"*"}
599608
_, cancel := context.WithCancel(context.Background())
@@ -632,12 +641,77 @@ func TestTaggerStartDoesNotBlock(t *testing.T) {
632641
close(inited)
633642
}
634643

644+
// Test that existing attributes are not overwritten by ec2tagger
645+
func TestExistingAttributesNotOverwritten(t *testing.T) {
646+
cfg := createDefaultConfig().(*Config)
647+
cfg.RefreshTagsInterval = 0 * time.Second
648+
cfg.RefreshVolumesInterval = 0 * time.Second
649+
cfg.EC2MetadataTags = []string{MdKeyInstanceID, MdKeyImageID, MdKeyInstanceType}
650+
cfg.EC2InstanceTagKeys = []string{tagKey1, tagKey2}
651+
cfg.EBSDeviceKeys = []string{device1}
652+
cfg.DiskDeviceTagKey = "device"
653+
_, cancel := context.WithCancel(context.Background())
654+
ec2Client := &mockEC2Client{
655+
tagsCallCount: 0,
656+
tagsFailLimit: 0,
657+
tagsPartialLimit: 1,
658+
UseUpdatedTags: false,
659+
}
660+
ec2Provider := func(*configaws.CredentialConfig) ec2iface.EC2API {
661+
return ec2Client
662+
}
663+
volumeCache := &mockVolumeCache{cache: make(map[string]string)}
664+
BackoffSleepArray = []time.Duration{10 * time.Millisecond, 20 * time.Millisecond, 30 * time.Millisecond}
665+
defaultRefreshInterval = 50 * time.Millisecond
666+
tagger := &Tagger{
667+
Config: cfg,
668+
logger: processortest.NewNopSettings(component.MustNewType("ec2tagger")).Logger,
669+
cancelFunc: cancel,
670+
metadataProvider: &mockMetadataProvider{InstanceIdentityDocument: mockedInstanceIdentityDoc},
671+
ec2Provider: ec2Provider,
672+
volumeSerialCache: volumeCache,
673+
}
674+
err := tagger.Start(context.Background(), componenttest.NewNopHost())
675+
assert.Nil(t, err)
676+
677+
// Wait for tags and volumes to be retrieved
678+
time.Sleep(time.Second)
679+
680+
// Create metrics with existing attributes that should not be overwritten
681+
md := createTestMetrics([]map[string]string{
682+
{
683+
"InstanceId": "i-100000", // This should NOT be overwritten
684+
"InstanceType": "t2.micro", // This should NOT be overwritten
685+
tagKey1: "existing-value", // This should NOT be overwritten
686+
"device": device1,
687+
},
688+
})
689+
690+
output, err := tagger.processMetrics(context.Background(), md)
691+
assert.Nil(t, err)
692+
693+
// Expected output should preserve existing values and only add missing ones
694+
expectedOutput := createTestMetrics([]map[string]string{
695+
{
696+
"InstanceId": "i-100000", // Original value preserved
697+
"InstanceType": "t2.micro", // Original value preserved
698+
"ImageId": "ami-09edd32d9b0990d49", // Added from metadata (not existing)
699+
tagKey1: "existing-value", // Original value preserved
700+
tagKey2: tagVal2, // Added from EC2 tags (not existing)
701+
"VolumeId": volumeId1, // Added from volume cache (not existing)
702+
"device": device1,
703+
},
704+
})
705+
706+
checkAttributes(t, expectedOutput, output)
707+
}
708+
635709
// Test ec2tagger Start does not block for a long time
636710
func TestTaggerStartsWithoutTagOrVolume(t *testing.T) {
637711
cfg := createDefaultConfig().(*Config)
638712
cfg.RefreshTagsInterval = 0 * time.Second
639713
cfg.RefreshVolumesInterval = 0 * time.Second
640-
cfg.EC2MetadataTags = []string{mdKeyInstanceId, mdKeyImageId, mdKeyInstanceType}
714+
cfg.EC2MetadataTags = []string{MdKeyInstanceID, MdKeyImageID, MdKeyInstanceType}
641715
_, cancel := context.WithCancel(context.Background())
642716

643717
tagger := &Tagger{
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
[agent]
2+
collection_jitter = "0s"
3+
debug = false
4+
flush_interval = "1s"
5+
flush_jitter = "0s"
6+
hostname = ""
7+
interval = "60s"
8+
logfile = "/opt/aws/amazon-cloudwatch-agent/logs/amazon-cloudwatch-agent.log"
9+
logtarget = "lumberjack"
10+
metric_batch_size = 1000
11+
metric_buffer_limit = 10000
12+
omit_hostname = false
13+
precision = ""
14+
quiet = false
15+
round_interval = false
16+
run_as_user = "cwagent"
17+
18+
[inputs]
19+
20+
[[inputs.cpu]]
21+
fieldpass = ["usage_idle"]
22+
percpu = false
23+
totalcpu = true
24+
[inputs.cpu.tags]
25+
ImageId = "ami-0abcdef1234567890"
26+
InstanceType = "t3.medium"
27+
ServiceName = "MyServiceApplication"
28+
29+
[outputs]
30+
31+
[[outputs.cloudwatch]]
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
{
2+
"metrics": {
3+
"append_dimensions": {
4+
"InstanceId": "${aws:InstanceId}"
5+
},
6+
"metrics_collected": {
7+
"cpu": {
8+
"append_dimensions": {
9+
"InstanceType": "${aws:InstanceType}",
10+
"ImageId": "${aws:ImageId}",
11+
"AutoScalingGroupName": "${aws:AutoScalingGroupName}",
12+
"ServiceName": "MyServiceApplication"
13+
},
14+
"measurement": [
15+
"cpu_usage_idle"
16+
]
17+
}
18+
}
19+
}
20+
}
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
exporters:
2+
awscloudwatch:
3+
force_flush_interval: 1m0s
4+
max_datums_per_call: 1000
5+
max_values_per_datum: 150
6+
middleware: agenthealth/metrics
7+
namespace: CWAgent
8+
region: us-west-2
9+
resource_to_telemetry_conversion:
10+
enabled: true
11+
extensions:
12+
agenthealth/metrics:
13+
is_usage_data_enabled: true
14+
stats:
15+
operations:
16+
- PutMetricData
17+
usage_flags:
18+
mode: EC2
19+
region_type: ACJ
20+
agenthealth/statuscode:
21+
is_status_code_enabled: true
22+
is_usage_data_enabled: true
23+
stats:
24+
usage_flags:
25+
mode: EC2
26+
region_type: ACJ
27+
entitystore:
28+
mode: ec2
29+
region: us-west-2
30+
processors:
31+
awsentity/resource:
32+
entity_type: Resource
33+
platform: ec2
34+
scrape_datapoint_attribute: true
35+
ec2tagger:
36+
ec2_metadata_tags:
37+
- InstanceId
38+
imds_retries: 1
39+
middleware: agenthealth/statuscode
40+
refresh_tags_interval: 0s
41+
refresh_volumes_interval: 0s
42+
receivers:
43+
telegraf_cpu:
44+
collection_interval: 1m0s
45+
initial_delay: 1s
46+
timeout: 0s
47+
service:
48+
extensions:
49+
- agenthealth/metrics
50+
- agenthealth/statuscode
51+
- entitystore
52+
pipelines:
53+
metrics/host:
54+
exporters:
55+
- awscloudwatch
56+
processors:
57+
- ec2tagger
58+
- awsentity/resource
59+
receivers:
60+
- telegraf_cpu
61+
telemetry:
62+
logs:
63+
encoding: console
64+
level: info
65+
output_paths:
66+
- /opt/aws/amazon-cloudwatch-agent/logs/amazon-cloudwatch-agent.log
67+
sampling:
68+
enabled: true
69+
initial: 2
70+
thereafter: 500
71+
tick: 10s
72+
metrics:
73+
level: None
74+
traces:
75+
level: None

0 commit comments

Comments
 (0)