diff --git a/cns/fakes/nmagentclientfake.go b/cns/fakes/nmagentclientfake.go index 67988d98e6..a5359ca006 100644 --- a/cns/fakes/nmagentclientfake.go +++ b/cns/fakes/nmagentclientfake.go @@ -14,9 +14,10 @@ import ( // NMAgentClientFake can be used to query to VM Host info. type NMAgentClientFake struct { - SupportedAPIsF func(context.Context) ([]string, error) - GetNCVersionListF func(context.Context) (nmagent.NCVersionList, error) - GetHomeAzF func(context.Context) (nmagent.AzResponse, error) + SupportedAPIsF func(context.Context) ([]string, error) + GetNCVersionListF func(context.Context) (nmagent.NCVersionList, error) + GetHomeAzF func(context.Context) (nmagent.AzResponse, error) + GetInterfaceIPInfoF func(ctx context.Context) (nmagent.Interfaces, error) } func (n *NMAgentClientFake) SupportedAPIs(ctx context.Context) ([]string, error) { @@ -30,3 +31,7 @@ func (n *NMAgentClientFake) GetNCVersionList(ctx context.Context) (nmagent.NCVer func (n *NMAgentClientFake) GetHomeAz(ctx context.Context) (nmagent.AzResponse, error) { return n.GetHomeAzF(ctx) } + +func (n *NMAgentClientFake) GetInterfaceIPInfo(ctx context.Context) (nmagent.Interfaces, error) { + return n.GetInterfaceIPInfoF(ctx) +} diff --git a/cns/restserver/helper_for_nodesubnet_test.go b/cns/restserver/helper_for_nodesubnet_test.go new file mode 100644 index 0000000000..5719757429 --- /dev/null +++ b/cns/restserver/helper_for_nodesubnet_test.go @@ -0,0 +1,89 @@ +package restserver + +import ( + "context" + "net/netip" + "testing" + + "github.com/Azure/azure-container-networking/cns" + "github.com/Azure/azure-container-networking/cns/common" + "github.com/Azure/azure-container-networking/cns/fakes" + "github.com/Azure/azure-container-networking/cns/nodesubnet" + acn "github.com/Azure/azure-container-networking/common" + "github.com/Azure/azure-container-networking/nmagent" + "github.com/Azure/azure-container-networking/store" +) + +// GetRestServiceObjectForNodeSubnetTest creates a new HTTPRestService object for use in nodesubnet unit tests. +func GetRestServiceObjectForNodeSubnetTest(t *testing.T, generator CNIConflistGenerator) *HTTPRestService { + config := &common.ServiceConfig{ + Name: "test", + Version: "1.0", + ChannelMode: "AzureHost", + Store: store.NewMockStore("test"), + } + interfaces := nmagent.Interfaces{ + Entries: []nmagent.Interface{ + { + MacAddress: nmagent.MACAddress{0x00, 0x0D, 0x3A, 0xF9, 0xDC, 0xA6}, + IsPrimary: true, + InterfaceSubnets: []nmagent.InterfaceSubnet{ + { + Prefix: "10.0.0.0/24", + IPAddress: []nmagent.NodeIP{ + { + Address: nmagent.IPAddress(netip.AddrFrom4([4]byte{10, 0, 0, 4})), + IsPrimary: true, + }, + { + Address: nmagent.IPAddress(netip.AddrFrom4([4]byte{10, 0, 0, 52})), + IsPrimary: false, + }, + { + Address: nmagent.IPAddress(netip.AddrFrom4([4]byte{10, 0, 0, 63})), + IsPrimary: false, + }, + { + Address: nmagent.IPAddress(netip.AddrFrom4([4]byte{10, 0, 0, 45})), + IsPrimary: false, + }, + }, + }, + }, + }, + }, + } + + svc, err := cns.NewService(config.Name, config.Version, config.ChannelMode, config.Store) + if err != nil { + return nil + } + + svc.SetOption(acn.OptCnsURL, "") + svc.SetOption(acn.OptCnsPort, "") + err = svc.Initialize(config) + if err != nil { + return nil + } + + t.Cleanup(func() { svc.Uninitialize() }) + + return &HTTPRestService{ + Service: svc, + cniConflistGenerator: generator, + state: &httpRestServiceState{}, + PodIPConfigState: make(map[string]cns.IPConfigurationStatus), + PodIPIDByPodInterfaceKey: make(map[string][]string), + nma: &fakes.NMAgentClientFake{ + GetInterfaceIPInfoF: func(_ context.Context) (nmagent.Interfaces, error) { + return interfaces, nil + }, + }, + wscli: &fakes.WireserverClientFake{}, + } +} + +// GetNodesubnetIPFetcher gets the nodesubnetIPFetcher from the HTTPRestService. +func (service *HTTPRestService) GetNodesubnetIPFetcher() *nodesubnet.IPFetcher { + return service.nodesubnetIPFetcher +} diff --git a/cns/restserver/nodesubnet.go b/cns/restserver/nodesubnet.go new file mode 100644 index 0000000000..177d4266bc --- /dev/null +++ b/cns/restserver/nodesubnet.go @@ -0,0 +1,64 @@ +package restserver + +import ( + "context" + "net/netip" + + "github.com/Azure/azure-container-networking/cns" + "github.com/Azure/azure-container-networking/cns/logger" + nodesubnet "github.com/Azure/azure-container-networking/cns/nodesubnet" + "github.com/Azure/azure-container-networking/cns/types" + "github.com/pkg/errors" +) + +var _ nodesubnet.IPConsumer = &HTTPRestService{} + +// UpdateIPsForNodeSubnet updates the IP pool of HTTPRestService with newly fetched secondary IPs +func (service *HTTPRestService) UpdateIPsForNodeSubnet(secondaryIPs []netip.Addr) error { + secondaryIPStrs := make([]string, len(secondaryIPs)) + for i, ip := range secondaryIPs { + secondaryIPStrs[i] = ip.String() + } + + networkContainerRequest := nodesubnet.CreateNodeSubnetNCRequest(secondaryIPStrs) + + code, msg := service.saveNetworkContainerGoalState(*networkContainerRequest) + if code != types.Success { + return errors.Errorf("failed to save fetched ips. code: %d, message %s", code, msg) + } + + logger.Debugf("IP change processed successfully") + + // saved NC successfully. UpdateIPsForNodeSubnet is called only when IPs are fetched from NMAgent. + // We now have IPs to serve IPAM requests. Generate conflist to indicate CNS is ready + service.MustGenerateCNIConflistOnce() + return nil +} + +// InitializeNodeSubnet prepares CNS for serving NodeSubnet requests. +// It sets the orchestrator type to KubernetesCRD, reconciles the initial +// CNS state from the statefile, then creates an IP fetcher. +func (service *HTTPRestService) InitializeNodeSubnet(ctx context.Context, podInfoByIPProvider cns.PodInfoByIPProvider) error { + // set orchestrator type + orchestrator := cns.SetOrchestratorTypeRequest{ + OrchestratorType: cns.KubernetesCRD, + } + service.SetNodeOrchestrator(&orchestrator) + + if podInfoByIPProvider == nil { + logger.Printf("PodInfoByIPProvider is nil, this usually means no saved endpoint state. Skipping reconciliation") + } else if _, err := nodesubnet.ReconcileInitialCNSState(ctx, service, podInfoByIPProvider); err != nil { + return errors.Wrap(err, "reconcile initial CNS state") + } + // statefile (if any) is reconciled. Initialize the IP fetcher. Start the IP fetcher only after the service is started, + // because starting the IP fetcher will generate conflist, which should be done only once we are ready to respond to IPAM requests. + service.nodesubnetIPFetcher = nodesubnet.NewIPFetcher(service.nma, service, 0, 0, logger.Log) + + return nil +} + +// StartNodeSubnet starts the IP fetcher for NodeSubnet. This will cause secondary IPs to be fetched periodically. +// After the first successful fetch, conflist will be generated to indicate CNS is ready. +func (service *HTTPRestService) StartNodeSubnet(ctx context.Context) { + service.nodesubnetIPFetcher.Start(ctx) +} diff --git a/cns/restserver/nodesubnet_test.go b/cns/restserver/nodesubnet_test.go new file mode 100644 index 0000000000..6c5b5890cb --- /dev/null +++ b/cns/restserver/nodesubnet_test.go @@ -0,0 +1,144 @@ +package restserver_test + +import ( + "context" + "net" + "testing" + + "github.com/Azure/azure-container-networking/cns/cnireconciler" + "github.com/Azure/azure-container-networking/cns/logger" + "github.com/Azure/azure-container-networking/cns/restserver" + "github.com/Azure/azure-container-networking/cns/types" + "github.com/Azure/azure-container-networking/store" +) + +// getMockStore creates a mock KeyValueStore with some endpoint state +func getMockStore() store.KeyValueStore { + mockStore := store.NewMockStore("") + endpointState := map[string]*restserver.EndpointInfo{ + "12e65d89e58cb23c784e97840cf76866bfc9902089bdc8e87e9f64032e312b0b": { + PodName: "coredns-54b69f46b8-ldmwr", + PodNamespace: "kube-system", + IfnameToIPMap: map[string]*restserver.IPInfo{ + "eth0": { + IPv4: []net.IPNet{ + { + IP: net.IPv4(10, 0, 0, 52), + Mask: net.CIDRMask(24, 32), + }, + }, + }, + }, + }, + "1fc5176913a3a1a7facfb823dde3b4ded404041134fef4f4a0c8bba140fc0413": { + PodName: "load-test-7f7d49687d-wxc9p", + PodNamespace: "load-test", + IfnameToIPMap: map[string]*restserver.IPInfo{ + "eth0": { + IPv4: []net.IPNet{ + { + IP: net.IPv4(10, 0, 0, 63), + Mask: net.CIDRMask(24, 32), + }, + }, + }, + }, + }, + } + + err := mockStore.Write(restserver.EndpointStoreKey, endpointState) + if err != nil { + return nil + } + return mockStore +} + +// Mock implementation of CNIConflistGenerator +type MockCNIConflistGenerator struct { + GenerateCalled chan struct{} +} + +func (m *MockCNIConflistGenerator) Generate() error { + close(m.GenerateCalled) + return nil +} + +func (m *MockCNIConflistGenerator) Close() error { + // Implement the Close method logic here if needed + return nil +} + +// TestNodeSubnet tests initialization of NodeSubnet with endpoint info, and verfies that +// the conflist is generated after fetching secondary IPs +func TestNodeSubnet(t *testing.T) { + podInfoByIPProvider, err := cnireconciler.NewCNSPodInfoProvider(getMockStore()) + if err != nil { + t.Fatalf("NewCNSPodInfoProvider returned an error: %v", err) + } + + // create a real HTTPRestService object + mockCNIConflistGenerator := &MockCNIConflistGenerator{ + GenerateCalled: make(chan struct{}), + } + service := restserver.GetRestServiceObjectForNodeSubnetTest(t, mockCNIConflistGenerator) + ctx, cancel := testContext(t) + defer cancel() + + err = service.InitializeNodeSubnet(ctx, podInfoByIPProvider) + if err != nil { + t.Fatalf("InitializeNodeSubnet returned an error: %v", err) + } + + expectedIPs := map[string]types.IPState{ + "10.0.0.52": types.Assigned, + "10.0.0.63": types.Assigned, + } + + checkIPassignment(t, service, expectedIPs) + + service.StartNodeSubnet(ctx) + + if service.GetNodesubnetIPFetcher() == nil { + t.Fatal("NodeSubnetIPFetcher is not initialized") + } + + select { + case <-ctx.Done(): + t.Errorf("test context done - %s", ctx.Err()) + return + case <-mockCNIConflistGenerator.GenerateCalled: + break + } + + expectedIPs["10.0.0.45"] = types.Available + checkIPassignment(t, service, expectedIPs) +} + +// checkIPassignment checks whether the IP assignment state in the HTTPRestService object matches expectation +func checkIPassignment(t *testing.T, service *restserver.HTTPRestService, expectedIPs map[string]types.IPState) { + if len(service.PodIPConfigState) != len(expectedIPs) { + t.Fatalf("expected 2 entries in PodIPConfigState, got %d", len(service.PodIPConfigState)) + } + + for ip := range service.GetPodIPConfigState() { + config := service.GetPodIPConfigState()[ip] + if assignmentState, exists := expectedIPs[ip]; !exists { + t.Fatalf("unexpected IP %s in PodIPConfigState", ip) + } else if config.GetState() != assignmentState { + t.Fatalf("expected state 'Assigned' for IP %s, got %s", ip, config.GetState()) + } + } +} + +// testContext creates a context from the provided testing.T that will be +// canceled if the test suite is terminated. +func testContext(t *testing.T) (context.Context, context.CancelFunc) { + if deadline, ok := t.Deadline(); ok { + return context.WithDeadline(context.Background(), deadline) + } + return context.WithCancel(context.Background()) +} + +func init() { + logger.InitLogger("testlogs", 0, 0, "./") +} diff --git a/cns/restserver/restserver.go b/cns/restserver/restserver.go index c74580f29c..30a2b1e8d6 100644 --- a/cns/restserver/restserver.go +++ b/cns/restserver/restserver.go @@ -13,6 +13,7 @@ import ( "github.com/Azure/azure-container-networking/cns/dockerclient" "github.com/Azure/azure-container-networking/cns/logger" "github.com/Azure/azure-container-networking/cns/networkcontainers" + "github.com/Azure/azure-container-networking/cns/nodesubnet" "github.com/Azure/azure-container-networking/cns/routes" "github.com/Azure/azure-container-networking/cns/types" "github.com/Azure/azure-container-networking/cns/types/bounded" @@ -40,6 +41,7 @@ type nmagentClient interface { SupportedAPIs(context.Context) ([]string, error) GetNCVersionList(context.Context) (nma.NCVersionList, error) GetHomeAz(context.Context) (nma.AzResponse, error) + GetInterfaceIPInfo(ctx context.Context) (nma.Interfaces, error) } type wireserverProxy interface { @@ -76,6 +78,7 @@ type HTTPRestService struct { IPConfigsHandlerMiddleware cns.IPConfigsHandlerMiddleware PnpIDByMacAddress map[string]string imdsClient imdsClient + nodesubnetIPFetcher *nodesubnet.IPFetcher } type CNIConflistGenerator interface { diff --git a/cns/service/main.go b/cns/service/main.go index 5219ed25de..8073271a69 100644 --- a/cns/service/main.go +++ b/cns/service/main.go @@ -736,7 +736,6 @@ func main() { } imdsClient := imds.NewClient() - httpRemoteRestService, err := restserver.NewHTTPRestService(&config, wsclient, &wsProxy, nmaClient, endpointStateStore, conflistGenerator, homeAzMonitor, imdsClient) if err != nil { @@ -871,6 +870,32 @@ func main() { } } + // AzureHost channelmode indicates Nodesubnet. IPs are to be fetched from NMagent. + if config.ChannelMode == cns.AzureHost { + if !cnsconfig.ManageEndpointState { + logger.Errorf("ManageEndpointState must be set to true for AzureHost mode") + return + } + + // If cns manageendpointstate is true, then cns maintains its own state and reconciles from it. + // in this case, cns maintains state with containerid as key and so in-memory cache can lookup + // and update based on container id. + cns.GlobalPodInfoScheme = cns.InfraIDPodInfoScheme + + var podInfoByIPProvider cns.PodInfoByIPProvider + podInfoByIPProvider, err = getPodInfoByIPProvider(rootCtx, cnsconfig, httpRemoteRestService, nil, "") + if err != nil { + logger.Errorf("[Azure CNS] Failed to get PodInfoByIPProvider: %v", err) + return + } + + err = httpRemoteRestService.InitializeNodeSubnet(rootCtx, podInfoByIPProvider) + if err != nil { + logger.Errorf("[Azure CNS] Failed to initialize node subnet: %v", err) + return + } + } + // Initialize multi-tenant controller if the CNS is running in MultiTenantCRD mode. // It must be started before we start HTTPRemoteRestService. if config.ChannelMode == cns.MultiTenantCRD { @@ -909,6 +934,7 @@ func main() { } // if user provides cns url by -c option, then only start HTTP remote server using this url + logger.Printf("[Azure CNS] Start HTTP Remote server") if httpRemoteRestService != nil { if cnsconfig.EnablePprof { @@ -1019,6 +1045,13 @@ func main() { }(privateEndpoint, infravnet, nodeID) } + if config.ChannelMode == cns.AzureHost { + // at this point, rest service is running. We can now start serving new requests. So call StartNodeSubnet, which + // will fetch secondary IPs and generate conflist. Do not move this all before rest service start - this will cause + // CNI to start sending requests, and if the service doesn't start successfully, the requests will fail. + httpRemoteRestService.StartNodeSubnet(rootCtx) + } + // mark the service as "ready" close(readyCh) // block until process exiting @@ -1249,40 +1282,11 @@ func InitializeCRDState(ctx context.Context, httpRestService cns.HTTPService, cn } } - var podInfoByIPProvider cns.PodInfoByIPProvider - switch { - case cnsconfig.ManageEndpointState: - logger.Printf("Initializing from self managed endpoint store") - podInfoByIPProvider, err = cnireconciler.NewCNSPodInfoProvider(httpRestServiceImplementation.EndpointStateStore) // get reference to endpoint state store from rest server - if err != nil { - if errors.Is(err, store.ErrKeyNotFound) { - logger.Printf("[Azure CNS] No endpoint state found, skipping initializing CNS state") - } else { - return errors.Wrap(err, "failed to create CNS PodInfoProvider") - } - } - case cnsconfig.InitializeFromCNI: - logger.Printf("Initializing from CNI") - podInfoByIPProvider, err = cnireconciler.NewCNIPodInfoProvider() - if err != nil { - return errors.Wrap(err, "failed to create CNI PodInfoProvider") - } - default: - logger.Printf("Initializing from Kubernetes") - podInfoByIPProvider = cns.PodInfoByIPProviderFunc(func() (map[string]cns.PodInfo, error) { - pods, err := clientset.CoreV1().Pods("").List(ctx, metav1.ListOptions{ //nolint:govet // ignore err shadow - FieldSelector: "spec.nodeName=" + nodeName, - }) - if err != nil { - return nil, errors.Wrap(err, "failed to list Pods for PodInfoProvider") - } - podInfo, err := cns.KubePodsToPodInfoByIP(pods.Items) - if err != nil { - return nil, errors.Wrap(err, "failed to convert Pods to PodInfoByIP") - } - return podInfo, nil - }) + podInfoByIPProvider, err := getPodInfoByIPProvider(ctx, cnsconfig, httpRestServiceImplementation, clientset, nodeName) + if err != nil { + return errors.Wrap(err, "failed to initialize ip state") } + // create scoped kube clients. directcli, err := client.New(kubeConfig, client.Options{Scheme: nodenetworkconfig.Scheme}) if err != nil { @@ -1505,6 +1509,51 @@ func InitializeCRDState(ctx context.Context, httpRestService cns.HTTPService, cn return nil } +// getPodInfoByIPProvider returns a PodInfoByIPProvider that reads endpoint state from the configured source +func getPodInfoByIPProvider( + ctx context.Context, + cnsconfig *configuration.CNSConfig, + httpRestServiceImplementation *restserver.HTTPRestService, + clientset *kubernetes.Clientset, + nodeName string, +) (podInfoByIPProvider cns.PodInfoByIPProvider, err error) { + switch { + case cnsconfig.ManageEndpointState: + logger.Printf("Initializing from self managed endpoint store") + podInfoByIPProvider, err = cnireconciler.NewCNSPodInfoProvider(httpRestServiceImplementation.EndpointStateStore) // get reference to endpoint state store from rest server + if err != nil { + if errors.Is(err, store.ErrKeyNotFound) { + logger.Printf("[Azure CNS] No endpoint state found, skipping initializing CNS state") + } else { + return podInfoByIPProvider, errors.Wrap(err, "failed to create CNS PodInfoProvider") + } + } + case cnsconfig.InitializeFromCNI: + logger.Printf("Initializing from CNI") + podInfoByIPProvider, err = cnireconciler.NewCNIPodInfoProvider() + if err != nil { + return podInfoByIPProvider, errors.Wrap(err, "failed to create CNI PodInfoProvider") + } + default: + logger.Printf("Initializing from Kubernetes") + podInfoByIPProvider = cns.PodInfoByIPProviderFunc(func() (map[string]cns.PodInfo, error) { + pods, err := clientset.CoreV1().Pods("").List(ctx, metav1.ListOptions{ //nolint:govet // ignore err shadow + FieldSelector: "spec.nodeName=" + nodeName, + }) + if err != nil { + return nil, errors.Wrap(err, "failed to list Pods for PodInfoProvider") + } + podInfo, err := cns.KubePodsToPodInfoByIP(pods.Items) + if err != nil { + return nil, errors.Wrap(err, "failed to convert Pods to PodInfoByIP") + } + return podInfo, nil + }) + } + + return podInfoByIPProvider, nil +} + // createOrUpdateNodeInfoCRD polls imds to learn the VM Unique ID and then creates or updates the NodeInfo CRD // with that vm unique ID func createOrUpdateNodeInfoCRD(ctx context.Context, restConfig *rest.Config, node *corev1.Node) error {