Skip to content

Commit 7a513b5

Browse files
authored
Merge pull request kubernetes#88440 from smarterclayton/container_success_fix
Ensure Kubelet always reports terminating pod container status
2 parents 7d6d790 + 8bc5cb0 commit 7a513b5

File tree

9 files changed

+361
-24
lines changed

9 files changed

+361
-24
lines changed

pkg/kubelet/container/helpers.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,10 @@ type RuntimeHelper interface {
6161
// ShouldContainerBeRestarted checks whether a container needs to be restarted.
6262
// TODO(yifan): Think about how to refactor this.
6363
func ShouldContainerBeRestarted(container *v1.Container, pod *v1.Pod, podStatus *PodStatus) bool {
64+
// Once a pod has been marked deleted, it should not be restarted
65+
if pod.DeletionTimestamp != nil {
66+
return false
67+
}
6468
// Get latest container status.
6569
status := podStatus.FindContainerStatusByName(container.Name)
6670
// If the container was never started before, we should start it.

pkg/kubelet/container/helpers_test.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package container
1919
import (
2020
"reflect"
2121
"testing"
22+
"time"
2223

2324
"github.com/google/go-cmp/cmp"
2425
"github.com/stretchr/testify/assert"
@@ -449,6 +450,8 @@ func TestShouldContainerBeRestarted(t *testing.T) {
449450
v1.RestartPolicyOnFailure,
450451
v1.RestartPolicyAlways,
451452
}
453+
454+
// test policies
452455
expected := map[string][]bool{
453456
"no-history": {true, true, true},
454457
"alive": {false, false, false},
@@ -467,6 +470,27 @@ func TestShouldContainerBeRestarted(t *testing.T) {
467470
}
468471
}
469472
}
473+
474+
// test deleted pod
475+
pod.DeletionTimestamp = &metav1.Time{Time: time.Now()}
476+
expected = map[string][]bool{
477+
"no-history": {false, false, false},
478+
"alive": {false, false, false},
479+
"succeed": {false, false, false},
480+
"failed": {false, false, false},
481+
"unknown": {false, false, false},
482+
}
483+
for _, c := range pod.Spec.Containers {
484+
for i, policy := range policies {
485+
pod.Spec.RestartPolicy = policy
486+
e := expected[c.Name][i]
487+
r := ShouldContainerBeRestarted(&c, pod, podStatus)
488+
if r != e {
489+
t.Errorf("Restart for container %q with restart policy %q expected %t, got %t",
490+
c.Name, policy, e, r)
491+
}
492+
}
493+
}
470494
}
471495

472496
func TestHasPrivilegedContainer(t *testing.T) {

pkg/kubelet/kubelet.go

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2001,18 +2001,22 @@ func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handle
20012001
}
20022002

20032003
// dispatchWork starts the asynchronous sync of the pod in a pod worker.
2004-
// If the pod is terminated, dispatchWork
2004+
// If the pod has completed termination, dispatchWork will perform no action.
20052005
func (kl *Kubelet) dispatchWork(pod *v1.Pod, syncType kubetypes.SyncPodType, mirrorPod *v1.Pod, start time.Time) {
2006-
if kl.podIsTerminated(pod) {
2007-
if pod.DeletionTimestamp != nil {
2008-
// If the pod is in a terminated state, there is no pod worker to
2009-
// handle the work item. Check if the DeletionTimestamp has been
2010-
// set, and force a status update to trigger a pod deletion request
2011-
// to the apiserver.
2012-
kl.statusManager.TerminatePod(pod)
2013-
}
2006+
// check whether we are ready to delete the pod from the API server (all status up to date)
2007+
containersTerminal, podWorkerTerminal := kl.podAndContainersAreTerminal(pod)
2008+
if pod.DeletionTimestamp != nil && containersTerminal {
2009+
klog.V(4).Infof("Pod %q has completed execution and should be deleted from the API server: %s", format.Pod(pod), syncType)
2010+
kl.statusManager.TerminatePod(pod)
20142011
return
20152012
}
2013+
2014+
// optimization: avoid invoking the pod worker if no further changes are possible to the pod definition
2015+
if podWorkerTerminal {
2016+
klog.V(4).Infof("Pod %q has completed, ignoring remaining sync work: %s", format.Pod(pod), syncType)
2017+
return
2018+
}
2019+
20162020
// Run the sync in an async worker.
20172021
kl.podWorkers.UpdatePod(&UpdatePodOptions{
20182022
Pod: pod,

pkg/kubelet/kubelet_pods.go

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -865,8 +865,9 @@ func (kl *Kubelet) getPullSecretsForPod(pod *v1.Pod) []v1.Secret {
865865
return pullSecrets
866866
}
867867

868-
// podIsTerminated returns true if pod is in the terminated state ("Failed" or "Succeeded").
869-
func (kl *Kubelet) podIsTerminated(pod *v1.Pod) bool {
868+
// podStatusIsTerminal reports when the specified pod has no running containers or is no longer accepting
869+
// spec changes.
870+
func (kl *Kubelet) podAndContainersAreTerminal(pod *v1.Pod) (containersTerminal, podWorkerTerminal bool) {
870871
// Check the cached pod status which was set after the last sync.
871872
status, ok := kl.statusManager.GetPodStatus(pod.UID)
872873
if !ok {
@@ -875,11 +876,28 @@ func (kl *Kubelet) podIsTerminated(pod *v1.Pod) bool {
875876
// restarted.
876877
status = pod.Status
877878
}
878-
return status.Phase == v1.PodFailed || status.Phase == v1.PodSucceeded || (pod.DeletionTimestamp != nil && notRunning(status.ContainerStatuses))
879+
// A pod transitions into failed or succeeded from either container lifecycle (RestartNever container
880+
// fails) or due to external events like deletion or eviction. A terminal pod *should* have no running
881+
// containers, but to know that the pod has completed its lifecycle you must wait for containers to also
882+
// be terminal.
883+
containersTerminal = notRunning(status.ContainerStatuses)
884+
// The kubelet must accept config changes from the pod spec until it has reached a point where changes would
885+
// have no effect on any running container.
886+
podWorkerTerminal = status.Phase == v1.PodFailed || status.Phase == v1.PodSucceeded || (pod.DeletionTimestamp != nil && containersTerminal)
887+
return
888+
}
889+
890+
// podIsTerminated returns true if the provided pod is in a terminal phase ("Failed", "Succeeded") or
891+
// has been deleted and has no running containers. This corresponds to when a pod must accept changes to
892+
// its pod spec (e.g. terminating containers allow grace period to be shortened).
893+
func (kl *Kubelet) podIsTerminated(pod *v1.Pod) bool {
894+
_, podWorkerTerminal := kl.podAndContainersAreTerminal(pod)
895+
return podWorkerTerminal
879896
}
880897

881-
// IsPodTerminated returns true if the pod with the provided UID is in a terminated state ("Failed" or "Succeeded")
882-
// or if the pod has been deleted or removed
898+
// IsPodTerminated returns true if the pod with the provided UID is in a terminal phase ("Failed",
899+
// "Succeeded") or has been deleted and has no running containers. This corresponds to when a pod must
900+
// accept changes to its pod spec (e.g. terminating containers allow grace period to be shortened)
883901
func (kl *Kubelet) IsPodTerminated(uid types.UID) bool {
884902
pod, podFound := kl.podManager.GetPodByUID(uid)
885903
if !podFound {

pkg/kubelet/status/BUILD

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ go_test(
5454
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
5555
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
5656
"//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
57+
"//staging/src/k8s.io/apimachinery/pkg/util/diff:go_default_library",
5758
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
5859
"//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library",
5960
"//staging/src/k8s.io/client-go/testing:go_default_library",

pkg/kubelet/status/status_manager.go

Lines changed: 34 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -160,13 +160,20 @@ func (m *manager) Start() {
160160
syncTicker := time.Tick(syncPeriod)
161161
// syncPod and syncBatch share the same go routine to avoid sync races.
162162
go wait.Forever(func() {
163-
select {
164-
case syncRequest := <-m.podStatusChannel:
165-
klog.V(5).Infof("Status Manager: syncing pod: %q, with status: (%d, %v) from podStatusChannel",
166-
syncRequest.podUID, syncRequest.status.version, syncRequest.status.status)
167-
m.syncPod(syncRequest.podUID, syncRequest.status)
168-
case <-syncTicker:
169-
m.syncBatch()
163+
for {
164+
select {
165+
case syncRequest := <-m.podStatusChannel:
166+
klog.V(5).Infof("Status Manager: syncing pod: %q, with status: (%d, %v) from podStatusChannel",
167+
syncRequest.podUID, syncRequest.status.version, syncRequest.status.status)
168+
m.syncPod(syncRequest.podUID, syncRequest.status)
169+
case <-syncTicker:
170+
klog.V(5).Infof("Status Manager: syncing batch")
171+
// remove any entries in the status channel since the batch will handle them
172+
for i := len(m.podStatusChannel); i > 0; i-- {
173+
<-m.podStatusChannel
174+
}
175+
m.syncBatch()
176+
}
170177
}
171178
}, 0)
172179
}
@@ -314,21 +321,39 @@ func findContainerStatus(status *v1.PodStatus, containerID string) (containerSta
314321
func (m *manager) TerminatePod(pod *v1.Pod) {
315322
m.podStatusesLock.Lock()
316323
defer m.podStatusesLock.Unlock()
324+
325+
// ensure that all containers have a terminated state - because we do not know whether the container
326+
// was successful, always report an error
317327
oldStatus := &pod.Status
318328
if cachedStatus, ok := m.podStatuses[pod.UID]; ok {
319329
oldStatus = &cachedStatus.status
320330
}
321331
status := *oldStatus.DeepCopy()
322332
for i := range status.ContainerStatuses {
333+
if status.ContainerStatuses[i].State.Terminated != nil || status.ContainerStatuses[i].State.Waiting != nil {
334+
continue
335+
}
323336
status.ContainerStatuses[i].State = v1.ContainerState{
324-
Terminated: &v1.ContainerStateTerminated{},
337+
Terminated: &v1.ContainerStateTerminated{
338+
Reason: "ContainerStatusUnknown",
339+
Message: "The container could not be located when the pod was terminated",
340+
ExitCode: 137,
341+
},
325342
}
326343
}
327344
for i := range status.InitContainerStatuses {
345+
if status.InitContainerStatuses[i].State.Terminated != nil || status.InitContainerStatuses[i].State.Waiting != nil {
346+
continue
347+
}
328348
status.InitContainerStatuses[i].State = v1.ContainerState{
329-
Terminated: &v1.ContainerStateTerminated{},
349+
Terminated: &v1.ContainerStateTerminated{
350+
Reason: "ContainerStatusUnknown",
351+
Message: "The container could not be located when the pod was terminated",
352+
ExitCode: 137,
353+
},
330354
}
331355
}
356+
332357
m.updateStatusInternal(pod, status, true)
333358
}
334359

pkg/kubelet/status/status_manager_test.go

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,12 @@ import (
2727

2828
"github.com/stretchr/testify/assert"
2929

30-
"k8s.io/api/core/v1"
30+
v1 "k8s.io/api/core/v1"
3131
"k8s.io/apimachinery/pkg/api/errors"
3232
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3333
"k8s.io/apimachinery/pkg/runtime"
3434
"k8s.io/apimachinery/pkg/runtime/schema"
35+
"k8s.io/apimachinery/pkg/util/diff"
3536
clientset "k8s.io/client-go/kubernetes"
3637
"k8s.io/client-go/kubernetes/fake"
3738
core "k8s.io/client-go/testing"
@@ -569,6 +570,16 @@ func TestTerminatePod(t *testing.T) {
569570
t.Logf("update the pod's status to Failed. TerminatePod should preserve this status update.")
570571
firstStatus := getRandomPodStatus()
571572
firstStatus.Phase = v1.PodFailed
573+
firstStatus.InitContainerStatuses = []v1.ContainerStatus{
574+
{Name: "init-test-1"},
575+
{Name: "init-test-2", State: v1.ContainerState{Terminated: &v1.ContainerStateTerminated{Reason: "InitTest", ExitCode: 0}}},
576+
{Name: "init-test-3", State: v1.ContainerState{Terminated: &v1.ContainerStateTerminated{Reason: "InitTest", ExitCode: 3}}},
577+
}
578+
firstStatus.ContainerStatuses = []v1.ContainerStatus{
579+
{Name: "test-1"},
580+
{Name: "test-2", State: v1.ContainerState{Terminated: &v1.ContainerStateTerminated{Reason: "Test", ExitCode: 2}}},
581+
{Name: "test-3", State: v1.ContainerState{Terminated: &v1.ContainerStateTerminated{Reason: "Test", ExitCode: 0}}},
582+
}
572583
syncer.SetPodStatus(testPod, firstStatus)
573584

574585
t.Logf("set the testPod to a pod with Phase running, to simulate a stale pod")
@@ -586,6 +597,26 @@ func TestTerminatePod(t *testing.T) {
586597
assert.False(t, newStatus.InitContainerStatuses[i].State.Terminated == nil, "expected init containers to be terminated")
587598
}
588599

600+
expectUnknownState := v1.ContainerState{Terminated: &v1.ContainerStateTerminated{Reason: "ContainerStatusUnknown", Message: "The container could not be located when the pod was terminated", ExitCode: 137}}
601+
if !reflect.DeepEqual(newStatus.InitContainerStatuses[0].State, expectUnknownState) {
602+
t.Errorf("terminated container state not defaulted: %s", diff.ObjectReflectDiff(newStatus.InitContainerStatuses[0].State, expectUnknownState))
603+
}
604+
if !reflect.DeepEqual(newStatus.InitContainerStatuses[1].State, firstStatus.InitContainerStatuses[1].State) {
605+
t.Errorf("existing terminated container state not preserved: %#v", newStatus.ContainerStatuses)
606+
}
607+
if !reflect.DeepEqual(newStatus.InitContainerStatuses[2].State, firstStatus.InitContainerStatuses[2].State) {
608+
t.Errorf("existing terminated container state not preserved: %#v", newStatus.ContainerStatuses)
609+
}
610+
if !reflect.DeepEqual(newStatus.ContainerStatuses[0].State, expectUnknownState) {
611+
t.Errorf("terminated container state not defaulted: %s", diff.ObjectReflectDiff(newStatus.ContainerStatuses[0].State, expectUnknownState))
612+
}
613+
if !reflect.DeepEqual(newStatus.ContainerStatuses[1].State, firstStatus.ContainerStatuses[1].State) {
614+
t.Errorf("existing terminated container state not preserved: %#v", newStatus.ContainerStatuses)
615+
}
616+
if !reflect.DeepEqual(newStatus.ContainerStatuses[2].State, firstStatus.ContainerStatuses[2].State) {
617+
t.Errorf("existing terminated container state not preserved: %#v", newStatus.ContainerStatuses)
618+
}
619+
589620
t.Logf("we expect the previous status update to be preserved.")
590621
assert.Equal(t, newStatus.Phase, firstStatus.Phase)
591622
assert.Equal(t, newStatus.Message, firstStatus.Message)

test/e2e/node/BUILD

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ go_library(
3737
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
3838
"//staging/src/k8s.io/apimachinery/pkg/util/uuid:go_default_library",
3939
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
40+
"//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library",
4041
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
4142
"//test/e2e/framework:go_default_library",
4243
"//test/e2e/framework/job:go_default_library",

0 commit comments

Comments
 (0)