-
Couldn't load subscription status.
- Fork 278
Add/lifecycle heartbeat #1116
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add/lifecycle heartbeat #1116
Changes from 20 commits
d991814
0e4b686
1fbd7cb
d0f1ef4
df0696f
d7f8e07
a6cfd89
64e9cff
d3047a0
559adc3
7012bab
bc79eb7
75400a9
bbddcfa
029fdf7
56b3f55
7221ed2
4bcb916
4ff40d9
fe7fcc1
265828d
044fc3a
2732775
0492976
1631bb6
b41751d
9e3fe77
dbdeec1
80b88a4
9c54964
56ea41d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -617,6 +617,71 @@ extraScrapeConfigs: | | |
| In IMDS mode, metrics can be collected as follows: | ||
| - Use a `podMonitor` custom resource with the Prometheus Operator to collect metrics. | ||
|
|
||
| ## Issuing Lifecycle Heartbeats | ||
|
|
||
| You can set NTH to send heartbeats to ASG in Queue Processor mode. This allows for a much longer grace period (up to 48 hours) for termination than the maximum heartbeat timeout of two hours. | ||
|
|
||
| ### How it works | ||
|
|
||
| - When NTH receives an ASG lifecycle termination event, it starts sending heartbeats to ASG to renew the heartbeat timeout associated with the ASG's termination lifecycle hook. | ||
| - The heartbeat timeout acts as a timer that starts when the termination event begins. | ||
| - Before the timeout reaches zero, the termination process is halted at the `Terminating:Wait` stage. | ||
| - Previously, NTH couldn't issue heartbeats, limiting the maximum time for preventing termination to the maximum heartbeat timeout (7200 seconds). | ||
| - Now, the graceful termination duration can be extended up to 48 hours, limited by the global timeout. | ||
|
|
||
| ### How to use | ||
|
|
||
| - Specify values for `Heartbeat Interval` (required) and `Heartbeat Until` (optional). | ||
LikithaVemulapalli marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| ### Configurations | ||
| #### `Heartbeat Interval` | ||
| - Time period between consecutive heartbeat signals (in seconds) | ||
| - Range: 30 to 3600 seconds (30 seconds to 1 hour) | ||
| - Flag for custom resource definition by *.yaml / helm: `heartbeatInterval` | ||
| - CLI flag: `heartbeat-interval` | ||
|
|
||
| #### `Heartbeat Until` | ||
| - Duration over which heartbeat signals are sent (in seconds) | ||
| - Range: 60 to 172800 seconds (1 minute to 48 hours) | ||
| - Flag for custom resource definition by *.yaml / helm: `heartbeatUntil` | ||
| - CLI flag: `heartbeat-until` | ||
|
|
||
| #### Example Case | ||
|
|
||
| - `Heartbeat Interval`: 1000 seconds | ||
| - `Heartbeat Until`: 4500 seconds | ||
| - `Heartbeat Timeout`: 3000 seconds | ||
|
|
||
| | Time (s) | Event | Heartbeat Timeout (HT) | Heartbeat Until (HU) | Action | | ||
| |----------|-------------|------------------|----------------------|--------| | ||
| | 0 | Start | 3000 | 4500 | Termination Event Received | | ||
| | 1000 | HB1 Issued | 2000 -> 3000 | 3500 | Send Heartbeat | | ||
| | 2000 | HB2 Issued | 2000 -> 3000 | 2500 | Send Heartbeat | | ||
| | 3000 | HB3 Issued | 2000 -> 3000 | 1500 | Send Heartbeat | | ||
| | 4000 | HB4 Issued | 2000 -> 3000 | 500 | Send Heartbeat | | ||
| | 4500 | HB Expires | 2500 | 0 | Stop Heartbeats | | ||
| | 7000 | Termination | - | - | Instance Terminates | | ||
|
|
||
| Note: The instance can terminate earlier if its pods finish draining and are ready for termination. | ||
|
|
||
| ### Example Helm Command | ||
|
|
||
| ```sh | ||
| helm upgrade --install aws-node-termination-handler \ | ||
| --namespace kube-system \ | ||
| --set enableSqsTerminationDraining=true \ | ||
| --set heartbeatInterval=1000 \ | ||
| --set heartbeatUntil=4500 \ | ||
| // other inputs.. | ||
| ``` | ||
|
|
||
| ### Important Notes | ||
|
|
||
| - A lifecycle hook for instance termination is required for this feature. Longer grace periods are achieved by renewing the heartbeat timeout of the ASG's lifecycle hook. Instances terminate instantly without a hook. | ||
|
||
| - Issuing lifecycle heartbeats is only supported in Queue Processor mode. Setting `enableSqsTerminationDraining=false` and specifying heartbeat flags is prevented in Helm. Directly editing deployment settings to do this will cause NTH to fail. | ||
| - The heartbeat interval should be sufficiently smaller than the heartbeat timeout. There's a time gap between instance start and NTH start. Setting the interval just slightly smaller than or equal to the timeout causes the heartbeat timeout to expire before the heartbeat is issued. Provide enough buffer for NTH to finish initializing. | ||
| - Issuing heartbeats is part of the termination process. The maximum number of instances that NTH can handle termination concurrently is limited by the number of workers. This implies that heartbeats can only be issued for up to the number of instances specified by the `workers` flag simultaneously. | ||
|
|
||
| ## Communication | ||
| * If you've run into a bug or have a new feature request, please open an [issue](https://github.com/aws/aws-node-termination-handler/issues/new). | ||
| * You can also chat with us in the [Kubernetes Slack](https://kubernetes.slack.com) in the `#provider-aws` channel | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -112,6 +112,9 @@ const ( | |
| queueURLConfigKey = "QUEUE_URL" | ||
| completeLifecycleActionDelaySecondsKey = "COMPLETE_LIFECYCLE_ACTION_DELAY_SECONDS" | ||
| deleteSqsMsgIfNodeNotFoundKey = "DELETE_SQS_MSG_IF_NODE_NOT_FOUND" | ||
| // heartbeat | ||
| heartbeatIntervalKey = "HEARTBEAT_INTERVAL" | ||
| heartbeatUntilKey = "HEARTBEAT_UNTIL" | ||
| ) | ||
|
|
||
| // Config arguments set via CLI, environment variables, or defaults | ||
|
|
@@ -166,6 +169,8 @@ type Config struct { | |
| CompleteLifecycleActionDelaySeconds int | ||
| DeleteSqsMsgIfNodeNotFound bool | ||
| UseAPIServerCacheToListPods bool | ||
| HeartbeatInterval int | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We need to have some coverage around these newly added configs in config-test.go file... |
||
| HeartbeatUntil int | ||
| } | ||
|
|
||
| // ParseCliArgs parses cli arguments and uses environment variables as fallback values | ||
|
|
@@ -230,6 +235,8 @@ func ParseCliArgs() (config Config, err error) { | |
| flag.IntVar(&config.CompleteLifecycleActionDelaySeconds, "complete-lifecycle-action-delay-seconds", getIntEnv(completeLifecycleActionDelaySecondsKey, -1), "Delay completing the Autoscaling lifecycle action after a node has been drained.") | ||
| flag.BoolVar(&config.DeleteSqsMsgIfNodeNotFound, "delete-sqs-msg-if-node-not-found", getBoolEnv(deleteSqsMsgIfNodeNotFoundKey, false), "If true, delete SQS Messages from the SQS Queue if the targeted node(s) are not found.") | ||
| flag.BoolVar(&config.UseAPIServerCacheToListPods, "use-apiserver-cache", getBoolEnv(useAPIServerCache, false), "If true, leverage the k8s apiserver's index on pod's spec.nodeName to list pods on a node, instead of doing an etcd quorum read.") | ||
| flag.IntVar(&config.HeartbeatInterval, "heartbeat-interval", getIntEnv(heartbeatIntervalKey, -1), "The time period in seconds between consecutive heartbeat signals. Valid range: 30-3600 seconds (30 seconds to 1 hour).") | ||
| flag.IntVar(&config.HeartbeatUntil, "heartbeat-until", getIntEnv(heartbeatUntilKey, -1), "The duration in seconds over which heartbeat signals are sent. Valid range: 60-172800 seconds (1 minute to 48 hours).") | ||
| flag.Parse() | ||
|
|
||
| if isConfigProvided("pod-termination-grace-period", podTerminationGracePeriodConfigKey) && isConfigProvided("grace-period", gracePeriodConfigKey) { | ||
|
|
@@ -274,6 +281,26 @@ func ParseCliArgs() (config Config, err error) { | |
| panic("You must provide a node-name to the CLI or NODE_NAME environment variable.") | ||
| } | ||
|
|
||
| // heartbeat value boundary and compability check | ||
| if config.EnableSQSTerminationDraining { | ||
| if config.HeartbeatInterval != -1 && (config.HeartbeatInterval < 30 || config.HeartbeatInterval > 3600) { | ||
| return config, fmt.Errorf("invalid heartbeat-interval passed: %d Should be between 30 and 3600 seconds", config.HeartbeatInterval) | ||
| } | ||
| if config.HeartbeatUntil != -1 && (config.HeartbeatUntil < 60 || config.HeartbeatUntil > 172800) { | ||
| return config, fmt.Errorf("invalid heartbeat-until passed: %d Should be between 60 and 172800 seconds", config.HeartbeatUntil) | ||
| } | ||
| if config.HeartbeatInterval != -1 && config.HeartbeatUntil == -1 { | ||
| config.HeartbeatUntil = 172800 | ||
| log.Info().Msgf("Since heartbeat-until is not set, defaulting to %d seconds", config.HeartbeatUntil) | ||
| } else if config.HeartbeatInterval == -1 && config.HeartbeatUntil != -1 { | ||
| return config, fmt.Errorf("invalid heartbeat configuration: heartbeat-interval is required when heartbeat-until is set") | ||
| } | ||
| } else { | ||
| if config.HeartbeatInterval != -1 || config.HeartbeatUntil != -1 { | ||
LikithaVemulapalli marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| return config, fmt.Errorf("currently using IMDS mode. Heartbeat is only supported for Queue Processor mode") | ||
| } | ||
| } | ||
|
|
||
| // client-go expects these to be set in env vars | ||
| os.Setenv(kubernetesServiceHostConfigKey, config.KubernetesServiceHost) | ||
| os.Setenv(kubernetesServicePortConfigKey, config.KubernetesServicePort) | ||
|
|
@@ -332,6 +359,8 @@ func (c Config) PrintJsonConfigArgs() { | |
| Str("ManagedTag", c.ManagedTag). | ||
| Bool("use_provider_id", c.UseProviderId). | ||
| Bool("use_apiserver_cache", c.UseAPIServerCacheToListPods). | ||
| Int("heartbeat_interval", c.HeartbeatInterval). | ||
| Int("heartbeat_until", c.HeartbeatUntil). | ||
| Msg("aws-node-termination-handler arguments") | ||
| } | ||
|
|
||
|
|
@@ -383,7 +412,9 @@ func (c Config) PrintHumanConfigArgs() { | |
| "\tmanaged-tag: %s,\n"+ | ||
| "\tuse-provider-id: %t,\n"+ | ||
| "\taws-endpoint: %s,\n"+ | ||
| "\tuse-apiserver-cache: %t,\n", | ||
| "\tuse-apiserver-cache: %t,\n"+ | ||
| "\theartbeat-interval: %d,\n"+ | ||
| "\theartbeat-until: %d\n", | ||
| c.DryRun, | ||
| c.NodeName, | ||
| c.PodName, | ||
|
|
@@ -424,6 +455,8 @@ func (c Config) PrintHumanConfigArgs() { | |
| c.UseProviderId, | ||
| c.AWSEndpoint, | ||
| c.UseAPIServerCacheToListPods, | ||
| c.HeartbeatInterval, | ||
| c.HeartbeatUntil, | ||
| ) | ||
| } | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -15,11 +15,14 @@ package sqsevent | |
|
|
||
| import ( | ||
| "encoding/json" | ||
| "errors" | ||
| "fmt" | ||
| "time" | ||
|
|
||
| "github.com/aws/aws-node-termination-handler/pkg/monitor" | ||
| "github.com/aws/aws-node-termination-handler/pkg/node" | ||
| "github.com/aws/aws-sdk-go/aws" | ||
| "github.com/aws/aws-sdk-go/aws/awserr" | ||
| "github.com/aws/aws-sdk-go/service/autoscaling" | ||
| "github.com/aws/aws-sdk-go/service/sqs" | ||
| "github.com/rs/zerolog/log" | ||
|
|
@@ -95,26 +98,117 @@ func (m SQSMonitor) asgTerminationToInterruptionEvent(event *EventBridgeEvent, m | |
| Description: fmt.Sprintf("ASG Lifecycle Termination event received. Instance will be interrupted at %s \n", event.getTime()), | ||
| } | ||
|
|
||
| stopHeartbeatCh := make(chan struct{}) | ||
|
|
||
| interruptionEvent.PostDrainTask = func(interruptionEvent monitor.InterruptionEvent, _ node.Node) error { | ||
|
|
||
| _, err = m.continueLifecycleAction(lifecycleDetail) | ||
| if err != nil { | ||
| return fmt.Errorf("continuing ASG termination lifecycle: %w", err) | ||
| } | ||
| log.Info().Str("lifecycleHookName", lifecycleDetail.LifecycleHookName).Str("instanceID", lifecycleDetail.EC2InstanceID).Msg("Completed ASG Lifecycle Hook") | ||
|
|
||
| close(stopHeartbeatCh) | ||
| return m.deleteMessage(message) | ||
| } | ||
|
|
||
| interruptionEvent.PreDrainTask = func(interruptionEvent monitor.InterruptionEvent, n node.Node) error { | ||
| nthConfig := n.GetNthConfig() | ||
| if nthConfig.HeartbeatInterval != -1 && nthConfig.HeartbeatUntil != -1 { | ||
LikithaVemulapalli marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| go m.checkHeartbeatTimeout(nthConfig.HeartbeatInterval, lifecycleDetail) | ||
| go m.SendHeartbeats(nthConfig.HeartbeatInterval, nthConfig.HeartbeatUntil, lifecycleDetail, stopHeartbeatCh) | ||
| } | ||
|
|
||
| err := n.TaintASGLifecycleTermination(interruptionEvent.NodeName, interruptionEvent.EventID) | ||
| if err != nil { | ||
| log.Err(err).Msgf("Unable to taint node with taint %s:%s", node.ASGLifecycleTerminationTaint, interruptionEvent.EventID) | ||
| log.Err(err).Msgf("unable to taint node with taint %s:%s", node.ASGLifecycleTerminationTaint, interruptionEvent.EventID) | ||
| } | ||
| return nil | ||
| } | ||
|
|
||
| return &interruptionEvent, nil | ||
| } | ||
|
|
||
| // Compare the heartbeatInterval with the heartbeat timeout and warn if (heartbeatInterval >= heartbeat timeout) | ||
| func (m SQSMonitor) checkHeartbeatTimeout(heartbeatInterval int, lifecycleDetail *LifecycleDetail) { | ||
| input := &autoscaling.DescribeLifecycleHooksInput{ | ||
| AutoScalingGroupName: aws.String(lifecycleDetail.AutoScalingGroupName), | ||
| LifecycleHookNames: []*string{aws.String(lifecycleDetail.LifecycleHookName)}, | ||
| } | ||
|
|
||
| lifecyclehook, err := m.ASG.DescribeLifecycleHooks(input) | ||
| if err != nil { | ||
| log.Err(err).Msg("failed to describe lifecycle hook") | ||
| return | ||
| } | ||
|
|
||
| if len(lifecyclehook.LifecycleHooks) == 0 { | ||
| log.Warn(). | ||
| Str("asgName", lifecycleDetail.AutoScalingGroupName). | ||
| Str("lifecycleHookName", lifecycleDetail.LifecycleHookName). | ||
| Msg("Tried to check heartbeat timeout, but no lifecycle hook found from ASG") | ||
| return | ||
| } | ||
|
|
||
| heartbeatTimeout := int(*lifecyclehook.LifecycleHooks[0].HeartbeatTimeout) | ||
|
|
||
| if heartbeatInterval >= heartbeatTimeout { | ||
| log.Warn().Msgf("Heartbeat interval (%d seconds) is equal to or greater than the heartbeat timeout (%d seconds) for the lifecycle hook %s. The node would likely be terminated before the heartbeat is sent", heartbeatInterval, heartbeatTimeout, *lifecyclehook.LifecycleHooks[0].LifecycleHookName) | ||
|
||
| } | ||
| } | ||
|
|
||
| // Issue lifecycle heartbeats to reset the heartbeat timeout timer in ASG | ||
| func (m SQSMonitor) SendHeartbeats(heartbeatInterval int, heartbeatUntil int, lifecycleDetail *LifecycleDetail, stopCh <-chan struct{}) { | ||
| ticker := time.NewTicker(time.Duration(heartbeatInterval) * time.Second) | ||
| defer ticker.Stop() | ||
| timeout := time.After(time.Duration(heartbeatUntil) * time.Second) | ||
|
|
||
| for { | ||
| select { | ||
| case <-stopCh: | ||
LikithaVemulapalli marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| return | ||
| case <-ticker.C: | ||
| err := m.recordLifecycleActionHeartbeat(lifecycleDetail) | ||
| if err != nil { | ||
| log.Err(err).Msg("invalid heartbeat target, stopping heartbeat") | ||
| return | ||
| } | ||
| case <-timeout: | ||
| log.Info().Msg("Heartbeat deadline exceeded, stopping heartbeat") | ||
| return | ||
| } | ||
| } | ||
| } | ||
|
|
||
| func (m SQSMonitor) recordLifecycleActionHeartbeat(lifecycleDetail *LifecycleDetail) error { | ||
| input := &autoscaling.RecordLifecycleActionHeartbeatInput{ | ||
| AutoScalingGroupName: aws.String(lifecycleDetail.AutoScalingGroupName), | ||
| LifecycleHookName: aws.String(lifecycleDetail.LifecycleHookName), | ||
| LifecycleActionToken: aws.String(lifecycleDetail.LifecycleActionToken), | ||
| InstanceId: aws.String(lifecycleDetail.EC2InstanceID), | ||
| } | ||
|
|
||
| log.Info().Str("asgName", lifecycleDetail.AutoScalingGroupName). | ||
| Str("lifecycleHookName", lifecycleDetail.LifecycleHookName). | ||
| Str("lifecycleActionToken", lifecycleDetail.LifecycleActionToken). | ||
| Str("instanceID", lifecycleDetail.EC2InstanceID). | ||
| Msg("Sending lifecycle heartbeat") | ||
|
|
||
| // Stop the heartbeat if the target is invalid | ||
| _, err := m.ASG.RecordLifecycleActionHeartbeat(input) | ||
| if err != nil { | ||
| var awsErr awserr.Error | ||
| log.Warn().Err(err).Msg("Failed to send lifecycle heartbeat") | ||
| if errors.As(err, &awsErr) && awsErr.Code() == "ValidationError" { | ||
| return err | ||
| } | ||
| return nil | ||
| } | ||
|
|
||
| log.Info().Msg("Successfully sent lifecycle heartbeat") | ||
| return nil | ||
| } | ||
|
|
||
| func (m SQSMonitor) deleteMessage(message *sqs.Message) error { | ||
| errs := m.deleteMessages([]*sqs.Message{message}) | ||
| if errs != nil { | ||
|
|
@@ -123,7 +217,7 @@ func (m SQSMonitor) deleteMessage(message *sqs.Message) error { | |
| return nil | ||
| } | ||
|
|
||
| // Continues the lifecycle hook thereby indicating a successful action occured | ||
| // Continues the lifecycle hook thereby indicating a successful action occurred | ||
| func (m SQSMonitor) continueLifecycleAction(lifecycleDetail *LifecycleDetail) (*autoscaling.CompleteLifecycleActionOutput, error) { | ||
| return m.completeLifecycleAction(&autoscaling.CompleteLifecycleActionInput{ | ||
| AutoScalingGroupName: &lifecycleDetail.AutoScalingGroupName, | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.