diff --git a/cmd/kured/main.go b/cmd/kured/main.go index a6b1c0e82..2586aefce 100644 --- a/cmd/kured/main.go +++ b/cmd/kured/main.go @@ -70,7 +70,10 @@ var ( messageTemplateDrain string messageTemplateReboot string messageTemplateUncordon string - podSelectors []string + blockingPodSelectors []string + inhibitingPodSelectors []string + inhibitingNodeAnnotations []string + blockingNodeAnnotations []string rebootCommand string rebootSignal int logFormat string @@ -180,8 +183,14 @@ func main() { "message template used to notify about a node being drained") flag.StringVar(&messageTemplateReboot, "message-template-reboot", "Rebooting node %s", "message template used to notify about a node being rebooted") - flag.StringArrayVar(&podSelectors, "blocking-pod-selector", nil, + flag.StringArrayVar(&blockingPodSelectors, "blocking-pod-selector", nil, "label selector identifying pods whose presence should prevent reboots") + flag.StringArrayVar(&inhibitingPodSelectors, "inhibiting-pod-selector", nil, + "label selector identifying pods whose presence should prevent final reboots not draining of nodes") + flag.StringArrayVar(&blockingNodeAnnotations, "blocking-node-annotation", nil, + "node annotation whose presence should prevent node reboots") + flag.StringArrayVar(&inhibitingNodeAnnotations, "inhibiting-node-annotation", nil, + "node annotation whose presence should prevent final reboots not draining of nodes") flag.StringSliceVar(&rebootDays, "reboot-days", timewindow.EveryDay, "schedule reboot on these days") flag.StringVar(&rebootStart, "start-time", "0:00", @@ -225,7 +234,7 @@ func main() { log.Infof("PreferNoSchedule taint: %s", preferNoScheduleTaintName) // This should be printed from blocker list instead of only blocking pod selectors - log.Infof("Blocking Pod Selectors: %v", podSelectors) + log.Infof("Blocking Pod Selectors: %v", blockingPodSelectors) log.Infof("Reboot period %v", period) log.Infof("Concurrency: %v", concurrency) @@ -266,9 +275,24 @@ func main() { if prometheusURL != "" { blockCheckers = append(blockCheckers, blockers.NewPrometheusBlockingChecker(papi.Config{Address: prometheusURL}, alertFilter.Regexp, alertFiringOnly, alertFilterMatchOnly)) } - if podSelectors != nil { - blockCheckers = append(blockCheckers, blockers.NewKubernetesBlockingChecker(client, nodeID, podSelectors)) + if blockingPodSelectors != nil { + blockCheckers = append(blockCheckers, blockers.NewKubernetesBlockingChecker(client, nodeID, blockingPodSelectors)) } + if blockingNodeAnnotations != nil { + blockCheckers = append(blockCheckers, blockers.NewNodeBlockingChecker(client, nodeID, blockingNodeAnnotations)) + } + + // These prevent the rebooter to reboot the node, it will still drain the node. + // This is useful for cases in which you want to wait for a condition that is only met after draining the node. + var inhibitingBlockCheckers []blockers.RebootBlocker + if inhibitingNodeAnnotations != nil { + log.Info("Setup inhibiting blocker for node annotations") + inhibitingBlockCheckers = append(inhibitingBlockCheckers, blockers.NewNodeBlockingChecker(client, nodeID, inhibitingNodeAnnotations)) + } + if inhibitingPodSelectors != nil { + inhibitingBlockCheckers = append(inhibitingBlockCheckers, blockers.NewKubernetesBlockingChecker(client, nodeID, inhibitingPodSelectors)) + } + log.Infof("Lock Annotation: %s/%s:%s", dsNamespace, dsName, lockAnnotation) if lockTTL > 0 { log.Infof("Lock TTL set, lock will expire after: %v", lockTTL) @@ -282,7 +306,7 @@ func main() { } lock := daemonsetlock.New(client, nodeID, dsNamespace, dsName, lockAnnotation, lockTTL, concurrency, lockReleaseDelay) - go rebootAsRequired(nodeID, rebooter, rebootChecker, blockCheckers, window, lock, client) + go rebootAsRequired(nodeID, rebooter, rebootChecker, blockCheckers, inhibitingBlockCheckers, window, lock, client) go maintainRebootRequiredMetric(nodeID, rebootChecker) http.Handle("/metrics", promhttp.Handler()) @@ -553,7 +577,7 @@ func updateNodeLabels(client *kubernetes.Clientset, node *v1.Node, labels []stri } } -func rebootAsRequired(nodeID string, rebooter reboot.Rebooter, checker checkers.Checker, blockCheckers []blockers.RebootBlocker, window *timewindow.TimeWindow, lock daemonsetlock.Lock, client *kubernetes.Clientset) { +func rebootAsRequired(nodeID string, rebooter reboot.Rebooter, checker checkers.Checker, blockCheckers []blockers.RebootBlocker, inhibitingBlockCheckers []blockers.RebootBlocker, window *timewindow.TimeWindow, lock daemonsetlock.Lock, client *kubernetes.Clientset) { source := rand.NewSource(time.Now().UnixNano()) tick := delaytick.New(source, 1*time.Minute) @@ -652,12 +676,11 @@ func rebootAsRequired(nodeID string, rebooter reboot.Rebooter, checker checkers. } } - var rebootRequiredBlockCondition string if blockers.RebootBlocked(blockCheckers...) { - rebootRequiredBlockCondition = ", but blocked at this time" + log.Info("Reboot required, but blocked at this time") continue } - log.Infof("Reboot required%s", rebootRequiredBlockCondition) + log.Info("Reboot required") holding, _, err := lock.Holding() if err != nil { @@ -691,6 +714,11 @@ func rebootAsRequired(nodeID string, rebooter reboot.Rebooter, checker checkers. } } + if blockers.RebootBlocked(inhibitingBlockCheckers...) { + log.Info("Reboot required, but blocked by inhibiting blockers") + continue + } + if rebootDelay > 0 { log.Infof("Delaying reboot for %v", rebootDelay) time.Sleep(rebootDelay) @@ -701,6 +729,7 @@ func rebootAsRequired(nodeID string, rebooter reboot.Rebooter, checker checkers. log.Warnf("Error notifying: %v", err) } } + log.Infof("Triggering reboot for node %v", nodeID) err = rebooter.Reboot() diff --git a/pkg/blockers/nodeannotation.go b/pkg/blockers/nodeannotation.go new file mode 100644 index 000000000..d3bddfeca --- /dev/null +++ b/pkg/blockers/nodeannotation.go @@ -0,0 +1,49 @@ +package blockers + +import ( + "context" + + log "github.com/sirupsen/logrus" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" +) + +// Compile-time checks to ensure the type implements the interface +var ( + _ RebootBlocker = (*NodeBlockingChecker)(nil) +) + +// NodeBlockingChecker contains info for connecting +// to k8s, and can give info about whether a reboot should be blocked +type NodeBlockingChecker struct { + // client used to contact kubernetes API + client *kubernetes.Clientset + nodeName string + // lised used to filter pods (podSelector) + filter []string +} + +func NewNodeBlockingChecker(client *kubernetes.Clientset, nodename string, nodeAnnotations []string) *NodeBlockingChecker { + return &NodeBlockingChecker{ + client: client, + nodeName: nodename, + filter: nodeAnnotations, + } +} + +// IsBlocked for the NodeBlockingChecker will check if a pod, for the node, is preventing +// the reboot. It will warn in the logs about blocking, but does not return an error. +func (kb *NodeBlockingChecker) IsBlocked() bool { + node, err := kb.client.CoreV1().Nodes().Get(context.TODO(), kb.nodeName, metav1.GetOptions{}) + if err != nil { + log.Warnf("Reboot blocked: node query error: %v", err) + return true + } + for _, annotation := range kb.filter { + if _, exists := node.Annotations[annotation]; exists { + log.Warnf("Reboot blocked: node annotation %s exists.", annotation) + return true + } + } + return false +}