diff --git a/cns/restserver/internalapi.go b/cns/restserver/internalapi.go index 9f855ec64f..5a089f58cc 100644 --- a/cns/restserver/internalapi.go +++ b/cns/restserver/internalapi.go @@ -13,9 +13,7 @@ import ( "net/http/httptest" "net/netip" "reflect" - "strconv" "strings" - "time" "github.com/Azure/azure-container-networking/cns" "github.com/Azure/azure-container-networking/cns/logger" @@ -168,146 +166,6 @@ func (service *HTTPRestService) SyncNodeStatus(dncEP, infraVnet, nodeID string, return } -// SyncHostNCVersion will check NC version from NMAgent and save it as host NC version in container status. -// If NMAgent NC version got updated, CNS will refresh the pending programming IP status. -func (service *HTTPRestService) SyncHostNCVersion(ctx context.Context, channelMode string) { - service.Lock() - defer service.Unlock() - start := time.Now() - programmedNCCount, err := service.syncHostNCVersion(ctx, channelMode) - // even if we get an error, we want to write the CNI conflist if we have any NC programmed to any version - if programmedNCCount > 0 { - // This will only be done once per lifetime of the CNS process. This function is threadsafe and will panic - // if it fails, so it is safe to call in a non-preemptable goroutine. - go service.MustGenerateCNIConflistOnce() - } - if err != nil { - logger.Errorf("sync host error %v", err) - } - syncHostNCVersionCount.WithLabelValues(strconv.FormatBool(err == nil)).Inc() - syncHostNCVersionLatency.WithLabelValues(strconv.FormatBool(err == nil)).Observe(time.Since(start).Seconds()) -} - -var errNonExistentContainerStatus = errors.New("nonExistantContainerstatus") - -// syncHostVersion updates the CNS state with the latest programmed versions of NCs attached to the VM. If any NC in local CNS state -// does not match the version that DNC claims to have published, this function will call NMAgent and list the latest programmed versions of -// all NCs and update the CNS state accordingly. This function returns the the total number of NCs on this VM that have been programmed to -// some version, NOT the number of NCs that are up-to-date. -func (service *HTTPRestService) syncHostNCVersion(ctx context.Context, channelMode string) (int, error) { - outdatedNCs := map[string]struct{}{} - programmedNCs := map[string]struct{}{} - for idx := range service.state.ContainerStatus { - // Will open a separate PR to convert all the NC version related variable to int. Change from string to int is a pain. - localNCVersion, err := strconv.Atoi(service.state.ContainerStatus[idx].HostVersion) - if err != nil { - logger.Errorf("Received err when change containerstatus.HostVersion %s to int, err msg %v", service.state.ContainerStatus[idx].HostVersion, err) - continue - } - dncNCVersion, err := strconv.Atoi(service.state.ContainerStatus[idx].CreateNetworkContainerRequest.Version) - if err != nil { - logger.Errorf("Received err when change nc version %s in containerstatus to int, err msg %v", service.state.ContainerStatus[idx].CreateNetworkContainerRequest.Version, err) - continue - } - // host NC version is the NC version from NMAgent, if it's smaller than NC version from DNC, then append it to indicate it needs update. - if localNCVersion < dncNCVersion { - outdatedNCs[service.state.ContainerStatus[idx].ID] = struct{}{} - } else if localNCVersion > dncNCVersion { - logger.Errorf("NC version from NMAgent is larger than DNC, NC version from NMAgent is %d, NC version from DNC is %d", localNCVersion, dncNCVersion) - } - - if localNCVersion > -1 { - programmedNCs[service.state.ContainerStatus[idx].ID] = struct{}{} - } - } - if len(outdatedNCs) == 0 { - return len(programmedNCs), nil - } - - ncVersionListResp, err := service.nma.GetNCVersionList(ctx) - if err != nil { - return len(programmedNCs), errors.Wrap(err, "failed to get nc version list from nmagent") - } - - // Get IMDS NC versions for delegated NIC scenarios - imdsNCVersions, err := service.GetIMDSNCs(ctx) - if err != nil { - // If any of the NMA API check calls, imds calls fails assume that nma build doesn't have the latest changes and create empty map - imdsNCVersions = make(map[string]string) - } - - nmaNCs := map[string]string{} - for _, nc := range ncVersionListResp.Containers { - nmaNCs[strings.ToLower(nc.NetworkContainerID)] = nc.Version - } - - // Consolidate both nc's from NMA and IMDS calls - nmaProgrammedNCs := make(map[string]string) - for ncID, version := range nmaNCs { - nmaProgrammedNCs[ncID] = version - } - for ncID, version := range imdsNCVersions { - if _, exists := nmaProgrammedNCs[ncID]; !exists { - nmaProgrammedNCs[strings.ToLower(ncID)] = version - } else { - //nolint:staticcheck // SA1019: suppress deprecated logger.Warnf usage. Todo: legacy logger usage is consistent in cns repo. Migrates when all logger usage is migrated - logger.Warnf("NC %s exists in both NMA and IMDS responses, which is not expected", ncID) - } - } - hasNC.Set(float64(len(nmaProgrammedNCs))) - for ncID := range outdatedNCs { - nmaProgrammedNCVersionStr, ok := nmaProgrammedNCs[ncID] - if !ok { - // Neither NMA nor IMDS has this NC that we need programmed yet, bail out - continue - } - nmaProgrammedNCVersion, err := strconv.Atoi(nmaProgrammedNCVersionStr) - if err != nil { - logger.Errorf("failed to parse container version of %s: %s", ncID, err) - continue - } - // Check whether it exist in service state and get the related nc info - ncInfo, exist := service.state.ContainerStatus[ncID] - if !exist { - // if we marked this NC as needs update, but it no longer exists in internal state when we reach - // this point, our internal state has changed unexpectedly and we should bail out and try again. - return len(programmedNCs), errors.Wrapf(errNonExistentContainerStatus, "can't find NC with ID %s in service state, stop updating this host NC version", ncID) - } - // if the NC still exists in state and is programmed to some version (doesn't have to be latest), add it to our set of NCs that have been programmed - if nmaProgrammedNCVersion > -1 { - programmedNCs[ncID] = struct{}{} - } - - localNCVersion, err := strconv.Atoi(ncInfo.HostVersion) - if err != nil { - logger.Errorf("failed to parse host nc version string %s: %s", ncInfo.HostVersion, err) - continue - } - if localNCVersion > nmaProgrammedNCVersion { - //nolint:staticcheck // SA1019: suppress deprecated logger.Printf usage. Todo: legacy logger usage is consistent in cns repo. Migrates when all logger usage is migrated - logger.Errorf("NC version from consolidated sources is decreasing: have %d, got %d", localNCVersion, nmaProgrammedNCVersion) - continue - } - if channelMode == cns.CRD { - service.MarkIpsAsAvailableUntransacted(ncInfo.ID, nmaProgrammedNCVersion) - } - //nolint:staticcheck // SA1019: suppress deprecated logger.Printf usage. Todo: legacy logger usage is consistent in cns repo. Migrates when all logger usage is migrated - logger.Printf("Updating NC %s host version from %s to %s", ncID, ncInfo.HostVersion, nmaProgrammedNCVersionStr) - ncInfo.HostVersion = nmaProgrammedNCVersionStr - logger.Printf("Updated NC %s host version to %s", ncID, ncInfo.HostVersion) - service.state.ContainerStatus[ncID] = ncInfo - // if we successfully updated the NC, pop it from the needs update set. - delete(outdatedNCs, ncID) - } - // if we didn't empty out the needs update set, NMA has not programmed all the NCs we are expecting, and we - // need to return an error indicating that - if len(outdatedNCs) > 0 { - return len(programmedNCs), errors.Errorf("unable to update some NCs: %v, missing or bad response from NMA or IMDS", outdatedNCs) - } - - return len(programmedNCs), nil -} - func (service *HTTPRestService) ReconcileIPAssignment(podInfoByIP map[string]cns.PodInfo, ncReqs []*cns.CreateNetworkContainerRequest) types.ResponseCode { // index all the secondary IP configs for all the nc reqs, for easier lookup later on. allSecIPsIdx := make(map[string]*cns.CreateNetworkContainerRequest) diff --git a/cns/restserver/internalapi_test.go b/cns/restserver/internalapi_test.go index 440e3b4e61..b31f226f95 100644 --- a/cns/restserver/internalapi_test.go +++ b/cns/restserver/internalapi_test.go @@ -304,7 +304,7 @@ func TestCreateAndUpdateNCWithSecondaryIPNCVersion(t *testing.T) { } } -func TestSyncHostNCVersion(t *testing.T) { +func TestStartSyncHostNCVersionLoop(t *testing.T) { // cns.KubernetesCRD has one more logic compared to other orchestrator type, so test both of them orchestratorTypes := []string{cns.Kubernetes, cns.KubernetesCRD} for _, orchestratorType := range orchestratorTypes { @@ -349,7 +349,13 @@ func TestSyncHostNCVersion(t *testing.T) { defer cleanupIMDS() // When syncing the host NC version, it will use the orchestratorType passed in. - svc.SyncHostNCVersion(context.Background(), orchestratorType) + cnsconf := configuration.CNSConfig{ + SyncHostNCVersionIntervalMs: 100, + ChannelMode: orchestratorType, + } + err := svc.StartSyncHostNCVersionLoop(t.Context(), cnsconf) + assert.NoError(t, err) + svc.Wait(t.Context()) // wait for at leat one run containerStatus = svc.state.ContainerStatus[req.NetworkContainerid] if containerStatus.HostVersion != "0" { t.Errorf("Unexpected containerStatus.HostVersion %s, expected host version should be 0 in string", containerStatus.HostVersion) @@ -400,7 +406,9 @@ func TestPendingIPsGotUpdatedWhenSyncHostNCVersion(t *testing.T) { cleanup := setMockNMAgent(svc, mnma) defer cleanup() - svc.SyncHostNCVersion(context.Background(), cns.CRD) + err := svc.StartSyncHostNCVersionLoop(t.Context(), fastcnsconf) + assert.NoError(t, err) + svc.Wait(t.Context()) // wait for at leat one run containerStatus = svc.state.ContainerStatus[req.NetworkContainerid] receivedSecondaryIPConfigs = containerStatus.CreateNetworkContainerRequest.SecondaryIPConfigs @@ -440,13 +448,13 @@ func TestSyncHostNCVersionErrorMissingNC(t *testing.T) { cleanup := setMockNMAgent(svc, mnma) defer cleanup() - _, err := svc.syncHostNCVersion(context.Background(), cns.KubernetesCRD) + _, err := svc.syncHostNCVersion(t.Context(), cns.KubernetesCRD) if err == nil { t.Errorf("Expected error when NC is missing from both NMAgent and IMDS, but got nil") } // Check that the error message contains the expected text - expectedErrorText := "unable to update some NCs" + expectedErrorText := "Have outdated NCs" if !strings.Contains(err.Error(), expectedErrorText) { t.Errorf("Expected error to contain '%s', but got: %v", expectedErrorText, err) } @@ -485,7 +493,7 @@ func TestSyncHostNCVersionLocalVersionHigher(t *testing.T) { cleanup := setMockNMAgent(svc, mnma) defer cleanup() - _, err := svc.syncHostNCVersion(context.Background(), cns.KubernetesCRD) + _, err := svc.syncHostNCVersion(t.Context(), cns.KubernetesCRD) if err != nil { t.Errorf("Expected sync to succeed, but got error: %v", err) } @@ -528,7 +536,7 @@ func TestSyncHostNCVersionLocalHigherThanDNC(t *testing.T) { // This should detect that localNCVersion (3) > dncNCVersion (1) and log error // but since there are no outdated NCs, it should return successfully - _, err := svc.syncHostNCVersion(context.Background(), cns.KubernetesCRD) + _, err := svc.syncHostNCVersion(t.Context(), cns.KubernetesCRD) if err != nil { t.Errorf("Expected no error when localNCVersion > dncNCVersion (no outdated NCs), but got: %v", err) } @@ -601,13 +609,13 @@ func TestSyncHostNCVersionIMDSAPIVersionNotSupported(t *testing.T) { defer func() { svc.imdsClient = originalIMDS }() // Test should fail because of outdated IMDS NC that can't be updated - _, err := svc.syncHostNCVersion(context.Background(), orchestratorType) + _, err := svc.syncHostNCVersion(t.Context(), orchestratorType) if err == nil { t.Errorf("Expected error when there are outdated IMDS NCs but API version is not supported, but got nil") } // Verify the error is about being unable to update NCs - expectedErrorText := "unable to update some NCs" + expectedErrorText := "Have outdated NCs" if !strings.Contains(err.Error(), expectedErrorText) { t.Errorf("Expected error to contain '%s', but got: %v", expectedErrorText, err) } @@ -1345,6 +1353,11 @@ func (m *mockCNIConflistGenerator) getGeneratedCount() int { return m.generatedCount } +var fastcnsconf = configuration.CNSConfig{ + SyncHostNCVersionIntervalMs: 100, + ChannelMode: cns.CRD, +} + // TestCNIConflistGenerationNewNC tests that discovering a new programmed NC in CNS state will trigger CNI conflist generation func TestCNIConflistGenerationNewNC(t *testing.T) { ncID := "some-new-nc" //nolint:goconst // value not shared across tests, can change without issue @@ -1380,9 +1393,11 @@ func TestCNIConflistGenerationNewNC(t *testing.T) { imdsClient: fakes.NewMockIMDSClient(), } - service.SyncHostNCVersion(context.Background(), cns.CRD) - // CNI conflist gen happens in goroutine so sleep for a second to let it run - time.Sleep(time.Second) + ctx, cancel := context.WithTimeout(t.Context(), time.Second*5) + defer cancel() + err := service.StartSyncHostNCVersionLoop(ctx, fastcnsconf) + assert.NoError(t, err) + service.Wait(ctx) assert.Equal(t, 1, mockgen.getGeneratedCount()) } @@ -1421,9 +1436,11 @@ func TestCNIConflistGenerationExistingNC(t *testing.T) { imdsClient: fakes.NewMockIMDSClient(), } - service.SyncHostNCVersion(context.Background(), cns.CRD) - // CNI conflist gen happens in goroutine so sleep for a second to let it run - time.Sleep(time.Second) + ctx, cancel := context.WithTimeout(t.Context(), time.Second*5) + defer cancel() + err := service.StartSyncHostNCVersionLoop(ctx, fastcnsconf) + assert.NoError(t, err) + service.Wait(ctx) assert.Equal(t, 1, mockgen.getGeneratedCount()) } @@ -1463,12 +1480,12 @@ func TestCNIConflistGenerationNewNCTwice(t *testing.T) { imdsClient: fakes.NewMockIMDSClient(), } - service.SyncHostNCVersion(context.Background(), cns.CRD) - // CNI conflist gen happens in goroutine so sleep for a second to let it run - time.Sleep(time.Second) + ctx, cancel := context.WithTimeout(t.Context(), time.Second*5) + defer cancel() + err := service.StartSyncHostNCVersionLoop(ctx, fastcnsconf) + assert.NoError(t, err) + service.Wait(ctx) assert.Equal(t, 1, mockgen.getGeneratedCount()) - - service.SyncHostNCVersion(context.Background(), cns.CRD) // CNI conflist gen happens in goroutine so sleep for a second to let it run time.Sleep(time.Second) assert.Equal(t, 1, mockgen.getGeneratedCount()) // should still be one @@ -1502,9 +1519,11 @@ func TestCNIConflistNotGenerated(t *testing.T) { imdsClient: fakes.NewMockIMDSClient(), } - service.SyncHostNCVersion(context.Background(), cns.CRD) - // CNI conflist gen happens in goroutine so sleep for a second to let it run - time.Sleep(time.Second) + ctx, cancel := context.WithTimeout(t.Context(), time.Second*5) + defer cancel() + err := service.StartSyncHostNCVersionLoop(ctx, fastcnsconf) + assert.NoError(t, err) + service.Wait(ctx) assert.Equal(t, 0, mockgen.getGeneratedCount()) } @@ -1545,9 +1564,11 @@ func TestCNIConflistGenerationOnNMAError(t *testing.T) { imdsClient: fakes.NewMockIMDSClient(), } - service.SyncHostNCVersion(context.Background(), cns.CRD) - // CNI conflist gen happens in goroutine so sleep for a second to let it run - time.Sleep(time.Second) + ctx, cancel := context.WithTimeout(t.Context(), time.Second*5) + defer cancel() + err := service.StartSyncHostNCVersionLoop(ctx, fastcnsconf) + assert.NoError(t, err) + service.Wait(ctx) assert.Equal(t, 1, mockgen.getGeneratedCount()) } diff --git a/cns/restserver/nodesubnet.go b/cns/restserver/nodesubnet.go index 177d4266bc..74e910e8dd 100644 --- a/cns/restserver/nodesubnet.go +++ b/cns/restserver/nodesubnet.go @@ -29,9 +29,7 @@ func (service *HTTPRestService) UpdateIPsForNodeSubnet(secondaryIPs []netip.Addr logger.Debugf("IP change processed successfully") - // saved NC successfully. UpdateIPsForNodeSubnet is called only when IPs are fetched from NMAgent. - // We now have IPs to serve IPAM requests. Generate conflist to indicate CNS is ready - service.MustGenerateCNIConflistOnce() + service.ncWait.Done() return nil } @@ -59,6 +57,10 @@ func (service *HTTPRestService) InitializeNodeSubnet(ctx context.Context, podInf // StartNodeSubnet starts the IP fetcher for NodeSubnet. This will cause secondary IPs to be fetched periodically. // After the first successful fetch, conflist will be generated to indicate CNS is ready. -func (service *HTTPRestService) StartNodeSubnet(ctx context.Context) { +func (service *HTTPRestService) StartNodeSubnet(ctx context.Context) error { + if err := service.ncWait.Start(); err != nil { + return err + } service.nodesubnetIPFetcher.Start(ctx) + return nil } diff --git a/cns/restserver/nodesubnet_test.go b/cns/restserver/nodesubnet_test.go index 361f5d005b..2e935fe88f 100644 --- a/cns/restserver/nodesubnet_test.go +++ b/cns/restserver/nodesubnet_test.go @@ -96,19 +96,15 @@ func TestNodeSubnet(t *testing.T) { checkIPassignment(t, service, expectedIPs) - service.StartNodeSubnet(ctx) + if err := service.StartNodeSubnet(ctx); err != nil { + t.Fatalf("StartNodeSubnet returned an error: %v", err) + } if service.GetNodesubnetIPFetcher() == nil { t.Fatal("NodeSubnetIPFetcher is not initialized") } - select { - case <-ctx.Done(): - t.Errorf("test context done - %s", ctx.Err()) - return - case <-mockCNIConflistGenerator.GenerateCalled: - break - } + service.Wait(ctx) expectedIPs["10.0.0.45"] = types.Available checkIPassignment(t, service, expectedIPs) diff --git a/cns/restserver/restserver.go b/cns/restserver/restserver.go index f14c69d12c..77236d19c1 100644 --- a/cns/restserver/restserver.go +++ b/cns/restserver/restserver.go @@ -97,11 +97,11 @@ type HTTPRestService struct { EndpointState map[string]*EndpointInfo // key : container id EndpointStateStore store.KeyValueStore cniConflistGenerator CNIConflistGenerator - generateCNIConflistOnce sync.Once IPConfigsHandlerMiddleware cns.IPConfigsHandlerMiddleware PnpIDByMacAddress map[string]string imdsClient imdsClient nodesubnetIPFetcher *nodesubnet.IPFetcher + ncWait ncWait } type CNIConflistGenerator interface { @@ -382,20 +382,22 @@ func (service *HTTPRestService) Stop() { logger.Printf("[Azure CNS] Service stopped.") } -// MustGenerateCNIConflistOnce will generate the CNI conflist once if the service was initialized with -// a conflist generator. If not, this is a no-op. -func (service *HTTPRestService) MustGenerateCNIConflistOnce() { - service.generateCNIConflistOnce.Do(func() { - if err := service.cniConflistGenerator.Generate(); err != nil { - panic("unable to generate cni conflist with error: " + err.Error()) - } - - if err := service.cniConflistGenerator.Close(); err != nil { - panic("unable to close the cni conflist output stream: " + err.Error()) - } - }) -} - func (service *HTTPRestService) AttachIPConfigsHandlerMiddleware(middleware cns.IPConfigsHandlerMiddleware) { service.IPConfigsHandlerMiddleware = middleware } + +// Wait waits for nc sync state then writes out the conflist. +func (service *HTTPRestService) Wait(ctx context.Context) { + service.ncWait.Wait(ctx) + if ctx.Err() != nil { + logger.Printf("Context done before writing out conflist: %v", ctx.Err()) + return + } + if err := service.cniConflistGenerator.Generate(); err != nil { + panic("unable to generate cni conflist with error: " + err.Error()) + } + + if err := service.cniConflistGenerator.Close(); err != nil { + panic("unable to close the cni conflist output stream: " + err.Error()) + } +} diff --git a/cns/restserver/synchostnc.go b/cns/restserver/synchostnc.go new file mode 100644 index 0000000000..5592a71ef7 --- /dev/null +++ b/cns/restserver/synchostnc.go @@ -0,0 +1,228 @@ +// Copyright 2017 Microsoft. All rights reserved. +// MIT License + +package restserver + +import ( + "context" + "strconv" + "strings" + "sync" + "sync/atomic" + "time" + + "github.com/Azure/azure-container-networking/cns" + "github.com/Azure/azure-container-networking/cns/configuration" + "github.com/Azure/azure-container-networking/cns/logger" + "github.com/pkg/errors" +) + +// TODO: make this file a sub pacakge? + +// ncWait manages making sure ncstate or nodesubnet state is ready before conflist writing +// Basically a wait group that can only be added once and waits with a context +// meant to be used uninitialized and then started once the sync loop begins. +type ncWait struct { + wg sync.WaitGroup + started atomic.Bool +} + +// Start is like Add except it only allows being called once. +func (n *ncWait) Start() error { + if !n.started.CompareAndSwap(false, true) { + return errors.New("sync loop already started") + } + n.wg.Add(1) + return nil +} + +// Done is like waitgroup Done but will ignore if Start was never called. +func (n *ncWait) Done() { + if !n.started.Load() { + return //nobody ever set this up just move on. + } + n.wg.Done() +} + +// Wait waits for the CNI conflist to be ready or for the context to be done. +func (n *ncWait) Wait(ctx context.Context) { + done := make(chan struct{}) + go func() { + n.wg.Wait() //still fine to wait even if never started will just return immediately + close(done) + }() + + select { + case <-done: + case <-ctx.Done(): + } +} + +// StartSyncHostNCVersionLoop loops until checking htat NCS are programmed annd also notifis when at least one has been programmed +// so we can write conflist and mark cns ready. +func (service *HTTPRestService) StartSyncHostNCVersionLoop(ctx context.Context, cnsconfig configuration.CNSConfig) error { + if err := service.ncWait.Start(); err != nil { + return err + } + go func() { + var one sync.Once + logger.Printf("Starting SyncHostNCVersion loop.") + // Periodically poll vfp programmed NC version from NMAgent + ticker := time.NewTicker(time.Duration(cnsconfig.SyncHostNCVersionIntervalMs) * time.Millisecond) + timeout := time.Duration(cnsconfig.SyncHostNCVersionIntervalMs) * time.Millisecond + for { + if service.syncHostNCVersionWrapper(ctx, cnsconfig.ChannelMode, timeout) { + one.Do(service.ncWait.Done) + } + select { + case <-ticker.C: + if service.syncHostNCVersionWrapper(ctx, cnsconfig.ChannelMode, timeout) { + one.Do(service.ncWait.Done) + } + case <-ctx.Done(): + logger.Printf("Stopping SyncHostNCVersion loop.") + return + } + } + }() + return nil +} + +// syncHostNCVersionWrapper bascially calls syncHostNCVersion but wraps it with locks a timeout and logges erros (but doesn't fail). +// Mostly exists so StartSyncHostNCVersionLoop doesn't have to repeat itself to be a do/while loop +func (service *HTTPRestService) syncHostNCVersionWrapper(ctx context.Context, channelMode string, timeout time.Duration) bool { + timedCtx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + service.Lock() + defer service.Unlock() + start := time.Now() + programmedNCCount, err := service.syncHostNCVersion(timedCtx, channelMode) + if err != nil { + logger.Errorf("sync host error %v", err) + } + + syncHostNCVersionCount.WithLabelValues(strconv.FormatBool(err == nil)).Inc() + //does not include time to write out conflist. + syncHostNCVersionLatency.WithLabelValues(strconv.FormatBool(err == nil)).Observe(time.Since(start).Seconds()) + return programmedNCCount > 0 +} + +var errNonExistentContainerStatus = errors.New("nonExistantContainerstatus") + +// syncHostVersion updates the CNS state with the latest programmed versions of NCs attached to the VM. If any NC in local CNS state +// does not match the version that DNC claims to have published, this function will call NMAgent and list the latest programmed versions of +// all NCs and update the CNS state accordingly. This function returns the the total number of NCs on this VM that have been programmed to +// some version, NOT the number of NCs that are up-to-date. +func (service *HTTPRestService) syncHostNCVersion(ctx context.Context, channelMode string) (int, error) { + outdatedNCs := map[string]struct{}{} + programmedNCs := map[string]struct{}{} + for idx := range service.state.ContainerStatus { + // Will open a separate PR to convert all the NC version related variable to int. Change from string to int is a pain. + localNCVersion, err := strconv.Atoi(service.state.ContainerStatus[idx].HostVersion) + if err != nil { + logger.Errorf("Received err when change containerstatus.HostVersion %s to int, err msg %v", service.state.ContainerStatus[idx].HostVersion, err) + continue + } + dncNCVersion, err := strconv.Atoi(service.state.ContainerStatus[idx].CreateNetworkContainerRequest.Version) + if err != nil { + logger.Errorf("Received err when change nc version %s in containerstatus to int, err msg %v", service.state.ContainerStatus[idx].CreateNetworkContainerRequest.Version, err) + continue + } + // host NC version is the NC version from NMAgent, if it's smaller than NC version from DNC, then append it to indicate it needs update. + if localNCVersion < dncNCVersion { + outdatedNCs[service.state.ContainerStatus[idx].ID] = struct{}{} + } else if localNCVersion > dncNCVersion { + logger.Errorf("NC version from NMAgent is larger than DNC, NC version from NMAgent is %d, NC version from DNC is %d", localNCVersion, dncNCVersion) + } + + if localNCVersion > -1 { + programmedNCs[service.state.ContainerStatus[idx].ID] = struct{}{} + } + } + if len(outdatedNCs) == 0 { + return len(programmedNCs), nil + } + + ncVersionListResp, err := service.nma.GetNCVersionList(ctx) + if err != nil { + return len(programmedNCs), errors.Wrap(err, "failed to get nc version list from nmagent") + } + + // Get IMDS NC versions for delegated NIC scenarios + imdsNCVersions, err := service.GetIMDSNCs(ctx) + if err != nil { + // If any of the NMA API check calls, imds calls fails assume that nma build doesn't have the latest changes and create empty map + imdsNCVersions = make(map[string]string) + } + + nmaNCs := map[string]string{} + for _, nc := range ncVersionListResp.Containers { + nmaNCs[strings.ToLower(nc.NetworkContainerID)] = nc.Version + } + + // Consolidate both nc's from NMA and IMDS calls + nmaProgrammedNCs := make(map[string]string) + for ncID, version := range nmaNCs { + nmaProgrammedNCs[ncID] = version + } + for ncID, version := range imdsNCVersions { + if _, exists := nmaProgrammedNCs[ncID]; !exists { + nmaProgrammedNCs[strings.ToLower(ncID)] = version + } else { + //nolint:staticcheck // SA1019: suppress deprecated logger.Warnf usage. Todo: legacy logger usage is consistent in cns repo. Migrates when all logger usage is migrated + logger.Warnf("NC %s exists in both NMA and IMDS responses, which is not expected", ncID) + } + } + hasNC.Set(float64(len(nmaProgrammedNCs))) + for ncID := range outdatedNCs { + nmaProgrammedNCVersionStr, ok := nmaProgrammedNCs[ncID] + if !ok { + // Neither NMA nor IMDS has this NC that we need programmed yet, bail out + continue + } + nmaProgrammedNCVersion, err := strconv.Atoi(nmaProgrammedNCVersionStr) + if err != nil { + logger.Errorf("failed to parse container version of %s: %s", ncID, err) + continue + } + // Check whether it exist in service state and get the related nc info + ncInfo, exist := service.state.ContainerStatus[ncID] + if !exist { + // if we marked this NC as needs update, but it no longer exists in internal state when we reach + // this point, our internal state has changed unexpectedly and we should bail out and try again. + return len(programmedNCs), errors.Wrapf(errNonExistentContainerStatus, "can't find NC with ID %s in service state, stop updating this host NC version", ncID) + } + // if the NC still exists in state and is programmed to some version (doesn't have to be latest), add it to our set of NCs that have been programmed + if nmaProgrammedNCVersion > -1 { + programmedNCs[ncID] = struct{}{} + } + + localNCVersion, err := strconv.Atoi(ncInfo.HostVersion) + if err != nil { + logger.Errorf("failed to parse host nc version string %s: %s", ncInfo.HostVersion, err) + continue + } + if localNCVersion > nmaProgrammedNCVersion { + //nolint:staticcheck // SA1019: suppress deprecated logger.Printf usage. Todo: legacy logger usage is consistent in cns repo. Migrates when all logger usage is migrated + logger.Errorf("NC version from consolidated sources is decreasing: have %d, got %d", localNCVersion, nmaProgrammedNCVersion) + continue + } + if channelMode == cns.CRD { + service.MarkIpsAsAvailableUntransacted(ncInfo.ID, nmaProgrammedNCVersion) + } + //nolint:staticcheck // SA1019: suppress deprecated logger.Printf usage. Todo: legacy logger usage is consistent in cns repo. Migrates when all logger usage is migrated + logger.Printf("Updating NC %s host version from %s to %s", ncID, ncInfo.HostVersion, nmaProgrammedNCVersionStr) + ncInfo.HostVersion = nmaProgrammedNCVersionStr + logger.Printf("Updated NC %s host version to %s", ncID, ncInfo.HostVersion) + service.state.ContainerStatus[ncID] = ncInfo + // if we successfully updated the NC, pop it from the needs update set. + delete(outdatedNCs, ncID) + } + // if we didn't empty out the needs update set, NMA has not programmed all the NCs we are expecting, and we + // need to return an error indicating that + if len(outdatedNCs) > 0 { + return len(programmedNCs), errors.Errorf("Have outdated NCs: %v, Current Programmed nics from NMA/IMDS %v", outdatedNCs, programmedNCs) + } + + return len(programmedNCs), nil +} diff --git a/cns/service/main.go b/cns/service/main.go index d7b9a526d5..59f1ae4a54 100644 --- a/cns/service/main.go +++ b/cns/service/main.go @@ -1115,9 +1115,14 @@ func main() { // at this point, rest service is running. We can now start serving new requests. So call StartNodeSubnet, which // will fetch secondary IPs and generate conflist. Do not move this all before rest service start - this will cause // CNI to start sending requests, and if the service doesn't start successfully, the requests will fail. - httpRemoteRestService.StartNodeSubnet(rootCtx) + if err := httpRemoteRestService.StartNodeSubnet(rootCtx); err != nil { + logger.Errorf("Failed to start NodeSubnet: %v", err) + return + } } + // Wait for NC sync to complete before marking service as ready. + httpRemoteRestService.Wait(rootCtx) // mark the service as "ready" close(readyCh) // block until process exiting @@ -1283,22 +1288,9 @@ func InitializeMultiTenantController(ctx context.Context, httpRestService cns.HT time.Sleep(time.Millisecond * 500) } - // TODO: do we need this to be running? - logger.Printf("Starting SyncHostNCVersion") - go func() { - // Periodically poll vfp programmed NC version from NMAgent - tickerChannel := time.Tick(time.Duration(cnsconfig.SyncHostNCVersionIntervalMs) * time.Millisecond) - for { - select { - case <-tickerChannel: - timedCtx, cancel := context.WithTimeout(ctx, time.Duration(cnsconfig.SyncHostNCVersionIntervalMs)*time.Millisecond) - httpRestServiceImpl.SyncHostNCVersion(timedCtx, cnsconfig.ChannelMode) - cancel() - case <-ctx.Done(): - return - } - } - }() + if err := httpRestServiceImpl.StartSyncHostNCVersionLoop(ctx, cnsconfig); err != nil { + return err + } return nil } @@ -1643,23 +1635,9 @@ func InitializeCRDState(ctx context.Context, z *zap.Logger, httpRestService cns. break } - go func() { - logger.Printf("Starting SyncHostNCVersion loop.") - // Periodically poll vfp programmed NC version from NMAgent - tickerChannel := time.Tick(time.Duration(cnsconfig.SyncHostNCVersionIntervalMs) * time.Millisecond) - for { - select { - case <-tickerChannel: - timedCtx, cancel := context.WithTimeout(ctx, time.Duration(cnsconfig.SyncHostNCVersionIntervalMs)*time.Millisecond) - httpRestServiceImplementation.SyncHostNCVersion(timedCtx, cnsconfig.ChannelMode) - cancel() - case <-ctx.Done(): - logger.Printf("Stopping SyncHostNCVersion loop.") - return - } - } - }() - logger.Printf("Initialized SyncHostNCVersion loop.") + if err := httpRestServiceImplementation.StartSyncHostNCVersionLoop(ctx, *cnsconfig); err != nil { + return err + } return nil }