Skip to content

Commit 76e7531

Browse files
authored
feat: [NPM] Support pod grace period for v2 (#1095)
* Use official k8s equals functions for label instead of our own implementation * Update v2 pod controller to support graceful shutdown of pod
1 parent cc68c34 commit 76e7531

File tree

5 files changed

+183
-141
lines changed

5 files changed

+183
-141
lines changed

npm/pkg/controlplane/controllers/v1/podController.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
coreinformer "k8s.io/client-go/informers/core/v1"
2222
corelisters "k8s.io/client-go/listers/core/v1"
2323

24+
k8slabels "k8s.io/apimachinery/pkg/labels"
2425
"k8s.io/client-go/tools/cache"
2526
"k8s.io/client-go/util/workqueue"
2627
"k8s.io/klog"
@@ -88,7 +89,7 @@ func (nPod *NpmPod) noUpdate(podObj *corev1.Pod) bool {
8889
nPod.Name == podObj.ObjectMeta.Name &&
8990
nPod.Phase == podObj.Status.Phase &&
9091
nPod.PodIP == podObj.Status.PodIP &&
91-
util.IsSameLabels(nPod.Labels, podObj.ObjectMeta.Labels) &&
92+
k8slabels.Equals(nPod.Labels, podObj.ObjectMeta.Labels) &&
9293
// TODO(jungukcho) to avoid using DeepEqual for ContainerPorts,
9394
// it needs a precise sorting. Will optimize it later if needed.
9495
reflect.DeepEqual(nPod.ContainerPorts, getContainerPortList(podObj))

npm/pkg/controlplane/controllers/v2/podController_test.go

Lines changed: 153 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
dpmocks "github.com/Azure/azure-container-networking/npm/pkg/dataplane/mocks"
1414
gomock "github.com/golang/mock/gomock"
1515
"github.com/stretchr/testify/assert"
16+
"github.com/stretchr/testify/require"
1617
corev1 "k8s.io/api/core/v1"
1718
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1819
"k8s.io/apimachinery/pkg/runtime"
@@ -85,7 +86,7 @@ func (f *podFixture) newPodController(_ chan struct{}) {
8586
// f.kubeInformer.Start(stopCh)
8687
}
8788

88-
func createPod(name, ns, rv, podIP string, labels map[string]string, isHostNewtwork bool, podPhase corev1.PodPhase) *corev1.Pod {
89+
func createPod(name, ns, rv, podIP string, labels map[string]string, isHostNetwork bool, podPhase corev1.PodPhase) *corev1.Pod {
8990
return &corev1.Pod{
9091
ObjectMeta: metav1.ObjectMeta{
9192
Name: name,
@@ -94,7 +95,7 @@ func createPod(name, ns, rv, podIP string, labels map[string]string, isHostNewtw
9495
ResourceVersion: rv,
9596
},
9697
Spec: corev1.PodSpec{
97-
HostNetwork: isHostNewtwork,
98+
HostNetwork: isHostNetwork,
9899
Containers: []corev1.Container{
99100
{
100101
Ports: []corev1.ContainerPort{
@@ -734,6 +735,88 @@ func TestHasValidPodIP(t *testing.T) {
734735
}
735736
}
736737

738+
func TestIsCompletePod(t *testing.T) {
739+
var zeroGracePeriod int64
740+
var defaultGracePeriod int64 = 30
741+
742+
type podState struct {
743+
phase corev1.PodPhase
744+
deletionTimestamp *metav1.Time
745+
deletionGracePeriodSeconds *int64
746+
}
747+
748+
tests := []struct {
749+
name string
750+
podState podState
751+
expectedCompletedPod bool
752+
}{
753+
754+
{
755+
name: "pod is in running status",
756+
podState: podState{
757+
phase: corev1.PodRunning,
758+
deletionTimestamp: nil,
759+
deletionGracePeriodSeconds: nil,
760+
},
761+
expectedCompletedPod: false,
762+
},
763+
{
764+
name: "pod is in completely terminating states after graceful shutdown period",
765+
podState: podState{
766+
phase: corev1.PodRunning,
767+
deletionTimestamp: &metav1.Time{},
768+
deletionGracePeriodSeconds: &zeroGracePeriod,
769+
},
770+
expectedCompletedPod: true,
771+
},
772+
{
773+
name: "pod is in terminating states, but in graceful shutdown period",
774+
podState: podState{
775+
phase: corev1.PodRunning,
776+
deletionTimestamp: &metav1.Time{},
777+
deletionGracePeriodSeconds: &defaultGracePeriod,
778+
},
779+
expectedCompletedPod: false,
780+
},
781+
{
782+
name: "pod is in PodSucceeded status",
783+
podState: podState{
784+
phase: corev1.PodSucceeded,
785+
deletionTimestamp: nil,
786+
deletionGracePeriodSeconds: nil,
787+
},
788+
expectedCompletedPod: true,
789+
},
790+
{
791+
name: "pod is in PodFailed status",
792+
podState: podState{
793+
phase: corev1.PodSucceeded,
794+
deletionTimestamp: nil,
795+
deletionGracePeriodSeconds: nil,
796+
},
797+
expectedCompletedPod: true,
798+
},
799+
}
800+
801+
for _, tt := range tests {
802+
tt := tt
803+
t.Run(tt.name, func(t *testing.T) {
804+
t.Parallel()
805+
corev1Pod := &corev1.Pod{
806+
ObjectMeta: metav1.ObjectMeta{
807+
DeletionTimestamp: tt.podState.deletionTimestamp,
808+
DeletionGracePeriodSeconds: tt.podState.deletionGracePeriodSeconds,
809+
},
810+
Status: corev1.PodStatus{
811+
Phase: tt.podState.phase,
812+
},
813+
}
814+
isPodCompleted := isCompletePod(corev1Pod)
815+
require.Equal(t, tt.expectedCompletedPod, isPodCompleted)
816+
})
817+
}
818+
}
819+
737820
// Extra unit test which is not quite related to PodController,
738821
// but help to understand how workqueue works to make event handler logic lock-free.
739822
// If the same key are queued into workqueue in multiple times,
@@ -768,3 +851,71 @@ func TestWorkQueue(t *testing.T) {
768851
}
769852
}
770853
}
854+
855+
func TestNPMPodNoUpdate(t *testing.T) {
856+
type podInfo struct {
857+
podName string
858+
ns string
859+
rv string
860+
podIP string
861+
labels map[string]string
862+
isHostNetwork bool
863+
podPhase corev1.PodPhase
864+
}
865+
866+
labels := map[string]string{
867+
"app": "test-pod",
868+
}
869+
870+
tests := []struct {
871+
name string
872+
podInfo
873+
updatingNPMPod bool
874+
expectedNoUpdate bool
875+
}{
876+
{
877+
"Required update of NPMPod given Pod",
878+
podInfo{
879+
podName: "test-pod-1",
880+
ns: "test-namespace",
881+
rv: "0",
882+
podIP: "1.2.3.4",
883+
labels: labels,
884+
isHostNetwork: NonHostNetwork,
885+
podPhase: corev1.PodRunning,
886+
},
887+
false,
888+
false,
889+
},
890+
{
891+
"No required update of NPMPod given Pod",
892+
podInfo{
893+
podName: "test-pod-2",
894+
ns: "test-namespace",
895+
rv: "0",
896+
podIP: "1.2.3.4",
897+
labels: labels,
898+
isHostNetwork: NonHostNetwork,
899+
podPhase: corev1.PodRunning,
900+
},
901+
true,
902+
true,
903+
},
904+
}
905+
906+
for _, tt := range tests {
907+
tt := tt
908+
t.Run(tt.name, func(t *testing.T) {
909+
t.Parallel()
910+
corev1Pod := createPod(tt.podName, tt.ns, tt.rv, tt.podIP, tt.labels, tt.isHostNetwork, tt.podPhase)
911+
npmPod := newNpmPod(corev1Pod)
912+
if tt.updatingNPMPod {
913+
npmPod.appendLabels(corev1Pod.Labels, appendToExistingLabels)
914+
npmPod.updateNpmPodAttributes(corev1Pod)
915+
npmPod.appendContainerPorts(corev1Pod)
916+
}
917+
noUpdate := npmPod.noUpdate(corev1Pod)
918+
require.Equal(t, tt.expectedNoUpdate, noUpdate)
919+
})
920+
}
921+
}

npm/pkg/controlplane/controllers/v2/podcontroller.go

Lines changed: 28 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
"github.com/pkg/errors"
1717
corev1 "k8s.io/api/core/v1"
1818
apierrors "k8s.io/apimachinery/pkg/api/errors"
19+
k8slabels "k8s.io/apimachinery/pkg/labels"
1920
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
2021
"k8s.io/apimachinery/pkg/util/wait"
2122
coreinformer "k8s.io/client-go/informers/core/v1"
@@ -83,6 +84,18 @@ func (nPod *NpmPod) updateNpmPodAttributes(podObj *corev1.Pod) {
8384
}
8485
}
8586

87+
// noUpdate evaluates whether NpmPod is required to be update given podObj.
88+
func (nPod *NpmPod) noUpdate(podObj *corev1.Pod) bool {
89+
return nPod.Namespace == podObj.ObjectMeta.Namespace &&
90+
nPod.Name == podObj.ObjectMeta.Name &&
91+
nPod.Phase == podObj.Status.Phase &&
92+
nPod.PodIP == podObj.Status.PodIP &&
93+
k8slabels.Equals(nPod.Labels, podObj.ObjectMeta.Labels) &&
94+
// TODO(jungukcho) to avoid using DeepEqual for ContainerPorts,
95+
// it needs a precise sorting. Will optimize it later if needed.
96+
reflect.DeepEqual(nPod.ContainerPorts, getContainerPortList(podObj))
97+
}
98+
8699
type PodController struct {
87100
podLister corelisters.PodLister
88101
workqueue workqueue.RateLimitingInterface
@@ -169,7 +182,8 @@ func (c *PodController) addPod(obj interface{}) {
169182
}
170183
podObj, _ := obj.(*corev1.Pod)
171184

172-
// If newPodObj status is either corev1.PodSucceeded or corev1.PodFailed or DeletionTimestamp is set, do not need to add it into workqueue.
185+
// To check whether this pod is needed to queue or not.
186+
// If the pod are in completely terminated states, the pod is not enqueued to avoid unnecessary computation.
173187
if isCompletePod(podObj) {
174188
return
175189
}
@@ -333,7 +347,9 @@ func (c *PodController) syncPod(key string) error {
333347
return err
334348
}
335349

336-
// If newPodObj status is either corev1.PodSucceeded or corev1.PodFailed or DeletionTimestamp is set, start clean-up the lastly applied states.
350+
// If this pod is completely in terminated states (which means pod is gracefully shutdown),
351+
// NPM starts clean-up the lastly applied states even in update events.
352+
// This proactive clean-up helps to miss stale pod object in case delete event is missed.
337353
if isCompletePod(pod) {
338354
if err = c.cleanUpDeletedPod(key); err != nil {
339355
return fmt.Errorf("Error: %w when when pod is in completed state", err)
@@ -346,7 +362,7 @@ func (c *PodController) syncPod(key string) error {
346362
// if pod does not have different states against lastly applied states stored in cachedNpmPod,
347363
// podController does not need to reconcile this update.
348364
// in this updatePod event, newPod was updated with states which PodController does not need to reconcile.
349-
if isInvalidPodUpdate(cachedNpmPod, pod) {
365+
if cachedNpmPod.noUpdate(pod) {
350366
return nil
351367
}
352368
}
@@ -619,13 +635,20 @@ func (c *PodController) manageNamedPortIpsets(portList []corev1.ContainerPort, p
619635
return nil
620636
}
621637

638+
// isCompletePod evaluates whether this pod is completely in terminated states,
639+
// which means pod is gracefully shutdown.
622640
func isCompletePod(podObj *corev1.Pod) bool {
623-
if podObj.DeletionTimestamp != nil {
641+
// DeletionTimestamp and DeletionGracePeriodSeconds in pod are not nil,
642+
// which means pod is expected to be deleted and
643+
// DeletionGracePeriodSeconds value is zero, which means the pod is gracefully terminated.
644+
if podObj.DeletionTimestamp != nil && podObj.DeletionGracePeriodSeconds != nil && *podObj.DeletionGracePeriodSeconds == 0 {
624645
return true
625646
}
626647

627-
// K8s categorizes Succeeded and Failed pods as a terminated pod and will not restart them
648+
// K8s categorizes Succeeded and Failed pods as a terminated pod and will not restart them.
628649
// So NPM will ignorer adding these pods
650+
// TODO(jungukcho): what are the values of DeletionTimestamp and podObj.DeletionGracePeriodSeconds
651+
// in either below status?
629652
if podObj.Status.Phase == corev1.PodSucceeded || podObj.Status.Phase == corev1.PodFailed {
630653
return true
631654
}
@@ -647,15 +670,3 @@ func getContainerPortList(podObj *corev1.Pod) []corev1.ContainerPort {
647670
}
648671
return portList
649672
}
650-
651-
// (TODO): better naming?
652-
func isInvalidPodUpdate(npmPod *NpmPod, newPodObj *corev1.Pod) bool {
653-
return npmPod.Namespace == newPodObj.ObjectMeta.Namespace &&
654-
npmPod.Name == newPodObj.ObjectMeta.Name &&
655-
npmPod.Phase == newPodObj.Status.Phase &&
656-
npmPod.PodIP == newPodObj.Status.PodIP &&
657-
newPodObj.ObjectMeta.DeletionTimestamp == nil &&
658-
newPodObj.ObjectMeta.DeletionGracePeriodSeconds == nil &&
659-
reflect.DeepEqual(npmPod.Labels, newPodObj.ObjectMeta.Labels) &&
660-
reflect.DeepEqual(npmPod.ContainerPorts, getContainerPortList(newPodObj))
661-
}

npm/util/util.go

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -338,18 +338,3 @@ func CompareSlices(list1, list2 []string) bool {
338338
func SliceToString(list []string) string {
339339
return strings.Join(list, SetPolicyDelimiter)
340340
}
341-
342-
// IsSameLabels return if all pairs of key and value in two maps are same.
343-
// Otherwise, it returns false.
344-
func IsSameLabels(labelA, labelB map[string]string) bool {
345-
if len(labelA) != len(labelB) {
346-
return false
347-
}
348-
349-
for labelKey, labelVal := range labelA {
350-
if val, exist := labelB[labelKey]; !exist || labelVal != val {
351-
return false
352-
}
353-
}
354-
return true
355-
}

0 commit comments

Comments
 (0)