Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion cns/ipampool/v2/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,16 @@ func (m *adapter) GetStateSnapshot() cns.IpamPoolMonitorStateSnapshot {

func PodIPDemandListener(ch chan<- int) func([]v1.Pod) {
return func(pods []v1.Pod) {
ch <- len(pods)
// Filter out Pods in terminal phases (Succeeded/Failed) since they no longer
// have network sandboxes and don't contribute to IP demand
activePods := 0
for i := range pods {
if pods[i].Status.Phase != v1.PodSucceeded && pods[i].Status.Phase != v1.PodFailed {
activePods++
}
}
ch <- activePods
}
}


102 changes: 102 additions & 0 deletions cns/ipampool/v2/adapter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package v2

import (
"testing"

v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

func TestPodIPDemandListener(t *testing.T) {
tests := []struct {
name string
pods []v1.Pod
expected int
}{
{
name: "empty pod list",
pods: []v1.Pod{},
expected: 0,
},
{
name: "single running pod",
pods: []v1.Pod{
{
ObjectMeta: metav1.ObjectMeta{Name: "pod1"},
Status: v1.PodStatus{Phase: v1.PodRunning},
},
},
expected: 1,
},
{
name: "multiple running pods",
pods: []v1.Pod{
{
ObjectMeta: metav1.ObjectMeta{Name: "pod1"},
Status: v1.PodStatus{Phase: v1.PodRunning},
},
{
ObjectMeta: metav1.ObjectMeta{Name: "pod2"},
Status: v1.PodStatus{Phase: v1.PodPending},
},
},
expected: 2,
},
{
name: "mix of running and terminal pods - should exclude terminal",
pods: []v1.Pod{
{
ObjectMeta: metav1.ObjectMeta{Name: "pod1"},
Status: v1.PodStatus{Phase: v1.PodRunning},
},
{
ObjectMeta: metav1.ObjectMeta{Name: "pod2"},
Status: v1.PodStatus{Phase: v1.PodSucceeded},
},
{
ObjectMeta: metav1.ObjectMeta{Name: "pod3"},
Status: v1.PodStatus{Phase: v1.PodFailed},
},
{
ObjectMeta: metav1.ObjectMeta{Name: "pod4"},
Status: v1.PodStatus{Phase: v1.PodPending},
},
},
expected: 2, // Only pod1 (Running) and pod4 (Pending) should be counted
},
{
name: "only terminal pods - should count zero",
pods: []v1.Pod{
{
ObjectMeta: metav1.ObjectMeta{Name: "pod1"},
Status: v1.PodStatus{Phase: v1.PodSucceeded},
},
{
ObjectMeta: metav1.ObjectMeta{Name: "pod2"},
Status: v1.PodStatus{Phase: v1.PodFailed},
},
},
expected: 0,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ch := make(chan int, 1)
listener := PodIPDemandListener(ch)

listener(tt.pods)

select {
case result := <-ch:
if result != tt.expected {
t.Errorf("expected %d, got %d", tt.expected, result)
}
default:
t.Error("expected value in channel")
}
})
}
}


15 changes: 14 additions & 1 deletion cns/kubecontroller/pod/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ type limiter interface {
Allow() bool
}

// NotifierFunc returns a reconcile.Func that lists Pods to get the latest
// NewNotifierFunc returns a reconcile.Func that lists Pods to get the latest
// state and notifies listeners of the resulting Pods.
// listOpts are passed to the client.List call to filter the Pod list.
// limiter is an optional rate limiter which may be used to limit the
Expand Down Expand Up @@ -80,6 +80,8 @@ func (p *watcher) NewNotifierFunc(listOpts *client.ListOptions, limiter limiter,
}
}



var hostNetworkIndexer = client.IndexerFunc(func(o client.Object) []string {
pod, ok := o.(*v1.Pod)
if !ok {
Expand All @@ -88,12 +90,23 @@ var hostNetworkIndexer = client.IndexerFunc(func(o client.Object) []string {
return []string{strconv.FormatBool(pod.Spec.HostNetwork)}
})

var statusPhaseIndexer = client.IndexerFunc(func(o client.Object) []string {
pod, ok := o.(*v1.Pod)
if !ok {
return nil
}
return []string{string(pod.Status.Phase)}
})

// SetupWithManager Sets up the reconciler with a new manager, filtering using NodeNetworkConfigFilter on nodeName.
func (p *watcher) SetupWithManager(ctx context.Context, mgr ctrl.Manager) error {
p.cli = mgr.GetClient()
if err := mgr.GetFieldIndexer().IndexField(ctx, &v1.Pod{}, "spec.hostNetwork", hostNetworkIndexer); err != nil {
return errors.Wrap(err, "failed to set up hostNetwork indexer")
}
if err := mgr.GetFieldIndexer().IndexField(ctx, &v1.Pod{}, "status.phase", statusPhaseIndexer); err != nil {
return errors.Wrap(err, "failed to set up status.phase indexer")
}
if err := ctrl.NewControllerManagedBy(mgr).
For(&v1.Pod{}).
WithEventFilter(predicate.Funcs{ // we only want create/delete events
Expand Down
Loading