Skip to content

Commit e526a27

Browse files
authored
Merge pull request kubernetes#116388 from mxpv/shutdown
Clean/refactor node shutdown manager
2 parents aa8f287 + 449f86b commit e526a27

File tree

7 files changed

+515
-458
lines changed

7 files changed

+515
-458
lines changed

pkg/kubelet/kubelet.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -930,7 +930,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
930930
util.SetNodeOwnerFunc(klet.heartbeatClient, string(klet.nodeName)))
931931

932932
// setup node shutdown manager
933-
shutdownManager, shutdownAdmitHandler := nodeshutdown.NewManager(&nodeshutdown.Config{
933+
shutdownManager := nodeshutdown.NewManager(&nodeshutdown.Config{
934934
Logger: logger,
935935
ProbeManager: klet.probeManager,
936936
VolumeManager: klet.volumeManager,
@@ -949,7 +949,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
949949
if err != nil {
950950
return nil, fmt.Errorf("create user namespace manager: %w", err)
951951
}
952-
klet.admitHandlers.AddPodAdmitHandler(shutdownAdmitHandler)
952+
klet.admitHandlers.AddPodAdmitHandler(shutdownManager)
953953

954954
// Finally, put the most recent version of the config on the Kubelet, so
955955
// people can see how it was configured.

pkg/kubelet/kubelet_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -351,7 +351,7 @@ func newTestKubeletWithImageList(
351351
kubelet.admitHandlers.AddPodAdmitHandler(evictionAdmitHandler)
352352

353353
// setup shutdown manager
354-
shutdownManager, shutdownAdmitHandler := nodeshutdown.NewManager(&nodeshutdown.Config{
354+
shutdownManager := nodeshutdown.NewManager(&nodeshutdown.Config{
355355
Logger: logger,
356356
ProbeManager: kubelet.probeManager,
357357
Recorder: fakeRecorder,
@@ -363,7 +363,7 @@ func newTestKubeletWithImageList(
363363
ShutdownGracePeriodCriticalPods: 0,
364364
})
365365
kubelet.shutdownManager = shutdownManager
366-
kubelet.admitHandlers.AddPodAdmitHandler(shutdownAdmitHandler)
366+
kubelet.admitHandlers.AddPodAdmitHandler(shutdownManager)
367367

368368
// Add this as cleanup predicate pod admitter
369369
kubelet.admitHandlers.AddPodAdmitHandler(lifecycle.NewPredicateAdmitHandler(kubelet.getNodeAnyWay, lifecycle.NewAdmissionFailureHandlerStub(), kubelet.containerManager.UpdatePluginResources))

pkg/kubelet/nodeshutdown/nodeshutdown_manager.go

Lines changed: 218 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,19 @@ limitations under the License.
1717
package nodeshutdown
1818

1919
import (
20+
"context"
21+
"fmt"
22+
"sort"
23+
"sync"
2024
"time"
2125

2226
v1 "k8s.io/api/core/v1"
27+
utilfeature "k8s.io/apiserver/pkg/util/feature"
2328
"k8s.io/client-go/tools/record"
2429
"k8s.io/klog/v2"
30+
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
31+
"k8s.io/kubernetes/pkg/apis/scheduling"
32+
"k8s.io/kubernetes/pkg/features"
2533
kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config"
2634
"k8s.io/kubernetes/pkg/kubelet/eviction"
2735
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
@@ -32,6 +40,8 @@ import (
3240

3341
// Manager interface provides methods for Kubelet to manage node shutdown.
3442
type Manager interface {
43+
lifecycle.PodAdmitHandler
44+
3545
Admit(attrs *lifecycle.PodAdmitAttributes) lifecycle.PodAdmitResult
3646
Start() error
3747
ShutdownStatus() error
@@ -71,3 +81,211 @@ func (managerStub) Start() error {
7181
func (managerStub) ShutdownStatus() error {
7282
return nil
7383
}
84+
85+
const (
86+
nodeShutdownReason = "Terminated"
87+
nodeShutdownMessage = "Pod was terminated in response to imminent node shutdown."
88+
)
89+
90+
// podManager is responsible for killing active pods by priority.
91+
type podManager struct {
92+
logger klog.Logger
93+
shutdownGracePeriodByPodPriority []kubeletconfig.ShutdownGracePeriodByPodPriority
94+
clock clock.Clock
95+
killPodFunc eviction.KillPodFunc
96+
volumeManager volumemanager.VolumeManager
97+
}
98+
99+
func newPodManager(conf *Config) *podManager {
100+
shutdownGracePeriodByPodPriority := conf.ShutdownGracePeriodByPodPriority
101+
102+
// Migration from the original configuration
103+
if !utilfeature.DefaultFeatureGate.Enabled(features.GracefulNodeShutdownBasedOnPodPriority) ||
104+
len(shutdownGracePeriodByPodPriority) == 0 {
105+
shutdownGracePeriodByPodPriority = migrateConfig(conf.ShutdownGracePeriodRequested, conf.ShutdownGracePeriodCriticalPods)
106+
}
107+
108+
// Sort by priority from low to high
109+
sort.Slice(shutdownGracePeriodByPodPriority, func(i, j int) bool {
110+
return shutdownGracePeriodByPodPriority[i].Priority < shutdownGracePeriodByPodPriority[j].Priority
111+
})
112+
113+
if conf.Clock == nil {
114+
conf.Clock = clock.RealClock{}
115+
}
116+
117+
return &podManager{
118+
logger: conf.Logger,
119+
shutdownGracePeriodByPodPriority: shutdownGracePeriodByPodPriority,
120+
clock: conf.Clock,
121+
killPodFunc: conf.KillPodFunc,
122+
volumeManager: conf.VolumeManager,
123+
}
124+
}
125+
126+
// killPods terminates pods by priority.
127+
func (m *podManager) killPods(activePods []*v1.Pod) error {
128+
groups := groupByPriority(m.shutdownGracePeriodByPodPriority, activePods)
129+
for _, group := range groups {
130+
// If there are no pods in a particular range,
131+
// then do not wait for pods in that priority range.
132+
if len(group.Pods) == 0 {
133+
continue
134+
}
135+
136+
var wg sync.WaitGroup
137+
wg.Add(len(group.Pods))
138+
for _, pod := range group.Pods {
139+
go func(pod *v1.Pod, group podShutdownGroup) {
140+
defer wg.Done()
141+
142+
gracePeriodOverride := group.ShutdownGracePeriodSeconds
143+
144+
// If the pod's spec specifies a termination gracePeriod which is less than the gracePeriodOverride calculated, use the pod spec termination gracePeriod.
145+
if pod.Spec.TerminationGracePeriodSeconds != nil && *pod.Spec.TerminationGracePeriodSeconds <= gracePeriodOverride {
146+
gracePeriodOverride = *pod.Spec.TerminationGracePeriodSeconds
147+
}
148+
149+
m.logger.V(1).Info("Shutdown manager killing pod with gracePeriod", "pod", klog.KObj(pod), "gracePeriod", gracePeriodOverride)
150+
151+
if err := m.killPodFunc(pod, false, &gracePeriodOverride, func(status *v1.PodStatus) {
152+
// set the pod status to failed (unless it was already in a successful terminal phase)
153+
if status.Phase != v1.PodSucceeded {
154+
status.Phase = v1.PodFailed
155+
}
156+
status.Message = nodeShutdownMessage
157+
status.Reason = nodeShutdownReason
158+
podutil.UpdatePodCondition(status, &v1.PodCondition{
159+
Type: v1.DisruptionTarget,
160+
Status: v1.ConditionTrue,
161+
Reason: v1.PodReasonTerminationByKubelet,
162+
Message: nodeShutdownMessage,
163+
})
164+
}); err != nil {
165+
m.logger.V(1).Info("Shutdown manager failed killing pod", "pod", klog.KObj(pod), "err", err)
166+
} else {
167+
m.logger.V(1).Info("Shutdown manager finished killing pod", "pod", klog.KObj(pod))
168+
}
169+
}(pod, group)
170+
}
171+
172+
// This duration determines how long the shutdown manager will wait for the pods in this group
173+
// to terminate before proceeding to the next group.
174+
var groupTerminationWaitDuration = time.Duration(group.ShutdownGracePeriodSeconds) * time.Second
175+
var (
176+
doneCh = make(chan struct{})
177+
timer = m.clock.NewTimer(groupTerminationWaitDuration)
178+
ctx, ctxCancel = context.WithTimeout(context.Background(), groupTerminationWaitDuration)
179+
)
180+
go func() {
181+
defer close(doneCh)
182+
defer ctxCancel()
183+
wg.Wait()
184+
// The signal to kill a Pod was sent successfully to all the pods,
185+
// let's wait until all the volumes are unmounted from all the pods before
186+
// continuing to the next group. This is done so that the CSI Driver (assuming
187+
// that it's part of the highest group) has a chance to perform unmounts.
188+
if err := m.volumeManager.WaitForAllPodsUnmount(ctx, group.Pods); err != nil {
189+
var podIdentifiers []string
190+
for _, pod := range group.Pods {
191+
podIdentifiers = append(podIdentifiers, fmt.Sprintf("%s/%s", pod.Namespace, pod.Name))
192+
}
193+
194+
// Waiting for volume teardown is done on a best basis effort,
195+
// report an error and continue.
196+
//
197+
// Depending on the user provided kubelet configuration value
198+
// either the `timer` will tick and we'll continue to shutdown the next group, or,
199+
// WaitForAllPodsUnmount will timeout, therefore this goroutine
200+
// will close doneCh and we'll continue to shutdown the next group.
201+
m.logger.Error(err, "Failed while waiting for all the volumes belonging to Pods in this group to unmount", "pods", podIdentifiers)
202+
}
203+
}()
204+
205+
select {
206+
case <-doneCh:
207+
timer.Stop()
208+
m.logger.V(1).Info("Done waiting for all pods in group to terminate", "gracePeriod", group.ShutdownGracePeriodSeconds, "priority", group.Priority)
209+
case <-timer.C():
210+
ctxCancel()
211+
m.logger.V(1).Info("Shutdown manager pod killing time out", "gracePeriod", group.ShutdownGracePeriodSeconds, "priority", group.Priority)
212+
}
213+
}
214+
215+
return nil
216+
}
217+
218+
func (m *podManager) periodRequested() time.Duration {
219+
var sum int64
220+
for _, period := range m.shutdownGracePeriodByPodPriority {
221+
sum += period.ShutdownGracePeriodSeconds
222+
}
223+
return time.Duration(sum) * time.Second
224+
}
225+
226+
func migrateConfig(shutdownGracePeriodRequested, shutdownGracePeriodCriticalPods time.Duration) []kubeletconfig.ShutdownGracePeriodByPodPriority {
227+
if shutdownGracePeriodRequested == 0 {
228+
return nil
229+
}
230+
defaultPriority := shutdownGracePeriodRequested - shutdownGracePeriodCriticalPods
231+
if defaultPriority < 0 {
232+
return nil
233+
}
234+
criticalPriority := shutdownGracePeriodRequested - defaultPriority
235+
if criticalPriority < 0 {
236+
return nil
237+
}
238+
return []kubeletconfig.ShutdownGracePeriodByPodPriority{
239+
{
240+
Priority: scheduling.DefaultPriorityWhenNoDefaultClassExists,
241+
ShutdownGracePeriodSeconds: int64(defaultPriority / time.Second),
242+
},
243+
{
244+
Priority: scheduling.SystemCriticalPriority,
245+
ShutdownGracePeriodSeconds: int64(criticalPriority / time.Second),
246+
},
247+
}
248+
}
249+
250+
type podShutdownGroup struct {
251+
kubeletconfig.ShutdownGracePeriodByPodPriority
252+
Pods []*v1.Pod
253+
}
254+
255+
func groupByPriority(shutdownGracePeriodByPodPriority []kubeletconfig.ShutdownGracePeriodByPodPriority, pods []*v1.Pod) []podShutdownGroup {
256+
groups := make([]podShutdownGroup, 0, len(shutdownGracePeriodByPodPriority))
257+
for _, period := range shutdownGracePeriodByPodPriority {
258+
groups = append(groups, podShutdownGroup{
259+
ShutdownGracePeriodByPodPriority: period,
260+
})
261+
}
262+
263+
for _, pod := range pods {
264+
var priority int32
265+
if pod.Spec.Priority != nil {
266+
priority = *pod.Spec.Priority
267+
}
268+
269+
// Find the group index according to the priority.
270+
index := sort.Search(len(groups), func(i int) bool {
271+
return groups[i].Priority >= priority
272+
})
273+
274+
// 1. Those higher than the highest priority default to the highest priority
275+
// 2. Those lower than the lowest priority default to the lowest priority
276+
// 3. Those boundary priority default to the lower priority
277+
// if priority of pod is:
278+
// groups[index-1].Priority <= pod priority < groups[index].Priority
279+
// in which case we want to pick lower one (i.e index-1)
280+
if index == len(groups) {
281+
index = len(groups) - 1
282+
} else if index < 0 {
283+
index = 0
284+
} else if index > 0 && groups[index].Priority > priority {
285+
index--
286+
}
287+
288+
groups[index].Pods = append(groups[index].Pods, pod)
289+
}
290+
return groups
291+
}

0 commit comments

Comments
 (0)