diff --git a/cmd/node-termination-handler.go b/cmd/node-termination-handler.go index 700543ea..78163e40 100644 --- a/cmd/node-termination-handler.go +++ b/cmd/node-termination-handler.go @@ -120,10 +120,10 @@ func main() { log.Fatal().Err(err).Msg("Unable to instantiate a node for various kubernetes node functions,") } - metrics, err := observability.InitMetrics(nthConfig.EnablePrometheus, nthConfig.PrometheusPort) - if err != nil { + metrics, initMetricsErr := observability.InitMetrics(nthConfig.EnablePrometheus, nthConfig.PrometheusPort) + if initMetricsErr != nil { nthConfig.Print() - log.Fatal().Err(err).Msg("Unable to instantiate observability metrics,") + log.Fatal().Err(initMetricsErr).Msg("Unable to instantiate observability metrics,") } err = observability.InitProbes(nthConfig.EnableProbes, nthConfig.ProbesPort, nthConfig.ProbesEndpoint) @@ -215,6 +215,12 @@ func main() { } log.Debug().Msgf("AWS Credentials retrieved from provider: %s", creds.ProviderName) + ec2Client := ec2.New(sess) + + if initMetricsErr == nil && nthConfig.EnablePrometheus { + go metrics.InitNodeMetrics(nthConfig, node, ec2Client) + } + completeLifecycleActionDelay := time.Duration(nthConfig.CompleteLifecycleActionDelaySeconds) * time.Second sqsMonitor := sqsevent.SQSMonitor{ CheckIfManaged: nthConfig.CheckTagBeforeDraining, @@ -224,7 +230,7 @@ func main() { CancelChan: cancelChan, SQS: sqsevent.GetSqsClient(sess), ASG: autoscaling.New(sess), - EC2: ec2.New(sess), + EC2: ec2Client, BeforeCompleteLifecycleAction: func() { <-time.After(completeLifecycleActionDelay) }, } monitoringFns[sqsEvents] = sqsMonitor diff --git a/pkg/ec2helper/ec2helper.go b/pkg/ec2helper/ec2helper.go new file mode 100644 index 00000000..9e5bdfa1 --- /dev/null +++ b/pkg/ec2helper/ec2helper.go @@ -0,0 +1,98 @@ +// Copyright 2016-2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"). You may +// not use this file except in compliance with the License. A copy of the +// License is located at +// +// http://aws.amazon.com/apache2.0/ +// +// or in the "license" file accompanying this file. This file is distributed +// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +// express or implied. See the License for the specific language governing +// permissions and limitations under the License. + +package ec2helper + +import ( + "fmt" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/service/ec2" + "github.com/aws/aws-sdk-go/service/ec2/ec2iface" +) + +type IEC2Helper interface { + GetInstanceIdsMapByTagKey(tag string) (map[string]bool, error) +} + +type EC2Helper struct { + ec2ServiceClient ec2iface.EC2API +} + +func New(ec2 ec2iface.EC2API) EC2Helper { + return EC2Helper{ + ec2ServiceClient: ec2, + } +} + +func (h EC2Helper) GetInstanceIdsByTagKey(tag string) ([]string, error) { + ids := []string{} + var nextToken string + + for { + result, err := h.ec2ServiceClient.DescribeInstances(&ec2.DescribeInstancesInput{ + Filters: []*ec2.Filter{ + { + Name: aws.String("tag-key"), + Values: []*string{aws.String(tag)}, + }, + }, + NextToken: &nextToken, + }) + + if err != nil { + return nil, err + } + + if result == nil || result.Reservations == nil { + return nil, fmt.Errorf("describe instances success but return empty response for tag key: %s", tag) + } + + for _, reservation := range result.Reservations { + if reservation.Instances == nil { + continue + } + for _, instance := range reservation.Instances { + if instance == nil || instance.InstanceId == nil { + continue + } + ids = append(ids, *instance.InstanceId) + } + } + + if result.NextToken == nil { + break + } + nextToken = *result.NextToken + } + + return ids, nil +} + +func (h EC2Helper) GetInstanceIdsMapByTagKey(tag string) (map[string]bool, error) { + idMap := map[string]bool{} + ids, err := h.GetInstanceIdsByTagKey(tag) + if err != nil { + return nil, err + } + + if ids == nil { + return nil, fmt.Errorf("get instance ids success but return empty response for tag key: %s", tag) + } + + for _, id := range ids { + idMap[id] = true + } + + return idMap, nil +} diff --git a/pkg/ec2helper/ec2helper_test.go b/pkg/ec2helper/ec2helper_test.go new file mode 100644 index 00000000..da535868 --- /dev/null +++ b/pkg/ec2helper/ec2helper_test.go @@ -0,0 +1,154 @@ +// Copyright 2016-2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"). You may +// not use this file except in compliance with the License. A copy of the +// License is located at +// +// http://aws.amazon.com/apache2.0/ +// +// or in the "license" file accompanying this file. This file is distributed +// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +// express or implied. See the License for the specific language governing +// permissions and limitations under the License. + +package ec2helper_test + +import ( + "testing" + + "github.com/aws/aws-node-termination-handler/pkg/ec2helper" + h "github.com/aws/aws-node-termination-handler/pkg/test" + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/awserr" + "github.com/aws/aws-sdk-go/service/ec2" +) + +const ( + instanceId1 = "i-1" + instanceId2 = "i-2" +) + +func TestGetInstanceIdsByTagKey(t *testing.T) { + ec2Mock := h.MockedEC2{ + DescribeInstancesResp: getDescribeInstancesResp(), + } + ec2Helper := ec2helper.New(ec2Mock) + instanceIds, err := ec2Helper.GetInstanceIdsByTagKey("myNTHManagedTag") + h.Ok(t, err) + + h.Equals(t, 2, len(instanceIds)) + h.Equals(t, instanceId1, instanceIds[0]) + h.Equals(t, instanceId2, instanceIds[1]) +} + +func TestGetInstanceIdsByTagKeyAPIError(t *testing.T) { + ec2Mock := h.MockedEC2{ + DescribeInstancesResp: getDescribeInstancesResp(), + DescribeInstancesErr: awserr.New("ThrottlingException", "Rate exceeded", nil), + } + ec2Helper := ec2helper.New(ec2Mock) + _, err := ec2Helper.GetInstanceIdsByTagKey("myNTHManagedTag") + h.Nok(t, err) +} + +func TestGetInstanceIdsByTagKeyNilResponse(t *testing.T) { + ec2Mock := h.MockedEC2{} + ec2Helper := ec2helper.New(ec2Mock) + _, err := ec2Helper.GetInstanceIdsByTagKey("myNTHManagedTag") + h.Nok(t, err) +} + +func TestGetInstanceIdsByTagKeyNilReservations(t *testing.T) { + ec2Mock := h.MockedEC2{ + DescribeInstancesResp: ec2.DescribeInstancesOutput{ + Reservations: nil, + }, + } + ec2Helper := ec2helper.New(ec2Mock) + _, err := ec2Helper.GetInstanceIdsByTagKey("myNTHManagedTag") + h.Nok(t, err) +} + +func TestGetInstanceIdsByTagKeyEmptyReservation(t *testing.T) { + ec2Mock := h.MockedEC2{ + DescribeInstancesResp: ec2.DescribeInstancesOutput{ + Reservations: []*ec2.Reservation{}, + }, + } + ec2Helper := ec2helper.New(ec2Mock) + instanceIds, err := ec2Helper.GetInstanceIdsByTagKey("myNTHManagedTag") + h.Ok(t, err) + h.Equals(t, 0, len(instanceIds)) +} + +func TestGetInstanceIdsByTagKeyEmptyInstances(t *testing.T) { + ec2Mock := h.MockedEC2{ + DescribeInstancesResp: ec2.DescribeInstancesOutput{ + Reservations: []*ec2.Reservation{ + { + Instances: []*ec2.Instance{}, + }, + }, + }, + } + ec2Helper := ec2helper.New(ec2Mock) + instanceIds, err := ec2Helper.GetInstanceIdsByTagKey("myNTHManagedTag") + h.Ok(t, err) + h.Equals(t, 0, len(instanceIds)) +} + +func TestGetInstanceIdsByTagKeyNilInstancesId(t *testing.T) { + ec2Mock := h.MockedEC2{ + DescribeInstancesResp: ec2.DescribeInstancesOutput{ + Reservations: []*ec2.Reservation{ + { + Instances: []*ec2.Instance{ + { + InstanceId: nil, + }, + { + InstanceId: aws.String(instanceId1), + }, + }, + }, + }, + }, + } + ec2Helper := ec2helper.New(ec2Mock) + instanceIds, err := ec2Helper.GetInstanceIdsByTagKey("myNTHManagedTag") + h.Ok(t, err) + h.Equals(t, 1, len(instanceIds)) +} + +func TestGetInstanceIdsMapByTagKey(t *testing.T) { + ec2Mock := h.MockedEC2{ + DescribeInstancesResp: getDescribeInstancesResp(), + } + ec2Helper := ec2helper.New(ec2Mock) + instanceIdsMap, err := ec2Helper.GetInstanceIdsMapByTagKey("myNTHManagedTag") + h.Ok(t, err) + + _, exist := instanceIdsMap[instanceId1] + h.Equals(t, true, exist) + _, exist = instanceIdsMap[instanceId2] + h.Equals(t, true, exist) + _, exist = instanceIdsMap["non-existent instance id"] + h.Equals(t, false, exist) +} + +func getDescribeInstancesResp() ec2.DescribeInstancesOutput { + return ec2.DescribeInstancesOutput{ + Reservations: []*ec2.Reservation{ + { + Instances: []*ec2.Instance{ + { + InstanceId: aws.String(instanceId1), + }, + { + InstanceId: aws.String(instanceId2), + }, + }, + }, + }, + } +} diff --git a/pkg/node/node.go b/pkg/node/node.go index 7e323d13..f2a5d28c 100644 --- a/pkg/node/node.go +++ b/pkg/node/node.go @@ -17,6 +17,7 @@ import ( "context" "encoding/json" "fmt" + "regexp" "strconv" "strings" "time" @@ -74,6 +75,7 @@ const ( var ( maxRetryDeadline time.Duration = 5 * time.Second conflictRetryInterval time.Duration = 750 * time.Millisecond + instanceIDRegex = regexp.MustCompile(`^i-.*`) ) // Node represents a kubernetes node with functions to manipulate its state via the kubernetes api server @@ -635,6 +637,43 @@ func (n Node) fetchKubernetesNode(nodeName string) (*corev1.Node, error) { return &matchingNodes.Items[0], nil } +// fetchKubernetesNode will send an http request to the k8s api server and return list of AWS EC2 instance id +func (n Node) FetchKubernetesNodeInstanceIds() ([]string, error) { + ids := []string{} + + if n.nthConfig.DryRun { + log.Info().Msgf("Would have retrieved nodes, but dry-run flag was set") + return ids, nil + } + matchingNodes, err := n.drainHelper.Client.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{}) + if err != nil { + log.Warn().Msgf("Unable to list Nodes") + return nil, err + } + + if matchingNodes == nil || matchingNodes.Items == nil { + return nil, fmt.Errorf("list nodes success but return empty response") + } + + for _, node := range matchingNodes.Items { + // sample providerID: aws:///us-west-2a/i-0abcd1234efgh5678 + parts := strings.Split(node.Spec.ProviderID, "/") + if len(parts) != 5 { + log.Warn().Msgf("Invalid providerID format found for node %s: %s (expected format: aws:///region/instance-id)", node.Name, node.Spec.ProviderID) + continue + } + + instanceId := parts[len(parts)-1] + if instanceIDRegex.MatchString(instanceId) { + ids = append(ids, parts[len(parts)-1]) + } else { + log.Warn().Msgf("Invalid instance id format found for node %s: %s (expected format: ^i-.*)", node.Name, instanceId) + } + } + + return ids, nil +} + func (n Node) fetchAllPods(nodeName string) (*corev1.PodList, error) { if n.nthConfig.DryRun { log.Info().Msgf("Would have retrieved running pod list on node %s, but dry-run flag was set", nodeName) diff --git a/pkg/node/node_test.go b/pkg/node/node_test.go index 0e4c393b..84a3fe1e 100644 --- a/pkg/node/node_test.go +++ b/pkg/node/node_test.go @@ -15,6 +15,7 @@ package node_test import ( "context" + "fmt" "strconv" "strings" "testing" @@ -35,7 +36,11 @@ import ( ) // Size of the fakeRecorder buffer -const recorderBufferSize = 10 +const ( + recorderBufferSize = 10 + instanceId1 = "i-0abcd1234efgh5678" + instanceId2 = "i-0wxyz5678ijkl1234" +) var nodeName = "NAME" @@ -379,6 +384,85 @@ func TestUncordonIfRebootedTimeParseFailure(t *testing.T) { h.Assert(t, err != nil, "Failed to return error on UncordonIfReboted failure to parse time") } +func TestFetchKubernetesNodeInstanceIds(t *testing.T) { + client := fake.NewSimpleClientset( + &v1.Node{ + ObjectMeta: metav1.ObjectMeta{Name: "node-1"}, + Spec: v1.NodeSpec{ProviderID: fmt.Sprintf("aws:///us-west-2a/%s", instanceId1)}, + }, + &v1.Node{ + ObjectMeta: metav1.ObjectMeta{Name: "node-2"}, + Spec: v1.NodeSpec{ProviderID: fmt.Sprintf("aws:///us-west-2a/%s", instanceId2)}, + }, + ) + + _, err := client.CoreV1().Nodes().List(context.Background(), metav1.ListOptions{}) + h.Ok(t, err) + + node, err := newNode(config.Config{}, client) + h.Ok(t, err) + + instanceIds, err := node.FetchKubernetesNodeInstanceIds() + h.Ok(t, err) + h.Equals(t, 2, len(instanceIds)) + h.Equals(t, instanceId1, instanceIds[0]) + h.Equals(t, instanceId2, instanceIds[1]) +} + +func TestFetchKubernetesNodeInstanceIdsEmptyResponse(t *testing.T) { + client := fake.NewSimpleClientset() + + _, err := client.CoreV1().Nodes().List(context.Background(), metav1.ListOptions{}) + h.Ok(t, err) + + node, err := newNode(config.Config{}, client) + h.Ok(t, err) + + _, err = node.FetchKubernetesNodeInstanceIds() + h.Nok(t, err) +} + +func TestFetchKubernetesNodeInstanceIdsInvalidProviderID(t *testing.T) { + client := fake.NewSimpleClientset( + &v1.Node{ + ObjectMeta: metav1.ObjectMeta{Name: "invalid-providerId-1"}, + Spec: v1.NodeSpec{ProviderID: "dummyProviderId"}, + }, + &v1.Node{ + ObjectMeta: metav1.ObjectMeta{Name: "invalid-providerId-2"}, + Spec: v1.NodeSpec{ProviderID: fmt.Sprintf("aws:/%s", instanceId2)}, + }, + &v1.Node{ + ObjectMeta: metav1.ObjectMeta{Name: "invalid-providerId-3"}, + Spec: v1.NodeSpec{ProviderID: fmt.Sprintf("us-west-2a/%s", instanceId2)}, + }, + &v1.Node{ + ObjectMeta: metav1.ObjectMeta{Name: "invalid-providerId-4"}, + Spec: v1.NodeSpec{ProviderID: fmt.Sprintf("aws:///us-west-2a/%s/dummyPart", instanceId2)}, + }, + &v1.Node{ + ObjectMeta: metav1.ObjectMeta{Name: "valid-providerId-2"}, + Spec: v1.NodeSpec{ProviderID: fmt.Sprintf("aws:///us-west-2a/%s", instanceId2)}, + }, + &v1.Node{ + ObjectMeta: metav1.ObjectMeta{Name: "valid-providerId-1"}, + Spec: v1.NodeSpec{ProviderID: fmt.Sprintf("aws:///us-west-2a/%s", instanceId1)}, + }, + ) + + _, err := client.CoreV1().Nodes().List(context.Background(), metav1.ListOptions{}) + h.Ok(t, err) + + node, err := newNode(config.Config{}, client) + h.Ok(t, err) + + instanceIds, err := node.FetchKubernetesNodeInstanceIds() + h.Ok(t, err) + h.Equals(t, 2, len(instanceIds)) + h.Equals(t, instanceId1, instanceIds[0]) + h.Equals(t, instanceId2, instanceIds[1]) +} + func TestFilterOutDaemonSetPods(t *testing.T) { tNode, err := newNode(config.Config{IgnoreDaemonSets: true}, fake.NewSimpleClientset()) h.Ok(t, err) diff --git a/pkg/observability/opentelemetry.go b/pkg/observability/opentelemetry.go index 706fff07..ad9d4b4c 100644 --- a/pkg/observability/opentelemetry.go +++ b/pkg/observability/opentelemetry.go @@ -21,6 +21,10 @@ import ( "strconv" "time" + "github.com/aws/aws-node-termination-handler/pkg/config" + "github.com/aws/aws-node-termination-handler/pkg/ec2helper" + "github.com/aws/aws-node-termination-handler/pkg/node" + "github.com/aws/aws-sdk-go/service/ec2/ec2iface" "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/rs/zerolog/log" @@ -43,11 +47,16 @@ var ( // Metrics represents the stats for observability type Metrics struct { - enabled bool - meter api.Meter - actionsCounter api.Int64Counter - actionsCounterV2 api.Int64Counter - errorEventsCounter api.Int64Counter + enabled bool + nthConfig config.Config + ec2Helper ec2helper.EC2Helper + node *node.Node + meter api.Meter + actionsCounter api.Int64Counter + actionsCounterV2 api.Int64Counter + errorEventsCounter api.Int64Counter + nthTaggedNodesGauge api.Int64Gauge + nthTaggedInstancesGauge api.Int64Gauge } // InitMetrics will initialize, register and expose, via http server, the metrics with Opentelemetry. @@ -77,6 +86,43 @@ func InitMetrics(enabled bool, port int) (Metrics, error) { return metrics, nil } +func (m Metrics) InitNodeMetrics(nthConfig config.Config, node *node.Node, ec2 ec2iface.EC2API) { + m.nthConfig = nthConfig + m.ec2Helper = ec2helper.New(ec2) + m.node = node + + // Run a periodic task + ticker := time.NewTicker(30 * time.Second) + defer ticker.Stop() + + for range ticker.C { + m.serveNodeMetrics() + } +} + +func (m Metrics) serveNodeMetrics() { + instanceIdsMap, err := m.ec2Helper.GetInstanceIdsMapByTagKey(m.nthConfig.ManagedTag) + if err != nil || instanceIdsMap == nil { + log.Err(err).Msg("Failed to get AWS instance ids") + return + } + + m.InstancesRecord(int64(len(instanceIdsMap))) + + nodeInstanceIds, err := m.node.FetchKubernetesNodeInstanceIds() + if err != nil || nodeInstanceIds == nil { + log.Err(err).Msg("Failed to get node instance ids") + } else { + nodeCount := 0 + for _, id := range nodeInstanceIds { + if _, ok := instanceIdsMap[id]; ok { + nodeCount++ + } + } + m.NodesRecord(int64(nodeCount)) + } +} + // ErrorEventsInc will increment one for the event errors counter, partitioned by action, and only if metrics are enabled. func (m Metrics) ErrorEventsInc(where string) { if !m.enabled { @@ -105,6 +151,22 @@ func (m Metrics) NodeActionsInc(action, nodeName string, eventID string, err err m.actionsCounterV2.Add(context.Background(), 1, api.WithAttributes(labelsV2...)) } +func (m Metrics) NodesRecord(num int64) { + if !m.enabled { + return + } + + m.nthTaggedNodesGauge.Record(context.Background(), num) +} + +func (m Metrics) InstancesRecord(num int64) { + if !m.enabled { + return + } + + m.nthTaggedInstancesGauge.Record(context.Background(), num) +} + func registerMetricsWith(provider *metric.MeterProvider) (Metrics, error) { meter := provider.Meter("aws.node.termination.handler") @@ -131,11 +193,28 @@ func registerMetricsWith(provider *metric.MeterProvider) (Metrics, error) { return Metrics{}, fmt.Errorf("failed to create Prometheus counter %q: %w", name, err) } errorEventsCounter.Add(context.Background(), 0) + + name = "nth_tagged_nodes" + nthTaggedNodesGauge, err := meter.Int64Gauge(name, api.WithDescription("Number of nodes processing")) + if err != nil { + return Metrics{}, fmt.Errorf("failed to create Prometheus gauge %q: %w", name, err) + } + nthTaggedNodesGauge.Record(context.Background(), 0) + + name = "nth_tagged_instances" + nthTaggedInstancesGauge, err := meter.Int64Gauge(name, api.WithDescription("Number of instances processing")) + if err != nil { + return Metrics{}, fmt.Errorf("failed to create Prometheus gauge %q: %w", name, err) + } + nthTaggedInstancesGauge.Record(context.Background(), 0) + return Metrics{ - meter: meter, - errorEventsCounter: errorEventsCounter, - actionsCounter: actionsCounter, - actionsCounterV2: actionsCounterV2, + meter: meter, + errorEventsCounter: errorEventsCounter, + actionsCounter: actionsCounter, + actionsCounterV2: actionsCounterV2, + nthTaggedNodesGauge: nthTaggedNodesGauge, + nthTaggedInstancesGauge: nthTaggedInstancesGauge, }, nil } diff --git a/pkg/observability/opentelemetry_test.go b/pkg/observability/opentelemetry_test.go index 237f456e..cf07f5e3 100644 --- a/pkg/observability/opentelemetry_test.go +++ b/pkg/observability/opentelemetry_test.go @@ -25,13 +25,25 @@ import ( "testing" "time" + "github.com/rs/zerolog/log" + "github.com/prometheus/client_golang/prometheus/promhttp" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/exporters/prometheus" api "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/sdk/metric" - + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes/fake" + "k8s.io/kubectl/pkg/drain" + + "github.com/aws/aws-node-termination-handler/pkg/config" + "github.com/aws/aws-node-termination-handler/pkg/ec2helper" + "github.com/aws/aws-node-termination-handler/pkg/node" h "github.com/aws/aws-node-termination-handler/pkg/test" + "github.com/aws/aws-node-termination-handler/pkg/uptime" + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/service/ec2" ) var ( @@ -48,6 +60,9 @@ var ( errorStatus = "error" mockDefaultPort = 9092 mockClosedPort = 9093 + instanceId1 = "i-1" + instanceId2 = "i-2" + instanceId3 = "i-3" ) func TestInitMetrics(t *testing.T) { @@ -109,6 +124,8 @@ func TestRegisterMetricsWith(t *testing.T) { const errorEventMetricsTotal = 23 const successActionMetricsTotal = 31 const errorActionMetricsTotal = 97 + const managedInstancesTotal = 3 + const managedNodesTotal = 5 metrics := getMetrics(t) @@ -126,6 +143,9 @@ func TestRegisterMetricsWith(t *testing.T) { metrics.actionsCounterV2.Add(context.Background(), 1, api.WithAttributes(errorActionlabels...)) } + metrics.NodesRecord(managedNodesTotal) + metrics.InstancesRecord(managedInstancesTotal) + responseRecorder := mockMetricsRequest() validateStatus(t, responseRecorder) @@ -135,6 +155,57 @@ func TestRegisterMetricsWith(t *testing.T) { validateEventErrorTotal(t, metricsMap, errorEventMetricsTotal) validateActionTotalV2(t, metricsMap, successActionMetricsTotal, successStatus) validateActionTotalV2(t, metricsMap, errorActionMetricsTotal, errorStatus) + validateGauge(t, metricsMap, managedNodesTotal, "nth_tagged_nodes") + validateGauge(t, metricsMap, managedInstancesTotal, "nth_tagged_instances") +} + +func TestServeNodeMetrics(t *testing.T) { + metrics := getMetrics(t) + metrics.ec2Helper = ec2helper.New(h.MockedEC2{ + DescribeInstancesResp: ec2.DescribeInstancesOutput{ + Reservations: []*ec2.Reservation{ + { + Instances: []*ec2.Instance{ + { + InstanceId: aws.String(instanceId1), + }, + { + InstanceId: aws.String(instanceId2), + }, + { + InstanceId: aws.String(instanceId3), + }, + }, + }, + }, + }, + }) + + helper := getDrainHelper(fake.NewSimpleClientset( + &v1.Node{ + ObjectMeta: metav1.ObjectMeta{Name: "node-1"}, + Spec: v1.NodeSpec{ProviderID: fmt.Sprintf("aws:///us-west-2a/%s", instanceId1)}, + }, + &v1.Node{ + ObjectMeta: metav1.ObjectMeta{Name: "node-2"}, + Spec: v1.NodeSpec{ProviderID: fmt.Sprintf("aws:///us-west-2a/%s", instanceId2)}, + }, + )) + + node, err := node.NewWithValues(config.Config{}, helper, uptime.Uptime) + h.Ok(t, err) + + metrics.node = node + metrics.serveNodeMetrics() + + responseRecorder := mockMetricsRequest() + + validateStatus(t, responseRecorder) + + metricsMap := getMetricsMap(responseRecorder.Body.String()) + + validateGauge(t, metricsMap, 2, "nth_tagged_nodes") + validateGauge(t, metricsMap, 3, "nth_tagged_instances") } func TestServeMetrics(t *testing.T) { @@ -225,6 +296,20 @@ func getMetricsMap(body string) map[string]string { return metricsMap } +func getDrainHelper(client *fake.Clientset) *drain.Helper { + return &drain.Helper{ + Ctx: context.TODO(), + Client: client, + Force: true, + GracePeriodSeconds: -1, + IgnoreAllDaemonSets: true, + DeleteEmptyDirData: true, + Timeout: time.Duration(120) * time.Second, + Out: log.Logger, + ErrOut: log.Logger, + } +} + func validateEventErrorTotal(t *testing.T, metricsMap map[string]string, expectedTotal int) { eventErrorTotalKey := fmt.Sprintf("events_error_total{event_error_where=\"%v\",otel_scope_name=\"%v\",otel_scope_version=\"\"}", mockErrorEvent, mockNth) actualValue, exists := metricsMap[eventErrorTotalKey] @@ -242,3 +327,12 @@ func validateActionTotalV2(t *testing.T, metricsMap map[string]string, expectedT } h.Equals(t, strconv.Itoa(expectedTotal), actualValue) } + +func validateGauge(t *testing.T, metricsMap map[string]string, expectedTotal int, name string) { + actionTotalKey := fmt.Sprintf("%v{otel_scope_name=\"%v\",otel_scope_version=\"\"}", name, mockNth) + actualValue, exists := metricsMap[actionTotalKey] + if !exists { + actualValue = "0" + } + h.Equals(t, strconv.Itoa(expectedTotal), actualValue) +} diff --git a/test/e2e/prometheus-metrics-sqs-test b/test/e2e/prometheus-metrics-sqs-test new file mode 100755 index 00000000..7ddbf227 --- /dev/null +++ b/test/e2e/prometheus-metrics-sqs-test @@ -0,0 +1,249 @@ +#!/bin/bash +set -euo pipefail + +# Available env vars: +# $TMP_DIR +# $CLUSTER_NAME +# $KUBECONFIG +# $NODE_TERMINATION_HANDLER_DOCKER_REPO +# $NODE_TERMINATION_HANDLER_DOCKER_TAG +# $WEBHOOK_DOCKER_REPO +# $WEBHOOK_DOCKER_TAG +# $AEMM_URL +# $AEMM_VERSION + +echo "Starting EC2 State Change SQS Test for Node Termination Handler in SQS mode with Prometheus server enabled" +START_TIME=$(date -u +"%Y-%m-%dT%TZ") +SCRIPTPATH="$( cd "$(dirname "$0")" ; pwd -P )" +PROMETHEUS_HELM_VERSION="41.7.4" + +common_helm_args=() + +helm repo add prometheus-community https://prometheus-community.github.io/helm-charts +helm repo update +retry 5 helm install kube-prometheus-stack prometheus-community/kube-prometheus-stack --version ${PROMETHEUS_HELM_VERSION} --set prometheusOperator.admissionWebhooks.enabled="false" --set grafana.enabled="false" --set nodeExporter.enabled="false" --set kubeStateMetrics.enabled="false" + +localstack_helm_args=( + upgrade + --install + --namespace default + "$CLUSTER_NAME-localstack" + "$SCRIPTPATH/../../config/helm/localstack/" + --set defaultRegion="${AWS_REGION}" + --wait +) + +set -x +helm "${localstack_helm_args[@]}" +set +x + +sleep 10 + +MANAGED_INSTANCE_CMD="awslocal ec2 run-instances --private-ip-address ${WORKER_IP} --region ${AWS_REGION} --tag-specifications 'ResourceType=instance,Tags=[{Key=aws:autoscaling:groupName,Value=nth-integ-test},{Key=aws-node-termination-handler/managed,Value=blah}]'" +MANAGED_INSTANCE_WITHOUT_TAG_VALUE_CMD="awslocal ec2 run-instances --region ${AWS_REGION} --tag-specifications 'ResourceType=instance,Tags=[{Key=aws:autoscaling:groupName,Value=nth-integ-test},{Key=aws-node-termination-handler/managed,Value=\"\"}]'" +UNMANAGED_INSTANCE_CMD="awslocal ec2 run-instances --region ${AWS_REGION} --tag-specifications 'ResourceType=instance,Tags=[{Key=aws:autoscaling:groupName,Value=nth-integ-test}]'" +set -x +localstack_pod=$(kubectl get pods --selector app=localstack --field-selector="status.phase=Running" \ + -o go-template --template '{{range .items}}{{.metadata.name}} {{.metadata.creationTimestamp}}{{"\n"}}{{end}}' \ + | awk '$2 >= "'"${START_TIME//+0000/Z}"'" { print $1 }') +echo "🥑 Using localstack pod $localstack_pod" + +for instance_cmd in "$MANAGED_INSTANCE_WITHOUT_TAG_VALUE_CMD" "$UNMANAGED_INSTANCE_CMD" "$MANAGED_INSTANCE_CMD"; do + run_instances_resp=$(kubectl exec -i "${localstack_pod}" -- bash -c "$instance_cmd") + instance_id=$(echo "${run_instances_resp}" | jq -r '.Instances[] .InstanceId') + echo "🥑 Started mock EC2 instance ($instance_id)" +done + +CREATE_SQS_CMD="awslocal sqs create-queue --queue-name "${CLUSTER_NAME}-queue" --attributes MessageRetentionPeriod=300 --region ${AWS_REGION}" +queue_url=$(kubectl exec -i "${localstack_pod}" -- bash -c "$CREATE_SQS_CMD" | jq -r .QueueUrl) + +echo "🥑 Created SQS Queue ${queue_url}" + +anth_helm_args=( + upgrade + --install + --namespace kube-system + "$CLUSTER_NAME-anth" + "$SCRIPTPATH/../../config/helm/aws-node-termination-handler/" + --set instanceMetadataURL="${INSTANCE_METADATA_URL:-"http://$AEMM_URL:$IMDS_PORT"}" + --set image.repository="$NODE_TERMINATION_HANDLER_DOCKER_REPO" + --set image.tag="$NODE_TERMINATION_HANDLER_DOCKER_TAG" + --set enablePrometheusServer="true" + --set podMonitor.create="true" + --set daemonsetTolerations="" + --set awsAccessKeyID=foo + --set awsSecretAccessKey=bar + --set awsRegion="${AWS_REGION}" + --set awsEndpoint="http://localstack.default" + --set checkTagBeforeDraining=false + --set enableSqsTerminationDraining=true + --set "queueURL=${queue_url}" + --wait + --force +) +[[ -n "${NODE_TERMINATION_HANDLER_DOCKER_PULL_POLICY-}" ]] && + anth_helm_args+=(--set image.pullPolicy="$NODE_TERMINATION_HANDLER_DOCKER_PULL_POLICY") +[[ ${#common_helm_args[@]} -gt 0 ]] && + anth_helm_args+=("${common_helm_args[@]}") + +set -x +helm "${anth_helm_args[@]}" +set +x + +emtp_helm_args=( + upgrade + --install + --namespace default + "$CLUSTER_NAME-emtp" + "$SCRIPTPATH/../../config/helm/webhook-test-proxy/" + --set webhookTestProxy.image.repository="$WEBHOOK_DOCKER_REPO" + --set webhookTestProxy.image.tag="$WEBHOOK_DOCKER_TAG" + --wait +) +[[ -n "${WEBHOOK_DOCKER_PULL_POLICY-}" ]] && + emtp_helm_args+=(--set webhookTestProxy.image.pullPolicy="$WEBHOOK_DOCKER_PULL_POLICY") +[[ ${#common_helm_args[@]} -gt 0 ]] && + emtp_helm_args+=("${common_helm_args[@]}") + +set -x +helm "${emtp_helm_args[@]}" +set +x + +TAINT_CHECK_CYCLES=15 +TAINT_CHECK_SLEEP=15 + +DEPLOYED=0 + +for i in $(seq 1 10); do + if [[ $(kubectl get deployments regular-pod-test -o jsonpath='{.status.unavailableReplicas}') -eq 0 ]]; then + echo "✅ Verified regular-pod-test pod was scheduled and started!" + DEPLOYED=1 + break + fi + sleep 5 +done + +if [[ $DEPLOYED -eq 0 ]]; then + echo "❌ regular-pod-test pod deployment failed" + fail_and_exit 2 +fi + + +EC2_STATE_CHANGE_EVENT=$(cat < /dev/null; then + echo "✅ Verified the worker node was cordoned!" + cordoned=1 + fi + + if [[ $cordoned -eq 1 && $(kubectl get deployments regular-pod-test -o=jsonpath='{.status.unavailableReplicas}') -eq 1 ]]; then + echo "✅ Verified the regular-pod-test pod was evicted!" + evicted=1 + fi + + if [[ ${evicted} -eq 1 && $(kubectl exec -i "${localstack_pod}" -- bash -c "$GET_ATTRS_SQS_CMD" | jq '(.Attributes.ApproximateNumberOfMessagesNotVisible|tonumber) + (.Attributes.ApproximateNumberOfMessages|tonumber)' ) -eq 0 ]]; then + kubectl exec -i "${localstack_pod}" -- bash -c "$GET_ATTRS_SQS_CMD" + echo "✅ Verified the message was deleted from the queue after processing!" + message_deleted=1 + break + fi + + echo "Assertion Loop $i/$TAINT_CHECK_CYCLES, sleeping for $TAINT_CHECK_SLEEP seconds" + sleep $TAINT_CHECK_SLEEP +done + +if [[ $cordoned -eq 0 ]]; then + echo "❌ Worker node was not cordoned" + exit 3 +elif [[ $evicted -eq 0 ]]; then + echo "❌ regular-pod-test was not evicted" + exit 3 +elif [[ $message_deleted -eq 0 ]]; then + echo "❌ Message was not removed from the queue after processing" + exit 3 +fi + +POD_NAME=$(get_nth_worker_pod) +echo "✅ Fetched the pod $POD_NAME " + +kubectl -n kube-system port-forward "$POD_NAME" 7000:9092 & +PORT_FORWARD_PID=$! +trap 'kill ${PORT_FORWARD_PID}' EXIT SIGINT SIGTERM ERR +echo "✅ Port-forwarded pod $POD_NAME" + +sleep 10 + +for i in $(seq 1 $TAINT_CHECK_CYCLES); do + METRICS_RESPONSE=$(curl -L localhost:7000/metrics) + echo "✅ Fetched /metrics." + failed="" + for METRIC in cordon-and-drain post-drain nth_tagged_instances nth_tagged_nodes runtime_go_gc runtime_go_goroutines runtime_go_mem; do + if [[ $METRICS_RESPONSE == *"$METRIC"* ]]; then + echo "✅ Metric $METRIC!" + else + echo "⚠️ Metric $METRIC" + failed=$METRIC + break + fi + done + if [ -z $failed ]; then + break + fi + echo "Metrics Loop $i/$TAINT_CHECK_CYCLES, sleeping for $TAINT_CHECK_SLEEP seconds" + sleep $TAINT_CHECK_SLEEP +done + +if [[ -n $failed ]]; then + exit 4 +fi + +metric_name="actions_total" +for action in cordon-and-drain post-drain; do + labels='node_action="'$action'",node_status="success",otel_scope_name="aws.node.termination.handler",otel_scope_version=""' + query="$metric_name{$labels}" + counter_value=$(echo "$METRICS_RESPONSE" | grep -E "${query}[[:space:]]+[0-9]+" | awk '{print $NF}') + if (($counter_value < 1)); then + echo "❌ Failed counter count for metric action:$action" + exit 5 + fi + echo "✅ Fetched counter:$counter_value for metric with action:$action" +done + +gauge="nth_tagged_instances" +query=''$gauge'{otel_scope_name="aws.node.termination.handler",otel_scope_version=""}' +counter_value=$(echo "$METRICS_RESPONSE" | grep -E "${query}[[:space:]]+[0-9]+" | awk '{print $NF}') +if (($counter_value < 2)); then + echo "❌ Failed gauge count for metric:$gauge" + exit 5 +fi +echo "✅ Fetched gauge:$counter_value for metric:$gauge"