Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions cns/ipampool/v2/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,23 @@ func (m *adapter) GetStateSnapshot() cns.IpamPoolMonitorStateSnapshot {

func PodIPDemandListener(ch chan<- int) func([]v1.Pod) {
return func(pods []v1.Pod) {
// Filter out Pods in terminal phases (Succeeded/Failed) since they no longer
// have network sandboxes and don't contribute to IP demand
activePods := 0
for i := range pods {
if pods[i].Status.Phase != v1.PodSucceeded && pods[i].Status.Phase != v1.PodFailed {
activePods++
}
}
ch <- activePods
}
}

// PodIPDemandListenerNoFilter counts all pods without client-side filtering.
// This is used when server-side filtering has already excluded terminal pods.
func PodIPDemandListenerNoFilter(ch chan<- int) func([]v1.Pod) {
return func(pods []v1.Pod) {
// All pods are already filtered server-side to exclude terminal phases
ch <- len(pods)
}
}
160 changes: 160 additions & 0 deletions cns/ipampool/v2/adapter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
package v2

import (
"testing"

v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

func TestPodIPDemandListener(t *testing.T) {
tests := []struct {
name string
pods []v1.Pod
expected int
}{
{
name: "empty pod list",
pods: []v1.Pod{},
expected: 0,
},
{
name: "single running pod",
pods: []v1.Pod{
{
ObjectMeta: metav1.ObjectMeta{Name: "pod1"},
Status: v1.PodStatus{Phase: v1.PodRunning},
},
},
expected: 1,
},
{
name: "multiple running pods",
pods: []v1.Pod{
{
ObjectMeta: metav1.ObjectMeta{Name: "pod1"},
Status: v1.PodStatus{Phase: v1.PodRunning},
},
{
ObjectMeta: metav1.ObjectMeta{Name: "pod2"},
Status: v1.PodStatus{Phase: v1.PodPending},
},
},
expected: 2,
},
{
name: "mix of running and terminal pods - should exclude terminal",
pods: []v1.Pod{
{
ObjectMeta: metav1.ObjectMeta{Name: "pod1"},
Status: v1.PodStatus{Phase: v1.PodRunning},
},
{
ObjectMeta: metav1.ObjectMeta{Name: "pod2"},
Status: v1.PodStatus{Phase: v1.PodSucceeded},
},
{
ObjectMeta: metav1.ObjectMeta{Name: "pod3"},
Status: v1.PodStatus{Phase: v1.PodFailed},
},
{
ObjectMeta: metav1.ObjectMeta{Name: "pod4"},
Status: v1.PodStatus{Phase: v1.PodPending},
},
},
expected: 2, // Only pod1 (Running) and pod4 (Pending) should be counted
},
{
name: "only terminal pods - should count zero",
pods: []v1.Pod{
{
ObjectMeta: metav1.ObjectMeta{Name: "pod1"},
Status: v1.PodStatus{Phase: v1.PodSucceeded},
},
{
ObjectMeta: metav1.ObjectMeta{Name: "pod2"},
Status: v1.PodStatus{Phase: v1.PodFailed},
},
},
expected: 0,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ch := make(chan int, 1)
listener := PodIPDemandListener(ch)

listener(tt.pods)

select {
case result := <-ch:
if result != tt.expected {
t.Errorf("expected %d, got %d", tt.expected, result)
}
default:
t.Error("expected value in channel")
}
})
}
}

func TestPodIPDemandListenerNoFilter(t *testing.T) {
tests := []struct {
name string
pods []v1.Pod
expected int
}{
{
name: "empty pod list",
pods: []v1.Pod{},
expected: 0,
},
{
name: "single pod",
pods: []v1.Pod{
{
ObjectMeta: metav1.ObjectMeta{Name: "pod1"},
Status: v1.PodStatus{Phase: v1.PodRunning},
},
},
expected: 1,
},
{
name: "multiple pods - counts all since filtering is done server-side",
pods: []v1.Pod{
{
ObjectMeta: metav1.ObjectMeta{Name: "pod1"},
Status: v1.PodStatus{Phase: v1.PodRunning},
},
{
ObjectMeta: metav1.ObjectMeta{Name: "pod2"},
Status: v1.PodStatus{Phase: v1.PodPending},
},
{
ObjectMeta: metav1.ObjectMeta{Name: "pod3"},
Status: v1.PodStatus{Phase: v1.PodUnknown},
},
},
expected: 3, // All pods counted since server-side filtering already excluded terminal pods
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ch := make(chan int, 1)
listener := PodIPDemandListenerNoFilter(ch)

listener(tt.pods)

select {
case result := <-ch:
if result != tt.expected {
t.Errorf("expected %d, got %d", tt.expected, result)
}
default:
t.Error("expected value in channel")
}
})
}
}
62 changes: 61 additions & 1 deletion cns/kubecontroller/pod/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/pkg/errors"
"go.uber.org/zap"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/fields"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
Expand Down Expand Up @@ -50,7 +51,7 @@ type limiter interface {
Allow() bool
}

// NotifierFunc returns a reconcile.Func that lists Pods to get the latest
// NewNotifierFunc returns a reconcile.Func that lists Pods to get the latest
// state and notifies listeners of the resulting Pods.
// listOpts are passed to the client.List call to filter the Pod list.
// limiter is an optional rate limiter which may be used to limit the
Expand Down Expand Up @@ -80,6 +81,54 @@ func (p *watcher) NewNotifierFunc(listOpts *client.ListOptions, limiter limiter,
}
}

// NewPodIPDemandNotifierFunc returns a reconcile.Func optimized for IP demand calculation
// that uses server-side filtering to exclude terminal Pods (Succeeded/Failed) from the results.
// This avoids client-side iteration over all pods by using multiple targeted queries.
func (p *watcher) NewPodIPDemandNotifierFunc(baseListOpts *client.ListOptions, limiter limiter, listeners ...func([]v1.Pod)) reconcile.Func {
p.z.Info("adding pod IP demand notifier for listeners", zap.Int("listeners", len(listeners)))
return func(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
if !limiter.Allow() {
// rate limit exceeded, requeue
p.z.Info("rate limit exceeded")
return ctrl.Result{Requeue: true}, nil
}

// Create field selectors for active pod phases (exclude Succeeded and Failed)
activePhases := []v1.PodPhase{v1.PodRunning, v1.PodPending, v1.PodUnknown}
var allActivePods []v1.Pod

for _, phase := range activePhases {
// Clone base list options and add phase filter
listOpts := &client.ListOptions{}
if baseListOpts.FieldSelector != nil {
// Combine existing field selector with phase filter
phaseSelector := fields.SelectorFromSet(fields.Set{"status.phase": string(phase)})
combinedSelector := fields.AndSelectors(baseListOpts.FieldSelector, phaseSelector)
listOpts.FieldSelector = combinedSelector
} else {
listOpts.FieldSelector = fields.SelectorFromSet(fields.Set{"status.phase": string(phase)})
}

// Copy other options from base
listOpts.LabelSelector = baseListOpts.LabelSelector
listOpts.Namespace = baseListOpts.Namespace
listOpts.Limit = baseListOpts.Limit
listOpts.Continue = baseListOpts.Continue

podList := &v1.PodList{}
if err := p.cli.List(ctx, podList, listOpts); err != nil {
return ctrl.Result{}, errors.Wrapf(err, "failed to list pods with phase %s", phase)
}
allActivePods = append(allActivePods, podList.Items...)
}

for _, l := range listeners {
l(allActivePods)
}
return ctrl.Result{}, nil
}
}

var hostNetworkIndexer = client.IndexerFunc(func(o client.Object) []string {
pod, ok := o.(*v1.Pod)
if !ok {
Expand All @@ -88,12 +137,23 @@ var hostNetworkIndexer = client.IndexerFunc(func(o client.Object) []string {
return []string{strconv.FormatBool(pod.Spec.HostNetwork)}
})

var statusPhaseIndexer = client.IndexerFunc(func(o client.Object) []string {
pod, ok := o.(*v1.Pod)
if !ok {
return nil
}
return []string{string(pod.Status.Phase)}
})

// SetupWithManager Sets up the reconciler with a new manager, filtering using NodeNetworkConfigFilter on nodeName.
func (p *watcher) SetupWithManager(ctx context.Context, mgr ctrl.Manager) error {
p.cli = mgr.GetClient()
if err := mgr.GetFieldIndexer().IndexField(ctx, &v1.Pod{}, "spec.hostNetwork", hostNetworkIndexer); err != nil {
return errors.Wrap(err, "failed to set up hostNetwork indexer")
}
if err := mgr.GetFieldIndexer().IndexField(ctx, &v1.Pod{}, "status.phase", statusPhaseIndexer); err != nil {
return errors.Wrap(err, "failed to set up status.phase indexer")
}
if err := ctrl.NewControllerManagedBy(mgr).
For(&v1.Pod{}).
WithEventFilter(predicate.Funcs{ // we only want create/delete events
Expand Down
10 changes: 8 additions & 2 deletions cns/service/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -1576,10 +1576,16 @@ func InitializeCRDState(ctx context.Context, httpRestService cns.HTTPService, cn
if cnsconfig.WatchPods {
pw := podctrl.New(z)
if cnsconfig.EnableIPAMv2 {
hostNetworkListOpt := &client.ListOptions{FieldSelector: fields.SelectorFromSet(fields.Set{"spec.hostNetwork": "false"})} // filter only podsubnet pods
// Create field selector to filter only podsubnet pods
fieldSet := fields.Set{
"spec.hostNetwork": "false",
}
hostNetworkSelector := fields.SelectorFromSet(fieldSet)
baseListOpts := &client.ListOptions{FieldSelector: hostNetworkSelector}
// don't relist pods more than every 500ms
limit := rate.NewLimiter(rate.Every(500*time.Millisecond), 1) //nolint:gomnd // clearly 500ms
pw.With(pw.NewNotifierFunc(hostNetworkListOpt, limit, ipampoolv2.PodIPDemandListener(ipDemandCh)))
// Use specialized notifier that filters terminal pods server-side
pw.With(pw.NewPodIPDemandNotifierFunc(baseListOpts, limit, ipampoolv2.PodIPDemandListenerNoFilter(ipDemandCh)))
}
if err := pw.SetupWithManager(ctx, manager); err != nil {
return errors.Wrapf(err, "failed to setup pod watcher with manager")
Expand Down
Binary file added service
Binary file not shown.