Skip to content

Commit 1b2a04a

Browse files
authored
feat: stub CNS Pod watcher (#2112)
* feat: cns watches pods Signed-off-by: Evan Baker <[email protected]> * indirect pod reconcile for more dynamic behavior Signed-off-by: Evan Baker <[email protected]> --------- Signed-off-by: Evan Baker <[email protected]>
1 parent d17079b commit 1b2a04a

File tree

4 files changed

+109
-0
lines changed

4 files changed

+109
-0
lines changed

cns/configuration/configuration.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ type CNSConfig struct {
4545
CNIConflistFilepath string
4646
MellanoxMonitorIntervalSecs int
4747
AZRSettings AZRSettings
48+
WatchPods bool
4849
}
4950

5051
type TelemetrySettings struct {

cns/kubecontroller/nodenetworkconfig/reconciler.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,9 @@ func (r *Reconciler) SetupWithManager(mgr ctrl.Manager, node *v1.Node) error {
164164
WithEventFilter(predicate.Funcs{
165165
// check that the generation is the same - status changes don't update generation.
166166
UpdateFunc: func(ue event.UpdateEvent) bool {
167+
if ue.ObjectOld == nil || ue.ObjectNew == nil {
168+
return false
169+
}
167170
return ue.ObjectOld.GetGeneration() == ue.ObjectNew.GetGeneration()
168171
},
169172
}).
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
package podwatcher
2+
3+
import (
4+
"context"
5+
6+
"github.com/pkg/errors"
7+
v1 "k8s.io/api/core/v1"
8+
"k8s.io/apimachinery/pkg/fields"
9+
ctrl "sigs.k8s.io/controller-runtime"
10+
"sigs.k8s.io/controller-runtime/pkg/client"
11+
"sigs.k8s.io/controller-runtime/pkg/event"
12+
"sigs.k8s.io/controller-runtime/pkg/predicate"
13+
"sigs.k8s.io/controller-runtime/pkg/reconcile"
14+
)
15+
16+
type podcli interface {
17+
List(ctx context.Context, list client.ObjectList, opts ...client.ListOption) error
18+
}
19+
20+
type podListener interface {
21+
Update([]v1.Pod)
22+
}
23+
24+
type PodWatcher struct {
25+
cli podcli
26+
listOpt client.ListOption
27+
ReconcileFuncs []reconcile.Func
28+
}
29+
30+
func New(nodename string) *PodWatcher { //nolint:revive // private struct to force constructor
31+
return &PodWatcher{
32+
listOpt: &client.ListOptions{FieldSelector: fields.SelectorFromSet(fields.Set{"spec.nodeName": nodename})},
33+
}
34+
}
35+
36+
func (p *PodWatcher) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
37+
for _, f := range p.ReconcileFuncs {
38+
if _, err := f(ctx, req); err != nil {
39+
return reconcile.Result{}, errors.Wrap(err, "failed to reconcile")
40+
}
41+
}
42+
return reconcile.Result{}, nil
43+
}
44+
45+
type PodFilter func([]v1.Pod) []v1.Pod
46+
47+
var PodNetworkFilter PodFilter = func(pods []v1.Pod) []v1.Pod {
48+
var filtered []v1.Pod
49+
for _, pod := range pods {
50+
if !pod.Spec.HostNetwork {
51+
filtered = append(filtered, pod)
52+
}
53+
}
54+
return filtered
55+
}
56+
57+
func (p *PodWatcher) PodNotifierFunc(f PodFilter, listeners ...podListener) reconcile.Func {
58+
return func(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
59+
podList := &v1.PodList{}
60+
if err := p.cli.List(ctx, podList, p.listOpt); err != nil {
61+
return reconcile.Result{}, errors.Wrap(err, "failed to list pods")
62+
}
63+
pods := podList.Items
64+
if f != nil {
65+
pods = f(pods)
66+
}
67+
for _, l := range listeners {
68+
l.Update(pods)
69+
}
70+
return reconcile.Result{}, nil
71+
}
72+
}
73+
74+
// SetupWithManager Sets up the reconciler with a new manager, filtering using NodeNetworkConfigFilter on nodeName.
75+
func (p *PodWatcher) SetupWithManager(mgr ctrl.Manager) error {
76+
p.cli = mgr.GetClient()
77+
err := ctrl.NewControllerManagedBy(mgr).
78+
For(&v1.Pod{}).
79+
WithEventFilter(predicate.Funcs{ // we only want create/delete events
80+
UpdateFunc: func(event.UpdateEvent) bool {
81+
return false
82+
},
83+
GenericFunc: func(event.GenericEvent) bool {
84+
return false
85+
},
86+
}).
87+
Complete(p)
88+
if err != nil {
89+
return errors.Wrap(err, "failed to set up pod watcher with manager")
90+
}
91+
return nil
92+
}

cns/service/main.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import (
3232
"github.com/Azure/azure-container-networking/cns/ipampool"
3333
cssctrl "github.com/Azure/azure-container-networking/cns/kubecontroller/clustersubnetstate"
3434
nncctrl "github.com/Azure/azure-container-networking/cns/kubecontroller/nodenetworkconfig"
35+
"github.com/Azure/azure-container-networking/cns/kubecontroller/podwatcher"
3536
"github.com/Azure/azure-container-networking/cns/logger"
3637
"github.com/Azure/azure-container-networking/cns/multitenantcontroller"
3738
"github.com/Azure/azure-container-networking/cns/multitenantcontroller/multitenantoperator"
@@ -54,6 +55,7 @@ import (
5455
"github.com/avast/retry-go/v3"
5556
"github.com/pkg/errors"
5657
"go.uber.org/zap"
58+
corev1 "k8s.io/api/core/v1"
5759
apierrors "k8s.io/apimachinery/pkg/api/errors"
5860
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
5961
"k8s.io/apimachinery/pkg/fields"
@@ -1189,6 +1191,9 @@ func InitializeCRDState(ctx context.Context, httpRestService cns.HTTPService, cn
11891191
&v1alpha.NodeNetworkConfig{}: {
11901192
Field: fields.SelectorFromSet(fields.Set{"metadata.name": nodeName}),
11911193
},
1194+
&corev1.Pod{}: {
1195+
Field: fields.SelectorFromSet(fields.Set{"spec.nodeName": nodeName}),
1196+
},
11921197
},
11931198
})
11941199

@@ -1259,6 +1264,14 @@ func InitializeCRDState(ctx context.Context, httpRestService cns.HTTPService, cn
12591264
}
12601265
}
12611266

1267+
// TODO: add pod listeners based on Swift V1 vs MT/V2 configuration
1268+
if cnsconfig.WatchPods {
1269+
pw := podwatcher.New(nodeName)
1270+
if err := pw.SetupWithManager(manager); err != nil {
1271+
return errors.Wrapf(err, "failed to setup pod watcher with manager")
1272+
}
1273+
}
1274+
12621275
// adding some routes to the root service mux
12631276
mux := httpRestServiceImplementation.Listener.GetMux()
12641277
mux.Handle("/readyz", http.StripPrefix("/readyz", &healthz.Handler{}))

0 commit comments

Comments
 (0)