Skip to content
142 changes: 0 additions & 142 deletions cns/restserver/internalapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,7 @@ import (
"net/http/httptest"
"net/netip"
"reflect"
"strconv"
"strings"
"time"

"github.com/Azure/azure-container-networking/cns"
"github.com/Azure/azure-container-networking/cns/logger"
Expand Down Expand Up @@ -168,146 +166,6 @@ func (service *HTTPRestService) SyncNodeStatus(dncEP, infraVnet, nodeID string,
return
}

// SyncHostNCVersion will check NC version from NMAgent and save it as host NC version in container status.
// If NMAgent NC version got updated, CNS will refresh the pending programming IP status.
func (service *HTTPRestService) SyncHostNCVersion(ctx context.Context, channelMode string) {
service.Lock()
defer service.Unlock()
start := time.Now()
programmedNCCount, err := service.syncHostNCVersion(ctx, channelMode)
// even if we get an error, we want to write the CNI conflist if we have any NC programmed to any version
if programmedNCCount > 0 {
// This will only be done once per lifetime of the CNS process. This function is threadsafe and will panic
// if it fails, so it is safe to call in a non-preemptable goroutine.
go service.MustGenerateCNIConflistOnce()
}
if err != nil {
logger.Errorf("sync host error %v", err)
}
syncHostNCVersionCount.WithLabelValues(strconv.FormatBool(err == nil)).Inc()
syncHostNCVersionLatency.WithLabelValues(strconv.FormatBool(err == nil)).Observe(time.Since(start).Seconds())
}

var errNonExistentContainerStatus = errors.New("nonExistantContainerstatus")

// syncHostVersion updates the CNS state with the latest programmed versions of NCs attached to the VM. If any NC in local CNS state
// does not match the version that DNC claims to have published, this function will call NMAgent and list the latest programmed versions of
// all NCs and update the CNS state accordingly. This function returns the the total number of NCs on this VM that have been programmed to
// some version, NOT the number of NCs that are up-to-date.
func (service *HTTPRestService) syncHostNCVersion(ctx context.Context, channelMode string) (int, error) {
outdatedNCs := map[string]struct{}{}
programmedNCs := map[string]struct{}{}
for idx := range service.state.ContainerStatus {
// Will open a separate PR to convert all the NC version related variable to int. Change from string to int is a pain.
localNCVersion, err := strconv.Atoi(service.state.ContainerStatus[idx].HostVersion)
if err != nil {
logger.Errorf("Received err when change containerstatus.HostVersion %s to int, err msg %v", service.state.ContainerStatus[idx].HostVersion, err)
continue
}
dncNCVersion, err := strconv.Atoi(service.state.ContainerStatus[idx].CreateNetworkContainerRequest.Version)
if err != nil {
logger.Errorf("Received err when change nc version %s in containerstatus to int, err msg %v", service.state.ContainerStatus[idx].CreateNetworkContainerRequest.Version, err)
continue
}
// host NC version is the NC version from NMAgent, if it's smaller than NC version from DNC, then append it to indicate it needs update.
if localNCVersion < dncNCVersion {
outdatedNCs[service.state.ContainerStatus[idx].ID] = struct{}{}
} else if localNCVersion > dncNCVersion {
logger.Errorf("NC version from NMAgent is larger than DNC, NC version from NMAgent is %d, NC version from DNC is %d", localNCVersion, dncNCVersion)
}

if localNCVersion > -1 {
programmedNCs[service.state.ContainerStatus[idx].ID] = struct{}{}
}
}
if len(outdatedNCs) == 0 {
return len(programmedNCs), nil
}

ncVersionListResp, err := service.nma.GetNCVersionList(ctx)
if err != nil {
return len(programmedNCs), errors.Wrap(err, "failed to get nc version list from nmagent")
}

// Get IMDS NC versions for delegated NIC scenarios
imdsNCVersions, err := service.GetIMDSNCs(ctx)
if err != nil {
// If any of the NMA API check calls, imds calls fails assume that nma build doesn't have the latest changes and create empty map
imdsNCVersions = make(map[string]string)
}

nmaNCs := map[string]string{}
for _, nc := range ncVersionListResp.Containers {
nmaNCs[strings.ToLower(nc.NetworkContainerID)] = nc.Version
}

// Consolidate both nc's from NMA and IMDS calls
nmaProgrammedNCs := make(map[string]string)
for ncID, version := range nmaNCs {
nmaProgrammedNCs[ncID] = version
}
for ncID, version := range imdsNCVersions {
if _, exists := nmaProgrammedNCs[ncID]; !exists {
nmaProgrammedNCs[strings.ToLower(ncID)] = version
} else {
//nolint:staticcheck // SA1019: suppress deprecated logger.Warnf usage. Todo: legacy logger usage is consistent in cns repo. Migrates when all logger usage is migrated
logger.Warnf("NC %s exists in both NMA and IMDS responses, which is not expected", ncID)
}
}
hasNC.Set(float64(len(nmaProgrammedNCs)))
for ncID := range outdatedNCs {
nmaProgrammedNCVersionStr, ok := nmaProgrammedNCs[ncID]
if !ok {
// Neither NMA nor IMDS has this NC that we need programmed yet, bail out
continue
}
nmaProgrammedNCVersion, err := strconv.Atoi(nmaProgrammedNCVersionStr)
if err != nil {
logger.Errorf("failed to parse container version of %s: %s", ncID, err)
continue
}
// Check whether it exist in service state and get the related nc info
ncInfo, exist := service.state.ContainerStatus[ncID]
if !exist {
// if we marked this NC as needs update, but it no longer exists in internal state when we reach
// this point, our internal state has changed unexpectedly and we should bail out and try again.
return len(programmedNCs), errors.Wrapf(errNonExistentContainerStatus, "can't find NC with ID %s in service state, stop updating this host NC version", ncID)
}
// if the NC still exists in state and is programmed to some version (doesn't have to be latest), add it to our set of NCs that have been programmed
if nmaProgrammedNCVersion > -1 {
programmedNCs[ncID] = struct{}{}
}

localNCVersion, err := strconv.Atoi(ncInfo.HostVersion)
if err != nil {
logger.Errorf("failed to parse host nc version string %s: %s", ncInfo.HostVersion, err)
continue
}
if localNCVersion > nmaProgrammedNCVersion {
//nolint:staticcheck // SA1019: suppress deprecated logger.Printf usage. Todo: legacy logger usage is consistent in cns repo. Migrates when all logger usage is migrated
logger.Errorf("NC version from consolidated sources is decreasing: have %d, got %d", localNCVersion, nmaProgrammedNCVersion)
continue
}
if channelMode == cns.CRD {
service.MarkIpsAsAvailableUntransacted(ncInfo.ID, nmaProgrammedNCVersion)
}
//nolint:staticcheck // SA1019: suppress deprecated logger.Printf usage. Todo: legacy logger usage is consistent in cns repo. Migrates when all logger usage is migrated
logger.Printf("Updating NC %s host version from %s to %s", ncID, ncInfo.HostVersion, nmaProgrammedNCVersionStr)
ncInfo.HostVersion = nmaProgrammedNCVersionStr
logger.Printf("Updated NC %s host version to %s", ncID, ncInfo.HostVersion)
service.state.ContainerStatus[ncID] = ncInfo
// if we successfully updated the NC, pop it from the needs update set.
delete(outdatedNCs, ncID)
}
// if we didn't empty out the needs update set, NMA has not programmed all the NCs we are expecting, and we
// need to return an error indicating that
if len(outdatedNCs) > 0 {
return len(programmedNCs), errors.Errorf("unable to update some NCs: %v, missing or bad response from NMA or IMDS", outdatedNCs)
}

return len(programmedNCs), nil
}

func (service *HTTPRestService) ReconcileIPAssignment(podInfoByIP map[string]cns.PodInfo, ncReqs []*cns.CreateNetworkContainerRequest) types.ResponseCode {
// index all the secondary IP configs for all the nc reqs, for easier lookup later on.
allSecIPsIdx := make(map[string]*cns.CreateNetworkContainerRequest)
Expand Down
46 changes: 27 additions & 19 deletions cns/restserver/internalapi_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -446,7 +446,7 @@ func TestSyncHostNCVersionErrorMissingNC(t *testing.T) {
}

// Check that the error message contains the expected text
expectedErrorText := "unable to update some NCs"
expectedErrorText := "Have outdated NCs"
if !strings.Contains(err.Error(), expectedErrorText) {
t.Errorf("Expected error to contain '%s', but got: %v", expectedErrorText, err)
}
Expand Down Expand Up @@ -607,7 +607,7 @@ func TestSyncHostNCVersionIMDSAPIVersionNotSupported(t *testing.T) {
}

// Verify the error is about being unable to update NCs
expectedErrorText := "unable to update some NCs"
expectedErrorText := "Have outdated NCs"
if !strings.Contains(err.Error(), expectedErrorText) {
t.Errorf("Expected error to contain '%s', but got: %v", expectedErrorText, err)
}
Expand Down Expand Up @@ -1345,6 +1345,11 @@ func (m *mockCNIConflistGenerator) getGeneratedCount() int {
return m.generatedCount
}

var fastcnsconf = configuration.CNSConfig{
SyncHostNCVersionIntervalMs: 100,
ChannelMode: cns.CRD,
}

// TestCNIConflistGenerationNewNC tests that discovering a new programmed NC in CNS state will trigger CNI conflist generation
func TestCNIConflistGenerationNewNC(t *testing.T) {
ncID := "some-new-nc" //nolint:goconst // value not shared across tests, can change without issue
Expand Down Expand Up @@ -1380,9 +1385,10 @@ func TestCNIConflistGenerationNewNC(t *testing.T) {
imdsClient: fakes.NewMockIMDSClient(),
}

service.SyncHostNCVersion(context.Background(), cns.CRD)
// CNI conflist gen happens in goroutine so sleep for a second to let it run
time.Sleep(time.Second)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
service.StartSyncHostNCVersionLoop(ctx, fastcnsconf)
service.Wait(ctx)
assert.Equal(t, 1, mockgen.getGeneratedCount())
}

Expand Down Expand Up @@ -1421,9 +1427,10 @@ func TestCNIConflistGenerationExistingNC(t *testing.T) {
imdsClient: fakes.NewMockIMDSClient(),
}

service.SyncHostNCVersion(context.Background(), cns.CRD)
// CNI conflist gen happens in goroutine so sleep for a second to let it run
time.Sleep(time.Second)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
service.StartSyncHostNCVersionLoop(ctx, fastcnsconf)
service.Wait(ctx)
assert.Equal(t, 1, mockgen.getGeneratedCount())
}

Expand Down Expand Up @@ -1463,12 +1470,11 @@ func TestCNIConflistGenerationNewNCTwice(t *testing.T) {
imdsClient: fakes.NewMockIMDSClient(),
}

service.SyncHostNCVersion(context.Background(), cns.CRD)
// CNI conflist gen happens in goroutine so sleep for a second to let it run
time.Sleep(time.Second)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
service.StartSyncHostNCVersionLoop(ctx, fastcnsconf)
service.Wait(ctx)
assert.Equal(t, 1, mockgen.getGeneratedCount())

service.SyncHostNCVersion(context.Background(), cns.CRD)
// CNI conflist gen happens in goroutine so sleep for a second to let it run
time.Sleep(time.Second)
assert.Equal(t, 1, mockgen.getGeneratedCount()) // should still be one
Expand Down Expand Up @@ -1502,9 +1508,10 @@ func TestCNIConflistNotGenerated(t *testing.T) {
imdsClient: fakes.NewMockIMDSClient(),
}

service.SyncHostNCVersion(context.Background(), cns.CRD)
// CNI conflist gen happens in goroutine so sleep for a second to let it run
time.Sleep(time.Second)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
service.StartSyncHostNCVersionLoop(ctx, fastcnsconf)
service.Wait(ctx)
assert.Equal(t, 0, mockgen.getGeneratedCount())
}

Expand Down Expand Up @@ -1545,9 +1552,10 @@ func TestCNIConflistGenerationOnNMAError(t *testing.T) {
imdsClient: fakes.NewMockIMDSClient(),
}

service.SyncHostNCVersion(context.Background(), cns.CRD)
// CNI conflist gen happens in goroutine so sleep for a second to let it run
time.Sleep(time.Second)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
service.StartSyncHostNCVersionLoop(ctx, fastcnsconf)
service.Wait(ctx)
assert.Equal(t, 1, mockgen.getGeneratedCount())
}

Expand Down
10 changes: 6 additions & 4 deletions cns/restserver/nodesubnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,7 @@ func (service *HTTPRestService) UpdateIPsForNodeSubnet(secondaryIPs []netip.Addr

logger.Debugf("IP change processed successfully")

// saved NC successfully. UpdateIPsForNodeSubnet is called only when IPs are fetched from NMAgent.
// We now have IPs to serve IPAM requests. Generate conflist to indicate CNS is ready
service.MustGenerateCNIConflistOnce()
service.ncSyncState.NotifyReady()
return nil
}

Expand Down Expand Up @@ -59,6 +57,10 @@ func (service *HTTPRestService) InitializeNodeSubnet(ctx context.Context, podInf

// StartNodeSubnet starts the IP fetcher for NodeSubnet. This will cause secondary IPs to be fetched periodically.
// After the first successful fetch, conflist will be generated to indicate CNS is ready.
func (service *HTTPRestService) StartNodeSubnet(ctx context.Context) {
func (service *HTTPRestService) StartNodeSubnet(ctx context.Context) error {
if err := service.ncSyncState.Start(); err != nil {
return err
}
service.nodesubnetIPFetcher.Start(ctx)
return nil
}
12 changes: 4 additions & 8 deletions cns/restserver/nodesubnet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,19 +96,15 @@ func TestNodeSubnet(t *testing.T) {

checkIPassignment(t, service, expectedIPs)

service.StartNodeSubnet(ctx)
if err := service.StartNodeSubnet(ctx); err != nil {
t.Fatalf("StartNodeSubnet returned an error: %v", err)
}

if service.GetNodesubnetIPFetcher() == nil {
t.Fatal("NodeSubnetIPFetcher is not initialized")
}

select {
case <-ctx.Done():
t.Errorf("test context done - %s", ctx.Err())
return
case <-mockCNIConflistGenerator.GenerateCalled:
break
}
service.Wait(ctx)

expectedIPs["10.0.0.45"] = types.Available
checkIPassignment(t, service, expectedIPs)
Expand Down
32 changes: 17 additions & 15 deletions cns/restserver/restserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,11 +97,11 @@ type HTTPRestService struct {
EndpointState map[string]*EndpointInfo // key : container id
EndpointStateStore store.KeyValueStore
cniConflistGenerator CNIConflistGenerator
generateCNIConflistOnce sync.Once
IPConfigsHandlerMiddleware cns.IPConfigsHandlerMiddleware
PnpIDByMacAddress map[string]string
imdsClient imdsClient
nodesubnetIPFetcher *nodesubnet.IPFetcher
ncSyncState networkContainerSyncState
}

type CNIConflistGenerator interface {
Expand Down Expand Up @@ -382,20 +382,22 @@ func (service *HTTPRestService) Stop() {
logger.Printf("[Azure CNS] Service stopped.")
}

// MustGenerateCNIConflistOnce will generate the CNI conflist once if the service was initialized with
// a conflist generator. If not, this is a no-op.
func (service *HTTPRestService) MustGenerateCNIConflistOnce() {
service.generateCNIConflistOnce.Do(func() {
if err := service.cniConflistGenerator.Generate(); err != nil {
panic("unable to generate cni conflist with error: " + err.Error())
}

if err := service.cniConflistGenerator.Close(); err != nil {
panic("unable to close the cni conflist output stream: " + err.Error())
}
})
}

func (service *HTTPRestService) AttachIPConfigsHandlerMiddleware(middleware cns.IPConfigsHandlerMiddleware) {
service.IPConfigsHandlerMiddleware = middleware
}

// Wait waits for nc sync state then writes out the conflist.
func (service *HTTPRestService) Wait(ctx context.Context) {
service.ncSyncState.Wait(ctx)
if ctx.Err() != nil {
logger.Printf("Context done before writing out conflist: %v", ctx.Err())
return
}
if err := service.cniConflistGenerator.Generate(); err != nil {
panic("unable to generate cni conflist with error: " + err.Error())
}

if err := service.cniConflistGenerator.Close(); err != nil {
panic("unable to close the cni conflist output stream: " + err.Error())
}
}
Loading
Loading