Skip to content

Commit a77e247

Browse files
authored
Don't send NNC to Pool Monitor before Reconciler starts (#1236)
* remove early nnc send to pool monitor Signed-off-by: Evan Baker <[email protected]> * order service starts better and add logging in initialization Signed-off-by: Evan Baker <[email protected]> * wait for reconciler to start Signed-off-by: Evan Baker <[email protected]> * update comments and docs Signed-off-by: GitHub <[email protected]>
1 parent 7b15cbd commit a77e247

File tree

6 files changed

+92
-42
lines changed

6 files changed

+92
-42
lines changed

cns/ipampool/monitor.go

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ type Monitor struct {
4444
metastate metaState
4545
nnccli nodeNetworkConfigSpecUpdater
4646
httpService cns.HTTPService
47-
initialized chan interface{}
47+
started chan interface{}
4848
nncSource chan v1alpha.NodeNetworkConfig
4949
once sync.Once
5050
}
@@ -60,11 +60,14 @@ func NewMonitor(httpService cns.HTTPService, nnccli nodeNetworkConfigSpecUpdater
6060
opts: opts,
6161
httpService: httpService,
6262
nnccli: nnccli,
63-
initialized: make(chan interface{}),
63+
started: make(chan interface{}),
6464
nncSource: make(chan v1alpha.NodeNetworkConfig),
6565
}
6666
}
6767

68+
// Start begins the Monitor's pool reconcile loop.
69+
// On first run, it will block until a NodeNetworkConfig is received (through a call to Update()).
70+
// Subsequently, it will run run once per RefreshDelay and attempt to re-reconcile the pool.
6871
func (pm *Monitor) Start(ctx context.Context) error {
6972
logger.Printf("[ipam-pool-monitor] Starting CNS IPAM Pool Monitor")
7073

@@ -78,7 +81,7 @@ func (pm *Monitor) Start(ctx context.Context) error {
7881
return errors.Wrap(ctx.Err(), "pool monitor context closed")
7982
case <-ticker.C: // attempt to reconcile every tick.
8083
select {
81-
case <-pm.initialized: // this blocks until we have initialized
84+
case <-pm.started: // this blocks until we have initialized
8285
// if we have initialized and enter this case, we proceed out of the select and continue to reconcile.
8386
default:
8487
// if we have NOT initialized and enter this case, we continue out of this iteration and let the for loop begin again.
@@ -90,7 +93,7 @@ func (pm *Monitor) Start(ctx context.Context) error {
9093
pm.metastate.batch = scaler.BatchSize
9194
pm.metastate.max = scaler.MaxIPCount
9295
pm.metastate.minFreeCount, pm.metastate.maxFreeCount = CalculateMinFreeIPs(scaler), CalculateMaxFreeIPs(scaler)
93-
pm.once.Do(func() { close(pm.initialized) }) // close the init channel the first time we receive a NodeNetworkConfig.
96+
pm.once.Do(func() { close(pm.started) }) // close the init channel the first time we receive a NodeNetworkConfig.
9497
}
9598
// if control has flowed through the select(s) to this point, we can now reconcile.
9699
err := pm.reconcile(ctx)
@@ -334,6 +337,10 @@ func (pm *Monitor) GetStateSnapshot() cns.IpamPoolMonitorStateSnapshot {
334337

335338
// Update ingests a NodeNetworkConfig, clamping some values to ensure they are legal and then
336339
// pushing it to the Monitor's source channel.
340+
// If the Monitor has been Started but is blocking until it receives an NNC, this will start
341+
// the pool reconcile loop.
342+
// If the Monitor has not been Started, this will block until Start() is called, which will
343+
// immediately read this passed NNC and start the pool reconcile loop.
337344
func (pm *Monitor) Update(nnc *v1alpha.NodeNetworkConfig) {
338345
pm.clampScaler(&nnc.Status.Scaler)
339346

cns/restserver/internalapi.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -227,7 +227,6 @@ func (service *HTTPRestService) ReconcileNCState(
227227
if returnCode != types.Success {
228228
return returnCode
229229
}
230-
service.IPAMPoolMonitor.Update(nnc)
231230

232231
// now parse the secondaryIP list, if it exists in PodInfo list, then assign that ip.
233232
for _, secIpConfig := range ncRequest.SecondaryIPConfigs {
@@ -256,7 +255,7 @@ func (service *HTTPRestService) ReconcileNCState(
256255
}
257256
}
258257

259-
err := service.MarkExistingIPsAsPending(nnc.Spec.IPsNotInUse)
258+
err := service.MarkExistingIPsAsPendingRelease(nnc.Spec.IPsNotInUse)
260259
if err != nil {
261260
logger.Errorf("[Azure CNS] Error. Failed to mark IPs as pending %v", nnc.Spec.IPsNotInUse)
262261
return types.UnexpectedError

cns/restserver/ipam.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -335,8 +335,8 @@ func (service *HTTPRestService) releaseIPConfig(podInfo cns.PodInfo) error {
335335
return nil
336336
}
337337

338-
// called when CNS is starting up and there are existing ipconfigs in the CRD that are marked as pending
339-
func (service *HTTPRestService) MarkExistingIPsAsPending(pendingIPIDs []string) error {
338+
// MarkExistingIPsAsPendingRelease is called when CNS is starting up and there are existing ipconfigs in the CRD that are marked as pending.
339+
func (service *HTTPRestService) MarkExistingIPsAsPendingRelease(pendingIPIDs []string) error {
340340
service.Lock()
341341
defer service.Unlock()
342342

cns/restserver/ipam_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -693,7 +693,7 @@ func TestIPAMMarkExistingIPConfigAsPending(t *testing.T) {
693693

694694
// mark available ip as as pending
695695
pendingIPIDs := []string{testPod2GUID}
696-
err = svc.MarkExistingIPsAsPending(pendingIPIDs)
696+
err = svc.MarkExistingIPsAsPendingRelease(pendingIPIDs)
697697
if err != nil {
698698
t.Fatalf("Expected to successfully mark available ip as pending")
699699
}
@@ -705,7 +705,7 @@ func TestIPAMMarkExistingIPConfigAsPending(t *testing.T) {
705705

706706
// attempt to mark assigned ipconfig as pending, expect fail
707707
pendingIPIDs = []string{testPod1GUID}
708-
err = svc.MarkExistingIPsAsPending(pendingIPIDs)
708+
err = svc.MarkExistingIPsAsPendingRelease(pendingIPIDs)
709709
if err == nil {
710710
t.Fatalf("Expected to fail when marking assigned ip as pending")
711711
}

cns/service/main.go

Lines changed: 56 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -820,8 +820,8 @@ type ncStateReconciler interface {
820820
}
821821

822822
// TODO(rbtr) where should this live??
823-
// InitCNS initializes cns by passing pods and a createnetworkcontainerrequest
824-
func initCNS(ctx context.Context, cli nodeNetworkConfigGetter, ncReconciler ncStateReconciler, podInfoByIPProvider cns.PodInfoByIPProvider) error {
823+
// reconcileInitialCNSState initializes cns by passing pods and a CreateNetworkContainerRequest
824+
func reconcileInitialCNSState(ctx context.Context, cli nodeNetworkConfigGetter, ncReconciler ncStateReconciler, podInfoByIPProvider cns.PodInfoByIPProvider) error {
825825
// Get nnc using direct client
826826
nnc, err := cli.Get(ctx)
827827
if err != nil {
@@ -864,8 +864,6 @@ func initCNS(ctx context.Context, cli nodeNetworkConfigGetter, ncReconciler ncSt
864864

865865
// InitializeCRDState builds and starts the CRD controllers.
866866
func InitializeCRDState(ctx context.Context, httpRestService cns.HTTPService, cnsconfig *configuration.CNSConfig) error {
867-
logger.Printf("[Azure CNS] Starting request controller")
868-
869867
// convert interface type to implementation type
870868
httpRestServiceImplementation, ok := httpRestService.(*restserver.HTTPRestService)
871869
if !ok {
@@ -880,40 +878,23 @@ func InitializeCRDState(ctx context.Context, httpRestService cns.HTTPService, cn
880878
}
881879
httpRestServiceImplementation.SetNodeOrchestrator(&orchestrator)
882880

881+
// build default clientset.
883882
kubeConfig, err := ctrl.GetConfig()
884883
kubeConfig.UserAgent = fmt.Sprintf("azure-cns-%s", version)
885884
if err != nil {
886885
logger.Errorf("[Azure CNS] Failed to get kubeconfig for request controller: %v", err)
887886
return err
888887
}
889-
nnccli, err := nodenetworkconfig.NewClient(kubeConfig)
888+
clientset, err := kubernetes.NewForConfig(kubeConfig)
890889
if err != nil {
891-
return errors.Wrap(err, "failed to create NNC client")
890+
return errors.Wrap(err, "failed to build clientset")
892891
}
892+
893+
// get nodename for scoping kube requests to node.
893894
nodeName, err := configuration.NodeName()
894895
if err != nil {
895896
return errors.Wrap(err, "failed to get NodeName")
896897
}
897-
// TODO(rbtr): nodename and namespace should be in the cns config
898-
scopedcli := kubecontroller.NewScopedClient(nnccli, types.NamespacedName{Namespace: "kube-system", Name: nodeName})
899-
900-
// initialize the ipam pool monitor
901-
poolOpts := ipampool.Options{
902-
RefreshDelay: poolIPAMRefreshRateInMilliseconds * time.Millisecond,
903-
}
904-
poolMonitor := ipampool.NewMonitor(httpRestServiceImplementation, scopedcli, &poolOpts)
905-
httpRestServiceImplementation.IPAMPoolMonitor = poolMonitor
906-
logger.Printf("Starting IPAM Pool Monitor")
907-
go func() {
908-
if e := poolMonitor.Start(ctx); e != nil {
909-
logger.Errorf("[Azure CNS] Failed to start pool monitor with err: %v", e)
910-
}
911-
}()
912-
913-
clientset, err := kubernetes.NewForConfig(kubeConfig)
914-
if err != nil {
915-
return errors.Wrap(err, "failed to build clientset")
916-
}
917898

918899
var podInfoByIPProvider cns.PodInfoByIPProvider
919900
if cnsconfig.InitializeFromCNI {
@@ -939,19 +920,49 @@ func InitializeCRDState(ctx context.Context, httpRestService cns.HTTPService, cn
939920
})
940921
}
941922

923+
// create scoped kube clients.
924+
nnccli, err := nodenetworkconfig.NewClient(kubeConfig)
925+
if err != nil {
926+
return errors.Wrap(err, "failed to create NNC client")
927+
}
928+
// TODO(rbtr): nodename and namespace should be in the cns config
929+
scopedcli := kubecontroller.NewScopedClient(nnccli, types.NamespacedName{Namespace: "kube-system", Name: nodeName})
930+
931+
// initialize the ipam pool monitor
932+
poolOpts := ipampool.Options{
933+
RefreshDelay: poolIPAMRefreshRateInMilliseconds * time.Millisecond,
934+
}
935+
poolMonitor := ipampool.NewMonitor(httpRestServiceImplementation, scopedcli, &poolOpts)
936+
httpRestServiceImplementation.IPAMPoolMonitor = poolMonitor
937+
938+
// reconcile initial CNS state from CNI or apiserver.
942939
// apiserver nnc might not be registered or api server might be down and crashloop backof puts us outside of 5-10 minutes we have for
943940
// aks addons to come up so retry a bit more aggresively here.
944941
// will retry 10 times maxing out at a minute taking about 8 minutes before it gives up.
942+
attempt := 0
945943
err = retry.Do(func() error {
946-
err = initCNS(ctx, scopedcli, httpRestServiceImplementation, podInfoByIPProvider)
944+
attempt++
945+
logger.Printf("reconciling initial CNS state attempt: %d", attempt)
946+
err = reconcileInitialCNSState(ctx, scopedcli, httpRestServiceImplementation, podInfoByIPProvider)
947947
if err != nil {
948-
logger.Errorf("[Azure CNS] Failed to init cns with err: %v", err)
948+
logger.Errorf("failed to reconcile initial CNS state, attempt: %d err: %v", attempt, err)
949949
}
950950
return errors.Wrap(err, "failed to initialize CNS state")
951951
}, retry.Context(ctx), retry.Delay(initCNSInitalDelay), retry.MaxDelay(time.Minute))
952952
if err != nil {
953953
return err
954954
}
955+
logger.Printf("reconciled initial CNS state after %d attempts", attempt)
956+
957+
// start the pool Monitor before the Reconciler, since it needs to be ready to receive an
958+
// NodeNetworkConfig update by the time the Reconciler tries to send it.
959+
go func() {
960+
logger.Printf("Starting IPAM Pool Monitor")
961+
if e := poolMonitor.Start(ctx); e != nil {
962+
logger.Errorf("[Azure CNS] Failed to start pool monitor with err: %v", e)
963+
}
964+
}()
965+
logger.Printf("initialized and started IPAM pool monitor")
955966

956967
// the nodeScopedCache sets Selector options on the Manager cache which are used
957968
// to perform *server-side* filtering of the cached objects. This is very important
@@ -982,31 +993,42 @@ func InitializeCRDState(ctx context.Context, httpRestService cns.HTTPService, cn
982993
return errors.Wrapf(err, "failed to get node %s", nodeName)
983994
}
984995

985-
reconciler := kubecontroller.NewReconciler(nnccli, httpRestServiceImplementation, httpRestServiceImplementation.IPAMPoolMonitor)
996+
reconciler := kubecontroller.NewReconciler(nnccli, httpRestServiceImplementation, poolMonitor)
986997
// pass Node to the Reconciler for Controller xref
987998
if err := reconciler.SetupWithManager(manager, node); err != nil {
988999
return errors.Wrapf(err, "failed to setup reconciler with manager")
9891000
}
9901001

991-
// Start the RequestController which starts the reconcile loop
1002+
// Start the Manager which starts the reconcile loop.
1003+
// The Reconciler will send an initial NodeNetworkConfig update to the PoolMonitor, starting the
1004+
// Monitor's internal loop.
9921005
go func() {
1006+
logger.Printf("Starting NodeNetworkConfig reconciler.")
9931007
for {
9941008
if err := manager.Start(ctx); err != nil {
9951009
logger.Errorf("[Azure CNS] Failed to start request controller: %v", err)
9961010
// retry to start the request controller
9971011
// todo: add a CNS metric to count # of failures
9981012
} else {
999-
logger.Printf("[Azure CNS] Exiting RequestController")
1013+
logger.Printf("exiting NodeNetworkConfig reconciler")
10001014
return
10011015
}
10021016

10031017
// Retry after 1sec
10041018
time.Sleep(time.Second)
10051019
}
10061020
}()
1021+
logger.Printf("initialized NodeNetworkConfig reconciler")
1022+
// wait for up to 10m for the Reconciler to run once.
1023+
timedCtx, cancel := context.WithTimeout(ctx, 10*time.Minute) //nolint:gomnd // default 10m
1024+
defer cancel()
1025+
if started := reconciler.Started(timedCtx); !started {
1026+
return errors.Errorf("timed out waiting for reconciler start")
1027+
}
1028+
logger.Printf("started NodeNetworkConfig reconciler")
10071029

1008-
logger.Printf("Starting SyncHostNCVersion")
10091030
go func() {
1031+
logger.Printf("starting SyncHostNCVersion loop")
10101032
// Periodically poll vfp programmed NC version from NMAgent
10111033
tickerChannel := time.Tick(time.Duration(cnsconfig.SyncHostNCVersionIntervalMs) * time.Millisecond)
10121034
for {
@@ -1016,10 +1038,12 @@ func InitializeCRDState(ctx context.Context, httpRestService cns.HTTPService, cn
10161038
httpRestServiceImplementation.SyncHostNCVersion(timedCtx, cnsconfig.ChannelMode)
10171039
cancel()
10181040
case <-ctx.Done():
1041+
logger.Printf("exiting SyncHostNCVersion")
10191042
return
10201043
}
10211044
}
10221045
}()
1046+
logger.Printf("initialized and started SyncHostNCVersion loop")
10231047

10241048
return nil
10251049
}

cns/singletenantcontroller/reconciler.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package kubecontroller
22

33
import (
44
"context"
5+
"sync"
56

67
"github.com/Azure/azure-container-networking/cns"
78
"github.com/Azure/azure-container-networking/cns/logger"
@@ -37,13 +38,16 @@ type Reconciler struct {
3738
cnscli cnsClient
3839
ipampoolmonitorcli ipamPoolMonitorClient
3940
nnccli nncGetter
41+
started chan interface{}
42+
once sync.Once
4043
}
4144

4245
func NewReconciler(nnccli nncGetter, cnscli cnsClient, ipampipampoolmonitorcli ipamPoolMonitorClient) *Reconciler {
4346
return &Reconciler{
4447
cnscli: cnscli,
4548
ipampoolmonitorcli: ipampipampoolmonitorcli,
4649
nnccli: nnccli,
50+
started: make(chan interface{}),
4751
}
4852
}
4953

@@ -87,9 +91,25 @@ func (r *Reconciler) Reconcile(ctx context.Context, req reconcile.Request) (reco
8791
// record assigned IPs metric
8892
allocatedIPs.Set(float64(len(nnc.Status.NetworkContainers[0].IPAssignments)))
8993

94+
// we have received and pushed an NNC update, we are "Started"
95+
r.once.Do(func() { close(r.started) })
9096
return reconcile.Result{}, nil
9197
}
9298

99+
// Started blocks until the Reconciler has reconciled at least once,
100+
// then, and any time that it is called after that, it immediately returns true.
101+
// It accepts a cancellable Context and if the context is closed
102+
// before Start it will return false. Passing a closed Context after the
103+
// Reconciler is started is indeterminate and the response is psuedorandom.
104+
func (r *Reconciler) Started(ctx context.Context) bool {
105+
select {
106+
case <-r.started:
107+
return true
108+
case <-ctx.Done():
109+
return false
110+
}
111+
}
112+
93113
// SetupWithManager Sets up the reconciler with a new manager, filtering using NodeNetworkConfigFilter on nodeName.
94114
func (r *Reconciler) SetupWithManager(mgr ctrl.Manager, node *v1.Node) error {
95115
err := ctrl.NewControllerManagedBy(mgr).

0 commit comments

Comments
 (0)