Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 39 additions & 10 deletions cmd/kured/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,10 @@ var (
messageTemplateDrain string
messageTemplateReboot string
messageTemplateUncordon string
podSelectors []string
blockingPodSelectors []string
inhibitingPodSelectors []string
inhibitingNodeAnnotations []string
blockingNodeAnnotations []string
rebootCommand string
rebootSignal int
logFormat string
Expand Down Expand Up @@ -180,8 +183,14 @@ func main() {
"message template used to notify about a node being drained")
flag.StringVar(&messageTemplateReboot, "message-template-reboot", "Rebooting node %s",
"message template used to notify about a node being rebooted")
flag.StringArrayVar(&podSelectors, "blocking-pod-selector", nil,
flag.StringArrayVar(&blockingPodSelectors, "blocking-pod-selector", nil,
"label selector identifying pods whose presence should prevent reboots")
flag.StringArrayVar(&inhibitingPodSelectors, "inhibiting-pod-selector", nil,
"label selector identifying pods whose presence should prevent final reboots not draining of nodes")
flag.StringArrayVar(&blockingNodeAnnotations, "blocking-node-annotation", nil,
"node annotation whose presence should prevent node reboots")
flag.StringArrayVar(&inhibitingNodeAnnotations, "inhibiting-node-annotation", nil,
"node annotation whose presence should prevent final reboots not draining of nodes")
flag.StringSliceVar(&rebootDays, "reboot-days", timewindow.EveryDay,
"schedule reboot on these days")
flag.StringVar(&rebootStart, "start-time", "0:00",
Expand Down Expand Up @@ -225,7 +234,7 @@ func main() {
log.Infof("PreferNoSchedule taint: %s", preferNoScheduleTaintName)

// This should be printed from blocker list instead of only blocking pod selectors
log.Infof("Blocking Pod Selectors: %v", podSelectors)
log.Infof("Blocking Pod Selectors: %v", blockingPodSelectors)

log.Infof("Reboot period %v", period)
log.Infof("Concurrency: %v", concurrency)
Expand Down Expand Up @@ -266,9 +275,24 @@ func main() {
if prometheusURL != "" {
blockCheckers = append(blockCheckers, blockers.NewPrometheusBlockingChecker(papi.Config{Address: prometheusURL}, alertFilter.Regexp, alertFiringOnly, alertFilterMatchOnly))
}
if podSelectors != nil {
blockCheckers = append(blockCheckers, blockers.NewKubernetesBlockingChecker(client, nodeID, podSelectors))
if blockingPodSelectors != nil {
blockCheckers = append(blockCheckers, blockers.NewKubernetesBlockingChecker(client, nodeID, blockingPodSelectors))
}
if blockingNodeAnnotations != nil {
blockCheckers = append(blockCheckers, blockers.NewNodeBlockingChecker(client, nodeID, blockingNodeAnnotations))
}

// These prevent the rebooter to reboot the node, it will still drain the node.
// This is useful for cases in which you want to wait for a condition that is only met after draining the node.
var inhibitingBlockCheckers []blockers.RebootBlocker
if inhibitingNodeAnnotations != nil {
log.Info("Setup inhibiting blocker for node annotations")
inhibitingBlockCheckers = append(inhibitingBlockCheckers, blockers.NewNodeBlockingChecker(client, nodeID, inhibitingNodeAnnotations))
}
if inhibitingPodSelectors != nil {
inhibitingBlockCheckers = append(inhibitingBlockCheckers, blockers.NewKubernetesBlockingChecker(client, nodeID, inhibitingPodSelectors))
}

log.Infof("Lock Annotation: %s/%s:%s", dsNamespace, dsName, lockAnnotation)
if lockTTL > 0 {
log.Infof("Lock TTL set, lock will expire after: %v", lockTTL)
Expand All @@ -282,7 +306,7 @@ func main() {
}
lock := daemonsetlock.New(client, nodeID, dsNamespace, dsName, lockAnnotation, lockTTL, concurrency, lockReleaseDelay)

go rebootAsRequired(nodeID, rebooter, rebootChecker, blockCheckers, window, lock, client)
go rebootAsRequired(nodeID, rebooter, rebootChecker, blockCheckers, inhibitingBlockCheckers, window, lock, client)
go maintainRebootRequiredMetric(nodeID, rebootChecker)

http.Handle("/metrics", promhttp.Handler())
Expand Down Expand Up @@ -553,7 +577,7 @@ func updateNodeLabels(client *kubernetes.Clientset, node *v1.Node, labels []stri
}
}

func rebootAsRequired(nodeID string, rebooter reboot.Rebooter, checker checkers.Checker, blockCheckers []blockers.RebootBlocker, window *timewindow.TimeWindow, lock daemonsetlock.Lock, client *kubernetes.Clientset) {
func rebootAsRequired(nodeID string, rebooter reboot.Rebooter, checker checkers.Checker, blockCheckers []blockers.RebootBlocker, inhibitingBlockCheckers []blockers.RebootBlocker, window *timewindow.TimeWindow, lock daemonsetlock.Lock, client *kubernetes.Clientset) {

source := rand.NewSource(time.Now().UnixNano())
tick := delaytick.New(source, 1*time.Minute)
Expand Down Expand Up @@ -652,12 +676,11 @@ func rebootAsRequired(nodeID string, rebooter reboot.Rebooter, checker checkers.
}
}

var rebootRequiredBlockCondition string
if blockers.RebootBlocked(blockCheckers...) {
rebootRequiredBlockCondition = ", but blocked at this time"
log.Info("Reboot required, but blocked at this time")
continue
}
log.Infof("Reboot required%s", rebootRequiredBlockCondition)
log.Info("Reboot required")

holding, _, err := lock.Holding()
if err != nil {
Expand Down Expand Up @@ -691,6 +714,11 @@ func rebootAsRequired(nodeID string, rebooter reboot.Rebooter, checker checkers.
}
}

if blockers.RebootBlocked(inhibitingBlockCheckers...) {
log.Info("Reboot required, but blocked by inhibiting blockers")
continue
}

if rebootDelay > 0 {
log.Infof("Delaying reboot for %v", rebootDelay)
time.Sleep(rebootDelay)
Expand All @@ -701,6 +729,7 @@ func rebootAsRequired(nodeID string, rebooter reboot.Rebooter, checker checkers.
log.Warnf("Error notifying: %v", err)
}
}

log.Infof("Triggering reboot for node %v", nodeID)

err = rebooter.Reboot()
Expand Down
49 changes: 49 additions & 0 deletions pkg/blockers/nodeannotation.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package blockers

import (
"context"

log "github.com/sirupsen/logrus"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
)

// Compile-time checks to ensure the type implements the interface
var (
_ RebootBlocker = (*NodeBlockingChecker)(nil)
)

// NodeBlockingChecker contains info for connecting
// to k8s, and can give info about whether a reboot should be blocked
type NodeBlockingChecker struct {
// client used to contact kubernetes API
client *kubernetes.Clientset
nodeName string
// lised used to filter pods (podSelector)
filter []string
}

func NewNodeBlockingChecker(client *kubernetes.Clientset, nodename string, nodeAnnotations []string) *NodeBlockingChecker {
return &NodeBlockingChecker{
client: client,
nodeName: nodename,
filter: nodeAnnotations,
}
}

// IsBlocked for the NodeBlockingChecker will check if a pod, for the node, is preventing
// the reboot. It will warn in the logs about blocking, but does not return an error.
func (kb *NodeBlockingChecker) IsBlocked() bool {
node, err := kb.client.CoreV1().Nodes().Get(context.TODO(), kb.nodeName, metav1.GetOptions{})
if err != nil {
log.Warnf("Reboot blocked: node query error: %v", err)
return true
}
for _, annotation := range kb.filter {
if _, exists := node.Annotations[annotation]; exists {
log.Warnf("Reboot blocked: node annotation %s exists.", annotation)
return true
}
}
return false
}