diff --git a/cns/service/aks.go b/cns/service/aks.go new file mode 100644 index 0000000000..bafe74cd41 --- /dev/null +++ b/cns/service/aks.go @@ -0,0 +1,620 @@ +// AKS specific initialization flows +// nolint // it's not worth it +package main + +import ( + "context" + "fmt" + "time" + + "github.com/Azure/azure-container-networking/cns" + "github.com/Azure/azure-container-networking/cns/configuration" + "github.com/Azure/azure-container-networking/cns/deviceplugin" + "github.com/Azure/azure-container-networking/cns/imds" + "github.com/Azure/azure-container-networking/cns/ipampool" + "github.com/Azure/azure-container-networking/cns/ipampool/metrics" + ipampoolv2 "github.com/Azure/azure-container-networking/cns/ipampool/v2" + cssctrl "github.com/Azure/azure-container-networking/cns/kubecontroller/clustersubnetstate" + mtpncctrl "github.com/Azure/azure-container-networking/cns/kubecontroller/multitenantpodnetworkconfig" + nncctrl "github.com/Azure/azure-container-networking/cns/kubecontroller/nodenetworkconfig" + podctrl "github.com/Azure/azure-container-networking/cns/kubecontroller/pod" + "github.com/Azure/azure-container-networking/cns/logger" + "github.com/Azure/azure-container-networking/cns/middlewares" + "github.com/Azure/azure-container-networking/cns/multitenantcontroller" + "github.com/Azure/azure-container-networking/cns/multitenantcontroller/multitenantoperator" + "github.com/Azure/azure-container-networking/cns/restserver" + cnstypes "github.com/Azure/azure-container-networking/cns/types" + "github.com/Azure/azure-container-networking/crd" + "github.com/Azure/azure-container-networking/crd/clustersubnetstate" + cssv1alpha1 "github.com/Azure/azure-container-networking/crd/clustersubnetstate/api/v1alpha1" + "github.com/Azure/azure-container-networking/crd/multitenancy" + mtv1alpha1 "github.com/Azure/azure-container-networking/crd/multitenancy/api/v1alpha1" + "github.com/Azure/azure-container-networking/crd/nodenetworkconfig" + "github.com/Azure/azure-container-networking/crd/nodenetworkconfig/api/v1alpha" + "github.com/avast/retry-go/v4" + "github.com/go-logr/zapr" + "github.com/google/go-cmp/cmp" + "github.com/pkg/errors" + "go.uber.org/zap" + "golang.org/x/time/rate" + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + kuberuntime "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/cache" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" + ctrlmgr "sigs.k8s.io/controller-runtime/pkg/manager" + ctrlmetrics "sigs.k8s.io/controller-runtime/pkg/metrics/server" +) + +type cniConflistScenario string + +const ( + scenarioV4Overlay cniConflistScenario = "v4overlay" + scenarioDualStackOverlay cniConflistScenario = "dualStackOverlay" + scenarioOverlay cniConflistScenario = "overlay" + scenarioCilium cniConflistScenario = "cilium" + scenarioSWIFT cniConflistScenario = "swift" +) + +type nodeNetworkConfigGetter interface { + Get(context.Context) (*v1alpha.NodeNetworkConfig, error) +} + +type ipamStateReconciler interface { + ReconcileIPAMStateForSwift(ncRequests []*cns.CreateNetworkContainerRequest, podInfoByIP map[string]cns.PodInfo, nnc *v1alpha.NodeNetworkConfig) cnstypes.ResponseCode +} + +// TODO(rbtr) where should this live?? +// reconcileInitialCNSState initializes cns by passing pods and a CreateNetworkContainerRequest +func reconcileInitialCNSState(ctx context.Context, cli nodeNetworkConfigGetter, ipamReconciler ipamStateReconciler, podInfoByIPProvider cns.PodInfoByIPProvider) error { + // Get nnc using direct client + nnc, err := cli.Get(ctx) + if err != nil { + if crd.IsNotDefined(err) { + return errors.Wrap(err, "failed to init CNS state: NNC CRD is not defined") + } + if apierrors.IsNotFound(err) { + return errors.Wrap(err, "failed to init CNS state: NNC not found") + } + return errors.Wrap(err, "failed to init CNS state: failed to get NNC CRD") + } + + logger.Printf("Retrieved NNC: %+v", nnc) + if !nnc.DeletionTimestamp.IsZero() { + return errors.New("failed to init CNS state: NNC is being deleted") + } + + // If there are no NCs, we can't initialize our state and we should fail out. + if len(nnc.Status.NetworkContainers) == 0 { + return errors.New("failed to init CNS state: no NCs found in NNC CRD") + } + + // Get previous PodInfo state from podInfoByIPProvider + podInfoByIP, err := podInfoByIPProvider.PodInfoByIP() + if err != nil { + return errors.Wrap(err, "provider failed to provide PodInfoByIP") + } + + ncReqs := make([]*cns.CreateNetworkContainerRequest, len(nnc.Status.NetworkContainers)) + + // For each NC, we need to create a CreateNetworkContainerRequest and use it to rebuild our state. + for i := range nnc.Status.NetworkContainers { + var ( + ncRequest *cns.CreateNetworkContainerRequest + err error + ) + switch nnc.Status.NetworkContainers[i].AssignmentMode { //nolint:exhaustive // skipping dynamic case + case v1alpha.Static: + ncRequest, err = nncctrl.CreateNCRequestFromStaticNC(nnc.Status.NetworkContainers[i]) + default: // For backward compatibility, default will be treated as Dynamic too. + ncRequest, err = nncctrl.CreateNCRequestFromDynamicNC(nnc.Status.NetworkContainers[i]) + } + + if err != nil { + return errors.Wrapf(err, "failed to convert NNC status to network container request, "+ + "assignmentMode: %s", nnc.Status.NetworkContainers[i].AssignmentMode) + } + + ncReqs[i] = ncRequest + } + + // Call cnsclient init cns passing those two things. + if err := restserver.ResponseCodeToError(ipamReconciler.ReconcileIPAMStateForSwift(ncReqs, podInfoByIP, nnc)); err != nil { + return errors.Wrap(err, "failed to reconcile CNS IPAM state") + } + + return nil +} + +// initializeCRDState builds and starts the CRD controllers. +// +//nolint:gocyclo // legacy +func initializeCRDState(ctx context.Context, z *zap.Logger, httpRestService cns.HTTPService, cnsconfig *configuration.CNSConfig) error { + // convert interface type to implementation type + httpRestServiceImplementation, ok := httpRestService.(*restserver.HTTPRestService) + if !ok { + logger.Errorf("[Azure CNS] Failed to convert interface httpRestService to implementation: %v", httpRestService) + return fmt.Errorf("[Azure CNS] Failed to convert interface httpRestService to implementation: %v", + httpRestService) + } + + // Set orchestrator type + orchestrator := cns.SetOrchestratorTypeRequest{ + OrchestratorType: cns.KubernetesCRD, + } + httpRestServiceImplementation.SetNodeOrchestrator(&orchestrator) + + // build default clientset. + kubeConfig, err := ctrl.GetConfig() + if err != nil { + logger.Errorf("[Azure CNS] Failed to get kubeconfig for request controller: %v", err) + return errors.Wrap(err, "failed to get kubeconfig") + } + kubeConfig.UserAgent = fmt.Sprintf("azure-cns-%s", version) + + clientset, err := kubernetes.NewForConfig(kubeConfig) + if err != nil { + return errors.Wrap(err, "failed to build clientset") + } + + // get nodename for scoping kube requests to node. + nodeName, err := configuration.NodeName() + if err != nil { + return errors.Wrap(err, "failed to get NodeName") + } + + node, err := clientset.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{}) + if err != nil { + return errors.Wrapf(err, "failed to get node %s", nodeName) + } + + // check the Node labels for Swift V2 + if _, ok := node.Labels[configuration.LabelNodeSwiftV2]; ok { + cnsconfig.EnableSwiftV2 = true + cnsconfig.WatchPods = true + if nodeInfoErr := createOrUpdateNodeInfoCRD(ctx, kubeConfig, node); nodeInfoErr != nil { + return errors.Wrap(nodeInfoErr, "error creating or updating nodeinfo crd") + } + } + + // perform state migration from CNI in case CNS is set to manage the endpoint state and has emty state + if cnsconfig.EnableStateMigration && !httpRestServiceImplementation.EndpointStateStore.Exists() { + if err = PopulateCNSEndpointState(httpRestServiceImplementation.EndpointStateStore); err != nil { + return errors.Wrap(err, "failed to create CNS EndpointState From CNI") + } + // endpoint state needs tobe loaded in memory so the subsequent Delete calls remove the state and release the IPs. + if err = httpRestServiceImplementation.EndpointStateStore.Read(restserver.EndpointStoreKey, &httpRestServiceImplementation.EndpointState); err != nil { + return errors.Wrap(err, "failed to restore endpoint state") + } + } + + podInfoByIPProvider, err := getPodInfoByIPProvider(ctx, cnsconfig, httpRestServiceImplementation, clientset, nodeName) + if err != nil { + return errors.Wrap(err, "failed to initialize ip state") + } + + // create scoped kube clients. + directcli, err := client.New(kubeConfig, client.Options{Scheme: nodenetworkconfig.Scheme}) + if err != nil { + return errors.Wrap(err, "failed to create ctrl client") + } + directnnccli := nodenetworkconfig.NewClient(directcli) + if err != nil { + return errors.Wrap(err, "failed to create NNC client") + } + // TODO(rbtr): nodename and namespace should be in the cns config + directscopedcli := nncctrl.NewScopedClient(directnnccli, types.NamespacedName{Namespace: "kube-system", Name: nodeName}) + + logger.Printf("Reconciling initial CNS state") + // apiserver nnc might not be registered or api server might be down and crashloop backof puts us outside of 5-10 minutes we have for + // aks addons to come up so retry a bit more aggresively here. + // will retry 10 times maxing out at a minute taking about 8 minutes before it gives up. + attempt := 0 + _ = retry.Do(func() error { + attempt++ + logger.Printf("reconciling initial CNS state attempt: %d", attempt) + err = reconcileInitialCNSState(ctx, directscopedcli, httpRestServiceImplementation, podInfoByIPProvider) + if err != nil { + logger.Errorf("failed to reconcile initial CNS state, attempt: %d err: %v", attempt, err) + nncInitFailure.Inc() + } + return errors.Wrap(err, "failed to initialize CNS state") + }, retry.Context(ctx), retry.Delay(initCNSInitalDelay), retry.MaxDelay(time.Minute), retry.UntilSucceeded()) + logger.Printf("reconciled initial CNS state after %d attempts", attempt) + hasNNCInitialized.Set(1) + scheme := kuberuntime.NewScheme() + if err := corev1.AddToScheme(scheme); err != nil { //nolint:govet // intentional shadow + return errors.Wrap(err, "failed to add corev1 to scheme") + } + if err = v1alpha.AddToScheme(scheme); err != nil { + return errors.Wrap(err, "failed to add nodenetworkconfig/v1alpha to scheme") + } + if err = cssv1alpha1.AddToScheme(scheme); err != nil { + return errors.Wrap(err, "failed to add clustersubnetstate/v1alpha1 to scheme") + } + if err = mtv1alpha1.AddToScheme(scheme); err != nil { + return errors.Wrap(err, "failed to add multitenantpodnetworkconfig/v1alpha1 to scheme") + } + + // Set Selector options on the Manager cache which are used + // to perform *server-side* filtering of the cached objects. This is very important + // for high node/pod count clusters, as it keeps us from watching objects at the + // whole cluster scope when we are only interested in the Node's scope. + cacheOpts := cache.Options{ + Scheme: scheme, + ByObject: map[client.Object]cache.ByObject{ + &v1alpha.NodeNetworkConfig{}: { + Namespaces: map[string]cache.Config{ + "kube-system": {FieldSelector: fields.SelectorFromSet(fields.Set{"metadata.name": nodeName})}, + }, + }, + }, + } + + if cnsconfig.WatchPods { + cacheOpts.ByObject[&corev1.Pod{}] = cache.ByObject{ + Field: fields.SelectorFromSet(fields.Set{"spec.nodeName": nodeName}), + } + } + + if cnsconfig.EnableSubnetScarcity { + cacheOpts.ByObject[&cssv1alpha1.ClusterSubnetState{}] = cache.ByObject{ + Namespaces: map[string]cache.Config{ + "kube-system": {}, + }, + } + } + + managerOpts := ctrlmgr.Options{ + Scheme: scheme, + Metrics: ctrlmetrics.Options{BindAddress: "0"}, + Cache: cacheOpts, + Logger: zapr.NewLogger(z), + } + + manager, err := ctrl.NewManager(kubeConfig, managerOpts) + if err != nil { + return errors.Wrap(err, "failed to create manager") + } + + // this cachedscopedclient is built using the Manager's cached client, which is + // NOT SAFE TO USE UNTIL THE MANAGER IS STARTED! + // This is okay because it is only used to build the IPAMPoolMonitor, which does not + // attempt to use the client until it has received a NodeNetworkConfig to update, and + // that can only happen once the Manager has started and the NodeNetworkConfig + // reconciler has pushed the Monitor a NodeNetworkConfig. + cachedscopedcli := nncctrl.NewScopedClient(nodenetworkconfig.NewClient(manager.GetClient()), types.NamespacedName{Namespace: "kube-system", Name: nodeName}) + + // Build the IPAM Pool monitor + var poolMonitor cns.IPAMPoolMonitor + cssCh := make(chan cssv1alpha1.ClusterSubnetState) + ipDemandCh := make(chan int) + if cnsconfig.EnableIPAMv2 { + cssSrc := func(context.Context) ([]cssv1alpha1.ClusterSubnetState, error) { return nil, nil } + if cnsconfig.EnableSubnetScarcity { + cssSrc = clustersubnetstate.NewClient(manager.GetClient()).List + } + nncCh := make(chan v1alpha.NodeNetworkConfig) + pmv2 := ipampoolv2.NewMonitor(z, httpRestServiceImplementation, cachedscopedcli, ipDemandCh, nncCh, cssCh) + obs := metrics.NewLegacyMetricsObserver(httpRestService.GetPodIPConfigState, cachedscopedcli.Get, cssSrc) + pmv2.WithLegacyMetricsObserver(obs) + poolMonitor = pmv2.AsV1(nncCh) + } else { + poolOpts := ipampool.Options{ + RefreshDelay: poolIPAMRefreshRateInMilliseconds * time.Millisecond, + } + poolMonitor = ipampool.NewMonitor(httpRestServiceImplementation, cachedscopedcli, cssCh, &poolOpts) + } + + // Start building the NNC Reconciler + + // get CNS Node IP to compare NC Node IP with this Node IP to ensure NCs were created for this node + nodeIP := configuration.NodeIP() + nncReconciler := nncctrl.NewReconciler(httpRestServiceImplementation, poolMonitor, nodeIP) + // pass Node to the Reconciler for Controller xref + // IPAMv1 - reconcile only status changes (where generation doesn't change). + // IPAMv2 - reconcile all updates. + filterGenerationChange := !cnsconfig.EnableIPAMv2 + if err := nncReconciler.SetupWithManager(manager, node, filterGenerationChange); err != nil { //nolint:govet // intentional shadow + return errors.Wrapf(err, "failed to setup nnc reconciler with manager") + } + + if cnsconfig.EnableSubnetScarcity { + // ClusterSubnetState reconciler + cssReconciler := cssctrl.New(cssCh) + if err := cssReconciler.SetupWithManager(manager); err != nil { + return errors.Wrapf(err, "failed to setup css reconciler with manager") + } + } + + // TODO: add pod listeners based on Swift V1 vs MT/V2 configuration + if cnsconfig.WatchPods { + pw := podctrl.New(z) + if cnsconfig.EnableIPAMv2 { + hostNetworkListOpt := &client.ListOptions{FieldSelector: fields.SelectorFromSet(fields.Set{"spec.hostNetwork": "false"})} // filter only podsubnet pods + // don't relist pods more than every 500ms + limit := rate.NewLimiter(rate.Every(500*time.Millisecond), 1) //nolint:gomnd // clearly 500ms + pw.With(pw.NewNotifierFunc(hostNetworkListOpt, limit, ipampoolv2.PodIPDemandListener(ipDemandCh))) + } + if err := pw.SetupWithManager(ctx, manager); err != nil { + return errors.Wrapf(err, "failed to setup pod watcher with manager") + } + } + + if cnsconfig.EnableSwiftV2 { + if err := mtpncctrl.SetupWithManager(manager); err != nil { + return errors.Wrapf(err, "failed to setup mtpnc reconciler with manager") + } + // if SWIFT v2 is enabled on CNS, attach multitenant middleware to rest service + // switch here for AKS(K8s) swiftv2 middleware to process IP configs requests + swiftV2Middleware := &middlewares.K8sSWIFTv2Middleware{Cli: manager.GetClient()} + httpRestService.AttachIPConfigsHandlerMiddleware(swiftV2Middleware) + } + + // start the pool Monitor before the Reconciler, since it needs to be ready to receive an + // NodeNetworkConfig update by the time the Reconciler tries to send it. + go func() { + logger.Printf("Starting IPAM Pool Monitor") + if e := poolMonitor.Start(ctx); e != nil { + logger.Errorf("[Azure CNS] Failed to start pool monitor with err: %v", e) + } + }() + logger.Printf("initialized and started IPAM pool monitor") + + // Start the Manager which starts the reconcile loop. + // The Reconciler will send an initial NodeNetworkConfig update to the PoolMonitor, starting the + // Monitor's internal loop. + go func() { + logger.Printf("Starting controller-manager.") + for { + if err := manager.Start(ctx); err != nil { + logger.Errorf("Failed to start controller-manager: %v", err) + // retry to start the request controller + // inc the managerStartFailures metric for failure tracking + managerStartFailures.Inc() + } else { + logger.Printf("Stopped controller-manager.") + return + } + time.Sleep(time.Second) // TODO(rbtr): make this exponential backoff + } + }() + logger.Printf("Initialized controller-manager.") + for { + logger.Printf("Waiting for NodeNetworkConfig reconciler to start.") + // wait for the Reconciler to run once on a NNC that was made for this Node. + // the nncReadyCtx has a timeout of 15 minutes, after which we will consider + // this false and the NNC Reconciler stuck/failed, log and retry. + nncReadyCtx, cancel := context.WithTimeout(ctx, 15*time.Minute) // nolint // it will time out and not leak + if started, err := nncReconciler.Started(nncReadyCtx); !started { + logger.Errorf("NNC reconciler has not started, does the NNC exist? err: %v", err) + nncReconcilerStartFailures.Inc() + continue + } + logger.Printf("NodeNetworkConfig reconciler has started.") + cancel() + break + } + + go func() { + logger.Printf("Starting SyncHostNCVersion loop.") + // Periodically poll vfp programmed NC version from NMAgent + tickerChannel := time.Tick(time.Duration(cnsconfig.SyncHostNCVersionIntervalMs) * time.Millisecond) + for { + select { + case <-tickerChannel: + timedCtx, cancel := context.WithTimeout(ctx, time.Duration(cnsconfig.SyncHostNCVersionIntervalMs)*time.Millisecond) + httpRestServiceImplementation.SyncHostNCVersion(timedCtx, cnsconfig.ChannelMode) + cancel() + case <-ctx.Done(): + logger.Printf("Stopping SyncHostNCVersion loop.") + return + } + } + }() + logger.Printf("Initialized SyncHostNCVersion loop.") + return nil +} + +// createOrUpdateNodeInfoCRD polls imds to learn the VM Unique ID and then creates or updates the NodeInfo CRD +// with that vm unique ID +func createOrUpdateNodeInfoCRD(ctx context.Context, restConfig *rest.Config, node *corev1.Node) error { + imdsCli := imds.NewClient() + vmUniqueID, err := imdsCli.GetVMUniqueID(ctx) + if err != nil { + return errors.Wrap(err, "error getting vm unique ID from imds") + } + + directcli, err := client.New(restConfig, client.Options{Scheme: multitenancy.Scheme}) + if err != nil { + return errors.Wrap(err, "failed to create ctrl client") + } + + nodeInfoCli := multitenancy.NodeInfoClient{ + Cli: directcli, + } + + nodeInfo := &mtv1alpha1.NodeInfo{ + ObjectMeta: metav1.ObjectMeta{ + Name: node.Name, + }, + Spec: mtv1alpha1.NodeInfoSpec{ + VMUniqueID: vmUniqueID, + }, + } + + if err := controllerutil.SetOwnerReference(node, nodeInfo, multitenancy.Scheme); err != nil { + return errors.Wrap(err, "failed to set nodeinfo owner reference to node") + } + + if err := nodeInfoCli.CreateOrUpdate(ctx, nodeInfo, "azure-cns"); err != nil { + return errors.Wrap(err, "error ensuring nodeinfo CRD exists and is up-to-date") + } + + return nil +} + +func initializeMultiTenantController(ctx context.Context, httpRestService cns.HTTPService, cnsconfig configuration.CNSConfig) error { + var multiTenantController multitenantcontroller.RequestController + kubeConfig, err := ctrl.GetConfig() + kubeConfig.UserAgent = fmt.Sprintf("azure-cns-%s", version) + if err != nil { + return err + } + + // convert interface type to implementation type + httpRestServiceImpl, ok := httpRestService.(*restserver.HTTPRestService) + if !ok { + logger.Errorf("Failed to convert interface httpRestService to implementation: %v", httpRestService) + return fmt.Errorf("Failed to convert interface httpRestService to implementation: %v", + httpRestService) + } + + // Set orchestrator type + orchestrator := cns.SetOrchestratorTypeRequest{ + OrchestratorType: cns.Kubernetes, + } + httpRestServiceImpl.SetNodeOrchestrator(&orchestrator) + + // Create multiTenantController. + multiTenantController, err = multitenantoperator.New(httpRestServiceImpl, kubeConfig) + if err != nil { + logger.Errorf("Failed to create multiTenantController:%v", err) + return err + } + + // Wait for multiTenantController to start. + go func() { + for { + if err := multiTenantController.Start(ctx); err != nil { + logger.Errorf("Failed to start multiTenantController: %v", err) + } else { + logger.Printf("Exiting multiTenantController") + return + } + + // Retry after 1sec + time.Sleep(time.Second) + } + }() + for { + if multiTenantController.IsStarted() { + logger.Printf("MultiTenantController is started") + break + } + + logger.Printf("Waiting for multiTenantController to start...") + time.Sleep(time.Millisecond * 500) + } + + // TODO: do we need this to be running? + logger.Printf("Starting SyncHostNCVersion") + go func() { + // Periodically poll vfp programmed NC version from NMAgent + tickerChannel := time.Tick(time.Duration(cnsconfig.SyncHostNCVersionIntervalMs) * time.Millisecond) + for { + select { + case <-tickerChannel: + timedCtx, cancel := context.WithTimeout(ctx, time.Duration(cnsconfig.SyncHostNCVersionIntervalMs)*time.Millisecond) + httpRestServiceImpl.SyncHostNCVersion(timedCtx, cnsconfig.ChannelMode) + cancel() + case <-ctx.Done(): + return + } + } + }() + + return nil +} + +// Poll CRD until it's set and update PluginManager +func pollNodeInfoCRDAndUpdatePlugin(ctx context.Context, zlog *zap.Logger, pluginManager *deviceplugin.PluginManager) error { + kubeConfig, err := ctrl.GetConfig() + if err != nil { + logger.Errorf("Failed to get kubeconfig for request controller: %v", err) + return errors.Wrap(err, "failed to get kubeconfig") + } + kubeConfig.UserAgent = "azure-cns-" + version + + clientset, err := kubernetes.NewForConfig(kubeConfig) + if err != nil { + return errors.Wrap(err, "failed to build clientset") + } + + nodeName, err := configuration.NodeName() + if err != nil { + return errors.Wrap(err, "failed to get NodeName") + } + + node, err := clientset.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{}) + if err != nil { + return errors.Wrapf(err, "failed to get node %s", nodeName) + } + + // check the Node labels for Swift V2 + if _, ok := node.Labels[configuration.LabelNodeSwiftV2]; !ok { + zlog.Info("Node is not labeled for Swift V2, skipping polling nodeinfo crd") + return nil + } + + directcli, err := client.New(kubeConfig, client.Options{Scheme: multitenancy.Scheme}) + if err != nil { + return errors.Wrap(err, "failed to create ctrl client") + } + + nodeInfoCli := multitenancy.NodeInfoClient{ + Cli: directcli, + } + + ticker := time.NewTicker(defaultNodeInfoCRDPollInterval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + zlog.Info("Polling context canceled, exiting") + return nil + case <-ticker.C: + // Fetch the CRD status + nodeInfo, err := nodeInfoCli.Get(ctx, node.Name) + if err != nil { + zlog.Error("Error fetching nodeinfo CRD", zap.Error(err)) + return errors.Wrap(err, "failed to get nodeinfo crd") + } + + // Check if the status is set + if !cmp.Equal(nodeInfo.Status, mtv1alpha1.NodeInfoStatus{}) && len(nodeInfo.Status.DeviceInfos) > 0 { + // Create a map to count devices by type + deviceCounts := map[mtv1alpha1.DeviceType]int{ + mtv1alpha1.DeviceTypeVnetNIC: 0, + mtv1alpha1.DeviceTypeInfiniBandNIC: 0, + } + + // Aggregate device counts from the CRD + for _, deviceInfo := range nodeInfo.Status.DeviceInfos { + switch deviceInfo.DeviceType { + case mtv1alpha1.DeviceTypeVnetNIC, mtv1alpha1.DeviceTypeInfiniBandNIC: + deviceCounts[deviceInfo.DeviceType]++ + default: + zlog.Error("Unknown device type", zap.String("deviceType", string(deviceInfo.DeviceType))) + } + } + + // Update the plugin manager with device counts + for deviceType, count := range deviceCounts { + pluginManager.TrackDevices(deviceType, count) + } + + // Exit polling loop once the CRD status is successfully processed + return nil + } + } + } +} diff --git a/cns/service/args.go b/cns/service/args.go new file mode 100644 index 0000000000..786cf04009 --- /dev/null +++ b/cns/service/args.go @@ -0,0 +1,219 @@ +// nolint +package main + +import ( + "os" + + acn "github.com/Azure/azure-container-networking/common" + "github.com/Azure/azure-container-networking/log" + "github.com/Azure/azure-container-networking/platform" +) + +// Command line arguments for CNS. +var args = acn.ArgumentList{ + { + Name: acn.OptEnvironment, + Shorthand: acn.OptEnvironmentAlias, + Description: "Set the operating environment", + Type: "string", + DefaultValue: acn.OptEnvironmentAzure, + ValueMap: map[string]interface{}{ + acn.OptEnvironmentAzure: 0, + acn.OptEnvironmentMAS: 0, + acn.OptEnvironmentFileIpam: 0, + }, + }, + { + Name: acn.OptAPIServerURL, + Shorthand: acn.OptAPIServerURLAlias, + Description: "Set the API server URL", + Type: "string", + DefaultValue: "", + }, + { + Name: acn.OptLogLevel, + Shorthand: acn.OptLogLevelAlias, + Description: "Set the logging level", + Type: "int", + DefaultValue: acn.OptLogLevelInfo, + ValueMap: map[string]interface{}{ + acn.OptLogLevelInfo: log.LevelInfo, + acn.OptLogLevelDebug: log.LevelDebug, + }, + }, + { + Name: acn.OptLogTarget, + Shorthand: acn.OptLogTargetAlias, + Description: "Set the logging target", + Type: "int", + DefaultValue: acn.OptLogTargetFile, + ValueMap: map[string]interface{}{ + acn.OptLogTargetSyslog: log.TargetSyslog, + acn.OptLogTargetStderr: log.TargetStderr, + acn.OptLogTargetFile: log.TargetLogfile, + acn.OptLogStdout: log.TargetStdout, + acn.OptLogMultiWrite: log.TargetStdOutAndLogFile, + }, + }, + { + Name: acn.OptLogLocation, + Shorthand: acn.OptLogLocationAlias, + Description: "Set the directory location where logs will be saved", + Type: "string", + DefaultValue: "", + }, + { + Name: acn.OptIpamQueryUrl, + Shorthand: acn.OptIpamQueryUrlAlias, + Description: "Set the IPAM query URL", + Type: "string", + DefaultValue: "", + }, + { + Name: acn.OptIpamQueryInterval, + Shorthand: acn.OptIpamQueryIntervalAlias, + Description: "Set the IPAM plugin query interval", + Type: "int", + DefaultValue: "", + }, + { + Name: acn.OptCnsURL, + Shorthand: acn.OptCnsURLAlias, + Description: "Set the URL for CNS to listen on", + Type: "string", + DefaultValue: "", + }, + { + Name: acn.OptCnsPort, + Shorthand: acn.OptCnsPortAlias, + Description: "Set the URL port for CNS to listen on", + Type: "string", + DefaultValue: "", + }, + { + Name: acn.OptVersion, + Shorthand: acn.OptVersionAlias, + Description: "Print version information", + Type: "bool", + DefaultValue: false, + }, + { + Name: acn.OptStoreFileLocation, + Shorthand: acn.OptStoreFileLocationAlias, + Description: "Set store file absolute path", + Type: "string", + DefaultValue: platform.CNMRuntimePath, + }, + { + Name: acn.OptNetPluginPath, + Shorthand: acn.OptNetPluginPathAlias, + Description: "Set network plugin binary absolute path to parent (of azure-vnet and azure-vnet-ipam)", + Type: "string", + DefaultValue: platform.K8SCNIRuntimePath, + }, + { + Name: acn.OptNetPluginConfigFile, + Shorthand: acn.OptNetPluginConfigFileAlias, + Description: "Set network plugin configuration file absolute path", + Type: "string", + DefaultValue: platform.K8SNetConfigPath + string(os.PathSeparator) + defaultCNINetworkConfigFileName, + }, + { + Name: acn.OptCreateDefaultExtNetworkType, + Shorthand: acn.OptCreateDefaultExtNetworkTypeAlias, + Description: "Create default external network for windows platform with the specified type (l2bridge)", + Type: "string", + DefaultValue: "", + }, + { + Name: acn.OptTelemetry, + Shorthand: acn.OptTelemetryAlias, + Description: "Set to false to disable telemetry. This is deprecated in favor of cns_config.json", + Type: "bool", + DefaultValue: true, + }, + { + Name: acn.OptHttpConnectionTimeout, + Shorthand: acn.OptHttpConnectionTimeoutAlias, + Description: "Set HTTP connection timeout in seconds to be used by http client in CNS", + Type: "int", + DefaultValue: "5", + }, + { + Name: acn.OptHttpResponseHeaderTimeout, + Shorthand: acn.OptHttpResponseHeaderTimeoutAlias, + Description: "Set HTTP response header timeout in seconds to be used by http client in CNS", + Type: "int", + DefaultValue: "120", + }, + { + Name: acn.OptPrivateEndpoint, + Shorthand: acn.OptPrivateEndpointAlias, + Description: "Set private endpoint", + Type: "string", + DefaultValue: "", + }, + { + Name: acn.OptInfrastructureNetworkID, + Shorthand: acn.OptInfrastructureNetworkIDAlias, + Description: "Set infrastructure network ID", + Type: "string", + DefaultValue: "", + }, + { + Name: acn.OptNodeID, + Shorthand: acn.OptNodeIDAlias, + Description: "Set node name/ID", + Type: "string", + DefaultValue: "", + }, + { + Name: acn.OptManaged, + Shorthand: acn.OptManagedAlias, + Description: "Set to true to enable managed mode. This is deprecated in favor of cns_config.json", + Type: "bool", + DefaultValue: false, + }, + { + Name: acn.OptDebugCmd, + Shorthand: acn.OptDebugCmdAlias, + Description: "Debug flag to retrieve IPconfigs, available values: assigned, available, all", + Type: "string", + DefaultValue: "", + }, + { + Name: acn.OptDebugArg, + Shorthand: acn.OptDebugArgAlias, + Description: "Argument flag to be paired with the 'debugcmd' flag.", + Type: "string", + DefaultValue: "", + }, + { + Name: acn.OptCNSConfigPath, + Shorthand: acn.OptCNSConfigPathAlias, + Description: "Path to cns config file", + Type: "string", + DefaultValue: "", + }, + { + Name: acn.OptTelemetryService, + Shorthand: acn.OptTelemetryServiceAlias, + Description: "Flag to start telemetry service to receive telemetry events from CNI. Default, disabled.", + Type: "bool", + DefaultValue: false, + }, + { + Name: acn.OptCNIConflistFilepath, + Shorthand: acn.OptCNIConflistFilepathAlias, + Description: "Filepath to write CNI conflist when CNI conflist generation is enabled", + Type: "string", + DefaultValue: "", + }, + { + Name: acn.OptCNIConflistScenario, + Shorthand: acn.OptCNIConflistScenarioAlias, + Description: "Scenario to generate CNI conflist for", + Type: "string", + DefaultValue: "", + }, +} diff --git a/cns/service/main.go b/cns/service/main.go index ff79090d8b..91f7242c38 100644 --- a/cns/service/main.go +++ b/cns/service/main.go @@ -4,7 +4,6 @@ package main import ( - "bytes" "context" "encoding/json" "fmt" @@ -13,7 +12,6 @@ import ( "os" "os/signal" "runtime" - "strconv" "strings" "syscall" "time" @@ -32,35 +30,18 @@ import ( "github.com/Azure/azure-container-networking/cns/healthserver" "github.com/Azure/azure-container-networking/cns/hnsclient" "github.com/Azure/azure-container-networking/cns/imds" - "github.com/Azure/azure-container-networking/cns/ipampool" - "github.com/Azure/azure-container-networking/cns/ipampool/metrics" - ipampoolv2 "github.com/Azure/azure-container-networking/cns/ipampool/v2" - cssctrl "github.com/Azure/azure-container-networking/cns/kubecontroller/clustersubnetstate" - mtpncctrl "github.com/Azure/azure-container-networking/cns/kubecontroller/multitenantpodnetworkconfig" - nncctrl "github.com/Azure/azure-container-networking/cns/kubecontroller/nodenetworkconfig" - podctrl "github.com/Azure/azure-container-networking/cns/kubecontroller/pod" "github.com/Azure/azure-container-networking/cns/logger" loggerv2 "github.com/Azure/azure-container-networking/cns/logger/v2" "github.com/Azure/azure-container-networking/cns/metric" "github.com/Azure/azure-container-networking/cns/middlewares" - "github.com/Azure/azure-container-networking/cns/multitenantcontroller" - "github.com/Azure/azure-container-networking/cns/multitenantcontroller/multitenantoperator" "github.com/Azure/azure-container-networking/cns/restserver" restserverv2 "github.com/Azure/azure-container-networking/cns/restserver/v2" cnipodprovider "github.com/Azure/azure-container-networking/cns/stateprovider/cni" cnspodprovider "github.com/Azure/azure-container-networking/cns/stateprovider/cns" - cnstypes "github.com/Azure/azure-container-networking/cns/types" "github.com/Azure/azure-container-networking/cns/wireserver" acn "github.com/Azure/azure-container-networking/common" - "github.com/Azure/azure-container-networking/crd" - "github.com/Azure/azure-container-networking/crd/clustersubnetstate" - cssv1alpha1 "github.com/Azure/azure-container-networking/crd/clustersubnetstate/api/v1alpha1" - "github.com/Azure/azure-container-networking/crd/multitenancy" mtv1alpha1 "github.com/Azure/azure-container-networking/crd/multitenancy/api/v1alpha1" - "github.com/Azure/azure-container-networking/crd/nodenetworkconfig" - "github.com/Azure/azure-container-networking/crd/nodenetworkconfig/api/v1alpha" acnfs "github.com/Azure/azure-container-networking/internal/fs" - "github.com/Azure/azure-container-networking/log" "github.com/Azure/azure-container-networking/nmagent" "github.com/Azure/azure-container-networking/platform" "github.com/Azure/azure-container-networking/processlock" @@ -68,26 +49,10 @@ import ( "github.com/Azure/azure-container-networking/store" "github.com/Azure/azure-container-networking/telemetry" "github.com/avast/retry-go/v4" - "github.com/go-logr/zapr" - "github.com/google/go-cmp/cmp" "github.com/pkg/errors" "go.uber.org/zap" - "golang.org/x/time/rate" - corev1 "k8s.io/api/core/v1" - apierrors "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/fields" - kuberuntime "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/kubernetes" - "k8s.io/client-go/rest" - ctrl "sigs.k8s.io/controller-runtime" - "sigs.k8s.io/controller-runtime/pkg/cache" - "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/healthz" - ctrlmgr "sigs.k8s.io/controller-runtime/pkg/manager" - ctrlmetrics "sigs.k8s.io/controller-runtime/pkg/metrics/server" ) const ( @@ -118,236 +83,14 @@ const ( initialIBNICCount = 0 ) -type cniConflistScenario string - -const ( - scenarioV4Overlay cniConflistScenario = "v4overlay" - scenarioDualStackOverlay cniConflistScenario = "dualStackOverlay" - scenarioOverlay cniConflistScenario = "overlay" - scenarioCilium cniConflistScenario = "cilium" - scenarioSWIFT cniConflistScenario = "swift" -) - var ( rootCtx context.Context rootErrCh chan error ) -// Version is populated by make during build. -var version string - // endpointStorePath is used to create the path for EdnpointState file. var endpointStorePath string -// Command line arguments for CNS. -var args = acn.ArgumentList{ - { - Name: acn.OptEnvironment, - Shorthand: acn.OptEnvironmentAlias, - Description: "Set the operating environment", - Type: "string", - DefaultValue: acn.OptEnvironmentAzure, - ValueMap: map[string]interface{}{ - acn.OptEnvironmentAzure: 0, - acn.OptEnvironmentMAS: 0, - acn.OptEnvironmentFileIpam: 0, - }, - }, - { - Name: acn.OptAPIServerURL, - Shorthand: acn.OptAPIServerURLAlias, - Description: "Set the API server URL", - Type: "string", - DefaultValue: "", - }, - { - Name: acn.OptLogLevel, - Shorthand: acn.OptLogLevelAlias, - Description: "Set the logging level", - Type: "int", - DefaultValue: acn.OptLogLevelInfo, - ValueMap: map[string]interface{}{ - acn.OptLogLevelInfo: log.LevelInfo, - acn.OptLogLevelDebug: log.LevelDebug, - }, - }, - { - Name: acn.OptLogTarget, - Shorthand: acn.OptLogTargetAlias, - Description: "Set the logging target", - Type: "int", - DefaultValue: acn.OptLogTargetFile, - ValueMap: map[string]interface{}{ - acn.OptLogTargetSyslog: log.TargetSyslog, - acn.OptLogTargetStderr: log.TargetStderr, - acn.OptLogTargetFile: log.TargetLogfile, - acn.OptLogStdout: log.TargetStdout, - acn.OptLogMultiWrite: log.TargetStdOutAndLogFile, - }, - }, - { - Name: acn.OptLogLocation, - Shorthand: acn.OptLogLocationAlias, - Description: "Set the directory location where logs will be saved", - Type: "string", - DefaultValue: "", - }, - { - Name: acn.OptIpamQueryUrl, - Shorthand: acn.OptIpamQueryUrlAlias, - Description: "Set the IPAM query URL", - Type: "string", - DefaultValue: "", - }, - { - Name: acn.OptIpamQueryInterval, - Shorthand: acn.OptIpamQueryIntervalAlias, - Description: "Set the IPAM plugin query interval", - Type: "int", - DefaultValue: "", - }, - { - Name: acn.OptCnsURL, - Shorthand: acn.OptCnsURLAlias, - Description: "Set the URL for CNS to listen on", - Type: "string", - DefaultValue: "", - }, - { - Name: acn.OptCnsPort, - Shorthand: acn.OptCnsPortAlias, - Description: "Set the URL port for CNS to listen on", - Type: "string", - DefaultValue: "", - }, - { - Name: acn.OptVersion, - Shorthand: acn.OptVersionAlias, - Description: "Print version information", - Type: "bool", - DefaultValue: false, - }, - { - Name: acn.OptStoreFileLocation, - Shorthand: acn.OptStoreFileLocationAlias, - Description: "Set store file absolute path", - Type: "string", - DefaultValue: platform.CNMRuntimePath, - }, - { - Name: acn.OptNetPluginPath, - Shorthand: acn.OptNetPluginPathAlias, - Description: "Set network plugin binary absolute path to parent (of azure-vnet and azure-vnet-ipam)", - Type: "string", - DefaultValue: platform.K8SCNIRuntimePath, - }, - { - Name: acn.OptNetPluginConfigFile, - Shorthand: acn.OptNetPluginConfigFileAlias, - Description: "Set network plugin configuration file absolute path", - Type: "string", - DefaultValue: platform.K8SNetConfigPath + string(os.PathSeparator) + defaultCNINetworkConfigFileName, - }, - { - Name: acn.OptCreateDefaultExtNetworkType, - Shorthand: acn.OptCreateDefaultExtNetworkTypeAlias, - Description: "Create default external network for windows platform with the specified type (l2bridge)", - Type: "string", - DefaultValue: "", - }, - { - Name: acn.OptTelemetry, - Shorthand: acn.OptTelemetryAlias, - Description: "Set to false to disable telemetry. This is deprecated in favor of cns_config.json", - Type: "bool", - DefaultValue: true, - }, - { - Name: acn.OptHttpConnectionTimeout, - Shorthand: acn.OptHttpConnectionTimeoutAlias, - Description: "Set HTTP connection timeout in seconds to be used by http client in CNS", - Type: "int", - DefaultValue: "5", - }, - { - Name: acn.OptHttpResponseHeaderTimeout, - Shorthand: acn.OptHttpResponseHeaderTimeoutAlias, - Description: "Set HTTP response header timeout in seconds to be used by http client in CNS", - Type: "int", - DefaultValue: "120", - }, - { - Name: acn.OptPrivateEndpoint, - Shorthand: acn.OptPrivateEndpointAlias, - Description: "Set private endpoint", - Type: "string", - DefaultValue: "", - }, - { - Name: acn.OptInfrastructureNetworkID, - Shorthand: acn.OptInfrastructureNetworkIDAlias, - Description: "Set infrastructure network ID", - Type: "string", - DefaultValue: "", - }, - { - Name: acn.OptNodeID, - Shorthand: acn.OptNodeIDAlias, - Description: "Set node name/ID", - Type: "string", - DefaultValue: "", - }, - { - Name: acn.OptManaged, - Shorthand: acn.OptManagedAlias, - Description: "Set to true to enable managed mode. This is deprecated in favor of cns_config.json", - Type: "bool", - DefaultValue: false, - }, - { - Name: acn.OptDebugCmd, - Shorthand: acn.OptDebugCmdAlias, - Description: "Debug flag to retrieve IPconfigs, available values: assigned, available, all", - Type: "string", - DefaultValue: "", - }, - { - Name: acn.OptDebugArg, - Shorthand: acn.OptDebugArgAlias, - Description: "Argument flag to be paired with the 'debugcmd' flag.", - Type: "string", - DefaultValue: "", - }, - { - Name: acn.OptCNSConfigPath, - Shorthand: acn.OptCNSConfigPathAlias, - Description: "Path to cns config file", - Type: "string", - DefaultValue: "", - }, - { - Name: acn.OptTelemetryService, - Shorthand: acn.OptTelemetryServiceAlias, - Description: "Flag to start telemetry service to receive telemetry events from CNI. Default, disabled.", - Type: "bool", - DefaultValue: false, - }, - { - Name: acn.OptCNIConflistFilepath, - Shorthand: acn.OptCNIConflistFilepathAlias, - Description: "Filepath to write CNI conflist when CNI conflist generation is enabled", - Type: "string", - DefaultValue: "", - }, - { - Name: acn.OptCNIConflistScenario, - Shorthand: acn.OptCNIConflistScenarioAlias, - Description: "Scenario to generate CNI conflist for", - Type: "string", - DefaultValue: "", - }, -} - // init() is executed before main() whenever this package is imported // to do pre-run setup of things like exit signal handling and building // the root context. @@ -380,96 +123,10 @@ func init() { }() } -// Prints description and version information. -func printVersion() { - fmt.Printf("Azure Container Network Service\n") - fmt.Printf("Version %v\n", version) -} - -// NodeInterrogator is functionality necessary to read information about nodes. -// It is intended to be strictly read-only. -type NodeInterrogator interface { - SupportedAPIs(context.Context) ([]string, error) -} - type httpDoer interface { Do(req *http.Request) (*http.Response, error) } -// RegisterNode - Tries to register node with DNC when CNS is started in managed DNC mode -func registerNode(ctx context.Context, httpClient httpDoer, httpRestService cns.HTTPService, dncEP, infraVnet, nodeID string, ni NodeInterrogator) error { - logger.Printf("[Azure CNS] Registering node %s with Infrastructure Network: %s PrivateEndpoint: %s", nodeID, infraVnet, dncEP) - - var ( - numCPU = runtime.NumCPU() - url = fmt.Sprintf(acn.RegisterNodeURLFmt, dncEP, infraVnet, nodeID, dncApiVersion) - nodeRegisterRequest cns.NodeRegisterRequest - ) - - nodeRegisterRequest.NumCores = numCPU - supportedApis, retErr := ni.SupportedAPIs(context.TODO()) - - if retErr != nil { - return errors.Wrap(retErr, fmt.Sprintf("[Azure CNS] Failed to retrieve SupportedApis from NMagent of node %s with Infrastructure Network: %s PrivateEndpoint: %s", - nodeID, infraVnet, dncEP)) - } - - // To avoid any null-pointer de-referencing errors. - if supportedApis == nil { - supportedApis = []string{} - } - - nodeRegisterRequest.NmAgentSupportedApis = supportedApis - - // CNS tries to register Node for maximum of an hour. - err := retry.Do(func() error { - return errors.Wrap(sendRegisterNodeRequest(ctx, httpClient, httpRestService, nodeRegisterRequest, url), "failed to sendRegisterNodeRequest") - }, retry.Delay(acn.FiveSeconds), retry.Attempts(maxRetryNodeRegister), retry.DelayType(retry.FixedDelay)) - - return errors.Wrap(err, fmt.Sprintf("[Azure CNS] Failed to register node %s after maximum reties for an hour with Infrastructure Network: %s PrivateEndpoint: %s", - nodeID, infraVnet, dncEP)) -} - -// sendRegisterNodeRequest func helps in registering the node until there is an error. -func sendRegisterNodeRequest(ctx context.Context, httpClient httpDoer, httpRestService cns.HTTPService, nodeRegisterRequest cns.NodeRegisterRequest, registerURL string) error { - var body bytes.Buffer - err := json.NewEncoder(&body).Encode(nodeRegisterRequest) - if err != nil { - logger.Errorf("Failed to register node while encoding json failed with non-retryable err %v", err) - return errors.Wrap(retry.Unrecoverable(err), "failed to sendRegisterNodeRequest") - } - - request, err := http.NewRequestWithContext(ctx, http.MethodPost, registerURL, &body) - if err != nil { - return errors.Wrap(err, "failed to build request") - } - - request.Header.Set("Content-Type", "application/json") - response, err := httpClient.Do(request) - if err != nil { - return errors.Wrap(err, "http request failed") - } - - defer response.Body.Close() - - if response.StatusCode != http.StatusOK { - err = fmt.Errorf("[Azure CNS] Failed to register node, DNC replied with http status code %s", strconv.Itoa(response.StatusCode)) - logger.Errorf(err.Error()) - return errors.Wrap(err, "failed to sendRegisterNodeRequest") - } - - var req cns.SetOrchestratorTypeRequest - err = json.NewDecoder(response.Body).Decode(&req) - if err != nil { - logger.Errorf("decoding Node Register response json failed with err %v", err) - return errors.Wrap(err, "failed to sendRegisterNodeRequest") - } - httpRestService.SetNodeOrchestrator(&req) - - logger.Printf("[Azure CNS] Node Registered") - return nil -} - func startTelemetryService(ctx context.Context) { var config aitelemetry.AIConfig @@ -870,7 +527,7 @@ func main() { logger.Printf("Set GlobalPodInfoScheme %v (InitializeFromCNI=%t)", cns.GlobalPodInfoScheme, cnsconfig.InitializeFromCNI) - err = InitializeCRDState(rootCtx, z, httpRemoteRestService, cnsconfig) + err = initializeCRDState(rootCtx, z, httpRemoteRestService, cnsconfig) if err != nil { logger.Errorf("Failed to start CRD Controller, err:%v.\n", err) return @@ -920,7 +577,7 @@ func main() { // Initialize multi-tenant controller if the CNS is running in MultiTenantCRD mode. // It must be started before we start HTTPRemoteRestService. if config.ChannelMode == cns.MultiTenantCRD { - err = InitializeMultiTenantController(rootCtx, httpRemoteRestService, *cnsconfig) + err = initializeMultiTenantController(rootCtx, httpRemoteRestService, *cnsconfig) if err != nil { logger.Errorf("Failed to start multiTenantController, err:%v.\n", err) return @@ -1144,524 +801,6 @@ func main() { logger.Close() } -// Poll CRD until it's set and update PluginManager -func pollNodeInfoCRDAndUpdatePlugin(ctx context.Context, zlog *zap.Logger, pluginManager *deviceplugin.PluginManager) error { - kubeConfig, err := ctrl.GetConfig() - if err != nil { - logger.Errorf("Failed to get kubeconfig for request controller: %v", err) - return errors.Wrap(err, "failed to get kubeconfig") - } - kubeConfig.UserAgent = "azure-cns-" + version - - clientset, err := kubernetes.NewForConfig(kubeConfig) - if err != nil { - return errors.Wrap(err, "failed to build clientset") - } - - nodeName, err := configuration.NodeName() - if err != nil { - return errors.Wrap(err, "failed to get NodeName") - } - - node, err := clientset.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{}) - if err != nil { - return errors.Wrapf(err, "failed to get node %s", nodeName) - } - - // check the Node labels for Swift V2 - if _, ok := node.Labels[configuration.LabelNodeSwiftV2]; !ok { - zlog.Info("Node is not labeled for Swift V2, skipping polling nodeinfo crd") - return nil - } - - directcli, err := client.New(kubeConfig, client.Options{Scheme: multitenancy.Scheme}) - if err != nil { - return errors.Wrap(err, "failed to create ctrl client") - } - - nodeInfoCli := multitenancy.NodeInfoClient{ - Cli: directcli, - } - - ticker := time.NewTicker(defaultNodeInfoCRDPollInterval) - defer ticker.Stop() - - for { - select { - case <-ctx.Done(): - zlog.Info("Polling context canceled, exiting") - return nil - case <-ticker.C: - // Fetch the CRD status - nodeInfo, err := nodeInfoCli.Get(ctx, node.Name) - if err != nil { - zlog.Error("Error fetching nodeinfo CRD", zap.Error(err)) - return errors.Wrap(err, "failed to get nodeinfo crd") - } - - // Check if the status is set - if !cmp.Equal(nodeInfo.Status, mtv1alpha1.NodeInfoStatus{}) && len(nodeInfo.Status.DeviceInfos) > 0 { - // Create a map to count devices by type - deviceCounts := map[mtv1alpha1.DeviceType]int{ - mtv1alpha1.DeviceTypeVnetNIC: 0, - mtv1alpha1.DeviceTypeInfiniBandNIC: 0, - } - - // Aggregate device counts from the CRD - for _, deviceInfo := range nodeInfo.Status.DeviceInfos { - switch deviceInfo.DeviceType { - case mtv1alpha1.DeviceTypeVnetNIC, mtv1alpha1.DeviceTypeInfiniBandNIC: - deviceCounts[deviceInfo.DeviceType]++ - default: - zlog.Error("Unknown device type", zap.String("deviceType", string(deviceInfo.DeviceType))) - } - } - - // Update the plugin manager with device counts - for deviceType, count := range deviceCounts { - pluginManager.TrackDevices(deviceType, count) - } - - // Exit polling loop once the CRD status is successfully processed - return nil - } - } - } -} - -func InitializeMultiTenantController(ctx context.Context, httpRestService cns.HTTPService, cnsconfig configuration.CNSConfig) error { - var multiTenantController multitenantcontroller.RequestController - kubeConfig, err := ctrl.GetConfig() - kubeConfig.UserAgent = fmt.Sprintf("azure-cns-%s", version) - if err != nil { - return err - } - - // convert interface type to implementation type - httpRestServiceImpl, ok := httpRestService.(*restserver.HTTPRestService) - if !ok { - logger.Errorf("Failed to convert interface httpRestService to implementation: %v", httpRestService) - return fmt.Errorf("Failed to convert interface httpRestService to implementation: %v", - httpRestService) - } - - // Set orchestrator type - orchestrator := cns.SetOrchestratorTypeRequest{ - OrchestratorType: cns.Kubernetes, - } - httpRestServiceImpl.SetNodeOrchestrator(&orchestrator) - - // Create multiTenantController. - multiTenantController, err = multitenantoperator.New(httpRestServiceImpl, kubeConfig) - if err != nil { - logger.Errorf("Failed to create multiTenantController:%v", err) - return err - } - - // Wait for multiTenantController to start. - go func() { - for { - if err := multiTenantController.Start(ctx); err != nil { - logger.Errorf("Failed to start multiTenantController: %v", err) - } else { - logger.Printf("Exiting multiTenantController") - return - } - - // Retry after 1sec - time.Sleep(time.Second) - } - }() - for { - if multiTenantController.IsStarted() { - logger.Printf("MultiTenantController is started") - break - } - - logger.Printf("Waiting for multiTenantController to start...") - time.Sleep(time.Millisecond * 500) - } - - // TODO: do we need this to be running? - logger.Printf("Starting SyncHostNCVersion") - go func() { - // Periodically poll vfp programmed NC version from NMAgent - tickerChannel := time.Tick(time.Duration(cnsconfig.SyncHostNCVersionIntervalMs) * time.Millisecond) - for { - select { - case <-tickerChannel: - timedCtx, cancel := context.WithTimeout(ctx, time.Duration(cnsconfig.SyncHostNCVersionIntervalMs)*time.Millisecond) - httpRestServiceImpl.SyncHostNCVersion(timedCtx, cnsconfig.ChannelMode) - cancel() - case <-ctx.Done(): - return - } - } - }() - - return nil -} - -type nodeNetworkConfigGetter interface { - Get(context.Context) (*v1alpha.NodeNetworkConfig, error) -} - -type ipamStateReconciler interface { - ReconcileIPAMStateForSwift(ncRequests []*cns.CreateNetworkContainerRequest, podInfoByIP map[string]cns.PodInfo, nnc *v1alpha.NodeNetworkConfig) cnstypes.ResponseCode -} - -// TODO(rbtr) where should this live?? -// reconcileInitialCNSState initializes cns by passing pods and a CreateNetworkContainerRequest -func reconcileInitialCNSState(ctx context.Context, cli nodeNetworkConfigGetter, ipamReconciler ipamStateReconciler, podInfoByIPProvider cns.PodInfoByIPProvider) error { - // Get nnc using direct client - nnc, err := cli.Get(ctx) - if err != nil { - if crd.IsNotDefined(err) { - return errors.Wrap(err, "failed to init CNS state: NNC CRD is not defined") - } - if apierrors.IsNotFound(err) { - return errors.Wrap(err, "failed to init CNS state: NNC not found") - } - return errors.Wrap(err, "failed to init CNS state: failed to get NNC CRD") - } - - logger.Printf("Retrieved NNC: %+v", nnc) - if !nnc.DeletionTimestamp.IsZero() { - return errors.New("failed to init CNS state: NNC is being deleted") - } - - // If there are no NCs, we can't initialize our state and we should fail out. - if len(nnc.Status.NetworkContainers) == 0 { - return errors.New("failed to init CNS state: no NCs found in NNC CRD") - } - - // Get previous PodInfo state from podInfoByIPProvider - podInfoByIP, err := podInfoByIPProvider.PodInfoByIP() - if err != nil { - return errors.Wrap(err, "provider failed to provide PodInfoByIP") - } - - ncReqs := make([]*cns.CreateNetworkContainerRequest, len(nnc.Status.NetworkContainers)) - - // For each NC, we need to create a CreateNetworkContainerRequest and use it to rebuild our state. - for i := range nnc.Status.NetworkContainers { - var ( - ncRequest *cns.CreateNetworkContainerRequest - err error - ) - switch nnc.Status.NetworkContainers[i].AssignmentMode { //nolint:exhaustive // skipping dynamic case - case v1alpha.Static: - ncRequest, err = nncctrl.CreateNCRequestFromStaticNC(nnc.Status.NetworkContainers[i]) - default: // For backward compatibility, default will be treated as Dynamic too. - ncRequest, err = nncctrl.CreateNCRequestFromDynamicNC(nnc.Status.NetworkContainers[i]) - } - - if err != nil { - return errors.Wrapf(err, "failed to convert NNC status to network container request, "+ - "assignmentMode: %s", nnc.Status.NetworkContainers[i].AssignmentMode) - } - - ncReqs[i] = ncRequest - } - - // Call cnsclient init cns passing those two things. - if err := restserver.ResponseCodeToError(ipamReconciler.ReconcileIPAMStateForSwift(ncReqs, podInfoByIP, nnc)); err != nil { - return errors.Wrap(err, "failed to reconcile CNS IPAM state") - } - - return nil -} - -// InitializeCRDState builds and starts the CRD controllers. -// -//nolint:gocyclo // legacy -func InitializeCRDState(ctx context.Context, z *zap.Logger, httpRestService cns.HTTPService, cnsconfig *configuration.CNSConfig) error { - // convert interface type to implementation type - httpRestServiceImplementation, ok := httpRestService.(*restserver.HTTPRestService) - if !ok { - logger.Errorf("[Azure CNS] Failed to convert interface httpRestService to implementation: %v", httpRestService) - return fmt.Errorf("[Azure CNS] Failed to convert interface httpRestService to implementation: %v", - httpRestService) - } - - // Set orchestrator type - orchestrator := cns.SetOrchestratorTypeRequest{ - OrchestratorType: cns.KubernetesCRD, - } - httpRestServiceImplementation.SetNodeOrchestrator(&orchestrator) - - // build default clientset. - kubeConfig, err := ctrl.GetConfig() - if err != nil { - logger.Errorf("[Azure CNS] Failed to get kubeconfig for request controller: %v", err) - return errors.Wrap(err, "failed to get kubeconfig") - } - kubeConfig.UserAgent = fmt.Sprintf("azure-cns-%s", version) - - clientset, err := kubernetes.NewForConfig(kubeConfig) - if err != nil { - return errors.Wrap(err, "failed to build clientset") - } - - // get nodename for scoping kube requests to node. - nodeName, err := configuration.NodeName() - if err != nil { - return errors.Wrap(err, "failed to get NodeName") - } - - node, err := clientset.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{}) - if err != nil { - return errors.Wrapf(err, "failed to get node %s", nodeName) - } - - // check the Node labels for Swift V2 - if _, ok := node.Labels[configuration.LabelNodeSwiftV2]; ok { - cnsconfig.EnableSwiftV2 = true - cnsconfig.WatchPods = true - if nodeInfoErr := createOrUpdateNodeInfoCRD(ctx, kubeConfig, node); nodeInfoErr != nil { - return errors.Wrap(nodeInfoErr, "error creating or updating nodeinfo crd") - } - } - - // perform state migration from CNI in case CNS is set to manage the endpoint state and has emty state - if cnsconfig.EnableStateMigration && !httpRestServiceImplementation.EndpointStateStore.Exists() { - if err = PopulateCNSEndpointState(httpRestServiceImplementation.EndpointStateStore); err != nil { - return errors.Wrap(err, "failed to create CNS EndpointState From CNI") - } - // endpoint state needs tobe loaded in memory so the subsequent Delete calls remove the state and release the IPs. - if err = httpRestServiceImplementation.EndpointStateStore.Read(restserver.EndpointStoreKey, &httpRestServiceImplementation.EndpointState); err != nil { - return errors.Wrap(err, "failed to restore endpoint state") - } - } - - podInfoByIPProvider, err := getPodInfoByIPProvider(ctx, cnsconfig, httpRestServiceImplementation, clientset, nodeName) - if err != nil { - return errors.Wrap(err, "failed to initialize ip state") - } - - // create scoped kube clients. - directcli, err := client.New(kubeConfig, client.Options{Scheme: nodenetworkconfig.Scheme}) - if err != nil { - return errors.Wrap(err, "failed to create ctrl client") - } - directnnccli := nodenetworkconfig.NewClient(directcli) - if err != nil { - return errors.Wrap(err, "failed to create NNC client") - } - // TODO(rbtr): nodename and namespace should be in the cns config - directscopedcli := nncctrl.NewScopedClient(directnnccli, types.NamespacedName{Namespace: "kube-system", Name: nodeName}) - - logger.Printf("Reconciling initial CNS state") - // apiserver nnc might not be registered or api server might be down and crashloop backof puts us outside of 5-10 minutes we have for - // aks addons to come up so retry a bit more aggresively here. - // will retry 10 times maxing out at a minute taking about 8 minutes before it gives up. - attempt := 0 - _ = retry.Do(func() error { - attempt++ - logger.Printf("reconciling initial CNS state attempt: %d", attempt) - err = reconcileInitialCNSState(ctx, directscopedcli, httpRestServiceImplementation, podInfoByIPProvider) - if err != nil { - logger.Errorf("failed to reconcile initial CNS state, attempt: %d err: %v", attempt, err) - nncInitFailure.Inc() - } - return errors.Wrap(err, "failed to initialize CNS state") - }, retry.Context(ctx), retry.Delay(initCNSInitalDelay), retry.MaxDelay(time.Minute), retry.UntilSucceeded()) - logger.Printf("reconciled initial CNS state after %d attempts", attempt) - hasNNCInitialized.Set(1) - scheme := kuberuntime.NewScheme() - if err := corev1.AddToScheme(scheme); err != nil { //nolint:govet // intentional shadow - return errors.Wrap(err, "failed to add corev1 to scheme") - } - if err = v1alpha.AddToScheme(scheme); err != nil { - return errors.Wrap(err, "failed to add nodenetworkconfig/v1alpha to scheme") - } - if err = cssv1alpha1.AddToScheme(scheme); err != nil { - return errors.Wrap(err, "failed to add clustersubnetstate/v1alpha1 to scheme") - } - if err = mtv1alpha1.AddToScheme(scheme); err != nil { - return errors.Wrap(err, "failed to add multitenantpodnetworkconfig/v1alpha1 to scheme") - } - - // Set Selector options on the Manager cache which are used - // to perform *server-side* filtering of the cached objects. This is very important - // for high node/pod count clusters, as it keeps us from watching objects at the - // whole cluster scope when we are only interested in the Node's scope. - cacheOpts := cache.Options{ - Scheme: scheme, - ByObject: map[client.Object]cache.ByObject{ - &v1alpha.NodeNetworkConfig{}: { - Namespaces: map[string]cache.Config{ - "kube-system": {FieldSelector: fields.SelectorFromSet(fields.Set{"metadata.name": nodeName})}, - }, - }, - }, - } - - if cnsconfig.WatchPods { - cacheOpts.ByObject[&corev1.Pod{}] = cache.ByObject{ - Field: fields.SelectorFromSet(fields.Set{"spec.nodeName": nodeName}), - } - } - - if cnsconfig.EnableSubnetScarcity { - cacheOpts.ByObject[&cssv1alpha1.ClusterSubnetState{}] = cache.ByObject{ - Namespaces: map[string]cache.Config{ - "kube-system": {}, - }, - } - } - - managerOpts := ctrlmgr.Options{ - Scheme: scheme, - Metrics: ctrlmetrics.Options{BindAddress: "0"}, - Cache: cacheOpts, - Logger: zapr.NewLogger(z), - } - - manager, err := ctrl.NewManager(kubeConfig, managerOpts) - if err != nil { - return errors.Wrap(err, "failed to create manager") - } - - // this cachedscopedclient is built using the Manager's cached client, which is - // NOT SAFE TO USE UNTIL THE MANAGER IS STARTED! - // This is okay because it is only used to build the IPAMPoolMonitor, which does not - // attempt to use the client until it has received a NodeNetworkConfig to update, and - // that can only happen once the Manager has started and the NodeNetworkConfig - // reconciler has pushed the Monitor a NodeNetworkConfig. - cachedscopedcli := nncctrl.NewScopedClient(nodenetworkconfig.NewClient(manager.GetClient()), types.NamespacedName{Namespace: "kube-system", Name: nodeName}) - - // Build the IPAM Pool monitor - var poolMonitor cns.IPAMPoolMonitor - cssCh := make(chan cssv1alpha1.ClusterSubnetState) - ipDemandCh := make(chan int) - if cnsconfig.EnableIPAMv2 { - cssSrc := func(context.Context) ([]cssv1alpha1.ClusterSubnetState, error) { return nil, nil } - if cnsconfig.EnableSubnetScarcity { - cssSrc = clustersubnetstate.NewClient(manager.GetClient()).List - } - nncCh := make(chan v1alpha.NodeNetworkConfig) - pmv2 := ipampoolv2.NewMonitor(z, httpRestServiceImplementation, cachedscopedcli, ipDemandCh, nncCh, cssCh) - obs := metrics.NewLegacyMetricsObserver(httpRestService.GetPodIPConfigState, cachedscopedcli.Get, cssSrc) - pmv2.WithLegacyMetricsObserver(obs) - poolMonitor = pmv2.AsV1(nncCh) - } else { - poolOpts := ipampool.Options{ - RefreshDelay: poolIPAMRefreshRateInMilliseconds * time.Millisecond, - } - poolMonitor = ipampool.NewMonitor(httpRestServiceImplementation, cachedscopedcli, cssCh, &poolOpts) - } - - // Start building the NNC Reconciler - - // get CNS Node IP to compare NC Node IP with this Node IP to ensure NCs were created for this node - nodeIP := configuration.NodeIP() - nncReconciler := nncctrl.NewReconciler(httpRestServiceImplementation, poolMonitor, nodeIP) - // pass Node to the Reconciler for Controller xref - // IPAMv1 - reconcile only status changes (where generation doesn't change). - // IPAMv2 - reconcile all updates. - filterGenerationChange := !cnsconfig.EnableIPAMv2 - if err := nncReconciler.SetupWithManager(manager, node, filterGenerationChange); err != nil { //nolint:govet // intentional shadow - return errors.Wrapf(err, "failed to setup nnc reconciler with manager") - } - - if cnsconfig.EnableSubnetScarcity { - // ClusterSubnetState reconciler - cssReconciler := cssctrl.New(cssCh) - if err := cssReconciler.SetupWithManager(manager); err != nil { - return errors.Wrapf(err, "failed to setup css reconciler with manager") - } - } - - // TODO: add pod listeners based on Swift V1 vs MT/V2 configuration - if cnsconfig.WatchPods { - pw := podctrl.New(z) - if cnsconfig.EnableIPAMv2 { - hostNetworkListOpt := &client.ListOptions{FieldSelector: fields.SelectorFromSet(fields.Set{"spec.hostNetwork": "false"})} // filter only podsubnet pods - // don't relist pods more than every 500ms - limit := rate.NewLimiter(rate.Every(500*time.Millisecond), 1) //nolint:gomnd // clearly 500ms - pw.With(pw.NewNotifierFunc(hostNetworkListOpt, limit, ipampoolv2.PodIPDemandListener(ipDemandCh))) - } - if err := pw.SetupWithManager(ctx, manager); err != nil { - return errors.Wrapf(err, "failed to setup pod watcher with manager") - } - } - - if cnsconfig.EnableSwiftV2 { - if err := mtpncctrl.SetupWithManager(manager); err != nil { - return errors.Wrapf(err, "failed to setup mtpnc reconciler with manager") - } - // if SWIFT v2 is enabled on CNS, attach multitenant middleware to rest service - // switch here for AKS(K8s) swiftv2 middleware to process IP configs requests - swiftV2Middleware := &middlewares.K8sSWIFTv2Middleware{Cli: manager.GetClient()} - httpRestService.AttachIPConfigsHandlerMiddleware(swiftV2Middleware) - } - - // start the pool Monitor before the Reconciler, since it needs to be ready to receive an - // NodeNetworkConfig update by the time the Reconciler tries to send it. - go func() { - logger.Printf("Starting IPAM Pool Monitor") - if e := poolMonitor.Start(ctx); e != nil { - logger.Errorf("[Azure CNS] Failed to start pool monitor with err: %v", e) - } - }() - logger.Printf("initialized and started IPAM pool monitor") - - // Start the Manager which starts the reconcile loop. - // The Reconciler will send an initial NodeNetworkConfig update to the PoolMonitor, starting the - // Monitor's internal loop. - go func() { - logger.Printf("Starting controller-manager.") - for { - if err := manager.Start(ctx); err != nil { - logger.Errorf("Failed to start controller-manager: %v", err) - // retry to start the request controller - // inc the managerStartFailures metric for failure tracking - managerStartFailures.Inc() - } else { - logger.Printf("Stopped controller-manager.") - return - } - time.Sleep(time.Second) // TODO(rbtr): make this exponential backoff - } - }() - logger.Printf("Initialized controller-manager.") - for { - logger.Printf("Waiting for NodeNetworkConfig reconciler to start.") - // wait for the Reconciler to run once on a NNC that was made for this Node. - // the nncReadyCtx has a timeout of 15 minutes, after which we will consider - // this false and the NNC Reconciler stuck/failed, log and retry. - nncReadyCtx, cancel := context.WithTimeout(ctx, 15*time.Minute) // nolint // it will time out and not leak - if started, err := nncReconciler.Started(nncReadyCtx); !started { - logger.Errorf("NNC reconciler has not started, does the NNC exist? err: %v", err) - nncReconcilerStartFailures.Inc() - continue - } - logger.Printf("NodeNetworkConfig reconciler has started.") - cancel() - break - } - - go func() { - logger.Printf("Starting SyncHostNCVersion loop.") - // Periodically poll vfp programmed NC version from NMAgent - tickerChannel := time.Tick(time.Duration(cnsconfig.SyncHostNCVersionIntervalMs) * time.Millisecond) - for { - select { - case <-tickerChannel: - timedCtx, cancel := context.WithTimeout(ctx, time.Duration(cnsconfig.SyncHostNCVersionIntervalMs)*time.Millisecond) - httpRestServiceImplementation.SyncHostNCVersion(timedCtx, cnsconfig.ChannelMode) - cancel() - case <-ctx.Done(): - logger.Printf("Stopping SyncHostNCVersion loop.") - return - } - } - }() - logger.Printf("Initialized SyncHostNCVersion loop.") - return nil -} - // getPodInfoByIPProvider returns a PodInfoByIPProvider that reads endpoint state from the configured source func getPodInfoByIPProvider( ctx context.Context, @@ -1691,44 +830,6 @@ func getPodInfoByIPProvider( return podInfoByIPProvider, nil } -// createOrUpdateNodeInfoCRD polls imds to learn the VM Unique ID and then creates or updates the NodeInfo CRD -// with that vm unique ID -func createOrUpdateNodeInfoCRD(ctx context.Context, restConfig *rest.Config, node *corev1.Node) error { - imdsCli := imds.NewClient() - vmUniqueID, err := imdsCli.GetVMUniqueID(ctx) - if err != nil { - return errors.Wrap(err, "error getting vm unique ID from imds") - } - - directcli, err := client.New(restConfig, client.Options{Scheme: multitenancy.Scheme}) - if err != nil { - return errors.Wrap(err, "failed to create ctrl client") - } - - nodeInfoCli := multitenancy.NodeInfoClient{ - Cli: directcli, - } - - nodeInfo := &mtv1alpha1.NodeInfo{ - ObjectMeta: metav1.ObjectMeta{ - Name: node.Name, - }, - Spec: mtv1alpha1.NodeInfoSpec{ - VMUniqueID: vmUniqueID, - }, - } - - if err := controllerutil.SetOwnerReference(node, nodeInfo, multitenancy.Scheme); err != nil { - return errors.Wrap(err, "failed to set nodeinfo owner reference to node") - } - - if err := nodeInfoCli.CreateOrUpdate(ctx, nodeInfo, "azure-cns"); err != nil { - return errors.Wrap(err, "error ensuring nodeinfo CRD exists and is up-to-date") - } - - return nil -} - // PopulateCNSEndpointState initilizes CNS Endpoint State by Migrating the CNI state. func PopulateCNSEndpointState(endpointStateStore store.KeyValueStore) error { logger.Printf("State Migration is enabled") diff --git a/cns/service/managed.go b/cns/service/managed.go new file mode 100644 index 0000000000..acb197bfcb --- /dev/null +++ b/cns/service/managed.go @@ -0,0 +1,98 @@ +// nolint +package main + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "net/http" + "runtime" + "strconv" + + "github.com/Azure/azure-container-networking/cns" + "github.com/Azure/azure-container-networking/cns/logger" + acn "github.com/Azure/azure-container-networking/common" + "github.com/avast/retry-go/v4" + "github.com/pkg/errors" +) + +// NodeInterrogator is functionality necessary to read information about nodes. +// It is intended to be strictly read-only. +type NodeInterrogator interface { + SupportedAPIs(context.Context) ([]string, error) +} + +// RegisterNode - Tries to register node with DNC when CNS is started in managed DNC mode +func registerNode(ctx context.Context, httpClient httpDoer, httpRestService cns.HTTPService, dncEP, infraVnet, nodeID string, ni NodeInterrogator) error { + logger.Printf("[Azure CNS] Registering node %s with Infrastructure Network: %s PrivateEndpoint: %s", nodeID, infraVnet, dncEP) + + var ( + numCPU = runtime.NumCPU() + url = fmt.Sprintf(acn.RegisterNodeURLFmt, dncEP, infraVnet, nodeID, dncApiVersion) + nodeRegisterRequest cns.NodeRegisterRequest + ) + + nodeRegisterRequest.NumCores = numCPU + supportedApis, retErr := ni.SupportedAPIs(context.TODO()) + + if retErr != nil { + return errors.Wrap(retErr, fmt.Sprintf("[Azure CNS] Failed to retrieve SupportedApis from NMagent of node %s with Infrastructure Network: %s PrivateEndpoint: %s", + nodeID, infraVnet, dncEP)) + } + + // To avoid any null-pointer de-referencing errors. + if supportedApis == nil { + supportedApis = []string{} + } + + nodeRegisterRequest.NmAgentSupportedApis = supportedApis + + // CNS tries to register Node for maximum of an hour. + err := retry.Do(func() error { + return errors.Wrap(sendRegisterNodeRequest(ctx, httpClient, httpRestService, nodeRegisterRequest, url), "failed to sendRegisterNodeRequest") + }, retry.Delay(acn.FiveSeconds), retry.Attempts(maxRetryNodeRegister), retry.DelayType(retry.FixedDelay)) + + return errors.Wrap(err, fmt.Sprintf("[Azure CNS] Failed to register node %s after maximum reties for an hour with Infrastructure Network: %s PrivateEndpoint: %s", + nodeID, infraVnet, dncEP)) +} + +// sendRegisterNodeRequest func helps in registering the node until there is an error. +func sendRegisterNodeRequest(ctx context.Context, httpClient httpDoer, httpRestService cns.HTTPService, nodeRegisterRequest cns.NodeRegisterRequest, registerURL string) error { + var body bytes.Buffer + err := json.NewEncoder(&body).Encode(nodeRegisterRequest) + if err != nil { + logger.Errorf("Failed to register node while encoding json failed with non-retryable err %v", err) + return errors.Wrap(retry.Unrecoverable(err), "failed to sendRegisterNodeRequest") + } + + request, err := http.NewRequestWithContext(ctx, http.MethodPost, registerURL, &body) + if err != nil { + return errors.Wrap(err, "failed to build request") + } + + request.Header.Set("Content-Type", "application/json") + response, err := httpClient.Do(request) + if err != nil { + return errors.Wrap(err, "http request failed") + } + + defer response.Body.Close() + + if response.StatusCode != http.StatusOK { + err = fmt.Errorf("[Azure CNS] Failed to register node, DNC replied with http status code %s", strconv.Itoa(response.StatusCode)) + logger.Errorf(err.Error()) + return errors.Wrap(err, "failed to sendRegisterNodeRequest") + } + + var req cns.SetOrchestratorTypeRequest + err = json.NewDecoder(response.Body).Decode(&req) + if err != nil { + logger.Errorf("decoding Node Register response json failed with err %v", err) + return errors.Wrap(err, "failed to sendRegisterNodeRequest") + } + httpRestService.SetNodeOrchestrator(&req) + + logger.Printf("[Azure CNS] Node Registered") + return nil +} diff --git a/cns/service/version.go b/cns/service/version.go new file mode 100644 index 0000000000..dcbb41e004 --- /dev/null +++ b/cns/service/version.go @@ -0,0 +1,12 @@ +package main + +import "fmt" + +// Version is populated by make during build. +var version string + +// Prints description and version information. +func printVersion() { + fmt.Printf("Azure Container Network Service\n") + fmt.Printf("Version %v\n", version) +}