diff --git a/cmd/controller/main.go b/cmd/controller/main.go index 0c07c0c0d425..3f60fc027b52 100644 --- a/cmd/controller/main.go +++ b/cmd/controller/main.go @@ -89,6 +89,7 @@ func main() { op.CapacityReservationProvider, op.PlacementGroupProvider, op.AMIResolver, + op.InstanceStatusProvider, )...). Start(ctx) } diff --git a/kwok/main.go b/kwok/main.go index 8625b61b95a1..eff9f2412f0b 100644 --- a/kwok/main.go +++ b/kwok/main.go @@ -102,6 +102,7 @@ func main() { op.CapacityReservationProvider, op.PlacementGroupProvider, op.AMIResolver, + op.InstanceStatusProvider, )...). Start(ctx) wg.Wait() diff --git a/kwok/operator/operator.go b/kwok/operator/operator.go index 52dc9a485f8a..9895e7af79bc 100644 --- a/kwok/operator/operator.go +++ b/kwok/operator/operator.go @@ -59,6 +59,7 @@ import ( "github.com/aws/karpenter-provider-aws/pkg/providers/capacityreservation" "github.com/aws/karpenter-provider-aws/pkg/providers/instance" "github.com/aws/karpenter-provider-aws/pkg/providers/instanceprofile" + "github.com/aws/karpenter-provider-aws/pkg/providers/instancestatus" "github.com/aws/karpenter-provider-aws/pkg/providers/instancetype" "github.com/aws/karpenter-provider-aws/pkg/providers/launchtemplate" "github.com/aws/karpenter-provider-aws/pkg/providers/placementgroup" @@ -92,6 +93,7 @@ type Operator struct { VersionProvider *version.DefaultProvider InstanceTypesProvider *instancetype.DefaultProvider InstanceProvider instance.Provider + InstanceStatusProvider *instancestatus.DefaultProvider SSMProvider ssmp.Provider CapacityReservationProvider capacityreservation.Provider PlacementGroupProvider placementgroup.Provider @@ -202,6 +204,8 @@ func NewOperator(ctx context.Context, operator *operator.Operator) (context.Cont cache.New(awscache.DefaultTTL, awscache.DefaultCleanupInterval), ) + instanceStatusProvider := instancestatus.NewDefaultProvider(ec2api, operator.Clock) + // Setup field indexers on instanceID -- specifically for the interruption controller if options.FromContext(ctx).InterruptionQueue != "" { SetupIndexers(ctx, operator.Manager) @@ -223,6 +227,7 @@ func NewOperator(ctx context.Context, operator *operator.Operator) (context.Cont PricingProvider: pricingProvider, InstanceTypesProvider: instanceTypeProvider, InstanceProvider: instanceProvider, + InstanceStatusProvider: instanceStatusProvider, SSMProvider: ssmProvider, CapacityReservationProvider: capacityReservationProvider, PlacementGroupProvider: placementGroupProvider, diff --git a/pkg/aws/sdk.go b/pkg/aws/sdk.go index df8ea4bee3bb..c56efa0b8128 100644 --- a/pkg/aws/sdk.go +++ b/pkg/aws/sdk.go @@ -36,6 +36,7 @@ type EC2API interface { DescribeInstanceTypes(context.Context, *ec2.DescribeInstanceTypesInput, ...func(*ec2.Options)) (*ec2.DescribeInstanceTypesOutput, error) DescribeInstanceTypeOfferings(context.Context, *ec2.DescribeInstanceTypeOfferingsInput, ...func(*ec2.Options)) (*ec2.DescribeInstanceTypeOfferingsOutput, error) DescribeSpotPriceHistory(context.Context, *ec2.DescribeSpotPriceHistoryInput, ...func(*ec2.Options)) (*ec2.DescribeSpotPriceHistoryOutput, error) + DescribeInstanceStatus(context.Context, *ec2.DescribeInstanceStatusInput, ...func(*ec2.Options)) (*ec2.DescribeInstanceStatusOutput, error) CreateFleet(context.Context, *ec2.CreateFleetInput, ...func(*ec2.Options)) (*ec2.CreateFleetOutput, error) TerminateInstances(context.Context, *ec2.TerminateInstancesInput, ...func(*ec2.Options)) (*ec2.TerminateInstancesOutput, error) DescribeInstances(context.Context, *ec2.DescribeInstancesInput, ...func(*ec2.Options)) (*ec2.DescribeInstancesOutput, error) @@ -71,6 +72,7 @@ type SQSAPI interface { ReceiveMessage(context.Context, *sqs.ReceiveMessageInput, ...func(*sqs.Options)) (*sqs.ReceiveMessageOutput, error) DeleteMessage(context.Context, *sqs.DeleteMessageInput, ...func(*sqs.Options)) (*sqs.DeleteMessageOutput, error) SendMessage(context.Context, *sqs.SendMessageInput, ...func(*sqs.Options)) (*sqs.SendMessageOutput, error) + GetQueueUrl(context.Context, *sqs.GetQueueUrlInput, ...func(*sqs.Options)) (*sqs.GetQueueUrlOutput, error) } type TimestreamWriteAPI interface { diff --git a/pkg/controllers/controllers.go b/pkg/controllers/controllers.go index 21e7be9c1919..e5c331d4277c 100644 --- a/pkg/controllers/controllers.go +++ b/pkg/controllers/controllers.go @@ -39,6 +39,7 @@ import ( ssminvalidation "github.com/aws/karpenter-provider-aws/pkg/controllers/providers/ssm/invalidation" controllersversion "github.com/aws/karpenter-provider-aws/pkg/controllers/providers/version" capacityreservationprovider "github.com/aws/karpenter-provider-aws/pkg/providers/capacityreservation" + "github.com/aws/karpenter-provider-aws/pkg/providers/instancestatus" "github.com/aws/karpenter-provider-aws/pkg/providers/launchtemplate" "github.com/aws/karpenter-provider-aws/pkg/providers/placementgroup" "github.com/aws/karpenter-provider-aws/pkg/providers/version" @@ -89,6 +90,7 @@ func NewControllers( capacityReservationProvider capacityreservationprovider.Provider, placementGroupProvider placementgroup.Provider, amiResolver amifamily.Resolver, + instanceStatusProvider instancestatus.Provider, ) []controller.Controller { controllers := []controller.Controller{ nodeclasshash.NewController(kubeClient), @@ -113,7 +115,10 @@ func NewControllers( if options.FromContext(ctx).InterruptionQueue != "" { sqsAPI := servicesqs.NewFromConfig(cfg) prov, _ := sqs.NewSQSProvider(ctx, sqsAPI) - controllers = append(controllers, interruption.NewController(kubeClient, cloudProvider, clk, recorder, prov, sqsAPI, unavailableOfferings, capacityReservationProvider)) + controllers = append(controllers, interruption.NewController(kubeClient, cloudProvider, clk, recorder, prov, sqsAPI, unavailableOfferings, capacityReservationProvider, instanceStatusProvider)) + } else { + // if no queue is configured, start the interruption controller with only instance status monitoring + controllers = append(controllers, interruption.NewController(kubeClient, cloudProvider, clk, recorder, nil, nil, unavailableOfferings, capacityReservationProvider, instanceStatusProvider)) } return controllers } diff --git a/pkg/controllers/interruption/controller.go b/pkg/controllers/interruption/controller.go index 70f05c10a64c..d316a0e182f8 100644 --- a/pkg/controllers/interruption/controller.go +++ b/pkg/controllers/interruption/controller.go @@ -23,7 +23,6 @@ import ( "sigs.k8s.io/karpenter/pkg/metrics" ec2types "github.com/aws/aws-sdk-go-v2/service/ec2/types" - sqsapi "github.com/aws/aws-sdk-go-v2/service/sqs" sqstypes "github.com/aws/aws-sdk-go-v2/service/sqs/types" "github.com/awslabs/operatorpkg/reconciler" "github.com/awslabs/operatorpkg/singleton" @@ -43,11 +42,17 @@ import ( "sigs.k8s.io/karpenter/pkg/events" + lop "github.com/samber/lo/parallel" + v1 "github.com/aws/karpenter-provider-aws/pkg/apis/v1" + sdk "github.com/aws/karpenter-provider-aws/pkg/aws" "github.com/aws/karpenter-provider-aws/pkg/cache" interruptionevents "github.com/aws/karpenter-provider-aws/pkg/controllers/interruption/events" "github.com/aws/karpenter-provider-aws/pkg/controllers/interruption/messages" + "github.com/aws/karpenter-provider-aws/pkg/controllers/interruption/messages/instancestatusfailure" + "github.com/aws/karpenter-provider-aws/pkg/operator/options" "github.com/aws/karpenter-provider-aws/pkg/providers/capacityreservation" + "github.com/aws/karpenter-provider-aws/pkg/providers/instancestatus" "github.com/aws/karpenter-provider-aws/pkg/providers/sqs" ) @@ -58,20 +63,32 @@ const ( NoAction Action = "NoAction" ) +var ( + // InstatusStatusInterval is used to rate limit calls to the EC2 DescribeInstanceStatus API + // since the Interruption controller runs in a hot loop with an SQS long poller sub-reconciler. + // Without this rate limit, if there are a lot of messages in the SQS queue, DescribeInstanceStatus + // could be called continuously since the long polling receives messages in small batches. + InstanceStatusInterval = 30 * time.Second +) + // Controller is an AWS interruption controller. // It continually polls an SQS queue for events from aws.ec2 and aws.health that // trigger node health events, spot interruption/rebalance events, and capacity reservation interruptions. type Controller struct { - kubeClient client.Client - cloudProvider cloudprovider.CloudProvider - clk clock.Clock - recorder events.Recorder - sqsProvider sqs.Provider - sqsAPI *sqsapi.Client + kubeClient client.Client + cloudProvider cloudprovider.CloudProvider + clk clock.Clock + recorder events.Recorder + // sqsProvider can be nil when a queue is not configured by the user + sqsProvider sqs.Provider + // sqsAPI can be nil when a queue is not configured by the user + sqsAPI sdk.SQSAPI unavailableOfferingsCache *cache.UnavailableOfferings + instanceStatusProvider instancestatus.Provider capacityReservationProvider capacityreservation.Provider parser *EventParser cm *pretty.ChangeMonitor + lastInstanceStatusRun time.Time } func NewController( @@ -80,9 +97,10 @@ func NewController( clk clock.Clock, recorder events.Recorder, sqsProvider sqs.Provider, - sqsAPI *sqsapi.Client, + sqsAPI sdk.SQSAPI, unavailableOfferingsCache *cache.UnavailableOfferings, capacityReservationProvider capacityreservation.Provider, + instanceStatusProvider instancestatus.Provider, ) *Controller { return &Controller{ kubeClient: kubeClient, @@ -93,6 +111,7 @@ func NewController( sqsAPI: sqsAPI, unavailableOfferingsCache: unavailableOfferingsCache, capacityReservationProvider: capacityReservationProvider, + instanceStatusProvider: instanceStatusProvider, parser: NewEventParser(DefaultParsers...), cm: pretty.NewChangeMonitor(), } @@ -100,24 +119,75 @@ func NewController( func (c *Controller) Reconcile(ctx context.Context) (reconciler.Result, error) { ctx = injection.WithControllerName(ctx, "interruption") - if c.sqsProvider == nil { - prov, err := sqs.NewSQSProvider(ctx, c.sqsAPI) - if err != nil { - log.FromContext(ctx).Error(err, "failed to create valid sqs provider") - return reconciler.Result{}, fmt.Errorf("creating sqs provider, %w", err) + + reconcilers := []func(context.Context) error{ + c.reconcileFromSQS, + c.reconcileInstanceStatus, + } + + errs := make([]error, len(reconcilers)) + lop.ForEach(reconcilers, func(r func(context.Context) error, i int) { + errs[i] = r(ctx) + }) + + if err := multierr.Combine(errs...); err != nil { + return reconciler.Result{}, fmt.Errorf("reconciling interruptions, %w", err) + } + + if options.FromContext(ctx).InterruptionQueue != "" { + return reconciler.Result{RequeueAfter: singleton.RequeueImmediately}, nil + } + return reconciler.Result{RequeueAfter: InstanceStatusInterval}, nil +} + +func (c *Controller) Register(_ context.Context, m manager.Manager) error { + return controllerruntime.NewControllerManagedBy(m). + Named("interruption"). + WatchesRawSource(singleton.Source()). + Complete(singleton.AsReconciler(c)) +} + +func (c *Controller) reconcileInstanceStatus(ctx context.Context) error { + // Pulling instance status more often can result in rate limiting when many SQS messages + // are received since the interruption controller runs in a hot loop + if c.clk.Since(c.lastInstanceStatusRun) < InstanceStatusInterval { + return nil + } + instanceStatuses, err := c.instanceStatusProvider.List(ctx) + if err != nil { + return fmt.Errorf("getting instance statuses %w", err) + } + + errs := make([]error, len(instanceStatuses)) + workqueue.ParallelizeUntil(ctx, 10, len(instanceStatuses), func(i int) { + categories := map[string]bool{} + for _, d := range instanceStatuses[i].Details { + categories[string(d.Category)] = true } - c.sqsProvider = prov + for cat := range categories { + InstanceStatusUnhealthy.Inc(map[string]string{categoryLabel: cat}) + } + if err := c.handleMessage(ctx, instancestatusfailure.Message(instanceStatuses[i])); err != nil { + errs[i] = fmt.Errorf("handling instance status check message, %w", err) + } + }) + if err = multierr.Combine(errs...); err != nil { + return err } - ctx = log.IntoContext(ctx, log.FromContext(ctx).WithValues("queue", c.sqsProvider.Name())) - if c.cm.HasChanged(c.sqsProvider.Name(), nil) { - log.FromContext(ctx).V(1).Info("watching interruption queue") + return nil +} + +func (c *Controller) reconcileFromSQS(ctx context.Context) error { + if options.FromContext(ctx).InterruptionQueue == "" { + return nil } - sqsMessages, err := c.sqsProvider.GetSQSMessages(ctx) + sqsMessages, err := c.sqsMessages(ctx) if err != nil { - return reconciler.Result{}, fmt.Errorf("getting messages from queue, %w", err) + return err } + if len(sqsMessages) == 0 { - return reconciler.Result{RequeueAfter: singleton.RequeueImmediately}, nil + return nil } errs := make([]error, len(sqsMessages)) @@ -136,16 +206,34 @@ func (c *Controller) Reconcile(ctx context.Context) (reconciler.Result, error) { errs[i] = c.deleteMessage(ctx, sqsMessages[i]) }) if err = multierr.Combine(errs...); err != nil { - return reconciler.Result{}, err + return err } - return reconciler.Result{RequeueAfter: singleton.RequeueImmediately}, nil + return nil } -func (c *Controller) Register(_ context.Context, m manager.Manager) error { - return controllerruntime.NewControllerManagedBy(m). - Named("interruption"). - WatchesRawSource(singleton.Source()). - Complete(singleton.AsReconciler(c)) +func (c *Controller) sqsMessages(ctx context.Context) ([]*sqstypes.Message, error) { + if c.sqsAPI == nil { + return nil, nil + } + // If the provider was unable to instantiate, keep trying. + // This would most likely be due to a permissions issue that can be fixed at runtime. + if c.sqsProvider == nil { + prov, err := sqs.NewSQSProvider(ctx, c.sqsAPI) + if err != nil { + log.FromContext(ctx).Error(err, "failed to create valid sqs provider") + return nil, fmt.Errorf("creating sqs provider, %w", err) + } + c.sqsProvider = prov + } + ctx = log.IntoContext(ctx, log.FromContext(ctx).WithValues("queue", c.sqsProvider.Name())) + if c.cm.HasChanged(c.sqsProvider.Name(), nil) { + log.FromContext(ctx).V(1).Info("watching interruption queue") + } + sqsMessages, err := c.sqsProvider.GetSQSMessages(ctx) + if err != nil { + return nil, fmt.Errorf("getting messages from queue, %w", err) + } + return sqsMessages, nil } // parseMessage parses the passed SQS message into an internal Message interface @@ -270,7 +358,7 @@ func (c *Controller) notifyForMessage(msg messages.Message, nodeClaim *karpv1.No case messages.RebalanceRecommendationKind: c.recorder.Publish(interruptionevents.RebalanceRecommendation(n, nodeClaim)...) - case messages.ScheduledChangeKind: + case messages.ScheduledChangeKind, messages.InstanceStatusFailure: c.recorder.Publish(interruptionevents.Unhealthy(n, nodeClaim)...) case messages.SpotInterruptionKind: @@ -291,7 +379,7 @@ func (c *Controller) notifyForMessage(msg messages.Message, nodeClaim *karpv1.No func actionForMessage(msg messages.Message) Action { switch msg.Kind() { - case messages.ScheduledChangeKind, messages.SpotInterruptionKind, messages.InstanceStoppedKind, messages.InstanceTerminatedKind, messages.CapacityReservationInterruptionKind: + case messages.ScheduledChangeKind, messages.SpotInterruptionKind, messages.InstanceStoppedKind, messages.InstanceTerminatedKind, messages.CapacityReservationInterruptionKind, messages.InstanceStatusFailure: return CordonAndDrain default: return NoAction diff --git a/pkg/controllers/interruption/messages/instancestatusfailure/model.go b/pkg/controllers/interruption/messages/instancestatusfailure/model.go new file mode 100644 index 000000000000..9ac2c6f21312 --- /dev/null +++ b/pkg/controllers/interruption/messages/instancestatusfailure/model.go @@ -0,0 +1,39 @@ +/* +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package instancestatusfailure + +import ( + "time" + + "github.com/aws/karpenter-provider-aws/pkg/controllers/interruption/messages" + "github.com/aws/karpenter-provider-aws/pkg/providers/instancestatus" +) + +// Message contains the Instance Status from EC2.DescribeInstanceStatus +// This is not vended via EventBridge but is handled in a similar manner +// as other EventBridge messages. +type Message instancestatus.HealthStatus + +func (m Message) EC2InstanceIDs() []string { + return []string{m.InstanceID} +} + +func (Message) Kind() messages.Kind { + return messages.InstanceStatusFailure +} + +func (m Message) StartTime() time.Time { + return m.ImpairedSince +} diff --git a/pkg/controllers/interruption/messages/types.go b/pkg/controllers/interruption/messages/types.go index 48fc6ed1fa61..a74244db7164 100644 --- a/pkg/controllers/interruption/messages/types.go +++ b/pkg/controllers/interruption/messages/types.go @@ -41,6 +41,7 @@ const ( InstanceStoppedKind Kind = "instance_stopped" InstanceTerminatedKind Kind = "instance_terminated" CapacityReservationInterruptionKind Kind = "capacity_reservation_interrupted" + InstanceStatusFailure Kind = "instance_status_failure" NoOpKind Kind = "no_op" ) diff --git a/pkg/controllers/interruption/metrics.go b/pkg/controllers/interruption/metrics.go index 655183f4190b..57702e4e9f41 100644 --- a/pkg/controllers/interruption/metrics.go +++ b/pkg/controllers/interruption/metrics.go @@ -25,6 +25,7 @@ import ( const ( interruptionSubsystem = "interruption" messageTypeLabel = "message_type" + categoryLabel = "category" ) var ( @@ -59,4 +60,14 @@ var ( }, []string{}, ) + InstanceStatusUnhealthy = opmetrics.NewPrometheusCounter( + crmetrics.Registry, + prometheus.CounterOpts{ + Namespace: metrics.Namespace, + Subsystem: interruptionSubsystem, + Name: "instance_status_unhealthy_total", + Help: "Count of unhealthy instance statuses detected from EC2 DescribeInstanceStatus. Broken down by status check category.", + }, + []string{categoryLabel}, + ) ) diff --git a/pkg/controllers/interruption/suite_test.go b/pkg/controllers/interruption/suite_test.go index 014d2d386ad9..811ff6144b0e 100644 --- a/pkg/controllers/interruption/suite_test.go +++ b/pkg/controllers/interruption/suite_test.go @@ -24,6 +24,8 @@ import ( "sigs.k8s.io/karpenter/pkg/metrics" "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/service/ec2" + ec2types "github.com/aws/aws-sdk-go-v2/service/ec2/types" servicesqs "github.com/aws/aws-sdk-go-v2/service/sqs" sqstypes "github.com/aws/aws-sdk-go-v2/service/sqs/types" "github.com/aws/smithy-go" @@ -94,7 +96,8 @@ var _ = BeforeSuite(func() { sqsProvider = lo.Must(sqs.NewDefaultProvider(sqsapi, fmt.Sprintf("https://sqs.%s.amazonaws.com/%s/test-cluster", fake.DefaultRegion, fake.DefaultAccount))) cloudProvider := cloudprovider.New(awsEnv.InstanceTypesProvider, awsEnv.InstanceProvider, events.NewRecorder(&record.FakeRecorder{}), env.Client, awsEnv.AMIProvider, awsEnv.SecurityGroupProvider, awsEnv.CapacityReservationProvider, awsEnv.PlacementGroupProvider, awsEnv.InstanceTypeStore) - controller = interruption.NewController(env.Client, cloudProvider, fakeClock, events.NewRecorder(&record.FakeRecorder{}), sqsProvider, servicesqs.NewFromConfig(aws.Config{}), unavailableOfferingsCache, awsEnv.CapacityReservationProvider) + controller = interruption.NewController(env.Client, cloudProvider, fakeClock, events.NewRecorder(&record.FakeRecorder{}), sqsProvider, sqsapi, unavailableOfferingsCache, awsEnv.CapacityReservationProvider, awsEnv.InstanceStatusProvider) + interruption.InstanceStatusInterval = 0 }) var _ = AfterSuite(func() { @@ -103,8 +106,10 @@ var _ = AfterSuite(func() { var _ = BeforeEach(func() { ctx = coreoptions.ToContext(ctx, coretest.Options(coretest.OptionsFields{FeatureGates: coretest.FeatureGates{ReservedCapacity: lo.ToPtr(true)}})) + ctx = options.ToContext(ctx, test.Options(test.OptionsFields{InterruptionQueue: lo.ToPtr("test-cluster")})) unavailableOfferingsCache.Flush() sqsapi.Reset() + interruption.InstanceStatusUnhealthy.Reset() }) var _ = AfterEach(func() { @@ -312,12 +317,85 @@ var _ = Describe("InterruptionHandling", func() { Expect(awsEnv.CapacityReservationProvider.GetAvailableInstanceCount("cr-56fac701cc1951b03")).To(Equal(0)) }) + It("should delete the NodeClaim when an instance is unhealthy due to EC2 status checks or scheduled events", func() { + ctx = options.ToContext(ctx, test.Options(test.OptionsFields{InterruptionQueue: lo.ToPtr("")})) + awsEnv.EC2API.DescribeInstanceStatusOutput.Set(&ec2.DescribeInstanceStatusOutput{ + InstanceStatuses: []ec2types.InstanceStatus{ + { + InstanceId: lo.ToPtr(lo.Must(utils.ParseInstanceID(nodeClaim.Status.ProviderID))), + SystemStatus: &ec2types.InstanceStatusSummary{ + Status: ec2types.SummaryStatusImpaired, + Details: []ec2types.InstanceStatusDetails{ + { + Status: ec2types.StatusTypeFailed, + Name: ec2types.StatusNameReachability, + ImpairedSince: lo.ToPtr(awsEnv.Clock.Now()), + }, + }, + }, + InstanceStatus: &ec2types.InstanceStatusSummary{ + Status: ec2types.SummaryStatusInitializing, + Details: []ec2types.InstanceStatusDetails{ + { + Status: ec2types.StatusTypeInitializing, + Name: ec2types.StatusNameReachability, + }, + }, + }, + Events: []ec2types.InstanceStatusEvent{ + { + Code: ec2types.EventCodeInstanceRetirement, + }, + }, + }, + }, + }) + awsEnv.Clock.Step(time.Hour) + ExpectApplied(ctx, env.Client, nodeClaim, node) + ExpectSingletonReconciled(ctx, controller) + ExpectMetricCounterValue(metrics.NodeClaimsDisruptedTotal, 1, map[string]string{ + metrics.ReasonLabel: "instance_status_failure", + "nodepool": "default", + }) + ExpectMetricCounterValue(interruption.InstanceStatusUnhealthy, 1, map[string]string{ + "category": "SystemStatus", + }) + ExpectMetricCounterValue(interruption.InstanceStatusUnhealthy, 1, map[string]string{ + "category": "EventStatus", + }) + ExpectNotFound(ctx, env.Client, nodeClaim) + }) + It("should NOT delete the NodeClaim when an instance is unhealthy due to EBS Status only", func() { + ctx = options.ToContext(ctx, test.Options(test.OptionsFields{InterruptionQueue: lo.ToPtr("")})) + awsEnv.EC2API.DescribeInstanceStatusOutput.Set(&ec2.DescribeInstanceStatusOutput{ + InstanceStatuses: []ec2types.InstanceStatus{ + { + InstanceId: lo.ToPtr(lo.Must(utils.ParseInstanceID(nodeClaim.Status.ProviderID))), + AttachedEbsStatus: &ec2types.EbsStatusSummary{ + Status: ec2types.SummaryStatusImpaired, + Details: []ec2types.EbsStatusDetails{ + { + Status: ec2types.StatusTypeFailed, + Name: ec2types.StatusNameReachability, + ImpairedSince: lo.ToPtr(awsEnv.Clock.Now()), + }, + }, + }, + }, + }, + }) + awsEnv.Clock.Step(time.Hour) + ExpectApplied(ctx, env.Client, nodeClaim, node) + ExpectSingletonReconciled(ctx, controller) + ExpectExists(ctx, env.Client, nodeClaim) + }) }) }) var _ = Describe("Error Handling", func() { It("should send an error on polling when QueueNotExists", func() { sqsapi.ReceiveMessageBehavior.Error.Set(smithyErrWithCode("QueueDoesNotExist"), fake.MaxCalls(0)) + _ = ExpectSingletonReconcileFailed(ctx, controller) }) It("should send an error on polling when AccessDenied", func() { sqsapi.ReceiveMessageBehavior.Error.Set(smithyErrWithCode("AccessDenied"), fake.MaxCalls(0)) diff --git a/pkg/fake/ec2api.go b/pkg/fake/ec2api.go index ca3b34ddebe3..fa2b6b31bcc2 100644 --- a/pkg/fake/ec2api.go +++ b/pkg/fake/ec2api.go @@ -54,6 +54,7 @@ type EC2Behavior struct { DescribeLaunchTemplatesOutput AtomicPtr[ec2.DescribeLaunchTemplatesOutput] DescribeInstanceTypesOutput AtomicPtr[ec2.DescribeInstanceTypesOutput] DescribeInstanceTypeOfferingsOutput AtomicPtr[ec2.DescribeInstanceTypeOfferingsOutput] + DescribeInstanceStatusOutput AtomicPtr[ec2.DescribeInstanceStatusOutput] DescribeAvailabilityZonesOutput AtomicPtr[ec2.DescribeAvailabilityZonesOutput] DescribeSubnetsBehavior MockedFunction[ec2.DescribeSubnetsInput, ec2.DescribeSubnetsOutput] DescribeSecurityGroupsBehavior MockedFunction[ec2.DescribeSecurityGroupsInput, ec2.DescribeSecurityGroupsOutput] @@ -93,6 +94,7 @@ func (e *EC2API) Reset() { e.DescribeLaunchTemplatesOutput.Reset() e.DescribeInstanceTypesOutput.Reset() e.DescribeInstanceTypeOfferingsOutput.Reset() + e.DescribeInstanceStatusOutput.Reset() e.DescribeAvailabilityZonesOutput.Reset() e.DescribeSubnetsBehavior.Reset() e.DescribeSecurityGroupsBehavior.Reset() @@ -653,3 +655,14 @@ func (e *EC2API) RunInstances(ctx context.Context, input *ec2.RunInstancesInput, }, nil }) } + +func (e *EC2API) DescribeInstanceStatus(ctx context.Context, input *ec2.DescribeInstanceStatusInput, optFns ...func(*ec2.Options)) (*ec2.DescribeInstanceStatusOutput, error) { + if !e.NextError.IsNil() { + defer e.NextError.Reset() + return nil, e.NextError.Get() + } + if !e.DescribeInstanceStatusOutput.IsNil() { + return e.DescribeInstanceStatusOutput.Clone(), nil + } + return &ec2.DescribeInstanceStatusOutput{}, nil +} diff --git a/pkg/operator/operator.go b/pkg/operator/operator.go index 62fae13133cb..975822000b19 100644 --- a/pkg/operator/operator.go +++ b/pkg/operator/operator.go @@ -57,6 +57,7 @@ import ( "github.com/aws/karpenter-provider-aws/pkg/providers/capacityreservation" "github.com/aws/karpenter-provider-aws/pkg/providers/instance" "github.com/aws/karpenter-provider-aws/pkg/providers/instanceprofile" + "github.com/aws/karpenter-provider-aws/pkg/providers/instancestatus" "github.com/aws/karpenter-provider-aws/pkg/providers/instancetype" "github.com/aws/karpenter-provider-aws/pkg/providers/launchtemplate" "github.com/aws/karpenter-provider-aws/pkg/providers/placementgroup" @@ -90,6 +91,7 @@ type Operator struct { VersionProvider *version.DefaultProvider InstanceTypesProvider *instancetype.DefaultProvider InstanceProvider instance.Provider + InstanceStatusProvider *instancestatus.DefaultProvider SSMProvider ssmp.Provider CapacityReservationProvider capacityreservation.Provider PlacementGroupProvider placementgroup.Provider @@ -210,6 +212,7 @@ func NewOperator(ctx context.Context, operator *operator.Operator) (context.Cont placementGroupProvider, cache.New(awscache.DefaultTTL, awscache.DefaultCleanupInterval), ) + instanceStatusProvider := instancestatus.NewDefaultProvider(ec2api, operator.Clock) // Setup field indexers on instanceID -- specifically for the interruption controller if options.FromContext(ctx).InterruptionQueue != "" { @@ -232,6 +235,7 @@ func NewOperator(ctx context.Context, operator *operator.Operator) (context.Cont PricingProvider: pricingProvider, InstanceTypesProvider: instanceTypeProvider, InstanceProvider: instanceProvider, + InstanceStatusProvider: instanceStatusProvider, SSMProvider: ssmProvider, CapacityReservationProvider: capacityReservationProvider, PlacementGroupProvider: placementGroupProvider, diff --git a/pkg/providers/instancestatus/instancestatus.go b/pkg/providers/instancestatus/instancestatus.go new file mode 100644 index 000000000000..41af1dff2814 --- /dev/null +++ b/pkg/providers/instancestatus/instancestatus.go @@ -0,0 +1,175 @@ +/* +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package instancestatus + +import ( + "context" + "fmt" + "slices" + "time" + + "github.com/aws/aws-sdk-go-v2/service/ec2" + ec2types "github.com/aws/aws-sdk-go-v2/service/ec2/types" + "github.com/samber/lo" + "k8s.io/utils/clock" + + sdk "github.com/aws/karpenter-provider-aws/pkg/aws" +) + +type Category string + +const ( + InstanceStatus = Category("InstanceStatus") + SystemStatus = Category("SystemStatus") + // EventStatus surfaces scheduled maintenance events. These are also consumed via EventBridge + // in the Interruption controller when an SQS queue is configured. The handling of maintenance events is + // currently primitive where we treat all events as instance degradation with an involuntary replacement. + EventStatus = Category("EventStatus") + // EBSStatus check failures are currently ignored until we can differentiate which volumes affect the node vs pods w/ PVCs + EBSStatus = Category("EBSStatus") +) + +var ( + UnhealthyThreshold = 120 * time.Second +) + +type Provider interface { + List(context.Context) ([]HealthStatus, error) +} + +type DefaultProvider struct { + ec2api sdk.EC2API + clk clock.Clock +} + +type HealthStatus struct { + InstanceID string + Overall ec2types.SummaryStatus + ImpairedSince time.Time + Details []Details +} + +type Details struct { + Category Category + Name string + ImpairedSince time.Time + Status ec2types.StatusType +} + +func NewDefaultProvider(ec2API sdk.EC2API, clk clock.Clock) *DefaultProvider { + return &DefaultProvider{ + ec2api: ec2API, + clk: clk, + } +} + +func (p DefaultProvider) List(ctx context.Context) ([]HealthStatus, error) { + var statuses []ec2types.InstanceStatus + pager := ec2.NewDescribeInstanceStatusPaginator(p.ec2api, &ec2.DescribeInstanceStatusInput{}) + + for pager.HasMorePages() { + out, err := pager.NextPage(ctx) + if err != nil { + return nil, fmt.Errorf("failed describing ec2 instance status checks, %w", err) + } + statuses = append(statuses, out.InstanceStatuses...) + } + + var healthStatuses []HealthStatus + for _, statusChecks := range statuses { + healthStatus := p.newHealthStatus(statusChecks) + // Filter out statuses that we do not consider unhealthy or do not want to handle right now + healthStatus.Details = lo.Filter(healthStatus.Details, func(details Details, _ int) bool { + if details.Status != ec2types.StatusTypeFailed { + return false + } + // ignore EBS health checks for now + if details.Category == EBSStatus { + return false + } + // Do not evaluate against the unhealthy threshold when its a scheduled maintenance event. + // Scheduled maintenance events often have a future scheduled time which makes a threshold + // difficult to utilize. We take the stance that if there is a scheduled maintenance event, + // then there is something wrong with the underlying host that warrants vacating immediately. + // This matches how we process scheduled maintenance events from EventBridge. + if details.Category == EventStatus { + return true + } + return p.clk.Since(details.ImpairedSince) >= UnhealthyThreshold + }) + if len(healthStatus.Details) == 0 { + continue + } + healthStatus.ImpairedSince = slices.MinFunc(healthStatus.Details, func(a, b Details) int { + return a.ImpairedSince.Compare(b.ImpairedSince) + }).ImpairedSince + healthStatuses = append(healthStatuses, healthStatus) + } + return healthStatuses, nil +} + +// newHealthStatus constructs a more consumable version of Health Status Details from the different status checks +func (p DefaultProvider) newHealthStatus(statusChecks ec2types.InstanceStatus) HealthStatus { + healthStatus := HealthStatus{ + InstanceID: *statusChecks.InstanceId, + Overall: ec2types.SummaryStatusImpaired, + } + if statusChecks.InstanceStatus != nil { + healthStatus.Details = append(healthStatus.Details, lo.Map(statusChecks.InstanceStatus.Details, func(details ec2types.InstanceStatusDetails, _ int) Details { + return p.newDetails(details, InstanceStatus) + })...) + } + if statusChecks.SystemStatus != nil { + healthStatus.Details = append(healthStatus.Details, lo.Map(statusChecks.SystemStatus.Details, func(details ec2types.InstanceStatusDetails, _ int) Details { + return p.newDetails(details, SystemStatus) + })...) + } + if statusChecks.AttachedEbsStatus != nil { + healthStatus.Details = append(healthStatus.Details, lo.Map(statusChecks.AttachedEbsStatus.Details, func(details ec2types.EbsStatusDetails, _ int) Details { + return p.newDetails(details, EBSStatus) + })...) + } + healthStatus.Details = append(healthStatus.Details, lo.Map(statusChecks.Events, func(details ec2types.InstanceStatusEvent, _ int) Details { + return p.newDetails(details, EventStatus) + })...) + return healthStatus +} + +func (p DefaultProvider) newDetails(details any, category Category) Details { + if ec2Details, ok := details.(ec2types.InstanceStatusDetails); ok { + return Details{ + Category: category, + Name: string(ec2Details.Name), + Status: ec2Details.Status, + ImpairedSince: lo.FromPtr(ec2Details.ImpairedSince), + } + } + if ec2Events, ok := details.(ec2types.InstanceStatusEvent); ok { + return Details{ + Category: category, + Name: string(ec2Events.Code), + // treat all scheduled maintenance events as failures + Status: ec2types.StatusTypeFailed, + ImpairedSince: p.clk.Now(), + } + } + ebsDetails := details.(ec2types.EbsStatusDetails) + return Details{ + Category: category, + Name: string(ebsDetails.Name), + Status: ebsDetails.Status, + ImpairedSince: lo.FromPtr(ebsDetails.ImpairedSince), + } +} diff --git a/pkg/providers/instancestatus/suite_test.go b/pkg/providers/instancestatus/suite_test.go new file mode 100644 index 000000000000..39157bba7f83 --- /dev/null +++ b/pkg/providers/instancestatus/suite_test.go @@ -0,0 +1,178 @@ +/* +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package instancestatus_test + +import ( + "context" + "testing" + "time" + + "github.com/aws/aws-sdk-go-v2/service/ec2" + ec2types "github.com/aws/aws-sdk-go-v2/service/ec2/types" + "github.com/samber/lo" + coretest "sigs.k8s.io/karpenter/pkg/test" + + "github.com/aws/karpenter-provider-aws/pkg/operator/options" + "github.com/aws/karpenter-provider-aws/pkg/providers/instancestatus" + "github.com/aws/karpenter-provider-aws/pkg/test" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "sigs.k8s.io/karpenter/pkg/test/v1alpha1" + . "sigs.k8s.io/karpenter/pkg/utils/testing" +) + +var ctx context.Context +var env *coretest.Environment +var awsEnv *test.Environment + +func TestAPIs(t *testing.T) { + ctx = TestContextWithLogger(t) + RegisterFailHandler(Fail) + RunSpecs(t, "InstanceStatusProvider") +} + +var _ = BeforeSuite(func() { + env = coretest.NewEnvironment(coretest.WithCRDs(v1alpha1.CRDs...)) + ctx = options.ToContext(ctx, test.Options()) + awsEnv = test.NewEnvironment(ctx, env) +}) + +var _ = Describe("Instance Status Provider", func() { + + BeforeEach(func() { + awsEnv.Clock.SetTime(time.Time{}) + statuses := []ec2types.InstanceStatus{ + { + InstanceId: lo.ToPtr("i-0123456789"), + InstanceStatus: &ec2types.InstanceStatusSummary{ + Status: ec2types.SummaryStatusImpaired, + Details: []ec2types.InstanceStatusDetails{ + { + Status: ec2types.StatusTypeFailed, + Name: ec2types.StatusNameReachability, + ImpairedSince: lo.ToPtr(awsEnv.Clock.Now()), + }, + }, + }, + SystemStatus: &ec2types.InstanceStatusSummary{ + Status: ec2types.SummaryStatusImpaired, + Details: []ec2types.InstanceStatusDetails{ + { + Status: ec2types.StatusTypeFailed, + Name: ec2types.StatusNameReachability, + ImpairedSince: lo.ToPtr(awsEnv.Clock.Now()), + }, + }, + }, + AttachedEbsStatus: &ec2types.EbsStatusSummary{ + Status: ec2types.SummaryStatusImpaired, + Details: []ec2types.EbsStatusDetails{ + { + Status: ec2types.StatusTypeFailed, + Name: ec2types.StatusNameReachability, + ImpairedSince: lo.ToPtr(awsEnv.Clock.Now()), + }, + }, + }, + Events: []ec2types.InstanceStatusEvent{ + { + Code: ec2types.EventCodeInstanceRetirement, + }, + }, + }, + } + awsEnv.EC2API.DescribeInstanceStatusOutput.Set(&ec2.DescribeInstanceStatusOutput{ + InstanceStatuses: statuses, + }) + }) + Context("List and Aggregate", func() { + It("should return all impairment details", func() { + impairedTime := awsEnv.Clock.Now() + awsEnv.Clock.Step(1 * time.Hour) + statuses, err := awsEnv.InstanceStatusProvider.List(ctx) + Expect(err).ToNot(HaveOccurred()) + Expect(statuses).To(HaveLen(1)) + Expect(statuses[0].InstanceID).To(Equal("i-0123456789")) + Expect(statuses[0].Overall).To(Equal(ec2types.SummaryStatusImpaired)) + Expect(statuses[0].Details).To(ContainElements( + instancestatus.Details{ + Category: instancestatus.InstanceStatus, + Name: string(ec2types.StatusNameReachability), + Status: ec2types.StatusTypeFailed, + ImpairedSince: impairedTime, + }, + instancestatus.Details{ + Category: instancestatus.SystemStatus, + Name: string(ec2types.StatusNameReachability), + Status: ec2types.StatusTypeFailed, + ImpairedSince: impairedTime, + }, + instancestatus.Details{ + Category: instancestatus.EventStatus, + Name: string(ec2types.EventCodeInstanceRetirement), + Status: ec2types.StatusTypeFailed, + ImpairedSince: awsEnv.Clock.Now(), + }, + )) + }) + It("should not return healthy statuses", func() { + awsEnv.EC2API.DescribeInstanceStatusOutput.Set(&ec2.DescribeInstanceStatusOutput{ + InstanceStatuses: []ec2types.InstanceStatus{ + { + InstanceId: lo.ToPtr("i-0123456789"), + SystemStatus: &ec2types.InstanceStatusSummary{ + Status: ec2types.SummaryStatusInitializing, + }, + InstanceStatus: &ec2types.InstanceStatusSummary{ + Status: ec2types.SummaryStatusInsufficientData, + }, + AttachedEbsStatus: &ec2types.EbsStatusSummary{ + Status: ec2types.SummaryStatusInitializing, + }, + }, + }, + }) + statuses, err := awsEnv.InstanceStatusProvider.List(ctx) + Expect(err).ToNot(HaveOccurred()) + Expect(statuses).To(HaveLen(0)) + }) + It("should return scheduled maintenance events even when other statuses are healthy", func() { + awsEnv.EC2API.DescribeInstanceStatusOutput.Set(&ec2.DescribeInstanceStatusOutput{ + InstanceStatuses: []ec2types.InstanceStatus{ + { + InstanceId: lo.ToPtr("i-0123456789"), + SystemStatus: &ec2types.InstanceStatusSummary{ + Status: ec2types.SummaryStatusInitializing, + }, + InstanceStatus: &ec2types.InstanceStatusSummary{ + Status: ec2types.SummaryStatusInsufficientData, + }, + Events: []ec2types.InstanceStatusEvent{ + { + Code: ec2types.EventCodeInstanceRetirement, + }, + }, + }, + }, + }) + statuses, err := awsEnv.InstanceStatusProvider.List(ctx) + Expect(err).ToNot(HaveOccurred()) + Expect(statuses).To(HaveLen(1)) + Expect(statuses[0].Details).To(HaveLen(1)) + Expect(statuses[0].Details[0].Category).To(Equal(instancestatus.EventStatus)) + }) + }) +}) diff --git a/pkg/providers/sqs/sqs.go b/pkg/providers/sqs/sqs.go index 7a81b799dab4..7ffa94bae73b 100644 --- a/pkg/providers/sqs/sqs.go +++ b/pkg/providers/sqs/sqs.go @@ -104,7 +104,7 @@ func (p *DefaultProvider) DeleteSQSMessage(ctx context.Context, msg *sqstypes.Me return nil } -func NewSQSProvider(ctx context.Context, sqsapi *sqs.Client) (Provider, error) { +func NewSQSProvider(ctx context.Context, sqsapi sdk.SQSAPI) (Provider, error) { out, err := sqsapi.GetQueueUrl(ctx, &sqs.GetQueueUrlInput{QueueName: lo.ToPtr(options.FromContext(ctx).InterruptionQueue)}) if err != nil { return nil, err diff --git a/pkg/test/environment.go b/pkg/test/environment.go index 36ec27492b21..73abe3ccbdf7 100644 --- a/pkg/test/environment.go +++ b/pkg/test/environment.go @@ -35,6 +35,7 @@ import ( "github.com/aws/karpenter-provider-aws/pkg/providers/capacityreservation" "github.com/aws/karpenter-provider-aws/pkg/providers/instance" "github.com/aws/karpenter-provider-aws/pkg/providers/instanceprofile" + "github.com/aws/karpenter-provider-aws/pkg/providers/instancestatus" "github.com/aws/karpenter-provider-aws/pkg/providers/instancetype" "github.com/aws/karpenter-provider-aws/pkg/providers/launchtemplate" "github.com/aws/karpenter-provider-aws/pkg/providers/placementgroup" @@ -97,6 +98,7 @@ type Environment struct { InstanceTypesResolver *instancetype.DefaultResolver InstanceTypesProvider *instancetype.DefaultProvider InstanceProvider *instance.DefaultProvider + InstanceStatusProvider *instancestatus.DefaultProvider SubnetProvider *subnet.DefaultProvider SecurityGroupProvider *securitygroup.DefaultProvider InstanceProfileProvider *instanceprofile.DefaultProvider @@ -166,6 +168,7 @@ func NewEnvironment(ctx context.Context, env *coretest.Environment) *Environment // Instance type updates are hydrated asynchronously after this by controllers. lo.Must0(instanceTypesProvider.UpdateInstanceTypes(ctx)) lo.Must0(instanceTypesProvider.UpdateInstanceTypeOfferings(ctx)) + instanceStatusProvider := instancestatus.NewDefaultProvider(ec2api, clock) launchTemplateProvider := launchtemplate.NewDefaultProvider( ctx, launchTemplateCache, @@ -236,6 +239,7 @@ func NewEnvironment(ctx context.Context, env *coretest.Environment) *Environment InstanceTypesResolver: instanceTypesResolver, InstanceTypesProvider: instanceTypesProvider, InstanceProvider: instanceProvider, + InstanceStatusProvider: instanceStatusProvider, SubnetProvider: subnetProvider, SecurityGroupProvider: securityGroupProvider, LaunchTemplateProvider: launchTemplateProvider, diff --git a/test/suites/interruption/suite_test.go b/test/suites/interruption/suite_test.go index d4a5683a5101..072a2a9411a9 100644 --- a/test/suites/interruption/suite_test.go +++ b/test/suites/interruption/suite_test.go @@ -267,6 +267,42 @@ var _ = Describe("Interruption", func() { env.EventuallyExpectNotFound(node) env.EventuallyExpectHealthyPodCount(selector, 1) }) + It("should terminate the node when receiving an instance status failure", func() { + numPods := 1 + dep := coretest.Deployment(coretest.DeploymentOptions{ + Replicas: int32(numPods), + PodOptions: coretest.PodOptions{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{"app": "my-app"}, + }, + TerminationGracePeriodSeconds: lo.ToPtr(int64(0)), + }, + }) + selector := labels.SelectorFromSet(dep.Spec.Selector.MatchLabels) + + // schedule the interface to go down after 70 seconds. + // this has to be above a minute so that the instance status check initializes to healthy. + nodeClass.Spec.UserData = lo.ToPtr(`#!/usr/bin/env bash +( + sleep 30 + IFACE=$(ip route show default | awk '{print $5}' | head -n1) + ip link set dev "$IFACE" down +) >>/var/log/disable-net.log 2>&1 &`) + + env.ExpectCreated(nodeClass, nodePool, dep) + + env.EventuallyExpectHealthyPodCount(selector, numPods) + env.ExpectCreatedNodeCount("==", 1) + + node := env.Monitor.CreatedNodes()[0] + + Eventually(func(g Gomega) { + g.Expect(env.Client.Get(env.Context, client.ObjectKeyFromObject(node), node)).To(Succeed()) + g.Expect(!node.DeletionTimestamp.IsZero()).To(BeTrue()) + }).WithTimeout(15 * time.Minute).Should(Succeed()) + env.EventuallyExpectNotFound(node) + env.EventuallyExpectHealthyPodCount(selector, 1) + }) }) func scheduledChangeMessage(region, accountID, involvedInstanceID string) scheduledchange.Message { diff --git a/website/content/en/preview/concepts/disruption.md b/website/content/en/preview/concepts/disruption.md index d44fc55fa969..ceea8c4b333a 100644 --- a/website/content/en/preview/concepts/disruption.md +++ b/website/content/en/preview/concepts/disruption.md @@ -196,10 +196,11 @@ Doing so can result in partially drained nodes stuck in the cluster, driving up If interruption-handling is enabled, Karpenter will watch for upcoming involuntary interruption events that would cause disruption to your workloads. These interruption events include: -* Spot Interruption Warnings -* Scheduled Change Health Events (Maintenance Events) -* Instance Terminating Events -* Instance Stopping Events +* Spot Interruption Warnings +* Scheduled Change Health Events (Maintenance Events) +* Instance Terminating Events +* Instance Stopping Events +* Instance Status Check Failures When Karpenter detects one of these events will occur to your nodes, it automatically taints, drains, and terminates the node(s) ahead of the interruption event to give the maximum amount of time for workload cleanup prior to compute disruption. This enables scenarios where the `terminationGracePeriod` for your workloads may be long or cleanup for your workloads is critical, and you want enough time to be able to gracefully clean-up your pods. @@ -211,9 +212,17 @@ Karpenter publishes Kubernetes events to the node for all events listed above in If you require handling for Spot Rebalance Recommendations, you can use the [AWS Node Termination Handler (NTH)](https://github.com/aws/aws-node-termination-handler) alongside Karpenter; however, note that the AWS Node Termination Handler cordons and drains nodes on rebalance recommendations, potentially causing more node churn in the cluster than with interruptions alone. Further information can be found in the [Troubleshooting Guide]({{< ref "../troubleshooting#aws-node-termination-handler-nth-interactions" >}}). {{% /alert %}} -Karpenter enables this feature by watching an SQS queue which receives critical events from AWS services which may affect your nodes. Karpenter requires that an SQS queue be provisioned and EventBridge rules and targets be added that forward interruption events from AWS services to the SQS queue. Karpenter provides details for provisioning this infrastructure in the [CloudFormation template in the Getting Started Guide](../../getting-started/getting-started-with-karpenter/#create-the-karpenter-infrastructure-and-iam-roles). +Karpenter handles most interruption events by watching an SQS queue which receives critical events from AWS services which may affect your nodes. Karpenter requires that an SQS queue be provisioned and EventBridge rules and targets be added that forward interruption events from AWS services to the SQS queue. Karpenter provides details for provisioning this infrastructure in the [CloudFormation template in the Getting Started Guide](../../getting-started/getting-started-with-karpenter/#create-the-karpenter-infrastructure-and-iam-roles). -To enable interruption handling, configure the `--interruption-queue` CLI argument with the name of the interruption queue provisioned to handle interruption events. +To enable full interruption handling, configure the `--interruption-queue` CLI argument with the name of the interruption queue provisioned to handle interruption events. + +Additionally, Karpenter utilizes the [EC2 DescribeInstanceStatus](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/monitoring-system-instance-status-check.html) API to check for unhealthy EC2 instances managed by Karpenter. The status checks Karpenter responds to are: + +* System Status - surfaces failures in the underlying physical host (hardware or software) +* Instance Status - surfaces failures in the virtual machine +* Scheduled Maintenance Events - surfaces upcoming maintenance events that may affect the instance + +These status checks do not require the `--interruption-queue` to be configured, just EC2 DescribeInstanceStatus IAM permissions. ### Node Auto Repair diff --git a/website/content/en/preview/getting-started/getting-started-with-karpenter/cloudformation.yaml b/website/content/en/preview/getting-started/getting-started-with-karpenter/cloudformation.yaml index 5f794c389e8d..fcdba0a48c0e 100644 --- a/website/content/en/preview/getting-started/getting-started-with-karpenter/cloudformation.yaml +++ b/website/content/en/preview/getting-started/getting-started-with-karpenter/cloudformation.yaml @@ -350,6 +350,14 @@ Resources: "Effect": "Allow", "Resource": "arn:${AWS::Partition}:iam::${AWS::AccountId}:instance-profile/*", "Action": "iam:GetInstanceProfile" + "Resource": "arn:${AWS::Partition}:eks:${AWS::Region}:${AWS::AccountId}:cluster/${ClusterName}", + "Action": "eks:DescribeCluster" + }, + { + "Sid": "AllowUnscopedEC2DescribeInstanceStatus", + "Effect": "Allow", + "Resource": "*", + "Action": "ec2:DescribeInstanceStatus" } ] }