From f4ae6deb8e00f602302979fe983ad744904d9620 Mon Sep 17 00:00:00 2001 From: phuhung273 Date: Wed, 1 Jan 2025 02:25:21 +0000 Subject: [PATCH 1/8] Prometheus add nodes gauge for SQS mode --- cmd/node-termination-handler.go | 28 ++++---- pkg/ec2helper/ec2helper.go | 92 ++++++++++++++++++++++++ pkg/ec2helper/ec2helper_test.go | 74 +++++++++++++++++++ pkg/node/node.go | 26 +++++++ pkg/node/node_test.go | 32 ++++++++- pkg/observability/opentelemetry.go | 86 ++++++++++++++++++++-- pkg/observability/opentelemetry_test.go | 96 ++++++++++++++++++++++++- 7 files changed, 415 insertions(+), 19 deletions(-) create mode 100644 pkg/ec2helper/ec2helper.go create mode 100644 pkg/ec2helper/ec2helper_test.go diff --git a/cmd/node-termination-handler.go b/cmd/node-termination-handler.go index 700543ea..238a5b7a 100644 --- a/cmd/node-termination-handler.go +++ b/cmd/node-termination-handler.go @@ -120,7 +120,20 @@ func main() { log.Fatal().Err(err).Msg("Unable to instantiate a node for various kubernetes node functions,") } - metrics, err := observability.InitMetrics(nthConfig.EnablePrometheus, nthConfig.PrometheusPort) + cfg := aws.NewConfig().WithRegion(nthConfig.AWSRegion).WithEndpoint(nthConfig.AWSEndpoint).WithSTSRegionalEndpoint(endpoints.RegionalSTSEndpoint) + sess := session.Must(session.NewSessionWithOptions(session.Options{ + Config: *cfg, + SharedConfigState: session.SharedConfigEnable, + })) + creds, err := sess.Config.Credentials.Get() + if err != nil { + log.Fatal().Err(err).Msg("Unable to get AWS credentials") + } + log.Debug().Msgf("AWS Credentials retrieved from provider: %s", creds.ProviderName) + + ec2 := ec2.New(sess) + + metrics, err := observability.InitMetrics(nthConfig, node, ec2) if err != nil { nthConfig.Print() log.Fatal().Err(err).Msg("Unable to instantiate observability metrics,") @@ -204,17 +217,6 @@ func main() { } } if nthConfig.EnableSQSTerminationDraining { - cfg := aws.NewConfig().WithRegion(nthConfig.AWSRegion).WithEndpoint(nthConfig.AWSEndpoint).WithSTSRegionalEndpoint(endpoints.RegionalSTSEndpoint) - sess := session.Must(session.NewSessionWithOptions(session.Options{ - Config: *cfg, - SharedConfigState: session.SharedConfigEnable, - })) - creds, err := sess.Config.Credentials.Get() - if err != nil { - log.Fatal().Err(err).Msg("Unable to get AWS credentials") - } - log.Debug().Msgf("AWS Credentials retrieved from provider: %s", creds.ProviderName) - completeLifecycleActionDelay := time.Duration(nthConfig.CompleteLifecycleActionDelaySeconds) * time.Second sqsMonitor := sqsevent.SQSMonitor{ CheckIfManaged: nthConfig.CheckTagBeforeDraining, @@ -224,7 +226,7 @@ func main() { CancelChan: cancelChan, SQS: sqsevent.GetSqsClient(sess), ASG: autoscaling.New(sess), - EC2: ec2.New(sess), + EC2: ec2, BeforeCompleteLifecycleAction: func() { <-time.After(completeLifecycleActionDelay) }, } monitoringFns[sqsEvents] = sqsMonitor diff --git a/pkg/ec2helper/ec2helper.go b/pkg/ec2helper/ec2helper.go new file mode 100644 index 00000000..2405cce2 --- /dev/null +++ b/pkg/ec2helper/ec2helper.go @@ -0,0 +1,92 @@ +// Copyright 2016-2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"). You may +// not use this file except in compliance with the License. A copy of the +// License is located at +// +// http://aws.amazon.com/apache2.0/ +// +// or in the "license" file accompanying this file. This file is distributed +// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +// express or implied. See the License for the specific language governing +// permissions and limitations under the License. + +package ec2helper + +import ( + "fmt" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/service/ec2" + "github.com/aws/aws-sdk-go/service/ec2/ec2iface" +) + +type IEC2Helper interface { + GetInstanceIdsMapByTagKey(tag string) (map[string]bool, error) +} + +type EC2Helper struct { + ec2ServiceClient ec2iface.EC2API +} + +func New(ec2 ec2iface.EC2API) EC2Helper { + return EC2Helper{ + ec2ServiceClient: ec2, + } +} + +func (h EC2Helper) GetInstanceIdsByTagKey(tag string) ([]string, error) { + ids := []string{} + nextToken := "" + + for { + result, err := h.ec2ServiceClient.DescribeInstances(&ec2.DescribeInstancesInput{ + Filters: []*ec2.Filter{ + { + Name: aws.String("tag-key"), + Values: []*string{aws.String(tag)}, + }, + }, + NextToken: &nextToken, + }) + + if err != nil { + return ids, err + } + + if result == nil || len(result.Reservations) == 0 || + len(result.Reservations[0].Instances) == 0 { + return ids, fmt.Errorf("failed to describe instances") + } + + for _, reservation := range result.Reservations { + for _, instance := range reservation.Instances { + if instance.InstanceId == nil { + continue + } + ids = append(ids, *instance.InstanceId) + } + } + + if result.NextToken == nil { + break + } + nextToken = *result.NextToken + } + + return ids, nil +} + +func (h EC2Helper) GetInstanceIdsMapByTagKey(tag string) (map[string]bool, error) { + idMap := map[string]bool{} + ids, err := h.GetInstanceIdsByTagKey(tag) + if err != nil { + return idMap, err + } + + for _, id := range ids { + idMap[id] = true + } + + return idMap, nil +} diff --git a/pkg/ec2helper/ec2helper_test.go b/pkg/ec2helper/ec2helper_test.go new file mode 100644 index 00000000..01e097c0 --- /dev/null +++ b/pkg/ec2helper/ec2helper_test.go @@ -0,0 +1,74 @@ +// Copyright 2016-2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"). You may +// not use this file except in compliance with the License. A copy of the +// License is located at +// +// http://aws.amazon.com/apache2.0/ +// +// or in the "license" file accompanying this file. This file is distributed +// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +// express or implied. See the License for the specific language governing +// permissions and limitations under the License. + +package ec2helper_test + +import ( + "testing" + + "github.com/aws/aws-node-termination-handler/pkg/ec2helper" + h "github.com/aws/aws-node-termination-handler/pkg/test" + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/service/ec2" +) + +const ( + instanceId1 = "i-1" + instanceId2 = "i-2" +) + +func TestGetInstanceIdsByTagKey(t *testing.T) { + ec2Mock := h.MockedEC2{ + DescribeInstancesResp: getDescribeInstancesResp(), + } + ec2Helper := ec2helper.New(ec2Mock) + instanceIds, err := ec2Helper.GetInstanceIdsByTagKey("myNTHManagedTag") + h.Ok(t, err) + + h.Equals(t, 2, len(instanceIds)) + h.Equals(t, instanceId1, instanceIds[0]) + h.Equals(t, instanceId2, instanceIds[1]) +} + +func TestGetInstanceIdsMapByTagKey(t *testing.T) { + ec2Mock := h.MockedEC2{ + DescribeInstancesResp: getDescribeInstancesResp(), + } + ec2Helper := ec2helper.New(ec2Mock) + instanceIdsMap, err := ec2Helper.GetInstanceIdsMapByTagKey("myNTHManagedTag") + h.Ok(t, err) + + _, exist := instanceIdsMap[instanceId1] + h.Equals(t, true, exist) + _, exist = instanceIdsMap[instanceId2] + h.Equals(t, true, exist) + _, exist = instanceIdsMap["non-existent instance id"] + h.Equals(t, false, exist) +} + +func getDescribeInstancesResp() ec2.DescribeInstancesOutput { + return ec2.DescribeInstancesOutput{ + Reservations: []*ec2.Reservation{ + { + Instances: []*ec2.Instance{ + { + InstanceId: aws.String(instanceId1), + }, + { + InstanceId: aws.String(instanceId2), + }, + }, + }, + }, + } +} diff --git a/pkg/node/node.go b/pkg/node/node.go index 7e323d13..b8b40082 100644 --- a/pkg/node/node.go +++ b/pkg/node/node.go @@ -635,6 +635,32 @@ func (n Node) fetchKubernetesNode(nodeName string) (*corev1.Node, error) { return &matchingNodes.Items[0], nil } +// fetchKubernetesNode will send an http request to the k8s api server and return list of AWS EC2 instance id +func (n Node) FetchKubernetesNodeInstanceIds() ([]string, error) { + ids := []string{} + + if n.nthConfig.DryRun { + return ids, nil + } + matchingNodes, err := n.drainHelper.Client.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{}) + if err != nil { + log.Warn().Msgf("Unable to list Nodes") + return ids, err + } + + if matchingNodes == nil || len(matchingNodes.Items) == 0 { + return ids, nil + } + + for _, node := range matchingNodes.Items { + // sample providerID: aws:///us-west-2a/i-0abcd1234efgh5678 + parts := strings.Split(node.Spec.ProviderID, "/") + ids = append(ids, parts[len(parts)-1]) + } + + return ids, nil +} + func (n Node) fetchAllPods(nodeName string) (*corev1.PodList, error) { if n.nthConfig.DryRun { log.Info().Msgf("Would have retrieved running pod list on node %s, but dry-run flag was set", nodeName) diff --git a/pkg/node/node_test.go b/pkg/node/node_test.go index 0e4c393b..e94ca88c 100644 --- a/pkg/node/node_test.go +++ b/pkg/node/node_test.go @@ -15,6 +15,7 @@ package node_test import ( "context" + "fmt" "strconv" "strings" "testing" @@ -35,7 +36,11 @@ import ( ) // Size of the fakeRecorder buffer -const recorderBufferSize = 10 +const ( + recorderBufferSize = 10 + instanceId1 = "i-0abcd1234efgh5678" + instanceId2 = "i-0wxyz5678ijkl1234" +) var nodeName = "NAME" @@ -379,6 +384,31 @@ func TestUncordonIfRebootedTimeParseFailure(t *testing.T) { h.Assert(t, err != nil, "Failed to return error on UncordonIfReboted failure to parse time") } +func TestFetchKubernetesNodeInstanceIds(t *testing.T) { + client := fake.NewSimpleClientset( + &v1.Node{ + ObjectMeta: metav1.ObjectMeta{Name: "node-1"}, + Spec: v1.NodeSpec{ProviderID: fmt.Sprintf("aws:///us-west-2a/%s", instanceId1)}, + }, + &v1.Node{ + ObjectMeta: metav1.ObjectMeta{Name: "node-2"}, + Spec: v1.NodeSpec{ProviderID: fmt.Sprintf("aws:///us-west-2a/%s", instanceId2)}, + }, + ) + + _, err := client.CoreV1().Nodes().List(context.Background(), metav1.ListOptions{}) + h.Ok(t, err) + + node, err := newNode(config.Config{}, client) + h.Ok(t, err) + + instanceIds, err := node.FetchKubernetesNodeInstanceIds() + h.Ok(t, err) + h.Equals(t, 2, len(instanceIds)) + h.Equals(t, instanceId1, instanceIds[0]) + h.Equals(t, instanceId2, instanceIds[1]) +} + func TestFilterOutDaemonSetPods(t *testing.T) { tNode, err := newNode(config.Config{IgnoreDaemonSets: true}, fake.NewSimpleClientset()) h.Ok(t, err) diff --git a/pkg/observability/opentelemetry.go b/pkg/observability/opentelemetry.go index 706fff07..581792d5 100644 --- a/pkg/observability/opentelemetry.go +++ b/pkg/observability/opentelemetry.go @@ -21,6 +21,10 @@ import ( "strconv" "time" + "github.com/aws/aws-node-termination-handler/pkg/config" + "github.com/aws/aws-node-termination-handler/pkg/ec2helper" + "github.com/aws/aws-node-termination-handler/pkg/node" + "github.com/aws/aws-sdk-go/service/ec2/ec2iface" "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/rs/zerolog/log" @@ -44,14 +48,19 @@ var ( // Metrics represents the stats for observability type Metrics struct { enabled bool + nthConfig config.Config + ec2Helper ec2helper.EC2Helper + node *node.Node meter api.Meter actionsCounter api.Int64Counter actionsCounterV2 api.Int64Counter errorEventsCounter api.Int64Counter + nodesGauge api.Int64Gauge + instancesGauge api.Int64Gauge } // InitMetrics will initialize, register and expose, via http server, the metrics with Opentelemetry. -func InitMetrics(enabled bool, port int) (Metrics, error) { +func InitMetrics(nthConfig config.Config, node *node.Node, ec2 ec2iface.EC2API) (Metrics, error) { exporter, err := prometheus.New() if err != nil { return Metrics{}, fmt.Errorf("failed to create Prometheus exporter: %w", err) @@ -61,7 +70,10 @@ func InitMetrics(enabled bool, port int) (Metrics, error) { if err != nil { return Metrics{}, fmt.Errorf("failed to register metrics with Prometheus provider: %w", err) } - metrics.enabled = enabled + metrics.enabled = nthConfig.EnablePrometheus + metrics.ec2Helper = ec2helper.New(ec2) + metrics.node = node + metrics.nthConfig = nthConfig // Starts an async process to collect golang runtime stats // go.opentelemetry.io/contrib/instrumentation/runtime @@ -70,13 +82,46 @@ func InitMetrics(enabled bool, port int) (Metrics, error) { return Metrics{}, fmt.Errorf("failed to start Go runtime metrics collection: %w", err) } - if enabled { - serveMetrics(port) + if metrics.enabled { + metrics.initCronMetrics() + serveMetrics(nthConfig.PrometheusPort) } return metrics, nil } +func (m Metrics) initCronMetrics() { + // Run a periodic task + ticker := time.NewTicker(30 * time.Second) + defer ticker.Stop() + + for range ticker.C { + m.serveNodeMetrics() + } +} + +func (m Metrics) serveNodeMetrics() { + instanceIdsMap, err := m.ec2Helper.GetInstanceIdsMapByTagKey(m.nthConfig.ManagedTag) + if err != nil { + log.Err(err).Msg("Failed to get AWS instance ids") + } else { + m.InstancesRecord(int64(len(instanceIdsMap))) + } + + nodeInstanceIds, err := m.node.FetchKubernetesNodeInstanceIds() + if err != nil { + log.Err(err).Msg("Failed to get node instance ids") + } else { + nodeCount := 0 + for _, id := range nodeInstanceIds { + if _, ok := instanceIdsMap[id]; ok { + nodeCount++ + } + } + m.NodesRecord(int64(nodeCount)) + } +} + // ErrorEventsInc will increment one for the event errors counter, partitioned by action, and only if metrics are enabled. func (m Metrics) ErrorEventsInc(where string) { if !m.enabled { @@ -105,6 +150,22 @@ func (m Metrics) NodeActionsInc(action, nodeName string, eventID string, err err m.actionsCounterV2.Add(context.Background(), 1, api.WithAttributes(labelsV2...)) } +func (m Metrics) NodesRecord(num int64) { + if !m.enabled { + return + } + + m.nodesGauge.Record(context.Background(), num) +} + +func (m Metrics) InstancesRecord(num int64) { + if !m.enabled { + return + } + + m.instancesGauge.Record(context.Background(), num) +} + func registerMetricsWith(provider *metric.MeterProvider) (Metrics, error) { meter := provider.Meter("aws.node.termination.handler") @@ -131,11 +192,28 @@ func registerMetricsWith(provider *metric.MeterProvider) (Metrics, error) { return Metrics{}, fmt.Errorf("failed to create Prometheus counter %q: %w", name, err) } errorEventsCounter.Add(context.Background(), 0) + + name = "nth_managed_nodes" + nodesGauge, err := meter.Int64Gauge(name, api.WithDescription("Number of nodes processing")) + if err != nil { + return Metrics{}, fmt.Errorf("failed to create Prometheus gauge %q: %w", name, err) + } + nodesGauge.Record(context.Background(), 0) + + name = "nth_managed_instances" + instancesGauge, err := meter.Int64Gauge(name, api.WithDescription("Number of instances processing")) + if err != nil { + return Metrics{}, fmt.Errorf("failed to create Prometheus gauge %q: %w", name, err) + } + instancesGauge.Record(context.Background(), 0) + return Metrics{ meter: meter, errorEventsCounter: errorEventsCounter, actionsCounter: actionsCounter, actionsCounterV2: actionsCounterV2, + nodesGauge: nodesGauge, + instancesGauge: instancesGauge, }, nil } diff --git a/pkg/observability/opentelemetry_test.go b/pkg/observability/opentelemetry_test.go index 237f456e..687d5b93 100644 --- a/pkg/observability/opentelemetry_test.go +++ b/pkg/observability/opentelemetry_test.go @@ -25,13 +25,25 @@ import ( "testing" "time" + "github.com/rs/zerolog/log" + "github.com/prometheus/client_golang/prometheus/promhttp" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/exporters/prometheus" api "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/sdk/metric" - + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes/fake" + "k8s.io/kubectl/pkg/drain" + + "github.com/aws/aws-node-termination-handler/pkg/config" + "github.com/aws/aws-node-termination-handler/pkg/ec2helper" + "github.com/aws/aws-node-termination-handler/pkg/node" h "github.com/aws/aws-node-termination-handler/pkg/test" + "github.com/aws/aws-node-termination-handler/pkg/uptime" + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/service/ec2" ) var ( @@ -48,6 +60,9 @@ var ( errorStatus = "error" mockDefaultPort = 9092 mockClosedPort = 9093 + instanceId1 = "i-1" + instanceId2 = "i-2" + instanceId3 = "i-3" ) func TestInitMetrics(t *testing.T) { @@ -109,6 +124,8 @@ func TestRegisterMetricsWith(t *testing.T) { const errorEventMetricsTotal = 23 const successActionMetricsTotal = 31 const errorActionMetricsTotal = 97 + const managedInstancesTotal = 3 + const managedNodesTotal = 5 metrics := getMetrics(t) @@ -126,6 +143,9 @@ func TestRegisterMetricsWith(t *testing.T) { metrics.actionsCounterV2.Add(context.Background(), 1, api.WithAttributes(errorActionlabels...)) } + metrics.NodesRecord(managedNodesTotal) + metrics.InstancesRecord(managedInstancesTotal) + responseRecorder := mockMetricsRequest() validateStatus(t, responseRecorder) @@ -135,6 +155,57 @@ func TestRegisterMetricsWith(t *testing.T) { validateEventErrorTotal(t, metricsMap, errorEventMetricsTotal) validateActionTotalV2(t, metricsMap, successActionMetricsTotal, successStatus) validateActionTotalV2(t, metricsMap, errorActionMetricsTotal, errorStatus) + validateGauge(t, metricsMap, managedNodesTotal, "nth_managed_nodes") + validateGauge(t, metricsMap, managedInstancesTotal, "nth_managed_instances") +} + +func TestServeNodeMetrics(t *testing.T) { + metrics := getMetrics(t) + metrics.ec2Helper = ec2helper.New(h.MockedEC2{ + DescribeInstancesResp: ec2.DescribeInstancesOutput{ + Reservations: []*ec2.Reservation{ + { + Instances: []*ec2.Instance{ + { + InstanceId: aws.String(instanceId1), + }, + { + InstanceId: aws.String(instanceId2), + }, + { + InstanceId: aws.String(instanceId3), + }, + }, + }, + }, + }, + }) + + helper := getDrainHelper(fake.NewSimpleClientset( + &v1.Node{ + ObjectMeta: metav1.ObjectMeta{Name: "node-1"}, + Spec: v1.NodeSpec{ProviderID: fmt.Sprintf("aws:///us-west-2a/%s", instanceId1)}, + }, + &v1.Node{ + ObjectMeta: metav1.ObjectMeta{Name: "node-2"}, + Spec: v1.NodeSpec{ProviderID: fmt.Sprintf("aws:///us-west-2a/%s", instanceId2)}, + }, + )) + + node, err := node.NewWithValues(config.Config{}, helper, uptime.Uptime) + h.Ok(t, err) + + metrics.node = node + metrics.serveNodeMetrics() + + responseRecorder := mockMetricsRequest() + + validateStatus(t, responseRecorder) + + metricsMap := getMetricsMap(responseRecorder.Body.String()) + + validateGauge(t, metricsMap, 2, "nth_managed_nodes") + validateGauge(t, metricsMap, 3, "nth_managed_instances") } func TestServeMetrics(t *testing.T) { @@ -225,6 +296,20 @@ func getMetricsMap(body string) map[string]string { return metricsMap } +func getDrainHelper(client *fake.Clientset) *drain.Helper { + return &drain.Helper{ + Ctx: context.TODO(), + Client: client, + Force: true, + GracePeriodSeconds: -1, + IgnoreAllDaemonSets: true, + DeleteEmptyDirData: true, + Timeout: time.Duration(120) * time.Second, + Out: log.Logger, + ErrOut: log.Logger, + } +} + func validateEventErrorTotal(t *testing.T, metricsMap map[string]string, expectedTotal int) { eventErrorTotalKey := fmt.Sprintf("events_error_total{event_error_where=\"%v\",otel_scope_name=\"%v\",otel_scope_version=\"\"}", mockErrorEvent, mockNth) actualValue, exists := metricsMap[eventErrorTotalKey] @@ -242,3 +327,12 @@ func validateActionTotalV2(t *testing.T, metricsMap map[string]string, expectedT } h.Equals(t, strconv.Itoa(expectedTotal), actualValue) } + +func validateGauge(t *testing.T, metricsMap map[string]string, expectedTotal int, name string) { + actionTotalKey := fmt.Sprintf("%v{otel_scope_name=\"%v\",otel_scope_version=\"\"}", name, mockNth) + actualValue, exists := metricsMap[actionTotalKey] + if !exists { + actualValue = "0" + } + h.Equals(t, strconv.Itoa(expectedTotal), actualValue) +} From a3ba27e32f7f7d25447f7170bd155ef8ac138144 Mon Sep 17 00:00:00 2001 From: phuhung273 Date: Wed, 8 Jan 2025 15:01:41 +0000 Subject: [PATCH 2/8] add nil check, fix client creation order --- cmd/node-termination-handler.go | 40 +++++++++++++++--------------- pkg/ec2helper/ec2helper.go | 20 +++++++++------ pkg/node/node.go | 19 +++++++++++--- pkg/observability/opentelemetry.go | 7 +++--- 4 files changed, 52 insertions(+), 34 deletions(-) diff --git a/cmd/node-termination-handler.go b/cmd/node-termination-handler.go index 238a5b7a..e7687110 100644 --- a/cmd/node-termination-handler.go +++ b/cmd/node-termination-handler.go @@ -120,25 +120,6 @@ func main() { log.Fatal().Err(err).Msg("Unable to instantiate a node for various kubernetes node functions,") } - cfg := aws.NewConfig().WithRegion(nthConfig.AWSRegion).WithEndpoint(nthConfig.AWSEndpoint).WithSTSRegionalEndpoint(endpoints.RegionalSTSEndpoint) - sess := session.Must(session.NewSessionWithOptions(session.Options{ - Config: *cfg, - SharedConfigState: session.SharedConfigEnable, - })) - creds, err := sess.Config.Credentials.Get() - if err != nil { - log.Fatal().Err(err).Msg("Unable to get AWS credentials") - } - log.Debug().Msgf("AWS Credentials retrieved from provider: %s", creds.ProviderName) - - ec2 := ec2.New(sess) - - metrics, err := observability.InitMetrics(nthConfig, node, ec2) - if err != nil { - nthConfig.Print() - log.Fatal().Err(err).Msg("Unable to instantiate observability metrics,") - } - err = observability.InitProbes(nthConfig.EnableProbes, nthConfig.ProbesPort, nthConfig.ProbesEndpoint) if err != nil { nthConfig.Print() @@ -167,6 +148,25 @@ func main() { log.Fatal().Msgf("Unable to find the AWS region to process queue events.") } + cfg := aws.NewConfig().WithRegion(nthConfig.AWSRegion).WithEndpoint(nthConfig.AWSEndpoint).WithSTSRegionalEndpoint(endpoints.RegionalSTSEndpoint) + sess := session.Must(session.NewSessionWithOptions(session.Options{ + Config: *cfg, + SharedConfigState: session.SharedConfigEnable, + })) + creds, err := sess.Config.Credentials.Get() + if err != nil { + log.Fatal().Err(err).Msg("Unable to get AWS credentials") + } + log.Debug().Msgf("AWS Credentials retrieved from provider: %s", creds.ProviderName) + + ec2Client := ec2.New(sess) + + metrics, err := observability.InitMetrics(nthConfig, node, ec2Client) + if err != nil { + nthConfig.Print() + log.Fatal().Err(err).Msg("Unable to instantiate observability metrics") + } + recorder, err := observability.InitK8sEventRecorder(nthConfig.EmitKubernetesEvents, nthConfig.NodeName, nthConfig.EnableSQSTerminationDraining, nodeMetadata, nthConfig.KubernetesEventsExtraAnnotations, clientset) if err != nil { nthConfig.Print() @@ -226,7 +226,7 @@ func main() { CancelChan: cancelChan, SQS: sqsevent.GetSqsClient(sess), ASG: autoscaling.New(sess), - EC2: ec2, + EC2: ec2Client, BeforeCompleteLifecycleAction: func() { <-time.After(completeLifecycleActionDelay) }, } monitoringFns[sqsEvents] = sqsMonitor diff --git a/pkg/ec2helper/ec2helper.go b/pkg/ec2helper/ec2helper.go index 2405cce2..c6ff33eb 100644 --- a/pkg/ec2helper/ec2helper.go +++ b/pkg/ec2helper/ec2helper.go @@ -37,7 +37,7 @@ func New(ec2 ec2iface.EC2API) EC2Helper { func (h EC2Helper) GetInstanceIdsByTagKey(tag string) ([]string, error) { ids := []string{} - nextToken := "" + var nextToken string for { result, err := h.ec2ServiceClient.DescribeInstances(&ec2.DescribeInstancesInput{ @@ -51,17 +51,19 @@ func (h EC2Helper) GetInstanceIdsByTagKey(tag string) ([]string, error) { }) if err != nil { - return ids, err + return nil, err } - if result == nil || len(result.Reservations) == 0 || - len(result.Reservations[0].Instances) == 0 { - return ids, fmt.Errorf("failed to describe instances") + if result == nil || result.Reservations == nil { + return nil, fmt.Errorf("failed to describe instances") } for _, reservation := range result.Reservations { + if reservation.Instances == nil { + continue + } for _, instance := range reservation.Instances { - if instance.InstanceId == nil { + if instance == nil || instance.InstanceId == nil { continue } ids = append(ids, *instance.InstanceId) @@ -81,7 +83,11 @@ func (h EC2Helper) GetInstanceIdsMapByTagKey(tag string) (map[string]bool, error idMap := map[string]bool{} ids, err := h.GetInstanceIdsByTagKey(tag) if err != nil { - return idMap, err + return nil, err + } + + if ids == nil { + return nil, fmt.Errorf("failed to describe instances") } for _, id := range ids { diff --git a/pkg/node/node.go b/pkg/node/node.go index b8b40082..4efb76f8 100644 --- a/pkg/node/node.go +++ b/pkg/node/node.go @@ -17,6 +17,7 @@ import ( "context" "encoding/json" "fmt" + "regexp" "strconv" "strings" "time" @@ -74,6 +75,7 @@ const ( var ( maxRetryDeadline time.Duration = 5 * time.Second conflictRetryInterval time.Duration = 750 * time.Millisecond + instanceIDRegex = regexp.MustCompile(`^i-.*`) ) // Node represents a kubernetes node with functions to manipulate its state via the kubernetes api server @@ -640,22 +642,31 @@ func (n Node) FetchKubernetesNodeInstanceIds() ([]string, error) { ids := []string{} if n.nthConfig.DryRun { + log.Info().Msgf("Would have retrieved nodes, but dry-run flag was set") return ids, nil } matchingNodes, err := n.drainHelper.Client.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{}) if err != nil { log.Warn().Msgf("Unable to list Nodes") - return ids, err + return nil, err } - if matchingNodes == nil || len(matchingNodes.Items) == 0 { - return ids, nil + if matchingNodes == nil || matchingNodes.Items == nil { + return nil, fmt.Errorf("failed to list nodes") } for _, node := range matchingNodes.Items { // sample providerID: aws:///us-west-2a/i-0abcd1234efgh5678 parts := strings.Split(node.Spec.ProviderID, "/") - ids = append(ids, parts[len(parts)-1]) + if len(parts) < 2 { + log.Warn().Msgf("Found invalid providerID: %s", node.Spec.ProviderID) + continue + } + + instanceId := parts[len(parts)-1] + if instanceIDRegex.MatchString(instanceId) { + ids = append(ids, parts[len(parts)-1]) + } } return ids, nil diff --git a/pkg/observability/opentelemetry.go b/pkg/observability/opentelemetry.go index 581792d5..d131ddad 100644 --- a/pkg/observability/opentelemetry.go +++ b/pkg/observability/opentelemetry.go @@ -83,7 +83,7 @@ func InitMetrics(nthConfig config.Config, node *node.Node, ec2 ec2iface.EC2API) } if metrics.enabled { - metrics.initCronMetrics() + go metrics.initCronMetrics() serveMetrics(nthConfig.PrometheusPort) } @@ -102,14 +102,15 @@ func (m Metrics) initCronMetrics() { func (m Metrics) serveNodeMetrics() { instanceIdsMap, err := m.ec2Helper.GetInstanceIdsMapByTagKey(m.nthConfig.ManagedTag) - if err != nil { + if err != nil || instanceIdsMap == nil { log.Err(err).Msg("Failed to get AWS instance ids") + return } else { m.InstancesRecord(int64(len(instanceIdsMap))) } nodeInstanceIds, err := m.node.FetchKubernetesNodeInstanceIds() - if err != nil { + if err != nil || nodeInstanceIds == nil { log.Err(err).Msg("Failed to get node instance ids") } else { nodeCount := 0 From d139b861312351665a6efdf94295b9d27de932b7 Mon Sep 17 00:00:00 2001 From: phuhung273 Date: Mon, 13 Jan 2025 14:49:13 +0000 Subject: [PATCH 3/8] restore client creation order --- cmd/node-termination-handler.go | 40 +++++++++-------- pkg/observability/opentelemetry.go | 58 ++++++++++++------------- pkg/observability/opentelemetry_test.go | 8 ++-- 3 files changed, 54 insertions(+), 52 deletions(-) diff --git a/cmd/node-termination-handler.go b/cmd/node-termination-handler.go index e7687110..74384565 100644 --- a/cmd/node-termination-handler.go +++ b/cmd/node-termination-handler.go @@ -120,6 +120,12 @@ func main() { log.Fatal().Err(err).Msg("Unable to instantiate a node for various kubernetes node functions,") } + metrics, err := observability.InitMetrics(nthConfig) + if err != nil { + nthConfig.Print() + log.Fatal().Err(err).Msg("Unable to instantiate observability metrics,") + } + err = observability.InitProbes(nthConfig.EnableProbes, nthConfig.ProbesPort, nthConfig.ProbesEndpoint) if err != nil { nthConfig.Print() @@ -148,25 +154,6 @@ func main() { log.Fatal().Msgf("Unable to find the AWS region to process queue events.") } - cfg := aws.NewConfig().WithRegion(nthConfig.AWSRegion).WithEndpoint(nthConfig.AWSEndpoint).WithSTSRegionalEndpoint(endpoints.RegionalSTSEndpoint) - sess := session.Must(session.NewSessionWithOptions(session.Options{ - Config: *cfg, - SharedConfigState: session.SharedConfigEnable, - })) - creds, err := sess.Config.Credentials.Get() - if err != nil { - log.Fatal().Err(err).Msg("Unable to get AWS credentials") - } - log.Debug().Msgf("AWS Credentials retrieved from provider: %s", creds.ProviderName) - - ec2Client := ec2.New(sess) - - metrics, err := observability.InitMetrics(nthConfig, node, ec2Client) - if err != nil { - nthConfig.Print() - log.Fatal().Err(err).Msg("Unable to instantiate observability metrics") - } - recorder, err := observability.InitK8sEventRecorder(nthConfig.EmitKubernetesEvents, nthConfig.NodeName, nthConfig.EnableSQSTerminationDraining, nodeMetadata, nthConfig.KubernetesEventsExtraAnnotations, clientset) if err != nil { nthConfig.Print() @@ -217,6 +204,21 @@ func main() { } } if nthConfig.EnableSQSTerminationDraining { + cfg := aws.NewConfig().WithRegion(nthConfig.AWSRegion).WithEndpoint(nthConfig.AWSEndpoint).WithSTSRegionalEndpoint(endpoints.RegionalSTSEndpoint) + sess := session.Must(session.NewSessionWithOptions(session.Options{ + Config: *cfg, + SharedConfigState: session.SharedConfigEnable, + })) + creds, err := sess.Config.Credentials.Get() + if err != nil { + log.Fatal().Err(err).Msg("Unable to get AWS credentials") + } + log.Debug().Msgf("AWS Credentials retrieved from provider: %s", creds.ProviderName) + + ec2Client := ec2.New(sess) + + go metrics.InitNodeMetrics(node, ec2Client) + completeLifecycleActionDelay := time.Duration(nthConfig.CompleteLifecycleActionDelaySeconds) * time.Second sqsMonitor := sqsevent.SQSMonitor{ CheckIfManaged: nthConfig.CheckTagBeforeDraining, diff --git a/pkg/observability/opentelemetry.go b/pkg/observability/opentelemetry.go index d131ddad..699faf2f 100644 --- a/pkg/observability/opentelemetry.go +++ b/pkg/observability/opentelemetry.go @@ -47,20 +47,20 @@ var ( // Metrics represents the stats for observability type Metrics struct { - enabled bool - nthConfig config.Config - ec2Helper ec2helper.EC2Helper - node *node.Node - meter api.Meter - actionsCounter api.Int64Counter - actionsCounterV2 api.Int64Counter - errorEventsCounter api.Int64Counter - nodesGauge api.Int64Gauge - instancesGauge api.Int64Gauge + enabled bool + nthConfig config.Config + ec2Helper ec2helper.EC2Helper + node *node.Node + meter api.Meter + actionsCounter api.Int64Counter + actionsCounterV2 api.Int64Counter + errorEventsCounter api.Int64Counter + nthTaggedNodesGauge api.Int64Gauge + nthTaggedInstancesGauge api.Int64Gauge } // InitMetrics will initialize, register and expose, via http server, the metrics with Opentelemetry. -func InitMetrics(nthConfig config.Config, node *node.Node, ec2 ec2iface.EC2API) (Metrics, error) { +func InitMetrics(nthConfig config.Config) (Metrics, error) { exporter, err := prometheus.New() if err != nil { return Metrics{}, fmt.Errorf("failed to create Prometheus exporter: %w", err) @@ -71,8 +71,6 @@ func InitMetrics(nthConfig config.Config, node *node.Node, ec2 ec2iface.EC2API) return Metrics{}, fmt.Errorf("failed to register metrics with Prometheus provider: %w", err) } metrics.enabled = nthConfig.EnablePrometheus - metrics.ec2Helper = ec2helper.New(ec2) - metrics.node = node metrics.nthConfig = nthConfig // Starts an async process to collect golang runtime stats @@ -83,14 +81,16 @@ func InitMetrics(nthConfig config.Config, node *node.Node, ec2 ec2iface.EC2API) } if metrics.enabled { - go metrics.initCronMetrics() serveMetrics(nthConfig.PrometheusPort) } return metrics, nil } -func (m Metrics) initCronMetrics() { +func (m Metrics) InitNodeMetrics(node *node.Node, ec2 ec2iface.EC2API) { + m.ec2Helper = ec2helper.New(ec2) + m.node = node + // Run a periodic task ticker := time.NewTicker(30 * time.Second) defer ticker.Stop() @@ -156,7 +156,7 @@ func (m Metrics) NodesRecord(num int64) { return } - m.nodesGauge.Record(context.Background(), num) + m.nthTaggedNodesGauge.Record(context.Background(), num) } func (m Metrics) InstancesRecord(num int64) { @@ -164,7 +164,7 @@ func (m Metrics) InstancesRecord(num int64) { return } - m.instancesGauge.Record(context.Background(), num) + m.nthTaggedInstancesGauge.Record(context.Background(), num) } func registerMetricsWith(provider *metric.MeterProvider) (Metrics, error) { @@ -194,27 +194,27 @@ func registerMetricsWith(provider *metric.MeterProvider) (Metrics, error) { } errorEventsCounter.Add(context.Background(), 0) - name = "nth_managed_nodes" - nodesGauge, err := meter.Int64Gauge(name, api.WithDescription("Number of nodes processing")) + name = "nth_tagged_nodes" + nthTaggedNodesGauge, err := meter.Int64Gauge(name, api.WithDescription("Number of nodes processing")) if err != nil { return Metrics{}, fmt.Errorf("failed to create Prometheus gauge %q: %w", name, err) } - nodesGauge.Record(context.Background(), 0) + nthTaggedNodesGauge.Record(context.Background(), 0) - name = "nth_managed_instances" - instancesGauge, err := meter.Int64Gauge(name, api.WithDescription("Number of instances processing")) + name = "nth_tagged_instances" + nthTaggedInstancesGauge, err := meter.Int64Gauge(name, api.WithDescription("Number of instances processing")) if err != nil { return Metrics{}, fmt.Errorf("failed to create Prometheus gauge %q: %w", name, err) } - instancesGauge.Record(context.Background(), 0) + nthTaggedInstancesGauge.Record(context.Background(), 0) return Metrics{ - meter: meter, - errorEventsCounter: errorEventsCounter, - actionsCounter: actionsCounter, - actionsCounterV2: actionsCounterV2, - nodesGauge: nodesGauge, - instancesGauge: instancesGauge, + meter: meter, + errorEventsCounter: errorEventsCounter, + actionsCounter: actionsCounter, + actionsCounterV2: actionsCounterV2, + nthTaggedNodesGauge: nthTaggedNodesGauge, + nthTaggedInstancesGauge: nthTaggedInstancesGauge, }, nil } diff --git a/pkg/observability/opentelemetry_test.go b/pkg/observability/opentelemetry_test.go index 687d5b93..cf07f5e3 100644 --- a/pkg/observability/opentelemetry_test.go +++ b/pkg/observability/opentelemetry_test.go @@ -155,8 +155,8 @@ func TestRegisterMetricsWith(t *testing.T) { validateEventErrorTotal(t, metricsMap, errorEventMetricsTotal) validateActionTotalV2(t, metricsMap, successActionMetricsTotal, successStatus) validateActionTotalV2(t, metricsMap, errorActionMetricsTotal, errorStatus) - validateGauge(t, metricsMap, managedNodesTotal, "nth_managed_nodes") - validateGauge(t, metricsMap, managedInstancesTotal, "nth_managed_instances") + validateGauge(t, metricsMap, managedNodesTotal, "nth_tagged_nodes") + validateGauge(t, metricsMap, managedInstancesTotal, "nth_tagged_instances") } func TestServeNodeMetrics(t *testing.T) { @@ -204,8 +204,8 @@ func TestServeNodeMetrics(t *testing.T) { metricsMap := getMetricsMap(responseRecorder.Body.String()) - validateGauge(t, metricsMap, 2, "nth_managed_nodes") - validateGauge(t, metricsMap, 3, "nth_managed_instances") + validateGauge(t, metricsMap, 2, "nth_tagged_nodes") + validateGauge(t, metricsMap, 3, "nth_tagged_instances") } func TestServeMetrics(t *testing.T) { From aff2a6058bbaaf2014d056b678efb707f8094628 Mon Sep 17 00:00:00 2001 From: phuhung273 Date: Wed, 15 Jan 2025 14:20:46 +0000 Subject: [PATCH 4/8] EnablePrometheus check --- cmd/node-termination-handler.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/cmd/node-termination-handler.go b/cmd/node-termination-handler.go index 74384565..77016516 100644 --- a/cmd/node-termination-handler.go +++ b/cmd/node-termination-handler.go @@ -217,7 +217,9 @@ func main() { ec2Client := ec2.New(sess) - go metrics.InitNodeMetrics(node, ec2Client) + if nthConfig.EnablePrometheus { + go metrics.InitNodeMetrics(node, ec2Client) + } completeLifecycleActionDelay := time.Duration(nthConfig.CompleteLifecycleActionDelaySeconds) * time.Second sqsMonitor := sqsevent.SQSMonitor{ From 316906ea1bd59100d1f749cc935253fe65dc1107 Mon Sep 17 00:00:00 2001 From: phuhung273 Date: Thu, 16 Jan 2025 14:02:32 +0000 Subject: [PATCH 5/8] add e2e test --- test/e2e/prometheus-metrics-sqs-test | 241 +++++++++++++++++++++++++++ 1 file changed, 241 insertions(+) create mode 100755 test/e2e/prometheus-metrics-sqs-test diff --git a/test/e2e/prometheus-metrics-sqs-test b/test/e2e/prometheus-metrics-sqs-test new file mode 100755 index 00000000..ea386bed --- /dev/null +++ b/test/e2e/prometheus-metrics-sqs-test @@ -0,0 +1,241 @@ +#!/bin/bash +set -euo pipefail + +# Available env vars: +# $TMP_DIR +# $CLUSTER_NAME +# $KUBECONFIG +# $NODE_TERMINATION_HANDLER_DOCKER_REPO +# $NODE_TERMINATION_HANDLER_DOCKER_TAG +# $WEBHOOK_DOCKER_REPO +# $WEBHOOK_DOCKER_TAG +# $AEMM_URL +# $AEMM_VERSION + +echo "Starting EC2 State Change SQS Test for Node Termination Handler in SQS mode with Prometheus server enabled" +START_TIME=$(date -u +"%Y-%m-%dT%TZ") +SCRIPTPATH="$( cd "$(dirname "$0")" ; pwd -P )" +PROMETHEUS_HELM_VERSION="41.7.4" + +common_helm_args=() + +helm repo add prometheus-community https://prometheus-community.github.io/helm-charts +helm repo update +retry 5 helm install kube-prometheus-stack prometheus-community/kube-prometheus-stack --version ${PROMETHEUS_HELM_VERSION} --set prometheusOperator.admissionWebhooks.enabled="false" --set grafana.enabled="false" --set nodeExporter.enabled="false" --set kubeStateMetrics.enabled="false" + +localstack_helm_args=( + upgrade + --install + --namespace default + "$CLUSTER_NAME-localstack" + "$SCRIPTPATH/../../config/helm/localstack/" + --set defaultRegion="${AWS_REGION}" + --wait +) + +set -x +helm "${localstack_helm_args[@]}" +set +x + +sleep 10 + +RUN_INSTANCE_CMD="awslocal ec2 run-instances --private-ip-address ${WORKER_IP} --region ${AWS_REGION} --tag-specifications 'ResourceType=instance,Tags=[{Key=aws:autoscaling:groupName,Value=nth-integ-test},{Key=aws-node-termination-handler/managed,Value=blah}]'" +set -x +localstack_pod=$(kubectl get pods --selector app=localstack --field-selector="status.phase=Running" \ + -o go-template --template '{{range .items}}{{.metadata.name}} {{.metadata.creationTimestamp}}{{"\n"}}{{end}}' \ + | awk '$2 >= "'"${START_TIME//+0000/Z}"'" { print $1 }') +echo "🥑 Using localstack pod $localstack_pod" +run_instances_resp=$(kubectl exec -i "${localstack_pod}" -- bash -c "$RUN_INSTANCE_CMD") +instance_id=$(echo "${run_instances_resp}" | jq -r '.Instances[] .InstanceId') +echo "🥑 Started mock EC2 instance ($instance_id)" + +CREATE_SQS_CMD="awslocal sqs create-queue --queue-name "${CLUSTER_NAME}-queue" --attributes MessageRetentionPeriod=300 --region ${AWS_REGION}" +queue_url=$(kubectl exec -i "${localstack_pod}" -- bash -c "$CREATE_SQS_CMD" | jq -r .QueueUrl) + +echo "🥑 Created SQS Queue ${queue_url}" + +anth_helm_args=( + upgrade + --install + --namespace kube-system + "$CLUSTER_NAME-anth" + "$SCRIPTPATH/../../config/helm/aws-node-termination-handler/" + --set instanceMetadataURL="${INSTANCE_METADATA_URL:-"http://$AEMM_URL:$IMDS_PORT"}" + --set image.repository="$NODE_TERMINATION_HANDLER_DOCKER_REPO" + --set image.tag="$NODE_TERMINATION_HANDLER_DOCKER_TAG" + --set enablePrometheusServer="true" + --set podMonitor.create="true" + --set daemonsetTolerations="" + --set awsAccessKeyID=foo + --set awsSecretAccessKey=bar + --set awsRegion="${AWS_REGION}" + --set awsEndpoint="http://localstack.default" + --set checkTagBeforeDraining=false + --set enableSqsTerminationDraining=true + --set "queueURL=${queue_url}" + --wait + --force +) +[[ -n "${NODE_TERMINATION_HANDLER_DOCKER_PULL_POLICY-}" ]] && + anth_helm_args+=(--set image.pullPolicy="$NODE_TERMINATION_HANDLER_DOCKER_PULL_POLICY") +[[ ${#common_helm_args[@]} -gt 0 ]] && + anth_helm_args+=("${common_helm_args[@]}") + +set -x +helm "${anth_helm_args[@]}" +set +x + +emtp_helm_args=( + upgrade + --install + --namespace default + "$CLUSTER_NAME-emtp" + "$SCRIPTPATH/../../config/helm/webhook-test-proxy/" + --set webhookTestProxy.image.repository="$WEBHOOK_DOCKER_REPO" + --set webhookTestProxy.image.tag="$WEBHOOK_DOCKER_TAG" + --wait +) +[[ -n "${WEBHOOK_DOCKER_PULL_POLICY-}" ]] && + emtp_helm_args+=(--set webhookTestProxy.image.pullPolicy="$WEBHOOK_DOCKER_PULL_POLICY") +[[ ${#common_helm_args[@]} -gt 0 ]] && + emtp_helm_args+=("${common_helm_args[@]}") + +set -x +helm "${emtp_helm_args[@]}" +set +x + +TAINT_CHECK_CYCLES=15 +TAINT_CHECK_SLEEP=15 + +DEPLOYED=0 + +for i in $(seq 1 10); do + if [[ $(kubectl get deployments regular-pod-test -o jsonpath='{.status.unavailableReplicas}') -eq 0 ]]; then + echo "✅ Verified regular-pod-test pod was scheduled and started!" + DEPLOYED=1 + break + fi + sleep 5 +done + +if [[ $DEPLOYED -eq 0 ]]; then + echo "❌ regular-pod-test pod deployment failed" + fail_and_exit 2 +fi + + +EC2_STATE_CHANGE_EVENT=$(cat < /dev/null; then + echo "✅ Verified the worker node was cordoned!" + cordoned=1 + fi + + if [[ $cordoned -eq 1 && $(kubectl get deployments regular-pod-test -o=jsonpath='{.status.unavailableReplicas}') -eq 1 ]]; then + echo "✅ Verified the regular-pod-test pod was evicted!" + evicted=1 + fi + + if [[ ${evicted} -eq 1 && $(kubectl exec -i "${localstack_pod}" -- bash -c "$GET_ATTRS_SQS_CMD" | jq '(.Attributes.ApproximateNumberOfMessagesNotVisible|tonumber) + (.Attributes.ApproximateNumberOfMessages|tonumber)' ) -eq 0 ]]; then + kubectl exec -i "${localstack_pod}" -- bash -c "$GET_ATTRS_SQS_CMD" + echo "✅ Verified the message was deleted from the queue after processing!" + fi + + echo "Assertion Loop $i/$TAINT_CHECK_CYCLES, sleeping for $TAINT_CHECK_SLEEP seconds" + sleep $TAINT_CHECK_SLEEP +done + +if [[ $cordoned -eq 0 ]]; then + echo "❌ Worker node was not cordoned" +else + echo "❌ regular-pod-test was not evicted" +fi + + +POD_NAME=$(get_nth_worker_pod) +echo "✅ Fetched the pod $POD_NAME " + +kubectl -n kube-system port-forward "$POD_NAME" 7000:9092 & +PORT_FORWARD_PID=$! +trap 'kill ${PORT_FORWARD_PID}' EXIT SIGINT SIGTERM ERR +echo "✅ Port-forwarded pod $POD_NAME" + +sleep 10 + +for i in $(seq 1 $TAINT_CHECK_CYCLES); do + METRICS_RESPONSE=$(curl -L localhost:7000/metrics) + echo "✅ Fetched /metrics." + failed="" + for METRIC in cordon-and-drain post-drain nth_tagged_instances nth_tagged_nodes runtime_go_gc runtime_go_goroutines runtime_go_mem; do + if [[ $METRICS_RESPONSE == *"$METRIC"* ]]; then + echo "✅ Metric $METRIC!" + else + echo "⚠️ Metric $METRIC" + failed=$METRIC + break + fi + done + if [ -z $failed ]; then + break + fi + echo "Metrics Loop $i/$TAINT_CHECK_CYCLES, sleeping for $TAINT_CHECK_SLEEP seconds" + sleep $TAINT_CHECK_SLEEP +done + +if [[ -n $failed ]];then + exit 4 +fi + +metric_name="actions_total" +for action in cordon-and-drain post-drain; do + labels='node_action="'$action'",node_status="success",otel_scope_name="aws.node.termination.handler",otel_scope_version=""' + query="$metric_name{$labels}" + counter_value=$(echo "$METRICS_RESPONSE" | grep -E "${query}[[:space:]]+[0-9]+" | awk '{print $NF}') + if (($counter_value < 1)); then + echo "❌ Failed counter count for metric action:$action" + exit 5 + fi + echo "✅ Fetched counter:$counter_value for metric with action:$action" +done + +for gauge in nth_tagged_instances; do + query=''$gauge'{otel_scope_name="aws.node.termination.handler",otel_scope_version=""}' + counter_value=$(echo "$METRICS_RESPONSE" | grep -E "${query}[[:space:]]+[0-9]+" | awk '{print $NF}') + if (($counter_value < 1)); then + echo "❌ Failed gauge count for metric:$gauge" + exit 5 + fi + echo "✅ Fetched gauge:$counter_value for metric:$gauge" +done + + +exit 0 From a95474ed160372ecebdb3cdd1fefb252eb7337d2 Mon Sep 17 00:00:00 2001 From: phuhung273 Date: Sun, 26 Jan 2025 16:48:39 +0000 Subject: [PATCH 6/8] improve test + logging --- cmd/node-termination-handler.go | 4 +- pkg/ec2helper/ec2helper.go | 4 +- pkg/ec2helper/ec2helper_test.go | 80 ++++++++++++++++++++++++++++ pkg/node/node.go | 8 +-- pkg/node/node_test.go | 54 +++++++++++++++++++ pkg/observability/opentelemetry.go | 16 +++--- test/e2e/prometheus-metrics-sqs-test | 36 +++++++------ 7 files changed, 170 insertions(+), 32 deletions(-) diff --git a/cmd/node-termination-handler.go b/cmd/node-termination-handler.go index 77016516..0e53e20b 100644 --- a/cmd/node-termination-handler.go +++ b/cmd/node-termination-handler.go @@ -120,7 +120,7 @@ func main() { log.Fatal().Err(err).Msg("Unable to instantiate a node for various kubernetes node functions,") } - metrics, err := observability.InitMetrics(nthConfig) + metrics, err := observability.InitMetrics(nthConfig.EnablePrometheus, nthConfig.PrometheusPort) if err != nil { nthConfig.Print() log.Fatal().Err(err).Msg("Unable to instantiate observability metrics,") @@ -218,7 +218,7 @@ func main() { ec2Client := ec2.New(sess) if nthConfig.EnablePrometheus { - go metrics.InitNodeMetrics(node, ec2Client) + go metrics.InitNodeMetrics(nthConfig, node, ec2Client) } completeLifecycleActionDelay := time.Duration(nthConfig.CompleteLifecycleActionDelaySeconds) * time.Second diff --git a/pkg/ec2helper/ec2helper.go b/pkg/ec2helper/ec2helper.go index c6ff33eb..9e5bdfa1 100644 --- a/pkg/ec2helper/ec2helper.go +++ b/pkg/ec2helper/ec2helper.go @@ -55,7 +55,7 @@ func (h EC2Helper) GetInstanceIdsByTagKey(tag string) ([]string, error) { } if result == nil || result.Reservations == nil { - return nil, fmt.Errorf("failed to describe instances") + return nil, fmt.Errorf("describe instances success but return empty response for tag key: %s", tag) } for _, reservation := range result.Reservations { @@ -87,7 +87,7 @@ func (h EC2Helper) GetInstanceIdsMapByTagKey(tag string) (map[string]bool, error } if ids == nil { - return nil, fmt.Errorf("failed to describe instances") + return nil, fmt.Errorf("get instance ids success but return empty response for tag key: %s", tag) } for _, id := range ids { diff --git a/pkg/ec2helper/ec2helper_test.go b/pkg/ec2helper/ec2helper_test.go index 01e097c0..da535868 100644 --- a/pkg/ec2helper/ec2helper_test.go +++ b/pkg/ec2helper/ec2helper_test.go @@ -19,6 +19,7 @@ import ( "github.com/aws/aws-node-termination-handler/pkg/ec2helper" h "github.com/aws/aws-node-termination-handler/pkg/test" "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/awserr" "github.com/aws/aws-sdk-go/service/ec2" ) @@ -40,6 +41,85 @@ func TestGetInstanceIdsByTagKey(t *testing.T) { h.Equals(t, instanceId2, instanceIds[1]) } +func TestGetInstanceIdsByTagKeyAPIError(t *testing.T) { + ec2Mock := h.MockedEC2{ + DescribeInstancesResp: getDescribeInstancesResp(), + DescribeInstancesErr: awserr.New("ThrottlingException", "Rate exceeded", nil), + } + ec2Helper := ec2helper.New(ec2Mock) + _, err := ec2Helper.GetInstanceIdsByTagKey("myNTHManagedTag") + h.Nok(t, err) +} + +func TestGetInstanceIdsByTagKeyNilResponse(t *testing.T) { + ec2Mock := h.MockedEC2{} + ec2Helper := ec2helper.New(ec2Mock) + _, err := ec2Helper.GetInstanceIdsByTagKey("myNTHManagedTag") + h.Nok(t, err) +} + +func TestGetInstanceIdsByTagKeyNilReservations(t *testing.T) { + ec2Mock := h.MockedEC2{ + DescribeInstancesResp: ec2.DescribeInstancesOutput{ + Reservations: nil, + }, + } + ec2Helper := ec2helper.New(ec2Mock) + _, err := ec2Helper.GetInstanceIdsByTagKey("myNTHManagedTag") + h.Nok(t, err) +} + +func TestGetInstanceIdsByTagKeyEmptyReservation(t *testing.T) { + ec2Mock := h.MockedEC2{ + DescribeInstancesResp: ec2.DescribeInstancesOutput{ + Reservations: []*ec2.Reservation{}, + }, + } + ec2Helper := ec2helper.New(ec2Mock) + instanceIds, err := ec2Helper.GetInstanceIdsByTagKey("myNTHManagedTag") + h.Ok(t, err) + h.Equals(t, 0, len(instanceIds)) +} + +func TestGetInstanceIdsByTagKeyEmptyInstances(t *testing.T) { + ec2Mock := h.MockedEC2{ + DescribeInstancesResp: ec2.DescribeInstancesOutput{ + Reservations: []*ec2.Reservation{ + { + Instances: []*ec2.Instance{}, + }, + }, + }, + } + ec2Helper := ec2helper.New(ec2Mock) + instanceIds, err := ec2Helper.GetInstanceIdsByTagKey("myNTHManagedTag") + h.Ok(t, err) + h.Equals(t, 0, len(instanceIds)) +} + +func TestGetInstanceIdsByTagKeyNilInstancesId(t *testing.T) { + ec2Mock := h.MockedEC2{ + DescribeInstancesResp: ec2.DescribeInstancesOutput{ + Reservations: []*ec2.Reservation{ + { + Instances: []*ec2.Instance{ + { + InstanceId: nil, + }, + { + InstanceId: aws.String(instanceId1), + }, + }, + }, + }, + }, + } + ec2Helper := ec2helper.New(ec2Mock) + instanceIds, err := ec2Helper.GetInstanceIdsByTagKey("myNTHManagedTag") + h.Ok(t, err) + h.Equals(t, 1, len(instanceIds)) +} + func TestGetInstanceIdsMapByTagKey(t *testing.T) { ec2Mock := h.MockedEC2{ DescribeInstancesResp: getDescribeInstancesResp(), diff --git a/pkg/node/node.go b/pkg/node/node.go index 4efb76f8..f2a5d28c 100644 --- a/pkg/node/node.go +++ b/pkg/node/node.go @@ -652,20 +652,22 @@ func (n Node) FetchKubernetesNodeInstanceIds() ([]string, error) { } if matchingNodes == nil || matchingNodes.Items == nil { - return nil, fmt.Errorf("failed to list nodes") + return nil, fmt.Errorf("list nodes success but return empty response") } for _, node := range matchingNodes.Items { // sample providerID: aws:///us-west-2a/i-0abcd1234efgh5678 parts := strings.Split(node.Spec.ProviderID, "/") - if len(parts) < 2 { - log.Warn().Msgf("Found invalid providerID: %s", node.Spec.ProviderID) + if len(parts) != 5 { + log.Warn().Msgf("Invalid providerID format found for node %s: %s (expected format: aws:///region/instance-id)", node.Name, node.Spec.ProviderID) continue } instanceId := parts[len(parts)-1] if instanceIDRegex.MatchString(instanceId) { ids = append(ids, parts[len(parts)-1]) + } else { + log.Warn().Msgf("Invalid instance id format found for node %s: %s (expected format: ^i-.*)", node.Name, instanceId) } } diff --git a/pkg/node/node_test.go b/pkg/node/node_test.go index e94ca88c..84a3fe1e 100644 --- a/pkg/node/node_test.go +++ b/pkg/node/node_test.go @@ -409,6 +409,60 @@ func TestFetchKubernetesNodeInstanceIds(t *testing.T) { h.Equals(t, instanceId2, instanceIds[1]) } +func TestFetchKubernetesNodeInstanceIdsEmptyResponse(t *testing.T) { + client := fake.NewSimpleClientset() + + _, err := client.CoreV1().Nodes().List(context.Background(), metav1.ListOptions{}) + h.Ok(t, err) + + node, err := newNode(config.Config{}, client) + h.Ok(t, err) + + _, err = node.FetchKubernetesNodeInstanceIds() + h.Nok(t, err) +} + +func TestFetchKubernetesNodeInstanceIdsInvalidProviderID(t *testing.T) { + client := fake.NewSimpleClientset( + &v1.Node{ + ObjectMeta: metav1.ObjectMeta{Name: "invalid-providerId-1"}, + Spec: v1.NodeSpec{ProviderID: "dummyProviderId"}, + }, + &v1.Node{ + ObjectMeta: metav1.ObjectMeta{Name: "invalid-providerId-2"}, + Spec: v1.NodeSpec{ProviderID: fmt.Sprintf("aws:/%s", instanceId2)}, + }, + &v1.Node{ + ObjectMeta: metav1.ObjectMeta{Name: "invalid-providerId-3"}, + Spec: v1.NodeSpec{ProviderID: fmt.Sprintf("us-west-2a/%s", instanceId2)}, + }, + &v1.Node{ + ObjectMeta: metav1.ObjectMeta{Name: "invalid-providerId-4"}, + Spec: v1.NodeSpec{ProviderID: fmt.Sprintf("aws:///us-west-2a/%s/dummyPart", instanceId2)}, + }, + &v1.Node{ + ObjectMeta: metav1.ObjectMeta{Name: "valid-providerId-2"}, + Spec: v1.NodeSpec{ProviderID: fmt.Sprintf("aws:///us-west-2a/%s", instanceId2)}, + }, + &v1.Node{ + ObjectMeta: metav1.ObjectMeta{Name: "valid-providerId-1"}, + Spec: v1.NodeSpec{ProviderID: fmt.Sprintf("aws:///us-west-2a/%s", instanceId1)}, + }, + ) + + _, err := client.CoreV1().Nodes().List(context.Background(), metav1.ListOptions{}) + h.Ok(t, err) + + node, err := newNode(config.Config{}, client) + h.Ok(t, err) + + instanceIds, err := node.FetchKubernetesNodeInstanceIds() + h.Ok(t, err) + h.Equals(t, 2, len(instanceIds)) + h.Equals(t, instanceId1, instanceIds[0]) + h.Equals(t, instanceId2, instanceIds[1]) +} + func TestFilterOutDaemonSetPods(t *testing.T) { tNode, err := newNode(config.Config{IgnoreDaemonSets: true}, fake.NewSimpleClientset()) h.Ok(t, err) diff --git a/pkg/observability/opentelemetry.go b/pkg/observability/opentelemetry.go index 699faf2f..ad9d4b4c 100644 --- a/pkg/observability/opentelemetry.go +++ b/pkg/observability/opentelemetry.go @@ -60,7 +60,7 @@ type Metrics struct { } // InitMetrics will initialize, register and expose, via http server, the metrics with Opentelemetry. -func InitMetrics(nthConfig config.Config) (Metrics, error) { +func InitMetrics(enabled bool, port int) (Metrics, error) { exporter, err := prometheus.New() if err != nil { return Metrics{}, fmt.Errorf("failed to create Prometheus exporter: %w", err) @@ -70,8 +70,7 @@ func InitMetrics(nthConfig config.Config) (Metrics, error) { if err != nil { return Metrics{}, fmt.Errorf("failed to register metrics with Prometheus provider: %w", err) } - metrics.enabled = nthConfig.EnablePrometheus - metrics.nthConfig = nthConfig + metrics.enabled = enabled // Starts an async process to collect golang runtime stats // go.opentelemetry.io/contrib/instrumentation/runtime @@ -80,14 +79,15 @@ func InitMetrics(nthConfig config.Config) (Metrics, error) { return Metrics{}, fmt.Errorf("failed to start Go runtime metrics collection: %w", err) } - if metrics.enabled { - serveMetrics(nthConfig.PrometheusPort) + if enabled { + serveMetrics(port) } return metrics, nil } -func (m Metrics) InitNodeMetrics(node *node.Node, ec2 ec2iface.EC2API) { +func (m Metrics) InitNodeMetrics(nthConfig config.Config, node *node.Node, ec2 ec2iface.EC2API) { + m.nthConfig = nthConfig m.ec2Helper = ec2helper.New(ec2) m.node = node @@ -105,10 +105,10 @@ func (m Metrics) serveNodeMetrics() { if err != nil || instanceIdsMap == nil { log.Err(err).Msg("Failed to get AWS instance ids") return - } else { - m.InstancesRecord(int64(len(instanceIdsMap))) } + m.InstancesRecord(int64(len(instanceIdsMap))) + nodeInstanceIds, err := m.node.FetchKubernetesNodeInstanceIds() if err != nil || nodeInstanceIds == nil { log.Err(err).Msg("Failed to get node instance ids") diff --git a/test/e2e/prometheus-metrics-sqs-test b/test/e2e/prometheus-metrics-sqs-test index ea386bed..a6f97d7f 100755 --- a/test/e2e/prometheus-metrics-sqs-test +++ b/test/e2e/prometheus-metrics-sqs-test @@ -39,15 +39,20 @@ set +x sleep 10 -RUN_INSTANCE_CMD="awslocal ec2 run-instances --private-ip-address ${WORKER_IP} --region ${AWS_REGION} --tag-specifications 'ResourceType=instance,Tags=[{Key=aws:autoscaling:groupName,Value=nth-integ-test},{Key=aws-node-termination-handler/managed,Value=blah}]'" +MANAGED_INSTANCE_CMD="awslocal ec2 run-instances --private-ip-address ${WORKER_IP} --region ${AWS_REGION} --tag-specifications 'ResourceType=instance,Tags=[{Key=aws:autoscaling:groupName,Value=nth-integ-test},{Key=aws-node-termination-handler/managed,Value=blah}]'" +MANAGED_INSTANCE_WITHOUT_TAG_VALUE_CMD="awslocal ec2 run-instances --region ${AWS_REGION} --tag-specifications 'ResourceType=instance,Tags=[{Key=aws:autoscaling:groupName,Value=nth-integ-test},{Key=aws-node-termination-handler/managed,Value=\"\"}]'" +UNMANAGED_INSTANCE_CMD="awslocal ec2 run-instances --region ${AWS_REGION} --tag-specifications 'ResourceType=instance,Tags=[{Key=aws:autoscaling:groupName,Value=nth-integ-test}]'" set -x localstack_pod=$(kubectl get pods --selector app=localstack --field-selector="status.phase=Running" \ -o go-template --template '{{range .items}}{{.metadata.name}} {{.metadata.creationTimestamp}}{{"\n"}}{{end}}' \ | awk '$2 >= "'"${START_TIME//+0000/Z}"'" { print $1 }') echo "🥑 Using localstack pod $localstack_pod" -run_instances_resp=$(kubectl exec -i "${localstack_pod}" -- bash -c "$RUN_INSTANCE_CMD") -instance_id=$(echo "${run_instances_resp}" | jq -r '.Instances[] .InstanceId') -echo "🥑 Started mock EC2 instance ($instance_id)" + +for instance_cmd in "$MANAGED_INSTANCE_WITHOUT_TAG_VALUE_CMD" "$UNMANAGED_INSTANCE_CMD" "$MANAGED_INSTANCE_CMD"; do + run_instances_resp=$(kubectl exec -i "${localstack_pod}" -- bash -c "$instance_cmd") + instance_id=$(echo "${run_instances_resp}" | jq -r '.Instances[] .InstanceId') + echo "🥑 Started mock EC2 instance ($instance_id)" +done CREATE_SQS_CMD="awslocal sqs create-queue --queue-name "${CLUSTER_NAME}-queue" --attributes MessageRetentionPeriod=300 --region ${AWS_REGION}" queue_url=$(kubectl exec -i "${localstack_pod}" -- bash -c "$CREATE_SQS_CMD" | jq -r .QueueUrl) @@ -168,6 +173,7 @@ for i in $(seq 1 $TAINT_CHECK_CYCLES); do if [[ ${evicted} -eq 1 && $(kubectl exec -i "${localstack_pod}" -- bash -c "$GET_ATTRS_SQS_CMD" | jq '(.Attributes.ApproximateNumberOfMessagesNotVisible|tonumber) + (.Attributes.ApproximateNumberOfMessages|tonumber)' ) -eq 0 ]]; then kubectl exec -i "${localstack_pod}" -- bash -c "$GET_ATTRS_SQS_CMD" echo "✅ Verified the message was deleted from the queue after processing!" + break fi echo "Assertion Loop $i/$TAINT_CHECK_CYCLES, sleeping for $TAINT_CHECK_SLEEP seconds" @@ -211,7 +217,7 @@ for i in $(seq 1 $TAINT_CHECK_CYCLES); do sleep $TAINT_CHECK_SLEEP done -if [[ -n $failed ]];then +if [[ -n $failed ]]; then exit 4 fi @@ -227,15 +233,11 @@ for action in cordon-and-drain post-drain; do echo "✅ Fetched counter:$counter_value for metric with action:$action" done -for gauge in nth_tagged_instances; do - query=''$gauge'{otel_scope_name="aws.node.termination.handler",otel_scope_version=""}' - counter_value=$(echo "$METRICS_RESPONSE" | grep -E "${query}[[:space:]]+[0-9]+" | awk '{print $NF}') - if (($counter_value < 1)); then - echo "❌ Failed gauge count for metric:$gauge" - exit 5 - fi - echo "✅ Fetched gauge:$counter_value for metric:$gauge" -done - - -exit 0 +gauge="nth_tagged_instances" +query=''$gauge'{otel_scope_name="aws.node.termination.handler",otel_scope_version=""}' +counter_value=$(echo "$METRICS_RESPONSE" | grep -E "${query}[[:space:]]+[0-9]+" | awk '{print $NF}') +if (($counter_value < 2)); then + echo "❌ Failed gauge count for metric:$gauge" + exit 5 +fi +echo "✅ Fetched gauge:$counter_value for metric:$gauge" From 61dc478bf65ee98dfcd195318c805a8d1883dcdb Mon Sep 17 00:00:00 2001 From: Tran Pham Phu Hung Date: Fri, 7 Feb 2025 22:21:51 +0700 Subject: [PATCH 7/8] add initMetricErr nil check, e2e remove misleading log --- cmd/node-termination-handler.go | 8 ++++---- test/e2e/prometheus-metrics-sqs-test | 7 ------- 2 files changed, 4 insertions(+), 11 deletions(-) diff --git a/cmd/node-termination-handler.go b/cmd/node-termination-handler.go index 0e53e20b..78163e40 100644 --- a/cmd/node-termination-handler.go +++ b/cmd/node-termination-handler.go @@ -120,10 +120,10 @@ func main() { log.Fatal().Err(err).Msg("Unable to instantiate a node for various kubernetes node functions,") } - metrics, err := observability.InitMetrics(nthConfig.EnablePrometheus, nthConfig.PrometheusPort) - if err != nil { + metrics, initMetricsErr := observability.InitMetrics(nthConfig.EnablePrometheus, nthConfig.PrometheusPort) + if initMetricsErr != nil { nthConfig.Print() - log.Fatal().Err(err).Msg("Unable to instantiate observability metrics,") + log.Fatal().Err(initMetricsErr).Msg("Unable to instantiate observability metrics,") } err = observability.InitProbes(nthConfig.EnableProbes, nthConfig.ProbesPort, nthConfig.ProbesEndpoint) @@ -217,7 +217,7 @@ func main() { ec2Client := ec2.New(sess) - if nthConfig.EnablePrometheus { + if initMetricsErr == nil && nthConfig.EnablePrometheus { go metrics.InitNodeMetrics(nthConfig, node, ec2Client) } diff --git a/test/e2e/prometheus-metrics-sqs-test b/test/e2e/prometheus-metrics-sqs-test index a6f97d7f..4319bc3d 100755 --- a/test/e2e/prometheus-metrics-sqs-test +++ b/test/e2e/prometheus-metrics-sqs-test @@ -180,13 +180,6 @@ for i in $(seq 1 $TAINT_CHECK_CYCLES); do sleep $TAINT_CHECK_SLEEP done -if [[ $cordoned -eq 0 ]]; then - echo "❌ Worker node was not cordoned" -else - echo "❌ regular-pod-test was not evicted" -fi - - POD_NAME=$(get_nth_worker_pod) echo "✅ Fetched the pod $POD_NAME " From c0fa29c170ddabb7d78c52a6bf7f34a26d8bc50d Mon Sep 17 00:00:00 2001 From: Tran Pham Phu Hung Date: Sun, 9 Feb 2025 22:30:41 +0700 Subject: [PATCH 8/8] added cordoned, evicted, message_deleted test --- test/e2e/prometheus-metrics-sqs-test | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/test/e2e/prometheus-metrics-sqs-test b/test/e2e/prometheus-metrics-sqs-test index 4319bc3d..7ddbf227 100755 --- a/test/e2e/prometheus-metrics-sqs-test +++ b/test/e2e/prometheus-metrics-sqs-test @@ -158,6 +158,7 @@ GET_ATTRS_SQS_CMD="awslocal sqs get-queue-attributes --queue-url ${queue_url} -- cordoned=0 evicted=0 +message_deleted=0 test_node="${TEST_NODE:-$CLUSTER_NAME-worker}" for i in $(seq 1 $TAINT_CHECK_CYCLES); do if [[ $cordoned -eq 0 ]] && kubectl get nodes "${test_node}" | grep SchedulingDisabled > /dev/null; then @@ -173,6 +174,7 @@ for i in $(seq 1 $TAINT_CHECK_CYCLES); do if [[ ${evicted} -eq 1 && $(kubectl exec -i "${localstack_pod}" -- bash -c "$GET_ATTRS_SQS_CMD" | jq '(.Attributes.ApproximateNumberOfMessagesNotVisible|tonumber) + (.Attributes.ApproximateNumberOfMessages|tonumber)' ) -eq 0 ]]; then kubectl exec -i "${localstack_pod}" -- bash -c "$GET_ATTRS_SQS_CMD" echo "✅ Verified the message was deleted from the queue after processing!" + message_deleted=1 break fi @@ -180,6 +182,17 @@ for i in $(seq 1 $TAINT_CHECK_CYCLES); do sleep $TAINT_CHECK_SLEEP done +if [[ $cordoned -eq 0 ]]; then + echo "❌ Worker node was not cordoned" + exit 3 +elif [[ $evicted -eq 0 ]]; then + echo "❌ regular-pod-test was not evicted" + exit 3 +elif [[ $message_deleted -eq 0 ]]; then + echo "❌ Message was not removed from the queue after processing" + exit 3 +fi + POD_NAME=$(get_nth_worker_pod) echo "✅ Fetched the pod $POD_NAME "