Skip to content

Commit c5f7dcd

Browse files
authored
Cns fixipampoolmonitor errorhandling (#806)
* temp changes * Fixed error handling in IPAMPoolMonitor * incorporate feedback * Fixed the String() implementation for struct * Fixed error message * Fixed logging * Fixed regressions
1 parent 08f0006 commit c5f7dcd

File tree

5 files changed

+153
-80
lines changed

5 files changed

+153
-80
lines changed

cns/api.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ package cns
66
import (
77
"context"
88
"encoding/json"
9+
"fmt"
910

1011
"github.com/Azure/azure-container-networking/cns/common"
1112
nnc "github.com/Azure/azure-container-networking/nodenetworkconfig/api/v1alpha"
@@ -56,6 +57,11 @@ type IPConfigurationStatus struct {
5657
OrchestratorContext json.RawMessage
5758
}
5859

60+
func (i IPConfigurationStatus) String() string {
61+
return fmt.Sprintf("IPConfigurationStatus: Id: [%s], NcId: [%s], IpAddress: [%s], State: [%s], OrchestratorContext: [%s]",
62+
i.ID, i.NCID, i.IPAddress, i.State, string(i.OrchestratorContext))
63+
}
64+
5965
// SetEnvironmentRequest describes the Request to set the environment in CNS.
6066
type SetEnvironmentRequest struct {
6167
Location string

cns/cnsclient/cli.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@ var (
3838
)
3939

4040
func HandleCNSClientCommands(cmd, arg string) error {
41-
4241
cnsIPAddress := os.Getenv(envCNSIPAddress)
4342
cnsPort := os.Getenv(envCNSPort)
4443

cns/ipampoolmonitor/ipampoolmonitor.go

Lines changed: 42 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -104,41 +104,58 @@ func (pm *CNSIPAMPoolMonitor) increasePoolSize() error {
104104
defer pm.mu.Unlock()
105105

106106
var err error
107-
pm.cachedNNC.Spec.RequestedIPCount += pm.scalarUnits.BatchSize
107+
var tempNNCSpec nnc.NodeNetworkConfigSpec
108+
tempNNCSpec, err = pm.createNNCSpecForCRD(false)
109+
if err != nil {
110+
return err
111+
}
108112

109-
// pass nil map to CNStoCRDSpec because we don't want to modify the to be deleted ipconfigs
110-
pm.cachedNNC.Spec, err = pm.createNNCSpecForCRD(false)
113+
tempNNCSpec.RequestedIPCount += pm.scalarUnits.BatchSize
114+
logger.Printf("[ipam-pool-monitor] Increasing pool size, Current Pool Size: %v, Updated Requested IP Count: %v, Pods with IP's:%v, ToBeDeleted Count: %v", len(pm.cns.GetPodIPConfigState()), tempNNCSpec.RequestedIPCount, len(pm.cns.GetAllocatedIPConfigs()), len(tempNNCSpec.IPsNotInUse))
115+
116+
err = pm.rc.UpdateCRDSpec(context.Background(), tempNNCSpec)
111117
if err != nil {
118+
// caller will retry to update the CRD again
112119
return err
113120
}
114121

115-
logger.Printf("[ipam-pool-monitor] Increasing pool size, Current Pool Size: %v, Requested IP Count: %v, Pods with IP's:%v, ToBeDeleted Count: %v", len(pm.cns.GetPodIPConfigState()), pm.cachedNNC.Spec.RequestedIPCount, len(pm.cns.GetAllocatedIPConfigs()), len(pm.cachedNNC.Spec.IPsNotInUse))
116-
return pm.rc.UpdateCRDSpec(context.Background(), pm.cachedNNC.Spec)
122+
// save the updated state to cachedSpec
123+
pm.cachedNNC.Spec = tempNNCSpec
124+
return nil
117125
}
118126

119127
func (pm *CNSIPAMPoolMonitor) decreasePoolSize() error {
120128
pm.mu.Lock()
121129
defer pm.mu.Unlock()
122130

123-
// TODO: Better handling here for negatives
124-
pm.cachedNNC.Spec.RequestedIPCount -= pm.scalarUnits.BatchSize
125-
126131
// mark n number of IP's as pending
127132
pendingIpAddresses, err := pm.cns.MarkIPAsPendingRelease(int(pm.scalarUnits.BatchSize))
128133
if err != nil {
129134
return err
130135
}
131136

132-
logger.Printf("[ipam-pool-monitor] Updated Requested count %v, Releasing ips: %+v", pm.cachedNNC.Spec.RequestedIPCount, pendingIpAddresses)
137+
totalIpsSetForRelease := len(pendingIpAddresses)
138+
logger.Printf("[ipam-pool-monitor] Releasing IPCount in this batch %d", totalIpsSetForRelease)
133139

134-
// convert the pending IP addresses to a spec
135-
pm.cachedNNC.Spec, err = pm.createNNCSpecForCRD(false)
140+
var tempNNCSpec nnc.NodeNetworkConfigSpec
141+
tempNNCSpec, err = pm.createNNCSpecForCRD(false)
136142
if err != nil {
137143
return err
138144
}
145+
146+
tempNNCSpec.RequestedIPCount -= int64(totalIpsSetForRelease)
147+
logger.Printf("[ipam-pool-monitor] Decreasing pool size, Current Pool Size: %v, Requested IP Count: %v, Pods with IP's: %v, ToBeDeleted Count: %v", len(pm.cns.GetPodIPConfigState()), tempNNCSpec.RequestedIPCount, len(pm.cns.GetAllocatedIPConfigs()), len(tempNNCSpec.IPsNotInUse))
148+
149+
err = pm.rc.UpdateCRDSpec(context.Background(), tempNNCSpec)
150+
if err != nil {
151+
// caller will retry to update the CRD again
152+
return err
153+
}
154+
155+
// save the updated state to cachedSpec
156+
pm.cachedNNC.Spec = tempNNCSpec
139157
pm.pendingRelease = true
140-
logger.Printf("[ipam-pool-monitor] Decreasing pool size, Current Pool Size: %v, Requested IP Count: %v, Pods with IP's: %v, ToBeDeleted Count: %v", len(pm.cns.GetPodIPConfigState()), pm.cachedNNC.Spec.RequestedIPCount, len(pm.cns.GetAllocatedIPConfigs()), len(pm.cachedNNC.Spec.IPsNotInUse))
141-
return pm.rc.UpdateCRDSpec(context.Background(), pm.cachedNNC.Spec)
158+
return nil
142159
}
143160

144161
// if cns pending ip release map is empty, request controller has already reconciled the CNS state,
@@ -148,13 +165,22 @@ func (pm *CNSIPAMPoolMonitor) cleanPendingRelease() error {
148165
defer pm.mu.Unlock()
149166

150167
var err error
151-
pm.cachedNNC.Spec, err = pm.createNNCSpecForCRD(true)
168+
var tempNNCSpec nnc.NodeNetworkConfigSpec
169+
tempNNCSpec, err = pm.createNNCSpecForCRD(true)
152170
if err != nil {
153-
logger.Printf("[ipam-pool-monitor] Failed to translate ")
171+
return err
154172
}
155173

174+
err = pm.rc.UpdateCRDSpec(context.Background(), tempNNCSpec)
175+
if err != nil {
176+
// caller will retry to update the CRD again
177+
return err
178+
}
179+
180+
// save the updated state to cachedSpec
181+
pm.cachedNNC.Spec = tempNNCSpec
156182
pm.pendingRelease = false
157-
return pm.rc.UpdateCRDSpec(context.Background(), pm.cachedNNC.Spec)
183+
return nil
158184
}
159185

160186
// CNSToCRDSpec translates CNS's map of Ips to be released and requested ip count into a CRD Spec

cns/restserver/ipam.go

Lines changed: 102 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -92,40 +92,57 @@ func (service *HTTPRestService) releaseIPConfigHandler(w http.ResponseWriter, r
9292
return
9393
}
9494

95-
// MarkIPAsPendingRelease will mark IPs to pending release state.
96-
func (service *HTTPRestService) MarkIPAsPendingRelease(numberToMark int) (map[string]cns.IPConfigurationStatus, error) {
97-
allReleasedIPs := make(map[string]cns.IPConfigurationStatus)
98-
// Ensure PendingProgramming IPs will be release before Available ones.
99-
ipStateTypes := [2]string{cns.PendingProgramming, cns.Available}
100-
95+
// MarkIPAsPendingRelease will set the IPs which are in PendingProgramming or Available to PendingRelease state
96+
// It will try to update [totalIpsToRelease] number of ips.
97+
func (service *HTTPRestService) MarkIPAsPendingRelease(totalIpsToRelease int) (map[string]cns.IPConfigurationStatus, error) {
98+
pendingReleasedIps := make(map[string]cns.IPConfigurationStatus)
10199
service.Lock()
102100
defer service.Unlock()
103-
for _, ipStateType := range ipStateTypes {
104-
pendingReleaseIPs := service.markSpecificIPTypeAsPending(numberToMark, ipStateType)
105-
for uuid, pependingReleaseIP := range pendingReleaseIPs {
106-
allReleasedIPs[uuid] = pependingReleaseIP
107-
}
108-
numberToMark -= len(pendingReleaseIPs)
109-
if numberToMark == 0 {
110-
return allReleasedIPs, nil
111-
}
112-
}
113-
return nil, fmt.Errorf("Failed to mark %d IP's as pending, only marked %d IP's", numberToMark, len(allReleasedIPs))
114-
}
115101

116-
func (service *HTTPRestService) markSpecificIPTypeAsPending(numberToMark int, ipStateType string) map[string]cns.IPConfigurationStatus {
117-
pendingReleaseIPs := make(map[string]cns.IPConfigurationStatus)
118-
for uuid, mutableIPConfig := range service.PodIPConfigState {
119-
if mutableIPConfig.State == ipStateType {
120-
mutableIPConfig.State = cns.PendingRelease
121-
service.PodIPConfigState[uuid] = mutableIPConfig
122-
pendingReleaseIPs[uuid] = mutableIPConfig
123-
if len(pendingReleaseIPs) == numberToMark {
124-
return pendingReleaseIPs
102+
for uuid, existingIpConfig := range service.PodIPConfigState {
103+
if existingIpConfig.State == cns.PendingProgramming {
104+
updatedIpConfig, err := service.updateIPConfigState(uuid, cns.PendingRelease, existingIpConfig.OrchestratorContext)
105+
if err != nil {
106+
return nil, err
107+
}
108+
109+
pendingReleasedIps[uuid] = updatedIpConfig
110+
if len(pendingReleasedIps) == totalIpsToRelease {
111+
return pendingReleasedIps, nil
125112
}
126-
}
113+
}
127114
}
128-
return pendingReleaseIPs
115+
116+
// if not all expected IPs are set to PendingRelease, then check the Available IPs
117+
for uuid, existingIpConfig := range service.PodIPConfigState {
118+
if existingIpConfig.State == cns.Available {
119+
updatedIpConfig, err := service.updateIPConfigState(uuid, cns.PendingRelease, existingIpConfig.OrchestratorContext)
120+
if err != nil {
121+
return nil, err
122+
}
123+
124+
pendingReleasedIps[uuid] = updatedIpConfig
125+
126+
if len(pendingReleasedIps) == totalIpsToRelease {
127+
return pendingReleasedIps, nil
128+
}
129+
}
130+
}
131+
132+
logger.Printf("[MarkIPAsPendingRelease] Set total ips to PendingRelease %d, expected %d", len(pendingReleasedIps), totalIpsToRelease)
133+
return pendingReleasedIps, nil
134+
}
135+
136+
func (service *HTTPRestService) updateIPConfigState(ipId string, updatedState string, orchestratorContext json.RawMessage) (cns.IPConfigurationStatus, error) {
137+
if ipConfig, found := service.PodIPConfigState[ipId]; found {
138+
logger.Printf("[updateIPConfigState] Changing IpId [%s] state to [%s], orchestratorContext [%s]. Current config [%+v]", ipId, updatedState, string(orchestratorContext), ipConfig)
139+
ipConfig.State = updatedState
140+
ipConfig.OrchestratorContext = orchestratorContext
141+
service.PodIPConfigState[ipId] = ipConfig
142+
return ipConfig, nil
143+
}
144+
145+
return cns.IPConfigurationStatus{}, fmt.Errorf("[updateIPConfigState] Failed to update state %s for the IPConfig. ID %s not found PodIPConfigState", updatedState, ipId)
129146
}
130147

131148
// MarkIpsAsAvailableUntransacted will update pending programming IPs to available if NMAgent side's programmed nc version keep up with nc version.
@@ -142,12 +159,15 @@ func (service *HTTPRestService) MarkIpsAsAvailableUntransacted(ncID string, newH
142159
if ipConfigStatus, exist := service.PodIPConfigState[uuid]; !exist {
143160
logger.Errorf("IP %s with uuid as %s exist in service state Secondary IP list but can't find in PodIPConfigState", ipConfigStatus.IPAddress, uuid)
144161
} else if ipConfigStatus.State == cns.PendingProgramming && secondaryIPConfigs.NCVersion <= newHostNCVersion {
145-
ipConfigStatus.State = cns.Available
146-
service.PodIPConfigState[uuid] = ipConfigStatus
162+
_, err := service.updateIPConfigState(uuid, cns.Available, nil)
163+
if err != nil {
164+
logger.Errorf("Error updating IPConfig [%+v] state to Available, err: %+v", ipConfigStatus, err)
165+
}
166+
147167
// Following 2 sentence assign new host version to secondary ip config.
148168
secondaryIPConfigs.NCVersion = newHostNCVersion
149169
ncInfo.CreateNetworkContainerRequest.SecondaryIPConfigs[uuid] = secondaryIPConfigs
150-
logger.Printf("Change ip %s with uuid %s from pending programming to %s, current secondary ip configs is %v", ipConfigStatus.IPAddress, uuid, cns.Available,
170+
logger.Printf("Change ip %s with uuid %s from pending programming to %s, current secondary ip configs is %+v", ipConfigStatus.IPAddress, uuid, cns.Available,
151171
ncInfo.CreateNetworkContainerRequest.SecondaryIPConfigs[uuid])
152172
}
153173
}
@@ -273,21 +293,25 @@ func filterIPConfigMap(toBeAdded map[string]cns.IPConfigurationStatus, f func(cn
273293
}
274294

275295
//SetIPConfigAsAllocated takes a lock of the service, and sets the ipconfig in the CNS state as allocated, does not take a lock
276-
func (service *HTTPRestService) setIPConfigAsAllocated(ipconfig cns.IPConfigurationStatus, podInfo cns.KubernetesPodInfo, marshalledOrchestratorContext json.RawMessage) cns.IPConfigurationStatus {
277-
ipconfig.State = cns.Allocated
278-
ipconfig.OrchestratorContext = marshalledOrchestratorContext
296+
func (service *HTTPRestService) setIPConfigAsAllocated(ipconfig cns.IPConfigurationStatus, podInfo cns.KubernetesPodInfo, marshalledOrchestratorContext json.RawMessage) (cns.IPConfigurationStatus, error) {
297+
ipconfig, err := service.updateIPConfigState(ipconfig.ID, cns.Allocated, marshalledOrchestratorContext)
298+
if err != nil {
299+
return cns.IPConfigurationStatus{}, err
300+
}
301+
279302
service.PodIPIDByOrchestratorContext[podInfo.GetOrchestratorContextKey()] = ipconfig.ID
280-
service.PodIPConfigState[ipconfig.ID] = ipconfig
281-
return service.PodIPConfigState[ipconfig.ID]
303+
return ipconfig, nil
282304
}
283305

284306
//SetIPConfigAsAllocated and sets the ipconfig in the CNS state as allocated, does not take a lock
285-
func (service *HTTPRestService) setIPConfigAsAvailable(ipconfig cns.IPConfigurationStatus, podInfo cns.KubernetesPodInfo) cns.IPConfigurationStatus {
286-
ipconfig.State = cns.Available
287-
ipconfig.OrchestratorContext = nil
288-
service.PodIPConfigState[ipconfig.ID] = ipconfig
307+
func (service *HTTPRestService) setIPConfigAsAvailable(ipconfig cns.IPConfigurationStatus, podInfo cns.KubernetesPodInfo) (cns.IPConfigurationStatus, error) {
308+
ipconfig, err := service.updateIPConfigState(ipconfig.ID, cns.Available, nil)
309+
if err != nil {
310+
return cns.IPConfigurationStatus{}, err
311+
}
312+
289313
delete(service.PodIPIDByOrchestratorContext, podInfo.GetOrchestratorContextKey())
290-
return service.PodIPConfigState[ipconfig.ID]
314+
return ipconfig, nil
291315
}
292316

293317
////SetIPConfigAsAllocated takes a lock of the service, and sets the ipconfig in the CNS stateas Available
@@ -300,15 +324,18 @@ func (service *HTTPRestService) releaseIPConfig(podInfo cns.KubernetesPodInfo) e
300324
ipID := service.PodIPIDByOrchestratorContext[podInfo.GetOrchestratorContextKey()]
301325
if ipID != "" {
302326
if ipconfig, isExist := service.PodIPConfigState[ipID]; isExist {
303-
service.setIPConfigAsAvailable(ipconfig, podInfo)
304-
logger.Printf("Released IP %+v for pod %+v", ipconfig.IPAddress, podInfo)
327+
_, err := service.setIPConfigAsAvailable(ipconfig, podInfo)
328+
if err != nil {
329+
return fmt.Errorf("[releaseIPConfig] failed to mark IPConfig [%+v] as Available. err: %v", ipconfig, err)
330+
}
331+
logger.Printf("[releaseIPConfig] Released IP %+v for pod %+v", ipconfig.IPAddress, podInfo)
305332

306333
} else {
307-
logger.Errorf("Failed to get release ipconfig. Pod to IPID exists, but IPID to IPConfig doesn't exist, CNS State potentially corrupt")
308-
return fmt.Errorf("releaseIPConfig failed. Pod to IPID exists, but IPID to IPConfig doesn't exist, CNS State potentially corrupt")
334+
logger.Errorf("[releaseIPConfig] Failed to get release ipconfig. Pod to IPID exists, but IPID to IPConfig doesn't exist, CNS State potentially corrupt")
335+
return fmt.Errorf("[releaseIPConfig] releaseIPConfig failed. Pod to IPID exists, but IPID to IPConfig doesn't exist, CNS State potentially corrupt")
309336
}
310337
} else {
311-
logger.Errorf("SetIPConfigAsAvailable failed to release, no allocation found for pod")
338+
logger.Errorf("[releaseIPConfig] SetIPConfigAsAvailable failed to release, no allocation found for pod [%+v]", podInfo)
312339
return nil
313340
}
314341
return nil
@@ -325,6 +352,7 @@ func (service *HTTPRestService) MarkExistingIPsAsPending(pendingIPIDs []string)
325352
return fmt.Errorf("Failed to mark IP [%v] as pending, currently allocated", id)
326353
}
327354

355+
logger.Printf("[MarkExistingIPsAsPending]: Marking IP [%+v] to PendingRelease", ipconfig)
328356
ipconfig.State = cns.PendingRelease
329357
service.PodIPConfigState[id] = ipconfig
330358
} else {
@@ -363,27 +391,35 @@ func (service *HTTPRestService) AllocateDesiredIPConfig(podInfo cns.KubernetesPo
363391
defer service.Unlock()
364392

365393
found := false
366-
for _, ipState := range service.PodIPConfigState {
367-
if ipState.IPAddress == desiredIPAddress {
368-
if ipState.State == cns.Allocated {
394+
for _, ipConfig := range service.PodIPConfigState {
395+
if ipConfig.IPAddress == desiredIPAddress {
396+
if ipConfig.State == cns.Allocated {
369397
// This IP has already been allocated, if it is allocated to same pod, then return the same
370398
// IPconfiguration
371-
if bytes.Equal(orchestratorContext, ipState.OrchestratorContext) == true {
399+
if bytes.Equal(orchestratorContext, ipConfig.OrchestratorContext) == true {
400+
logger.Printf("[AllocateDesiredIPConfig]: IP Config [%+v] is already allocated to this Pod [%+v]", ipConfig, podInfo)
372401
found = true
373402
} else {
374403
var pInfo cns.KubernetesPodInfo
375-
json.Unmarshal(ipState.OrchestratorContext, &pInfo)
376-
return podIpInfo, fmt.Errorf("Desired IP is already allocated %+v to Pod: %+v, requested for pod %+v", ipState, pInfo, podInfo)
404+
err := json.Unmarshal(ipConfig.OrchestratorContext, &pInfo)
405+
if err != nil {
406+
return podIpInfo, fmt.Errorf("[AllocateDesiredIPConfig] Failed to unmarshal IPState [%+v] OrchestratorContext, err: %v", ipConfig, err)
407+
}
408+
return podIpInfo, fmt.Errorf("[AllocateDesiredIPConfig] Desired IP is already allocated %+v to Pod: %+v, requested for pod %+v", ipConfig, pInfo, podInfo)
409+
}
410+
} else if ipConfig.State == cns.Available {
411+
_, err := service.setIPConfigAsAllocated(ipConfig, podInfo, orchestratorContext)
412+
if err != nil {
413+
return podIpInfo, err
377414
}
378-
} else if ipState.State == cns.Available {
379-
service.setIPConfigAsAllocated(ipState, podInfo, orchestratorContext)
415+
380416
found = true
381417
} else {
382-
return podIpInfo, fmt.Errorf("Desired IP is not available %+v", ipState)
418+
return podIpInfo, fmt.Errorf("[AllocateDesiredIPConfig] Desired IP is not available %+v", ipConfig)
383419
}
384420

385421
if found {
386-
err := service.populateIpConfigInfoUntransacted(ipState, &podIpInfo)
422+
err := service.populateIpConfigInfoUntransacted(ipConfig, &podIpInfo)
387423
return podIpInfo, err
388424
}
389425
}
@@ -399,10 +435,16 @@ func (service *HTTPRestService) AllocateAnyAvailableIPConfig(podInfo cns.Kuberne
399435

400436
for _, ipState := range service.PodIPConfigState {
401437
if ipState.State == cns.Available {
402-
err := service.populateIpConfigInfoUntransacted(ipState, &podIpInfo)
403-
if err == nil {
404-
service.setIPConfigAsAllocated(ipState, podInfo, orchestratorContext)
438+
_, err := service.setIPConfigAsAllocated(ipState, podInfo, orchestratorContext)
439+
if err != nil {
440+
return podIpInfo, err
405441
}
442+
443+
err = service.populateIpConfigInfoUntransacted(ipState, &podIpInfo)
444+
if err != nil {
445+
return podIpInfo, err
446+
}
447+
406448
return podIpInfo, err
407449
}
408450
}

cns/restserver/ipam_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -580,10 +580,10 @@ func TestIPAMMarkIPCountAsPending(t *testing.T) {
580580
t.Fatalf("Unexpected failure releasing IP: %+v", err)
581581
}
582582

583-
// Try to release IP when no IP can be released. It should return error and ips will be nil
583+
// Try to release IP when no IP can be released. It will not return error and return 0 IPs
584584
ips, err = svc.MarkIPAsPendingRelease(1)
585-
if err == nil || ips != nil {
586-
t.Fatalf("We are expecting err and ips should be nil, however, return these IP %v", ips)
585+
if err != nil || len(ips) != 0 {
586+
t.Fatalf("We are not either expecting err [%v] or ips as non empty [%v]", err, ips)
587587
}
588588
}
589589

0 commit comments

Comments
 (0)