Skip to content

Commit 93e76f5

Browse files
authored
Merge pull request kubernetes#92442 from tedyu/grace-period-with-map
Respect grace period when removing mirror pod
2 parents c9c5c23 + a76a959 commit 93e76f5

File tree

8 files changed

+279
-37
lines changed

8 files changed

+279
-37
lines changed

pkg/kubelet/kubelet.go

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -134,11 +134,6 @@ const (
134134
// MaxContainerBackOff is the max backoff period, exported for the e2e test
135135
MaxContainerBackOff = 300 * time.Second
136136

137-
// Capacity of the channel for storing pods to kill. A small number should
138-
// suffice because a goroutine is dedicated to check the channel and does
139-
// not block on anything else.
140-
podKillingChannelCapacity = 50
141-
142137
// Period for performing global cleanup tasks.
143138
housekeepingPeriod = time.Second * 2
144139

@@ -748,7 +743,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
748743
klet.podWorkers = newPodWorkers(klet.syncPod, kubeDeps.Recorder, klet.workQueue, klet.resyncInterval, backOffPeriod, klet.podCache)
749744

750745
klet.backOff = flowcontrol.NewBackOff(backOffPeriod, MaxContainerBackOff)
751-
klet.podKillingCh = make(chan *kubecontainer.PodPair, podKillingChannelCapacity)
746+
klet.podKiller = NewPodKiller(klet)
752747

753748
// setup eviction manager
754749
evictionManager, evictionAdmitHandler := eviction.NewManager(klet.resourceAnalyzer, evictionConfig, killPodNow(klet.podWorkers, kubeDeps.Recorder), klet.podManager.GetMirrorPodByPod, klet.imageManager, klet.containerGC, kubeDeps.Recorder, nodeRef, klet.clock)
@@ -1036,8 +1031,8 @@ type Kubelet struct {
10361031
// Container restart Backoff
10371032
backOff *flowcontrol.Backoff
10381033

1039-
// Channel for sending pods to kill.
1040-
podKillingCh chan *kubecontainer.PodPair
1034+
// Pod killer handles pods to be killed
1035+
podKiller PodKiller
10411036

10421037
// Information about the ports which are opened by daemons on Node running this Kubelet server.
10431038
daemonEndpoints *v1.NodeDaemonEndpoints
@@ -1346,7 +1341,7 @@ func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
13461341

13471342
// Start a goroutine responsible for killing pods (that are not properly
13481343
// handled by pod workers).
1349-
go wait.Until(kl.podKiller, 1*time.Second, wait.NeverStop)
1344+
go wait.Until(kl.podKiller.PerformPodKillingWork, 1*time.Second, wait.NeverStop)
13501345

13511346
// Start component sync loops.
13521347
kl.statusManager.Start()
@@ -1671,7 +1666,7 @@ func (kl *Kubelet) deletePod(pod *v1.Pod) error {
16711666
}
16721667
podPair := kubecontainer.PodPair{APIPod: pod, RunningPod: &runningPod}
16731668

1674-
kl.podKillingCh <- &podPair
1669+
kl.podKiller.KillPod(&podPair)
16751670
// TODO: delete the mirror pod here?
16761671

16771672
// We leave the volume/directory cleanup to the periodic cleanup routine.
@@ -2006,6 +2001,9 @@ func (kl *Kubelet) HandlePodRemoves(pods []*v1.Pod) {
20062001
kl.handleMirrorPod(pod, start)
20072002
continue
20082003
}
2004+
if _, ok := kl.podManager.GetMirrorPodByPod(pod); ok {
2005+
kl.podKiller.MarkMirrorPodPendingTermination(pod)
2006+
}
20092007
// Deletion is allowed to fail because the periodic cleanup routine
20102008
// will trigger deletion again.
20112009
if err := kl.deletePod(pod); err != nil {

pkg/kubelet/kubelet_pods.go

Lines changed: 124 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,11 @@ import (
7171
const (
7272
managedHostsHeader = "# Kubernetes-managed hosts file.\n"
7373
managedHostsHeaderWithHostNetwork = "# Kubernetes-managed hosts file (host network).\n"
74+
75+
// Capacity of the channel for storing pods to kill. A small number should
76+
// suffice because a goroutine is dedicated to check the channel and does
77+
// not block on anything else.
78+
podKillingChannelCapacity = 50
7479
)
7580

7681
// Get a list of pods that have data directories.
@@ -1020,6 +1025,23 @@ func (kl *Kubelet) removeOrphanedPodStatuses(pods []*v1.Pod, mirrorPods []*v1.Po
10201025
kl.statusManager.RemoveOrphanedStatuses(podUIDs)
10211026
}
10221027

1028+
// deleteOrphanedMirrorPods checks whether pod killer has done with orphaned mirror pod.
1029+
// If pod killing is done, podManager.DeleteMirrorPod() is called to delete mirror pod
1030+
// from the API server
1031+
func (kl *Kubelet) deleteOrphanedMirrorPods() {
1032+
podFullNames := kl.podManager.GetOrphanedMirrorPodNames()
1033+
for _, podFullname := range podFullNames {
1034+
if !kl.podKiller.IsMirrorPodPendingTerminationByPodName(podFullname) {
1035+
_, err := kl.podManager.DeleteMirrorPod(podFullname, nil)
1036+
if err != nil {
1037+
klog.Errorf("encountered error when deleting mirror pod %q : %v", podFullname, err)
1038+
} else {
1039+
klog.V(3).Infof("deleted pod %q", podFullname)
1040+
}
1041+
}
1042+
}
1043+
}
1044+
10231045
// HandlePodCleanups performs a series of cleanup work, including terminating
10241046
// pod workers, killing unwanted pods, and removing orphaned volumes/pod
10251047
// directories.
@@ -1071,7 +1093,7 @@ func (kl *Kubelet) HandlePodCleanups() error {
10711093
}
10721094
for _, pod := range runningPods {
10731095
if _, found := desiredPods[pod.ID]; !found {
1074-
kl.podKillingCh <- &kubecontainer.PodPair{APIPod: nil, RunningPod: pod}
1096+
kl.podKiller.KillPod(&kubecontainer.PodPair{APIPod: nil, RunningPod: pod})
10751097
}
10761098
}
10771099

@@ -1099,24 +1121,112 @@ func (kl *Kubelet) HandlePodCleanups() error {
10991121
}
11001122

11011123
// Remove any orphaned mirror pods.
1102-
kl.podManager.DeleteOrphanedMirrorPods()
1124+
kl.deleteOrphanedMirrorPods()
11031125

11041126
// Remove any cgroups in the hierarchy for pods that are no longer running.
11051127
if kl.cgroupsPerQOS {
1106-
kl.cleanupOrphanedPodCgroups(cgroupPods, activePods)
1128+
pcm := kl.containerManager.NewPodContainerManager()
1129+
kl.cleanupOrphanedPodCgroups(pcm, cgroupPods, activePods)
11071130
}
11081131

11091132
kl.backOff.GC()
11101133
return nil
11111134
}
11121135

1113-
// podKiller launches a goroutine to kill a pod received from the channel if
1136+
// PodKiller handles requests for killing pods
1137+
type PodKiller interface {
1138+
// KillPod receives pod speficier representing the pod to kill
1139+
KillPod(pair *kubecontainer.PodPair)
1140+
// PerformPodKillingWork performs the actual pod killing work via calling CRI
1141+
// It returns after its Close() func is called and all outstanding pod killing requests are served
1142+
PerformPodKillingWork()
1143+
// After Close() is called, this pod killer wouldn't accept any more pod killing requests
1144+
Close()
1145+
// IsMirrorPodPendingTerminationByPodName checks whether the mirror pod for the given full pod name is pending termination
1146+
IsMirrorPodPendingTerminationByPodName(podFullname string) bool
1147+
// IsMirrorPodPendingTerminationByUID checks whether the mirror pod for the given uid is pending termination
1148+
IsMirrorPodPendingTerminationByUID(uid types.UID) bool
1149+
// MarkMirrorPodPendingTermination marks the mirror pod entering grace period of termination
1150+
MarkMirrorPodPendingTermination(pod *v1.Pod)
1151+
}
1152+
1153+
// podKillerWithChannel is an implementation of PodKiller which receives pod killing requests via channel
1154+
type podKillerWithChannel struct {
1155+
// Channel for getting pods to kill.
1156+
podKillingCh chan *kubecontainer.PodPair
1157+
// lock for synchronization between HandlePodCleanups and pod killer
1158+
podKillingLock *sync.Mutex
1159+
// mirrorPodTerminationMap keeps track of the progress of mirror pod termination
1160+
// The key is the UID of the pod and the value is the full name of the pod
1161+
mirrorPodTerminationMap map[string]string
1162+
// killPod is the func which invokes runtime to kill the pod
1163+
killPod func(pod *v1.Pod, runningPod *kubecontainer.Pod, status *kubecontainer.PodStatus, gracePeriodOverride *int64) error
1164+
}
1165+
1166+
// NewPodKiller returns a functional PodKiller
1167+
func NewPodKiller(kl *Kubelet) PodKiller {
1168+
podKiller := &podKillerWithChannel{
1169+
podKillingCh: make(chan *kubecontainer.PodPair, podKillingChannelCapacity),
1170+
podKillingLock: &sync.Mutex{},
1171+
mirrorPodTerminationMap: make(map[string]string),
1172+
killPod: kl.killPod,
1173+
}
1174+
return podKiller
1175+
}
1176+
1177+
// IsMirrorPodPendingTerminationByUID checks whether the pod for the given uid is pending termination
1178+
func (pk *podKillerWithChannel) IsMirrorPodPendingTerminationByUID(uid types.UID) bool {
1179+
pk.podKillingLock.Lock()
1180+
defer pk.podKillingLock.Unlock()
1181+
_, ok := pk.mirrorPodTerminationMap[string(uid)]
1182+
return ok
1183+
}
1184+
1185+
// IsMirrorPodPendingTerminationByPodName checks whether the given pod is in grace period of termination
1186+
func (pk *podKillerWithChannel) IsMirrorPodPendingTerminationByPodName(podFullname string) bool {
1187+
pk.podKillingLock.Lock()
1188+
defer pk.podKillingLock.Unlock()
1189+
for _, name := range pk.mirrorPodTerminationMap {
1190+
if name == podFullname {
1191+
return true
1192+
}
1193+
}
1194+
return false
1195+
}
1196+
1197+
func (pk *podKillerWithChannel) markMirrorPodTerminated(uid string) {
1198+
pk.podKillingLock.Lock()
1199+
klog.V(4).Infof("marking pod termination %q", uid)
1200+
delete(pk.mirrorPodTerminationMap, uid)
1201+
pk.podKillingLock.Unlock()
1202+
}
1203+
1204+
// MarkMirrorPodPendingTermination marks the pod entering grace period of termination
1205+
func (pk *podKillerWithChannel) MarkMirrorPodPendingTermination(pod *v1.Pod) {
1206+
fullname := kubecontainer.GetPodFullName(pod)
1207+
klog.V(3).Infof("marking pod pending termination %q", string(pod.UID))
1208+
pk.podKillingLock.Lock()
1209+
pk.mirrorPodTerminationMap[string(pod.UID)] = fullname
1210+
pk.podKillingLock.Unlock()
1211+
}
1212+
1213+
// Close closes the channel through which requests are delivered
1214+
func (pk *podKillerWithChannel) Close() {
1215+
close(pk.podKillingCh)
1216+
}
1217+
1218+
// KillPod sends pod killing request to the killer
1219+
func (pk *podKillerWithChannel) KillPod(pair *kubecontainer.PodPair) {
1220+
pk.podKillingCh <- pair
1221+
}
1222+
1223+
// PerformPodKillingWork launches a goroutine to kill a pod received from the channel if
11141224
// another goroutine isn't already in action.
1115-
func (kl *Kubelet) podKiller() {
1225+
func (pk *podKillerWithChannel) PerformPodKillingWork() {
11161226
killing := sets.NewString()
11171227
// guard for the killing set
11181228
lock := sync.Mutex{}
1119-
for podPair := range kl.podKillingCh {
1229+
for podPair := range pk.podKillingCh {
11201230
runningPod := podPair.RunningPod
11211231
apiPod := podPair.APIPod
11221232

@@ -1130,13 +1240,14 @@ func (kl *Kubelet) podKiller() {
11301240
if !exists {
11311241
go func(apiPod *v1.Pod, runningPod *kubecontainer.Pod) {
11321242
klog.V(2).Infof("Killing unwanted pod %q", runningPod.Name)
1133-
err := kl.killPod(apiPod, runningPod, nil, nil)
1243+
err := pk.killPod(apiPod, runningPod, nil, nil)
11341244
if err != nil {
11351245
klog.Errorf("Failed killing the pod %q: %v", runningPod.Name, err)
11361246
}
11371247
lock.Lock()
11381248
killing.Delete(string(runningPod.ID))
11391249
lock.Unlock()
1250+
pk.markMirrorPodTerminated(string(runningPod.ID))
11401251
}(apiPod, runningPod)
11411252
}
11421253
}
@@ -1721,13 +1832,12 @@ func (kl *Kubelet) GetPortForward(podName, podNamespace string, podUID types.UID
17211832

17221833
// cleanupOrphanedPodCgroups removes cgroups that should no longer exist.
17231834
// it reconciles the cached state of cgroupPods with the specified list of runningPods
1724-
func (kl *Kubelet) cleanupOrphanedPodCgroups(cgroupPods map[types.UID]cm.CgroupName, activePods []*v1.Pod) {
1835+
func (kl *Kubelet) cleanupOrphanedPodCgroups(pcm cm.PodContainerManager, cgroupPods map[types.UID]cm.CgroupName, activePods []*v1.Pod) {
17251836
// Add all running pods to the set that we want to preserve
17261837
podSet := sets.NewString()
17271838
for _, pod := range activePods {
17281839
podSet.Insert(string(pod.UID))
17291840
}
1730-
pcm := kl.containerManager.NewPodContainerManager()
17311841

17321842
// Iterate over all the found pods to verify if they should be running
17331843
for uid, val := range cgroupPods {
@@ -1736,6 +1846,11 @@ func (kl *Kubelet) cleanupOrphanedPodCgroups(cgroupPods map[types.UID]cm.CgroupN
17361846
continue
17371847
}
17381848

1849+
// if the pod is within termination grace period, we shouldn't cleanup the underlying cgroup
1850+
if kl.podKiller.IsMirrorPodPendingTerminationByUID(uid) {
1851+
klog.V(3).Infof("pod %q is pending termination", uid)
1852+
continue
1853+
}
17391854
// If volumes have not been unmounted/detached, do not delete the cgroup
17401855
// so any memory backed volumes don't have their charges propagated to the
17411856
// parent croup. If the volumes still exist, reduce the cpu shares for any

pkg/kubelet/kubelet_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -279,7 +279,7 @@ func newTestKubeletWithImageList(
279279
fakeClock := clock.NewFakeClock(time.Now())
280280
kubelet.backOff = flowcontrol.NewBackOff(time.Second, time.Minute)
281281
kubelet.backOff.Clock = fakeClock
282-
kubelet.podKillingCh = make(chan *kubecontainer.PodPair, 20)
282+
kubelet.podKiller = NewPodKiller(kubelet)
283283
kubelet.resyncInterval = 10 * time.Second
284284
kubelet.workQueue = queue.NewBasicWorkQueue(fakeClock)
285285
// Relist period does not affect the tests.

pkg/kubelet/pod/pod_manager.go

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -73,10 +73,8 @@ type Manager interface {
7373
// this means deleting the mappings related to mirror pods. For non-
7474
// mirror pods, this means deleting from indexes for all non-mirror pods.
7575
DeletePod(pod *v1.Pod)
76-
// DeleteOrphanedMirrorPods deletes all mirror pods which do not have
77-
// associated static pods. This method sends deletion requests to the API
78-
// server, but does NOT modify the internal pod storage in basicManager.
79-
DeleteOrphanedMirrorPods()
76+
// GetOrphanedMirrorPodNames returns names of orphaned mirror pods
77+
GetOrphanedMirrorPodNames() []string
8078
// TranslatePodUID returns the actual UID of a pod. If the UID belongs to
8179
// a mirror pod, returns the UID of its static pod. Otherwise, returns the
8280
// original UID.
@@ -307,7 +305,7 @@ func (pm *basicManager) GetUIDTranslations() (podToMirror map[kubetypes.Resolved
307305
return podToMirror, mirrorToPod
308306
}
309307

310-
func (pm *basicManager) getOrphanedMirrorPodNames() []string {
308+
func (pm *basicManager) GetOrphanedMirrorPodNames() []string {
311309
pm.lock.RLock()
312310
defer pm.lock.RUnlock()
313311
var podFullNames []string
@@ -319,13 +317,6 @@ func (pm *basicManager) getOrphanedMirrorPodNames() []string {
319317
return podFullNames
320318
}
321319

322-
func (pm *basicManager) DeleteOrphanedMirrorPods() {
323-
podFullNames := pm.getOrphanedMirrorPodNames()
324-
for _, podFullName := range podFullNames {
325-
pm.MirrorClient.DeleteMirrorPod(podFullName, nil)
326-
}
327-
}
328-
329320
func (pm *basicManager) IsMirrorPodOf(mirrorPod, pod *v1.Pod) bool {
330321
// Check name and namespace first.
331322
if pod.Name != mirrorPod.Name || pod.Namespace != mirrorPod.Namespace {

pkg/kubelet/pod/pod_manager_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,7 @@ func TestDeletePods(t *testing.T) {
158158
t.Fatalf("Run DeletePod() error, expected %d pods, got %d pods; ", len(expectedPods)-1, len(actualPods))
159159
}
160160

161-
orphanedMirrorPodNames := podManager.getOrphanedMirrorPodNames()
161+
orphanedMirrorPodNames := podManager.GetOrphanedMirrorPodNames()
162162
expectedOrphanedMirrorPodNameNum := 1
163163
if len(orphanedMirrorPodNames) != expectedOrphanedMirrorPodNameNum {
164164
t.Fatalf("Run getOrphanedMirrorPodNames() error, expected %d orphaned mirror pods, got %d orphaned mirror pods; ", expectedOrphanedMirrorPodNameNum, len(orphanedMirrorPodNames))

pkg/kubelet/pod/testing/mock_manager.go

Lines changed: 15 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

test/e2e_node/BUILD

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,7 @@ go_test(
127127
"hugepages_test.go",
128128
"image_id_test.go",
129129
"log_path_test.go",
130+
"mirror_pod_grace_period_test.go",
130131
"mirror_pod_test.go",
131132
"node_container_manager_test.go",
132133
"node_perf_test.go",

0 commit comments

Comments
 (0)