Skip to content

Commit cab6615

Browse files
authored
Azure CNS fix reconcile bug (#809)
Handle the race scenario where HTTP Listener gets started before initializing CNS state. During Initialization/Reconciliation, CNS gets the CRD and recreates the NC request with SecondaryIPs, then it gets list of pods and set those ips as Allocated. If HTTP listener is started then CNI will start requesting ips to CNS and CNS will start allocating right after they are set to Available, thus double allocations. Thus reordered the calls and ensured HTTP Listener is started after initializing CNS. Also fixed some logging avoid double release during failure in IPAM pool monitor
1 parent b93e4cc commit cab6615

File tree

14 files changed

+296
-114
lines changed

14 files changed

+296
-114
lines changed

cns/NetworkContainerContract.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -219,6 +219,11 @@ type IPConfigRequest struct {
219219
OrchestratorContext json.RawMessage
220220
}
221221

222+
func (i IPConfigRequest) String() string {
223+
return fmt.Sprintf("[IPConfigRequest: DesiredIPAddress %s, OrchestratorContext %s]",
224+
i.DesiredIPAddress, string(i.OrchestratorContext))
225+
}
226+
222227
// IPConfigResponse is used in CNS IPAM mode as a response to CNI ADD
223228
type IPConfigResponse struct {
224229
PodIpInfo PodIpInfo

cns/cnsclient/cnsclient_test.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,9 +137,15 @@ func TestMain(m *testing.M) {
137137
}
138138

139139
if httpRestService != nil {
140+
err = httpRestService.Init(&config)
141+
if err != nil {
142+
logger.Errorf("Failed to initialize HttpService, err:%v.\n", err)
143+
return
144+
}
145+
140146
err = httpRestService.Start(&config)
141147
if err != nil {
142-
logger.Errorf("Failed to start CNS, err:%v.\n", err)
148+
logger.Errorf("Failed to start HttpService, err:%v.\n", err)
143149
return
144150
}
145151
}

cns/common/service.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ type Service struct {
2424

2525
// ServiceAPI defines base interface.
2626
type ServiceAPI interface {
27+
Init(*ServiceConfig) error
2728
Start(*ServiceConfig) error
2829
Stop()
2930
GetOption(string) interface{}

cns/fakes/cnsfake.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -306,4 +306,9 @@ func (fake *HTTPServiceFake) Start(*common.ServiceConfig) error {
306306
return nil
307307
}
308308

309+
310+
func (fake *HTTPServiceFake) Init(*common.ServiceConfig) error {
311+
return nil
312+
}
313+
309314
func (fake *HTTPServiceFake) Stop() {}

cns/fakes/requestcontrollerfake.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,10 +62,18 @@ func (rc *RequestControllerFake) CarveIPConfigsAndAddToStatusAndCNS(numberOfIPCo
6262
return cnsIPConfigs
6363
}
6464

65+
func (rc *RequestControllerFake) InitRequestController() error {
66+
return nil
67+
}
68+
6569
func (rc *RequestControllerFake) StartRequestController(exitChan <-chan struct{}) error {
6670
return nil
6771
}
6872

73+
func (rc *RequestControllerFake) IsStarted() bool {
74+
return true
75+
}
76+
6977
func (rc *RequestControllerFake) UpdateCRDSpec(cntxt context.Context, desiredSpec nnc.NodeNetworkConfigSpec) error {
7078
rc.cachedCRD.Spec = desiredSpec
7179

cns/ipampoolmonitor/ipampoolmonitor.go

Lines changed: 58 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -16,21 +16,23 @@ type CNSIPAMPoolMonitor struct {
1616
pendingRelease bool
1717

1818
cachedNNC nnc.NodeNetworkConfig
19+
updatingIpsNotInUseCount int
1920
scalarUnits nnc.Scaler
2021

21-
cns cns.HTTPService
22+
httpService cns.HTTPService
2223
rc requestcontroller.RequestController
2324
MinimumFreeIps int64
2425
MaximumFreeIps int64
2526

2627
mu sync.RWMutex
2728
}
2829

29-
func NewCNSIPAMPoolMonitor(cns cns.HTTPService, rc requestcontroller.RequestController) *CNSIPAMPoolMonitor {
30+
func NewCNSIPAMPoolMonitor(httpService cns.HTTPService, rc requestcontroller.RequestController) *CNSIPAMPoolMonitor {
31+
logger.Printf("NewCNSIPAMPoolMonitor: Create IPAM Pool Monitor")
3032
return &CNSIPAMPoolMonitor{
31-
pendingRelease: false,
32-
cns: cns,
33-
rc: rc,
33+
pendingRelease: false,
34+
httpService: httpService,
35+
rc: rc,
3436
}
3537
}
3638

@@ -52,7 +54,7 @@ func (pm *CNSIPAMPoolMonitor) Start(ctx context.Context, poolMonitorRefreshMilli
5254
for {
5355
select {
5456
case <-ctx.Done():
55-
return fmt.Errorf("CNS IPAM Pool Monitor received cancellation signal")
57+
return fmt.Errorf("[ipam-pool-monitor] CNS IPAM Pool Monitor received cancellation signal")
5658
case <-ticker.C:
5759
err := pm.Reconcile()
5860
if err != nil {
@@ -63,14 +65,14 @@ func (pm *CNSIPAMPoolMonitor) Start(ctx context.Context, poolMonitorRefreshMilli
6365
}
6466

6567
func (pm *CNSIPAMPoolMonitor) Reconcile() error {
66-
cnsPodIPConfigCount := len(pm.cns.GetPodIPConfigState())
67-
pendingProgramCount := len(pm.cns.GetPendingProgramIPConfigs()) // TODO: add pending program count to real cns
68-
allocatedPodIPCount := len(pm.cns.GetAllocatedIPConfigs())
69-
pendingReleaseIPCount := len(pm.cns.GetPendingReleaseIPConfigs())
70-
availableIPConfigCount := len(pm.cns.GetAvailableIPConfigs()) // TODO: add pending allocation count to real cns
68+
cnsPodIPConfigCount := len(pm.httpService.GetPodIPConfigState())
69+
pendingProgramCount := len(pm.httpService.GetPendingProgramIPConfigs()) // TODO: add pending program count to real cns
70+
allocatedPodIPCount := len(pm.httpService.GetAllocatedIPConfigs())
71+
pendingReleaseIPCount := len(pm.httpService.GetPendingReleaseIPConfigs())
72+
availableIPConfigCount := len(pm.httpService.GetAvailableIPConfigs()) // TODO: add pending allocation count to real cns
7173
freeIPConfigCount := pm.cachedNNC.Spec.RequestedIPCount - int64(allocatedPodIPCount)
7274

73-
msg := fmt.Sprintf("Pool Size: %v, Goal Size: %v, BatchSize: %v, MinFree: %v, MaxFree:%v, Allocated: %v, Available: %v, Pending Release: %v, Free: %v, Pending Program: %v",
75+
msg := fmt.Sprintf("[ipam-pool-monitor] Pool Size: %v, Goal Size: %v, BatchSize: %v, MinFree: %v, MaxFree:%v, Allocated: %v, Available: %v, Pending Release: %v, Free: %v, Pending Program: %v",
7476
cnsPodIPConfigCount, pm.cachedNNC.Spec.RequestedIPCount, pm.scalarUnits.BatchSize, pm.MinimumFreeIps, pm.MaximumFreeIps, allocatedPodIPCount, availableIPConfigCount, pendingReleaseIPCount, freeIPConfigCount, pendingProgramCount)
7577

7678
switch {
@@ -82,7 +84,7 @@ func (pm *CNSIPAMPoolMonitor) Reconcile() error {
8284
// pod count is decreasing
8385
case freeIPConfigCount > pm.MaximumFreeIps:
8486
logger.Printf("[ipam-pool-monitor] Decreasing pool size...%s ", msg)
85-
return pm.decreasePoolSize()
87+
return pm.decreasePoolSize(pendingReleaseIPCount)
8688

8789
// CRD has reconciled CNS state, and target spec is now the same size as the state
8890
// free to remove the IP's from the CRD
@@ -100,8 +102,8 @@ func (pm *CNSIPAMPoolMonitor) Reconcile() error {
100102
}
101103

102104
func (pm *CNSIPAMPoolMonitor) increasePoolSize() error {
103-
pm.mu.Lock()
104105
defer pm.mu.Unlock()
106+
pm.mu.Lock()
105107

106108
var err error
107109
var tempNNCSpec nnc.NodeNetworkConfigSpec
@@ -111,58 +113,79 @@ func (pm *CNSIPAMPoolMonitor) increasePoolSize() error {
111113
}
112114

113115
tempNNCSpec.RequestedIPCount += pm.scalarUnits.BatchSize
114-
logger.Printf("[ipam-pool-monitor] Increasing pool size, Current Pool Size: %v, Updated Requested IP Count: %v, Pods with IP's:%v, ToBeDeleted Count: %v", len(pm.cns.GetPodIPConfigState()), tempNNCSpec.RequestedIPCount, len(pm.cns.GetAllocatedIPConfigs()), len(tempNNCSpec.IPsNotInUse))
116+
logger.Printf("[ipam-pool-monitor] Increasing pool size, Current Pool Size: %v, Updated Requested IP Count: %v, Pods with IP's:%v, ToBeDeleted Count: %v", len(pm.httpService.GetPodIPConfigState()), tempNNCSpec.RequestedIPCount, len(pm.httpService.GetAllocatedIPConfigs()), len(tempNNCSpec.IPsNotInUse))
115117

116118
err = pm.rc.UpdateCRDSpec(context.Background(), tempNNCSpec)
117119
if err != nil {
118120
// caller will retry to update the CRD again
119121
return err
120122
}
121123

124+
logger.Printf("[ipam-pool-monitor] Increasing pool size: UpdateCRDSpec succeeded for spec %+v", tempNNCSpec)
122125
// save the updated state to cachedSpec
123126
pm.cachedNNC.Spec = tempNNCSpec
124127
return nil
125128
}
126129

127-
func (pm *CNSIPAMPoolMonitor) decreasePoolSize() error {
128-
pm.mu.Lock()
130+
func (pm *CNSIPAMPoolMonitor) decreasePoolSize(existingPendingReleaseIPCount int) error {
129131
defer pm.mu.Unlock()
132+
pm.mu.Lock()
130133

131134
// mark n number of IP's as pending
132-
pendingIpAddresses, err := pm.cns.MarkIPAsPendingRelease(int(pm.scalarUnits.BatchSize))
133-
if err != nil {
134-
return err
135-
}
135+
var err error
136+
var newIpsMarkedAsPending bool
137+
var pendingIpAddresses map[string]cns.IPConfigurationStatus
138+
if pm.updatingIpsNotInUseCount == 0 ||
139+
pm.updatingIpsNotInUseCount < existingPendingReleaseIPCount {
140+
logger.Printf("[ipam-pool-monitor] Marking IPs as PendingRelease, ipsToBeReleasedCount %d", int(pm.scalarUnits.BatchSize))
141+
pendingIpAddresses, err = pm.httpService.MarkIPAsPendingRelease(int(pm.scalarUnits.BatchSize))
142+
if err != nil {
143+
return err
144+
}
136145

137-
totalIpsSetForRelease := len(pendingIpAddresses)
138-
logger.Printf("[ipam-pool-monitor] Releasing IPCount in this batch %d", totalIpsSetForRelease)
146+
newIpsMarkedAsPending = true
147+
}
139148

140149
var tempNNCSpec nnc.NodeNetworkConfigSpec
141150
tempNNCSpec, err = pm.createNNCSpecForCRD(false)
142151
if err != nil {
143152
return err
144153
}
145154

146-
tempNNCSpec.RequestedIPCount -= int64(totalIpsSetForRelease)
147-
logger.Printf("[ipam-pool-monitor] Decreasing pool size, Current Pool Size: %v, Requested IP Count: %v, Pods with IP's: %v, ToBeDeleted Count: %v", len(pm.cns.GetPodIPConfigState()), tempNNCSpec.RequestedIPCount, len(pm.cns.GetAllocatedIPConfigs()), len(tempNNCSpec.IPsNotInUse))
155+
if newIpsMarkedAsPending {
156+
// cache the updatingPendingRelease so that we dont re-set new IPs to PendingRelease in case UpdateCRD call fails
157+
pm.updatingIpsNotInUseCount = len(tempNNCSpec.IPsNotInUse)
158+
}
159+
160+
logger.Printf("[ipam-pool-monitor] Releasing IPCount in this batch %d, updatingPendingIpsNotInUse count %d", len(pendingIpAddresses), pm.updatingIpsNotInUseCount)
161+
162+
tempNNCSpec.RequestedIPCount -= int64(len(pendingIpAddresses))
163+
logger.Printf("[ipam-pool-monitor] Decreasing pool size, Current Pool Size: %v, Requested IP Count: %v, Pods with IP's: %v, ToBeDeleted Count: %v", len(pm.httpService.GetPodIPConfigState()), tempNNCSpec.RequestedIPCount, len(pm.httpService.GetAllocatedIPConfigs()), len(tempNNCSpec.IPsNotInUse))
148164

149165
err = pm.rc.UpdateCRDSpec(context.Background(), tempNNCSpec)
150166
if err != nil {
151167
// caller will retry to update the CRD again
152168
return err
153169
}
154170

171+
logger.Printf("[ipam-pool-monitor] Decreasing pool size: UpdateCRDSpec succeeded for spec %+v", tempNNCSpec)
172+
155173
// save the updated state to cachedSpec
156174
pm.cachedNNC.Spec = tempNNCSpec
157175
pm.pendingRelease = true
176+
177+
// clear the updatingPendingIpsNotInUse, as we have Updated the CRD
178+
logger.Printf("[ipam-pool-monitor] cleaning the updatingPendingIpsNotInUse, existing length %d", pm.updatingIpsNotInUseCount)
179+
pm.updatingIpsNotInUseCount = 0
180+
158181
return nil
159182
}
160183

161184
// if cns pending ip release map is empty, request controller has already reconciled the CNS state,
162185
// so we can remove it from our cache and remove the IP's from the CRD
163186
func (pm *CNSIPAMPoolMonitor) cleanPendingRelease() error {
164-
pm.mu.Lock()
165187
defer pm.mu.Unlock()
188+
pm.mu.Lock()
166189

167190
var err error
168191
var tempNNCSpec nnc.NodeNetworkConfigSpec
@@ -177,6 +200,9 @@ func (pm *CNSIPAMPoolMonitor) cleanPendingRelease() error {
177200
return err
178201
}
179202

203+
logger.Printf("[ipam-pool-monitor] cleanPendingRelease: UpdateCRDSpec succeeded for spec %+v", tempNNCSpec)
204+
205+
180206
// save the updated state to cachedSpec
181207
pm.cachedNNC.Spec = tempNNCSpec
182208
pm.pendingRelease = false
@@ -197,7 +223,7 @@ func (pm *CNSIPAMPoolMonitor) createNNCSpecForCRD(resetNotInUseList bool) (nnc.N
197223
spec.IPsNotInUse = make([]string, 0)
198224
} else {
199225
// Get All Pending IPs from CNS and populate it again.
200-
pendingIps := pm.cns.GetPendingReleaseIPConfigs()
226+
pendingIps := pm.httpService.GetPendingReleaseIPConfigs()
201227
for _, pendingIp := range pendingIps {
202228
spec.IPsNotInUse = append(spec.IPsNotInUse, pendingIp.ID)
203229
}
@@ -208,14 +234,18 @@ func (pm *CNSIPAMPoolMonitor) createNNCSpecForCRD(resetNotInUseList bool) (nnc.N
208234

209235
// UpdatePoolLimitsTransacted called by request controller on reconcile to set the batch size limits
210236
func (pm *CNSIPAMPoolMonitor) Update(scalar nnc.Scaler, spec nnc.NodeNetworkConfigSpec) error {
211-
pm.mu.Lock()
212237
defer pm.mu.Unlock()
238+
pm.mu.Lock()
239+
213240
pm.scalarUnits = scalar
214241

215242
pm.MinimumFreeIps = int64(float64(pm.scalarUnits.BatchSize) * (float64(pm.scalarUnits.RequestThresholdPercent) / 100))
216243
pm.MaximumFreeIps = int64(float64(pm.scalarUnits.BatchSize) * (float64(pm.scalarUnits.ReleaseThresholdPercent) / 100))
217244

218245
pm.cachedNNC.Spec = spec
219246

247+
logger.Printf("[ipam-pool-monitor] Update spec %+v, pm.MinimumFreeIps %d, pm.MaximumFreeIps %d",
248+
pm.cachedNNC.Spec, pm.MinimumFreeIps, pm.MaximumFreeIps)
249+
220250
return nil
221251
}

cns/requestcontroller/kubecontroller/crdreconciler.go

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,13 +41,25 @@ func (r *CrdReconciler) Reconcile(request reconcile.Request) (reconcile.Result,
4141
}
4242

4343
logger.Printf("[cns-rc] CRD Spec: %v", nodeNetConfig.Spec)
44-
logger.Printf("[cns-rc] CRD Status: %v", nodeNetConfig.Status)
44+
4545

4646
// If there are no network containers, don't hand it off to CNS
4747
if len(nodeNetConfig.Status.NetworkContainers) == 0 {
48+
logger.Errorf("[cns-rc] Empty NetworkContainers")
4849
return reconcile.Result{}, nil
4950
}
5051

52+
networkContainer := nodeNetConfig.Status.NetworkContainers[0]
53+
logger.Printf("[cns-rc] CRD Status: NcId: [%s], Version: [%d], podSubnet: [%s], Subnet CIDR: [%s], " +
54+
"Gateway Addr: [%s], Primary IP: [%s], SecondaryIpsCount: [%d]",
55+
networkContainer.ID,
56+
networkContainer.Version,
57+
networkContainer.SubnetName,
58+
networkContainer.SubnetAddressSpace,
59+
networkContainer.DefaultGateway,
60+
networkContainer.PrimaryIP,
61+
len(networkContainer.IPAssignments))
62+
5163
// Otherwise, create NC request and hand it off to CNS
5264
ncRequest, err = CRDStatusToNCRequest(nodeNetConfig.Status)
5365
if err != nil {

cns/requestcontroller/kubecontroller/crdrequestcontroller.go

Lines changed: 37 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"errors"
66
"fmt"
77
"os"
8+
"sync"
89

910
"github.com/Azure/azure-container-networking/cns"
1011
"github.com/Azure/azure-container-networking/cns/cnsclient"
@@ -42,6 +43,9 @@ type crdRequestController struct {
4243
CNSClient cnsclient.APIClient
4344
nodeName string //name of node running this program
4445
Reconciler *CrdReconciler
46+
initialized bool
47+
Started bool
48+
lock sync.Mutex
4549
}
4650

4751
// GetKubeConfig precedence
@@ -139,20 +143,38 @@ func NewCrdRequestController(restService *restserver.HTTPRestService, kubeconfig
139143
return &crdRequestController, nil
140144
}
141145

146+
// InitRequestController will initialize/reconcile the CNS state
147+
func (crdRC *crdRequestController) InitRequestController() error {
148+
logger.Printf("InitRequestController")
149+
150+
defer crdRC.lock.Unlock()
151+
crdRC.lock.Lock()
152+
153+
if err := crdRC.initCNS(); err != nil {
154+
logger.Errorf("[cns-rc] Error initializing cns state: %v", err)
155+
return err
156+
}
157+
158+
crdRC.initialized = true
159+
return nil
160+
}
161+
142162
// StartRequestController starts the Reconciler loop which watches for CRD status updates
143163
// Blocks until SIGINT or SIGTERM is received
144164
// Notifies exitChan when kill signal received
145165
func (crdRC *crdRequestController) StartRequestController(exitChan <-chan struct{}) error {
146-
var (
147-
err error
148-
)
166+
logger.Printf("StartRequestController")
149167

150-
logger.Printf("Initializing CNS state")
151-
if err = crdRC.initCNS(); err != nil {
152-
logger.Errorf("[cns-rc] Error initializing cns state: %v", err)
153-
return err
168+
crdRC.lock.Lock()
169+
if crdRC.initialized != true {
170+
crdRC.lock.Unlock()
171+
return fmt.Errorf("Failed to start requestController, state is not initialized [%v]", crdRC)
154172
}
155173

174+
// Setting the started state
175+
crdRC.Started = true
176+
crdRC.lock.Unlock()
177+
156178
logger.Printf("Starting reconcile loop")
157179
if err := crdRC.mgr.Start(exitChan); err != nil {
158180
if crdRC.isNotDefined(err) {
@@ -166,6 +188,13 @@ func (crdRC *crdRequestController) StartRequestController(exitChan <-chan struct
166188
return nil
167189
}
168190

191+
// return if RequestController is started
192+
func (crdRC *crdRequestController) IsStarted() bool {
193+
defer crdRC.lock.Unlock()
194+
crdRC.lock.Lock()
195+
return crdRC.Started
196+
}
197+
169198
// InitCNS initializes cns by passing pods and a createnetworkcontainerrequest
170199
func (crdRC *crdRequestController) initCNS() error {
171200
var (
@@ -242,7 +271,7 @@ func (crdRC *crdRequestController) initCNS() error {
242271
}
243272

244273
// UpdateCRDSpec updates the CRD spec
245-
func (crdRC *crdRequestController) UpdateCRDSpec(cntxt context.Context, crdSpec nnc.NodeNetworkConfigSpec) error {
274+
func (crdRC *crdRequestController) UpdateCRDSpec(cntxt context.Context, crdSpec nnc.NodeNetworkConfigSpec) error {
246275
nodeNetworkConfig, err := crdRC.getNodeNetConfig(cntxt, crdRC.nodeName, k8sNamespace)
247276
if err != nil {
248277
logger.Errorf("[cns-rc] Error getting CRD when updating spec %v", err)

cns/requestcontroller/requestcontrollerintreface.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ import (
88

99
// RequestController interface for cns to interact with the request controller
1010
type RequestController interface {
11+
InitRequestController() error
1112
StartRequestController(exitChan <-chan struct{}) error
1213
UpdateCRDSpec(cntxt context.Context, crdSpec nnc.NodeNetworkConfigSpec) error
14+
IsStarted() bool
1315
}

0 commit comments

Comments
 (0)