diff --git a/multicluster/cmd/service-mirror/main.go b/multicluster/cmd/service-mirror/main.go index 7b995fdfaf833..3ef0a1004ecca 100644 --- a/multicluster/cmd/service-mirror/main.go +++ b/multicluster/cmd/service-mirror/main.go @@ -53,6 +53,7 @@ func Main(args []string) { repairPeriod := cmd.Duration("endpoint-refresh-period", 1*time.Minute, "frequency to refresh endpoint resolution") enableHeadlessSvc := cmd.Bool("enable-headless-services", false, "toggle support for headless service mirroring") enableNamespaceCreation := cmd.Bool("enable-namespace-creation", false, "toggle support for namespace creation") + enableEndpointSlices := cmd.Bool("enable-endpoint-slices", true, "Use EndpointSlice resources instead of Endpoints") enablePprof := cmd.Bool("enable-pprof", false, "Enable pprof endpoints on the admin server") localMirror := cmd.Bool("local-mirror", false, "watch the local cluster for federated service members") federatedServiceSelector := cmd.String("federated-service-selector", k8s.DefaultFederatedServiceSelector, "Selector (label query) for federated service members in the local cluster") @@ -92,14 +93,23 @@ func Main(args []string) { // controllerK8sAPI is used by the cluster watcher to manage // mirror resources such as services, namespaces, and endpoints. + // Build the list of resources to watch based on configuration + localAPIResources := []controllerK8s.APIResource{ + controllerK8s.NS, + controllerK8s.Svc, + } + if *enableEndpointSlices { + localAPIResources = append(localAPIResources, controllerK8s.ES) + } else { + localAPIResources = append(localAPIResources, controllerK8s.Endpoint) + } + controllerK8sAPI, err := controllerK8s.InitializeAPI( rootCtx, *kubeConfigPath, false, "local", - controllerK8s.NS, - controllerK8s.Svc, - controllerK8s.Endpoint, + localAPIResources..., ) if err != nil { log.Fatalf("Failed to initialize K8s API: %s", err) @@ -153,7 +163,7 @@ func Main(args []string) { ExcludedLabels: excludedLabelList, }, } - err = startLocalClusterWatcher(ctx, *namespace, controllerK8sAPI, linksAPI, *requeueLimit, *repairPeriod, *enableHeadlessSvc, *enableNamespaceCreation, link) + err = startLocalClusterWatcher(ctx, *namespace, controllerK8sAPI, linksAPI, *requeueLimit, *repairPeriod, *enableHeadlessSvc, *enableNamespaceCreation, *enableEndpointSlices, link) if err != nil { log.Fatalf("Failed to start local cluster watcher: %s", err) } @@ -200,7 +210,7 @@ func Main(args []string) { if err != nil { log.Errorf("Failed to load remote cluster credentials: %s", err) } - err = restartClusterWatcher(ctx, link, *namespace, *probeSvc, creds, controllerK8sAPI, linksAPI, *requeueLimit, *repairPeriod, metrics, *enableHeadlessSvc, *enableNamespaceCreation) + err = restartClusterWatcher(ctx, link, *namespace, *probeSvc, creds, controllerK8sAPI, linksAPI, *requeueLimit, *repairPeriod, metrics, *enableHeadlessSvc, *enableNamespaceCreation, *enableEndpointSlices) if err != nil { // failed to restart cluster watcher; give a bit of slack // and requeue the link to give it another try @@ -329,6 +339,7 @@ func restartClusterWatcher( metrics servicemirror.ProbeMetricVecs, enableHeadlessSvc bool, enableNamespaceCreation bool, + enableEndpointSlices bool, ) error { cleanupWorkers() @@ -352,7 +363,14 @@ func restartClusterWatcher( if err != nil { return fmt.Errorf("unable to parse kube config: %w", err) } - remoteAPI, err := controllerK8s.InitializeAPIForConfig(ctx, cfg, false, link.Spec.TargetClusterName, controllerK8s.Svc, controllerK8s.Endpoint) + // Build the list of resources to watch based on configuration + remoteAPIResources := []controllerK8s.APIResource{controllerK8s.Svc} + if enableEndpointSlices { + remoteAPIResources = append(remoteAPIResources, controllerK8s.ES) + } else { + remoteAPIResources = append(remoteAPIResources, controllerK8s.Endpoint) + } + remoteAPI, err := controllerK8s.InitializeAPIForConfig(ctx, cfg, false, link.Spec.TargetClusterName, remoteAPIResources...) if err != nil { return fmt.Errorf("cannot initialize api for target cluster %s: %w", link.Spec.TargetClusterName, err) } @@ -369,6 +387,7 @@ func restartClusterWatcher( ch, enableHeadlessSvc, enableNamespaceCreation, + enableEndpointSlices, ) if err != nil { return fmt.Errorf("unable to create cluster watcher: %w", err) @@ -391,6 +410,7 @@ func startLocalClusterWatcher( repairPeriod time.Duration, enableHeadlessSvc bool, enableNamespaceCreation bool, + enableEndpointSlices bool, link v1alpha3.Link, ) error { cw, err := servicemirror.NewRemoteClusterServiceWatcher( @@ -406,6 +426,7 @@ func startLocalClusterWatcher( make(chan bool), enableHeadlessSvc, enableNamespaceCreation, + enableEndpointSlices, ) if err != nil { return fmt.Errorf("unable to create cluster watcher: %w", err) diff --git a/multicluster/service-mirror/cluster_watcher.go b/multicluster/service-mirror/cluster_watcher.go index 8a7a1d415f556..6bc1833614cbd 100644 --- a/multicluster/service-mirror/cluster_watcher.go +++ b/multicluster/service-mirror/cluster_watcher.go @@ -18,6 +18,7 @@ import ( "github.com/prometheus/client_golang/prometheus" logging "github.com/sirupsen/logrus" corev1 "k8s.io/api/core/v1" + discoveryv1 "k8s.io/api/discovery/v1" kerrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" @@ -64,6 +65,7 @@ type ( liveness chan bool headlessServicesEnabled bool namespaceCreationEnabled bool + enableEndpointSlices bool informerHandlers } @@ -71,6 +73,7 @@ type ( informerHandlers struct { svcHandler cache.ResourceEventHandlerRegistration epHandler cache.ResourceEventHandlerRegistration + esHandler cache.ResourceEventHandlerRegistration nsHandler cache.ResourceEventHandlerRegistration } @@ -156,6 +159,19 @@ type ( OnUpdateEndpointsCalled struct { ep *corev1.Endpoints } + + // OnAddEndpointSliceCalled is issued when the onAdd function of the + // EndpointSlice shared informer is called + OnAddEndpointSliceCalled struct { + es *discoveryv1.EndpointSlice + } + + // OnUpdateEndpointSliceCalled is issued when the onUpdate function of the + // EndpointSlice shared informer is called + OnUpdateEndpointSliceCalled struct { + es *discoveryv1.EndpointSlice + } + // OnDeleteCalled is issued when the onDelete function of the // shared informer is called OnDeleteCalled struct { @@ -199,6 +215,7 @@ func NewRemoteClusterServiceWatcher( liveness chan bool, enableHeadlessSvc bool, enableNamespaceCreation bool, + enableEndpointSlices bool, ) (*RemoteClusterServiceWatcher, error) { _, err := remoteAPI.Client.Discovery().ServerVersion() if err != nil { @@ -235,6 +252,7 @@ func NewRemoteClusterServiceWatcher( liveness: liveness, headlessServicesEnabled: enableHeadlessSvc, namespaceCreationEnabled: enableNamespaceCreation, + enableEndpointSlices: enableEndpointSlices, // always instantiate the gatewayAlive=true to prevent unexpected service fail fast gatewayAlive: true, }, nil @@ -1085,11 +1103,15 @@ func (rcsw *RemoteClusterServiceWatcher) handleLocalNamespaceAdded(ns *corev1.Na } // isEmptyService returns true if any of these conditions are true: -// - svc's Endpoint is not found -// - svc's Endpoint has no Subsets (happens when there's no associated Pod) +// - svc's Endpoint/EndpointSlice is not found +// - svc's Endpoint has no Subsets / EndpointSlice has no endpoints (happens when there's no associated Pod) // - svc's Endpoint has Subsets, but none have addresses (only notReadyAddresses, -// when the pod is not ready yet) +// when the pod is not ready yet) / EndpointSlice has no ready endpoints func (rcsw *RemoteClusterServiceWatcher) isEmptyService(svc *corev1.Service) (bool, error) { + if rcsw.enableEndpointSlices { + return rcsw.isEmptyServiceES(svc) + } + ep, err := rcsw.remoteAPIClient.Endpoint().Lister().Endpoints(svc.Namespace).Get(svc.Name) if err != nil { if kerrors.IsNotFound(err) { @@ -1121,6 +1143,71 @@ func (rcsw *RemoteClusterServiceWatcher) isEmptyEndpoints(ep *corev1.Endpoints) return true } +// getEndpointSliceServiceID extracts the service name from an EndpointSlice +// by checking the kubernetes.io/service-name label or OwnerReferences +func getEndpointSliceServiceID(es *discoveryv1.EndpointSlice) (string, string, error) { + if svc, ok := es.Labels[discoveryv1.LabelServiceName]; ok { + return es.Namespace, svc, nil + } + for _, ref := range es.OwnerReferences { + if ref.Kind == "Service" && ref.Name != "" { + return es.Namespace, ref.Name, nil + } + } + return "", "", fmt.Errorf("EndpointSlice [%s/%s] has no service reference", es.Namespace, es.Name) +} + +// isEmptyEndpointSlice returns true if the EndpointSlice has no ready endpoints +func (rcsw *RemoteClusterServiceWatcher) isEmptyEndpointSlice(es *discoveryv1.EndpointSlice) bool { + if len(es.Endpoints) == 0 { + rcsw.log.Debugf("endpointslice %s/%s has no endpoints", es.Namespace, es.Name) + return true + } + for _, ep := range es.Endpoints { + if ep.Conditions.Ready != nil && *ep.Conditions.Ready { + return false + } + } + rcsw.log.Debugf("endpointslice %s/%s has no ready endpoints", es.Namespace, es.Name) + return true +} + +// isEmptyServiceES checks if a service has any ready endpoints using EndpointSlices +func (rcsw *RemoteClusterServiceWatcher) isEmptyServiceES(svc *corev1.Service) (bool, error) { + matchLabels := map[string]string{discoveryv1.LabelServiceName: svc.Name} + selector := labels.Set(matchLabels).AsSelector() + + slices, err := rcsw.remoteAPIClient.ES().Lister().EndpointSlices(svc.Namespace).List(selector) + if err != nil { + if kerrors.IsNotFound(err) { + rcsw.log.Debugf("no endpointslices found for service %s/%s", svc.Namespace, svc.Name) + return true, nil + } + return true, err + } + + if len(slices) == 0 { + rcsw.log.Debugf("no endpointslices found for service %s/%s", svc.Namespace, svc.Name) + return true, nil + } + + for _, slice := range slices { + if !rcsw.isEmptyEndpointSlice(slice) { + return false, nil + } + } + return true, nil +} + +// isHeadlessEndpointSlice checks if an EndpointSlice belongs to a headless service +func isHeadlessEndpointSlice(es *discoveryv1.EndpointSlice, log *logging.Entry) bool { + if _, found := es.Labels[corev1.IsHeadlessService]; !found { + log.Debugf("skipped processing EndpointSlice %s/%s: missing %s label", es.Namespace, es.Name, corev1.IsHeadlessService) + return false + } + return true +} + func (rcsw *RemoteClusterServiceWatcher) createGatewayEndpoints(ctx context.Context, exportedService *corev1.Service) error { empty, err := rcsw.isEmptyService(exportedService) if err != nil { @@ -1296,10 +1383,14 @@ func (rcsw *RemoteClusterServiceWatcher) processNextEvent(ctx context.Context) ( err = rcsw.createOrUpdateService(ev.svc) case *OnAddEndpointsCalled: err = rcsw.handleCreateOrUpdateEndpoints(ctx, ev.ep) + case *OnAddEndpointSliceCalled: + err = rcsw.handleCreateOrUpdateEndpointSlice(ctx, ev.es) case *OnUpdateCalled: err = rcsw.createOrUpdateService(ev.svc) case *OnUpdateEndpointsCalled: err = rcsw.handleCreateOrUpdateEndpoints(ctx, ev.ep) + case *OnUpdateEndpointSliceCalled: + err = rcsw.handleCreateOrUpdateEndpointSlice(ctx, ev.es) case *OnDeleteCalled: rcsw.handleOnDelete(ev.svc) case *RemoteServiceExported: @@ -1401,48 +1492,97 @@ func (rcsw *RemoteClusterServiceWatcher) Start(ctx context.Context) error { return err } - rcsw.epHandler, err = rcsw.remoteAPIClient.Endpoint().Informer().AddEventHandler( - cache.ResourceEventHandlerFuncs{ - // AddFunc only relevant for exported headless endpoints - AddFunc: func(obj interface{}) { - ep, ok := obj.(*corev1.Endpoints) - if !ok { - rcsw.log.Errorf("error processing endpoints object: got %#v, expected *corev1.Endpoints", ep) - return - } + // Register either EndpointSlice or Endpoints informer based on configuration + if rcsw.enableEndpointSlices { + rcsw.log.Debugf("Watching EndpointSlice resources") + rcsw.esHandler, err = rcsw.remoteAPIClient.ES().Informer().AddEventHandler( + cache.ResourceEventHandlerFuncs{ + // AddFunc only relevant for exported headless endpoints + AddFunc: func(obj interface{}) { + es, ok := obj.(*discoveryv1.EndpointSlice) + if !ok { + rcsw.log.Errorf("error processing EndpointSlice object: got %#v, expected *discoveryv1.EndpointSlice", obj) + return + } - if !rcsw.isExported(ep.Labels) { - rcsw.log.Debugf("skipped processing endpoints object %s/%s: missing %s label", ep.Namespace, ep.Name, consts.DefaultExportedServiceSelector) - return - } + if !rcsw.isExported(es.Labels) { + rcsw.log.Debugf("skipped processing EndpointSlice object %s/%s: missing export label", es.Namespace, es.Name) + return + } - if !isHeadlessEndpoints(ep, rcsw.log) { - return - } + if !isHeadlessEndpointSlice(es, rcsw.log) { + return + } - rcsw.eventsQueue.Add(&OnAddEndpointsCalled{obj.(*corev1.Endpoints)}) + rcsw.eventsQueue.Add(&OnAddEndpointSliceCalled{es}) + }, + // UpdateFunc relevant for all kind of exported endpoints + UpdateFunc: func(_, new interface{}) { + esNew, ok := new.(*discoveryv1.EndpointSlice) + if !ok { + rcsw.log.Errorf("error processing EndpointSlice object: got %#v, expected *discoveryv1.EndpointSlice", new) + return + } + if !rcsw.isExported(esNew.Labels) { + rcsw.log.Debugf("skipped processing EndpointSlice object %s/%s: missing export label", esNew.Namespace, esNew.Name) + return + } + if rcsw.isRemoteDiscovery(esNew.Labels) { + rcsw.log.Debugf("skipped processing EndpointSlice object %s/%s (service labeled for remote-discovery mode)", esNew.Namespace, esNew.Name) + return + } + rcsw.eventsQueue.Add(&OnUpdateEndpointSliceCalled{esNew}) + }, }, - // AddFunc relevant for all kind of exported endpoints - UpdateFunc: func(_, new interface{}) { - epNew, ok := new.(*corev1.Endpoints) - if !ok { - rcsw.log.Errorf("error processing endpoints object: got %#v, expected *corev1.Endpoints", epNew) - return - } - if !rcsw.isExported(epNew.Labels) { - rcsw.log.Debugf("skipped processing endpoints object %s/%s: missing %s label", epNew.Namespace, epNew.Name, consts.DefaultExportedServiceSelector) - return - } - if rcsw.isRemoteDiscovery(epNew.Labels) { - rcsw.log.Debugf("skipped processing endpoints object %s/%s (service labeled for remote-discovery mode)", epNew.Namespace, epNew.Name) - return - } - rcsw.eventsQueue.Add(&OnUpdateEndpointsCalled{epNew}) + ) + if err != nil { + return err + } + } else { + rcsw.log.Debugf("Watching Endpoints resources") + rcsw.epHandler, err = rcsw.remoteAPIClient.Endpoint().Informer().AddEventHandler( + cache.ResourceEventHandlerFuncs{ + // AddFunc only relevant for exported headless endpoints + AddFunc: func(obj interface{}) { + ep, ok := obj.(*corev1.Endpoints) + if !ok { + rcsw.log.Errorf("error processing endpoints object: got %#v, expected *corev1.Endpoints", ep) + return + } + + if !rcsw.isExported(ep.Labels) { + rcsw.log.Debugf("skipped processing endpoints object %s/%s: missing %s label", ep.Namespace, ep.Name, consts.DefaultExportedServiceSelector) + return + } + + if !isHeadlessEndpoints(ep, rcsw.log) { + return + } + + rcsw.eventsQueue.Add(&OnAddEndpointsCalled{obj.(*corev1.Endpoints)}) + }, + // UpdateFunc relevant for all kind of exported endpoints + UpdateFunc: func(_, new interface{}) { + epNew, ok := new.(*corev1.Endpoints) + if !ok { + rcsw.log.Errorf("error processing endpoints object: got %#v, expected *corev1.Endpoints", epNew) + return + } + if !rcsw.isExported(epNew.Labels) { + rcsw.log.Debugf("skipped processing endpoints object %s/%s: missing %s label", epNew.Namespace, epNew.Name, consts.DefaultExportedServiceSelector) + return + } + if rcsw.isRemoteDiscovery(epNew.Labels) { + rcsw.log.Debugf("skipped processing endpoints object %s/%s (service labeled for remote-discovery mode)", epNew.Namespace, epNew.Name) + return + } + rcsw.eventsQueue.Add(&OnUpdateEndpointsCalled{epNew}) + }, }, - }, - ) - if err != nil { - return err + ) + if err != nil { + return err + } } rcsw.nsHandler, err = rcsw.localAPIClient.NS().Informer().AddEventHandler( @@ -1505,7 +1645,12 @@ func (rcsw *RemoteClusterServiceWatcher) Stop(cleanupState bool) { } if rcsw.epHandler != nil { if err := rcsw.remoteAPIClient.Endpoint().Informer().RemoveEventHandler(rcsw.epHandler); err != nil { - rcsw.log.Warnf("error removing service informer handler: %s", err) + rcsw.log.Warnf("error removing endpoints informer handler: %s", err) + } + } + if rcsw.esHandler != nil { + if err := rcsw.remoteAPIClient.ES().Informer().RemoveEventHandler(rcsw.esHandler); err != nil { + rcsw.log.Warnf("error removing endpointslice informer handler: %s", err) } } if rcsw.nsHandler != nil { diff --git a/multicluster/service-mirror/cluster_watcher_endpointslice_test.go b/multicluster/service-mirror/cluster_watcher_endpointslice_test.go new file mode 100644 index 0000000000000..3282dbf45f1ec --- /dev/null +++ b/multicluster/service-mirror/cluster_watcher_endpointslice_test.go @@ -0,0 +1,887 @@ +package servicemirror + +import ( + "context" + "testing" + + "github.com/linkerd/linkerd2/controller/gen/apis/link/v1alpha3" + "github.com/linkerd/linkerd2/controller/k8s" + consts "github.com/linkerd/linkerd2/pkg/k8s" + logging "github.com/sirupsen/logrus" + corev1 "k8s.io/api/core/v1" + discoveryv1 "k8s.io/api/discovery/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/util/workqueue" +) + +// Helper functions for EndpointSlice testing + +func remoteHeadlessEndpointSlice(name, serviceName, namespace, resourceVersion, address string, ports []discoveryv1.EndpointPort) *discoveryv1.EndpointSlice { + hostname := "pod-0" + ready := true + return &discoveryv1.EndpointSlice{ + TypeMeta: metav1.TypeMeta{ + Kind: "EndpointSlice", + APIVersion: "discovery.k8s.io/v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + ResourceVersion: resourceVersion, + Labels: map[string]string{ + discoveryv1.LabelServiceName: serviceName, + corev1.IsHeadlessService: "", + consts.DefaultExportedServiceSelector: "true", + }, + }, + AddressType: discoveryv1.AddressTypeIPv4, + Endpoints: []discoveryv1.Endpoint{ + { + Addresses: []string{address}, + Hostname: &hostname, + Conditions: discoveryv1.EndpointConditions{Ready: &ready}, + TargetRef: &corev1.ObjectReference{ + Kind: "Pod", + Name: "pod-0", + Namespace: namespace, + ResourceVersion: resourceVersion, + }, + }, + }, + Ports: ports, + } +} + +func remoteHeadlessEndpointSliceUpdate(name, serviceName, namespace, resourceVersion, address string, ports []discoveryv1.EndpointPort) *discoveryv1.EndpointSlice { + hostname0 := "pod-0" + hostname1 := "pod-1" + ready := true + return &discoveryv1.EndpointSlice{ + TypeMeta: metav1.TypeMeta{ + Kind: "EndpointSlice", + APIVersion: "discovery.k8s.io/v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + ResourceVersion: resourceVersion, + Labels: map[string]string{ + discoveryv1.LabelServiceName: serviceName, + corev1.IsHeadlessService: "", + consts.DefaultExportedServiceSelector: "true", + }, + }, + AddressType: discoveryv1.AddressTypeIPv4, + Endpoints: []discoveryv1.Endpoint{ + { + Addresses: []string{address}, + Hostname: &hostname0, + Conditions: discoveryv1.EndpointConditions{Ready: &ready}, + TargetRef: &corev1.ObjectReference{ + Kind: "Pod", + Name: "pod-0", + Namespace: namespace, + ResourceVersion: resourceVersion, + }, + }, + { + Addresses: []string{address}, + Hostname: &hostname1, + Conditions: discoveryv1.EndpointConditions{Ready: &ready}, + TargetRef: &corev1.ObjectReference{ + Kind: "Pod", + Name: "pod-1", + Namespace: namespace, + ResourceVersion: resourceVersion, + }, + }, + }, + Ports: ports, + } +} + +func endpointSlicePorts(ports []corev1.ServicePort) []discoveryv1.EndpointPort { + result := make([]discoveryv1.EndpointPort, 0, len(ports)) + for _, p := range ports { + port := p.Port + protocol := p.Protocol + name := p.Name + result = append(result, discoveryv1.EndpointPort{ + Name: &name, + Protocol: &protocol, + Port: &port, + }) + } + return result +} + +type endpointSliceTestEnvironment struct { + events []interface{} + remoteResources []string + localResources []string + link v1alpha3.Link +} + +func (te *endpointSliceTestEnvironment) runEnvironment(watcherQueue workqueue.TypedRateLimitingInterface[any]) (*k8s.API, error) { + remoteAPI, err := k8s.NewFakeAPI(te.remoteResources...) + if err != nil { + return nil, err + } + localAPI, err := k8s.NewFakeAPIWithL5dClient(te.localResources...) + if err != nil { + return nil, err + } + linksAPI := k8s.NewNamespacedAPI(nil, nil, localAPI.L5dClient, "default", "local", k8s.Link) + remoteAPI.Sync(nil) + localAPI.Sync(nil) + linksAPI.Sync(nil) + + watcher := RemoteClusterServiceWatcher{ + link: &te.link, + remoteAPIClient: remoteAPI, + localAPIClient: localAPI, + linksAPIClient: linksAPI, + stopper: nil, + log: logging.WithFields(logging.Fields{"cluster": clusterName}), + eventsQueue: watcherQueue, + requeueLimit: 0, + gatewayAlive: true, + headlessServicesEnabled: true, + enableEndpointSlices: true, + } + + for _, ev := range te.events { + watcherQueue.Add(ev) + } + + for range te.events { + watcher.processNextEvent(context.Background()) + } + + localAPI.Sync(nil) + remoteAPI.Sync(nil) + + return localAPI, nil +} + +func TestEndpointSliceHelperFunctions(t *testing.T) { + t.Run("getEndpointSliceServiceID extracts service name from labels", func(t *testing.T) { + es := &discoveryv1.EndpointSlice{ + ObjectMeta: metav1.ObjectMeta{ + Name: "service-abc123", + Namespace: "test-ns", + Labels: map[string]string{ + discoveryv1.LabelServiceName: "my-service", + }, + }, + } + + ns, name, err := getEndpointSliceServiceID(es) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if ns != "test-ns" { + t.Errorf("expected namespace 'test-ns', got '%s'", ns) + } + if name != "my-service" { + t.Errorf("expected name 'my-service', got '%s'", name) + } + }) + + t.Run("getEndpointSliceServiceID extracts service name from ownerReferences", func(t *testing.T) { + es := &discoveryv1.EndpointSlice{ + ObjectMeta: metav1.ObjectMeta{ + Name: "service-abc123", + Namespace: "test-ns", + OwnerReferences: []metav1.OwnerReference{ + { + Kind: "Service", + Name: "owner-service", + }, + }, + }, + } + + ns, name, err := getEndpointSliceServiceID(es) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if ns != "test-ns" { + t.Errorf("expected namespace 'test-ns', got '%s'", ns) + } + if name != "owner-service" { + t.Errorf("expected name 'owner-service', got '%s'", name) + } + }) + + t.Run("getEndpointSliceServiceID returns error when no service reference", func(t *testing.T) { + es := &discoveryv1.EndpointSlice{ + ObjectMeta: metav1.ObjectMeta{ + Name: "orphan-slice", + Namespace: "test-ns", + }, + } + + _, _, err := getEndpointSliceServiceID(es) + if err == nil { + t.Fatal("expected error, got nil") + } + }) + + t.Run("isHeadlessEndpointSlice returns true for headless", func(t *testing.T) { + es := &discoveryv1.EndpointSlice{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + corev1.IsHeadlessService: "", + }, + }, + } + + result := isHeadlessEndpointSlice(es, logging.NewEntry(logging.New())) + if !result { + t.Error("expected true for headless EndpointSlice") + } + }) + + t.Run("isHeadlessEndpointSlice returns false for non-headless", func(t *testing.T) { + es := &discoveryv1.EndpointSlice{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{}, + }, + } + + result := isHeadlessEndpointSlice(es, logging.NewEntry(logging.New())) + if result { + t.Error("expected false for non-headless EndpointSlice") + } + }) + + t.Run("shouldExportAsHeadlessServiceFromSlice returns true with hostname", func(t *testing.T) { + hostname := "pod-0" + es := &discoveryv1.EndpointSlice{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + discoveryv1.LabelServiceName: "test-service", + }, + }, + Endpoints: []discoveryv1.Endpoint{ + { + Hostname: &hostname, + }, + }, + } + + result := shouldExportAsHeadlessServiceFromSlice(es, logging.NewEntry(logging.New())) + if !result { + t.Error("expected true when hostname is present") + } + }) + + t.Run("shouldExportAsHeadlessServiceFromSlice returns false without hostname", func(t *testing.T) { + es := &discoveryv1.EndpointSlice{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + discoveryv1.LabelServiceName: "test-service", + }, + }, + Endpoints: []discoveryv1.Endpoint{ + { + Addresses: []string{"10.0.0.1"}, + }, + }, + } + + result := shouldExportAsHeadlessServiceFromSlice(es, logging.NewEntry(logging.New())) + if result { + t.Error("expected false when no hostname is present") + } + }) +} + +func TestEndpointSliceEmptinessCheck(t *testing.T) { + t.Run("isEmptyEndpointSlice returns true for empty endpoints", func(t *testing.T) { + es := &discoveryv1.EndpointSlice{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-slice", + Namespace: "test-ns", + }, + Endpoints: []discoveryv1.Endpoint{}, + } + + watcher := &RemoteClusterServiceWatcher{ + log: logging.WithFields(logging.Fields{"test": "true"}), + } + + if !watcher.isEmptyEndpointSlice(es) { + t.Error("expected empty EndpointSlice to be detected as empty") + } + }) + + t.Run("isEmptyEndpointSlice returns true when all endpoints not ready", func(t *testing.T) { + ready := false + es := &discoveryv1.EndpointSlice{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-slice", + Namespace: "test-ns", + }, + Endpoints: []discoveryv1.Endpoint{ + { + Addresses: []string{"10.0.0.1"}, + Conditions: discoveryv1.EndpointConditions{Ready: &ready}, + }, + }, + } + + watcher := &RemoteClusterServiceWatcher{ + log: logging.WithFields(logging.Fields{"test": "true"}), + } + + if !watcher.isEmptyEndpointSlice(es) { + t.Error("expected EndpointSlice with no ready endpoints to be detected as empty") + } + }) + + t.Run("isEmptyEndpointSlice returns false when endpoint is ready", func(t *testing.T) { + ready := true + es := &discoveryv1.EndpointSlice{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-slice", + Namespace: "test-ns", + }, + Endpoints: []discoveryv1.Endpoint{ + { + Addresses: []string{"10.0.0.1"}, + Conditions: discoveryv1.EndpointConditions{Ready: &ready}, + }, + }, + } + + watcher := &RemoteClusterServiceWatcher{ + log: logging.WithFields(logging.Fields{"test": "true"}), + } + + if watcher.isEmptyEndpointSlice(es) { + t.Error("expected EndpointSlice with ready endpoint to not be detected as empty") + } + }) +} + +func TestEndpointSliceHeadlessServiceMirroring(t *testing.T) { + servicePorts := []corev1.ServicePort{ + {Name: "port1", Protocol: "TCP", Port: 555}, + {Name: "port2", Protocol: "TCP", Port: 666}, + } + esPorts := endpointSlicePorts(servicePorts) + + createHeadlessServiceWithES := &endpointSliceTestEnvironment{ + events: []interface{}{ + &RemoteServiceExported{ + service: remoteHeadlessService("service-one", "ns1", "111", map[string]string{ + consts.DefaultExportedServiceSelector: "true", + }, servicePorts), + }, + &OnAddEndpointSliceCalled{ + es: remoteHeadlessEndpointSlice("service-one-abc", "service-one", "ns1", "112", "192.0.0.1", esPorts), + }, + }, + remoteResources: []string{ + asYaml(gateway("existing-gateway", "existing-namespace", "222", "192.0.2.129", "gateway", 889, "gateway-identity", 123456, "/probe1", "120s")), + asYaml(remoteHeadlessService("service-one", "ns1", "111", map[string]string{ + consts.DefaultExportedServiceSelector: "true", + }, servicePorts)), + asYaml(remoteHeadlessEndpointSlice("service-one-abc", "service-one", "ns1", "112", "192.0.0.1", esPorts)), + }, + localResources: []string{ + asYaml(namespace("ns1")), + }, + link: v1alpha3.Link{ + Spec: v1alpha3.LinkSpec{ + TargetClusterName: clusterName, + TargetClusterDomain: clusterDomain, + GatewayIdentity: "gateway-identity", + GatewayAddress: "192.0.2.129", + GatewayPort: "889", + ProbeSpec: defaultProbeSpec, + Selector: defaultSelector, + RemoteDiscoverySelector: defaultRemoteDiscoverySelector, + }, + }, + } + + t.Run("create headless service from EndpointSlice", func(t *testing.T) { + queue := workqueue.NewTypedRateLimitingQueue(workqueue.DefaultTypedControllerRateLimiter[any]()) + localAPI, err := createHeadlessServiceWithES.runEnvironment(queue) + if err != nil { + t.Fatalf("error running environment: %v", err) + } + + // Check that mirror service was created + mirrorSvc, err := localAPI.Client.CoreV1().Services("ns1").Get(context.Background(), "service-one-remote", metav1.GetOptions{}) + if err != nil { + t.Fatalf("failed to get mirror service: %v", err) + } + if mirrorSvc.Spec.ClusterIP != corev1.ClusterIPNone { + t.Errorf("expected headless mirror service, got ClusterIP=%s", mirrorSvc.Spec.ClusterIP) + } + }) +} + +func TestEndpointSliceUpdateEvent(t *testing.T) { + servicePorts := []corev1.ServicePort{ + {Name: "port1", Protocol: "TCP", Port: 555}, + {Name: "port2", Protocol: "TCP", Port: 666}, + } + esPorts := endpointSlicePorts(servicePorts) + + updateHeadlessServiceWithES := &endpointSliceTestEnvironment{ + events: []interface{}{ + &OnUpdateEndpointSliceCalled{ + es: remoteHeadlessEndpointSliceUpdate("service-one-abc", "service-one", "ns1", "113", "192.0.0.1", esPorts), + }, + }, + remoteResources: []string{ + asYaml(gateway("gateway", "gateway-ns", "222", "192.0.2.127", "mc-gateway", 888, "", defaultProbePort, defaultProbePath, defaultProbePeriod)), + asYaml(remoteHeadlessService("service-one", "ns1", "111", map[string]string{ + consts.DefaultExportedServiceSelector: "true", + }, servicePorts)), + asYaml(remoteHeadlessEndpointSliceUpdate("service-one-abc", "service-one", "ns1", "113", "192.0.0.1", esPorts)), + }, + localResources: []string{ + asYaml(namespace("ns1")), + asYaml(headlessMirrorService("service-one-remote", "ns1", "111", nil, servicePorts)), + asYaml(endpointMirrorService("pod-0", "service-one-remote", "ns1", "112", nil, servicePorts)), + asYaml(headlessMirrorEndpoints("service-one-remote", "ns1", nil, "gateway-identity", nil)), + asYaml(endpointMirrorEndpoints("service-one-remote", "ns1", nil, "pod-0", "192.0.2.127", "gateway-identity", nil)), + }, + link: v1alpha3.Link{ + Spec: v1alpha3.LinkSpec{ + TargetClusterName: clusterName, + TargetClusterDomain: clusterDomain, + GatewayIdentity: "gateway-identity", + GatewayAddress: "192.0.2.127", + GatewayPort: "888", + ProbeSpec: defaultProbeSpec, + Selector: defaultSelector, + RemoteDiscoverySelector: defaultRemoteDiscoverySelector, + }, + }, + } + + t.Run("update headless service from EndpointSlice with new host", func(t *testing.T) { + queue := workqueue.NewTypedRateLimitingQueue(workqueue.DefaultTypedControllerRateLimiter[any]()) + localAPI, err := updateHeadlessServiceWithES.runEnvironment(queue) + if err != nil { + t.Fatalf("error running environment: %v", err) + } + + // Check that new endpoint mirror service was created for pod-1 + _, err = localAPI.Client.CoreV1().Services("ns1").Get(context.Background(), "pod-1-remote", metav1.GetOptions{}) + if err != nil { + t.Fatalf("failed to get endpoint mirror service for pod-1: %v", err) + } + }) +} + +// Helper to create a non-headless EndpointSlice for testing +func remoteEndpointSlice(name, serviceName, namespace, resourceVersion string, addresses []string, ports []discoveryv1.EndpointPort) *discoveryv1.EndpointSlice { + ready := true + endpoints := make([]discoveryv1.Endpoint, 0, len(addresses)) + for i, addr := range addresses { + endpoints = append(endpoints, discoveryv1.Endpoint{ + Addresses: []string{addr}, + Conditions: discoveryv1.EndpointConditions{Ready: &ready}, + TargetRef: &corev1.ObjectReference{ + Kind: "Pod", + Name: "pod-" + string(rune('0'+i)), + Namespace: namespace, + }, + }) + } + return &discoveryv1.EndpointSlice{ + TypeMeta: metav1.TypeMeta{ + Kind: "EndpointSlice", + APIVersion: "discovery.k8s.io/v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + ResourceVersion: resourceVersion, + Labels: map[string]string{ + discoveryv1.LabelServiceName: serviceName, + consts.DefaultExportedServiceSelector: "true", + }, + }, + AddressType: discoveryv1.AddressTypeIPv4, + Endpoints: endpoints, + Ports: ports, + } +} + +// Helper to create an empty EndpointSlice +func emptyEndpointSlice(name, serviceName, namespace, resourceVersion string, ports []discoveryv1.EndpointPort) *discoveryv1.EndpointSlice { + return &discoveryv1.EndpointSlice{ + TypeMeta: metav1.TypeMeta{ + Kind: "EndpointSlice", + APIVersion: "discovery.k8s.io/v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + ResourceVersion: resourceVersion, + Labels: map[string]string{ + discoveryv1.LabelServiceName: serviceName, + consts.DefaultExportedServiceSelector: "true", + }, + }, + AddressType: discoveryv1.AddressTypeIPv4, + Endpoints: []discoveryv1.Endpoint{}, + Ports: ports, + } +} + +func TestIsEmptyServiceES(t *testing.T) { + servicePorts := []corev1.ServicePort{ + {Name: "port1", Protocol: "TCP", Port: 8080}, + } + esPorts := endpointSlicePorts(servicePorts) + + t.Run("returns true when no EndpointSlices exist", func(t *testing.T) { + remoteAPI, err := k8s.NewFakeAPI( + asYaml(remoteService("test-svc", "test-ns", "111", map[string]string{ + consts.DefaultExportedServiceSelector: "true", + }, servicePorts)), + ) + if err != nil { + t.Fatalf("error creating fake API: %v", err) + } + remoteAPI.Sync(nil) + + watcher := &RemoteClusterServiceWatcher{ + remoteAPIClient: remoteAPI, + log: logging.WithFields(logging.Fields{"test": "true"}), + } + + svc, _ := remoteAPI.Svc().Lister().Services("test-ns").Get("test-svc") + empty, err := watcher.isEmptyServiceES(svc) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if !empty { + t.Error("expected service with no EndpointSlices to be empty") + } + }) + + t.Run("returns true when EndpointSlice has no endpoints", func(t *testing.T) { + remoteAPI, err := k8s.NewFakeAPI( + asYaml(remoteService("test-svc", "test-ns", "111", map[string]string{ + consts.DefaultExportedServiceSelector: "true", + }, servicePorts)), + asYaml(emptyEndpointSlice("test-svc-abc", "test-svc", "test-ns", "112", esPorts)), + ) + if err != nil { + t.Fatalf("error creating fake API: %v", err) + } + remoteAPI.Sync(nil) + + watcher := &RemoteClusterServiceWatcher{ + remoteAPIClient: remoteAPI, + log: logging.WithFields(logging.Fields{"test": "true"}), + } + + svc, _ := remoteAPI.Svc().Lister().Services("test-ns").Get("test-svc") + empty, err := watcher.isEmptyServiceES(svc) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if !empty { + t.Error("expected service with empty EndpointSlice to be empty") + } + }) + + t.Run("returns false when EndpointSlice has ready endpoints", func(t *testing.T) { + remoteAPI, err := k8s.NewFakeAPI( + asYaml(remoteService("test-svc", "test-ns", "111", map[string]string{ + consts.DefaultExportedServiceSelector: "true", + }, servicePorts)), + asYaml(remoteEndpointSlice("test-svc-abc", "test-svc", "test-ns", "112", []string{"10.0.0.1"}, esPorts)), + ) + if err != nil { + t.Fatalf("error creating fake API: %v", err) + } + remoteAPI.Sync(nil) + + watcher := &RemoteClusterServiceWatcher{ + remoteAPIClient: remoteAPI, + log: logging.WithFields(logging.Fields{"test": "true"}), + } + + svc, _ := remoteAPI.Svc().Lister().Services("test-ns").Get("test-svc") + empty, err := watcher.isEmptyServiceES(svc) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if empty { + t.Error("expected service with ready endpoints to not be empty") + } + }) + + t.Run("returns false when any EndpointSlice has ready endpoints (multiple slices)", func(t *testing.T) { + remoteAPI, err := k8s.NewFakeAPI( + asYaml(remoteService("test-svc", "test-ns", "111", map[string]string{ + consts.DefaultExportedServiceSelector: "true", + }, servicePorts)), + asYaml(emptyEndpointSlice("test-svc-abc", "test-svc", "test-ns", "112", esPorts)), + asYaml(remoteEndpointSlice("test-svc-def", "test-svc", "test-ns", "113", []string{"10.0.0.1"}, esPorts)), + ) + if err != nil { + t.Fatalf("error creating fake API: %v", err) + } + remoteAPI.Sync(nil) + + watcher := &RemoteClusterServiceWatcher{ + remoteAPIClient: remoteAPI, + log: logging.WithFields(logging.Fields{"test": "true"}), + } + + svc, _ := remoteAPI.Svc().Lister().Services("test-ns").Get("test-svc") + empty, err := watcher.isEmptyServiceES(svc) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if empty { + t.Error("expected service to not be empty when at least one EndpointSlice has endpoints") + } + }) +} + +func TestIsEmptyServiceDispatch(t *testing.T) { + servicePorts := []corev1.ServicePort{ + {Name: "port1", Protocol: "TCP", Port: 8080}, + } + esPorts := endpointSlicePorts(servicePorts) + + t.Run("uses EndpointSlice when enableEndpointSlices is true", func(t *testing.T) { + remoteAPI, err := k8s.NewFakeAPI( + asYaml(remoteService("test-svc", "test-ns", "111", map[string]string{ + consts.DefaultExportedServiceSelector: "true", + }, servicePorts)), + asYaml(remoteEndpointSlice("test-svc-abc", "test-svc", "test-ns", "112", []string{"10.0.0.1"}, esPorts)), + ) + if err != nil { + t.Fatalf("error creating fake API: %v", err) + } + remoteAPI.Sync(nil) + + watcher := &RemoteClusterServiceWatcher{ + remoteAPIClient: remoteAPI, + log: logging.WithFields(logging.Fields{"test": "true"}), + enableEndpointSlices: true, + } + + svc, _ := remoteAPI.Svc().Lister().Services("test-ns").Get("test-svc") + empty, err := watcher.isEmptyService(svc) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if empty { + t.Error("expected service to not be empty when using EndpointSlice mode") + } + }) + + t.Run("uses Endpoints when enableEndpointSlices is false", func(t *testing.T) { + remoteAPI, err := k8s.NewFakeAPI( + asYaml(remoteService("test-svc", "test-ns", "111", map[string]string{ + consts.DefaultExportedServiceSelector: "true", + }, servicePorts)), + asYaml(endpoints("test-svc", "test-ns", nil, "10.0.0.1", "", nil)), + ) + if err != nil { + t.Fatalf("error creating fake API: %v", err) + } + remoteAPI.Sync(nil) + + watcher := &RemoteClusterServiceWatcher{ + remoteAPIClient: remoteAPI, + log: logging.WithFields(logging.Fields{"test": "true"}), + enableEndpointSlices: false, + } + + svc, _ := remoteAPI.Svc().Lister().Services("test-ns").Get("test-svc") + empty, err := watcher.isEmptyService(svc) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if empty { + t.Error("expected service to not be empty when using Endpoints mode") + } + }) +} + +func TestEndpointSliceWithNotReadyEndpoints(t *testing.T) { + t.Run("isEmptyEndpointSlice returns true when Ready is nil", func(t *testing.T) { + es := &discoveryv1.EndpointSlice{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-slice", + Namespace: "test-ns", + }, + Endpoints: []discoveryv1.Endpoint{ + { + Addresses: []string{"10.0.0.1"}, + Conditions: discoveryv1.EndpointConditions{Ready: nil}, + }, + }, + } + + watcher := &RemoteClusterServiceWatcher{ + log: logging.WithFields(logging.Fields{"test": "true"}), + } + + // When Ready is nil, the endpoint is considered not ready + if !watcher.isEmptyEndpointSlice(es) { + t.Error("expected EndpointSlice with nil Ready condition to be considered empty") + } + }) + + t.Run("isEmptyEndpointSlice returns false with mixed ready states", func(t *testing.T) { + ready := true + notReady := false + es := &discoveryv1.EndpointSlice{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-slice", + Namespace: "test-ns", + }, + Endpoints: []discoveryv1.Endpoint{ + { + Addresses: []string{"10.0.0.1"}, + Conditions: discoveryv1.EndpointConditions{Ready: ¬Ready}, + }, + { + Addresses: []string{"10.0.0.2"}, + Conditions: discoveryv1.EndpointConditions{Ready: &ready}, + }, + }, + } + + watcher := &RemoteClusterServiceWatcher{ + log: logging.WithFields(logging.Fields{"test": "true"}), + } + + if watcher.isEmptyEndpointSlice(es) { + t.Error("expected EndpointSlice with at least one ready endpoint to not be empty") + } + }) +} + +func TestEndpointSliceNonHeadlessService(t *testing.T) { + servicePorts := []corev1.ServicePort{ + {Name: "http", Protocol: "TCP", Port: 80}, + } + esPorts := endpointSlicePorts(servicePorts) + + t.Run("non-headless EndpointSlice triggers emptiness check", func(t *testing.T) { + // Create an EndpointSlice without headless label + es := &discoveryv1.EndpointSlice{ + TypeMeta: metav1.TypeMeta{ + Kind: "EndpointSlice", + APIVersion: "discovery.k8s.io/v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "svc-abc", + Namespace: "test-ns", + Labels: map[string]string{ + discoveryv1.LabelServiceName: "svc", + consts.DefaultExportedServiceSelector: "true", + // No corev1.IsHeadlessService label + }, + }, + AddressType: discoveryv1.AddressTypeIPv4, + Endpoints: []discoveryv1.Endpoint{ + { + Addresses: []string{"10.0.0.1"}, + Conditions: discoveryv1.EndpointConditions{Ready: func() *bool { b := true; return &b }()}, + }, + }, + Ports: esPorts, + } + + // Verify it's not considered headless + if isHeadlessEndpointSlice(es, logging.NewEntry(logging.New())) { + t.Error("expected EndpointSlice without headless label to not be considered headless") + } + }) +} + +func TestEndpointSliceMultipleAddresses(t *testing.T) { + t.Run("EndpointSlice with multiple addresses per endpoint", func(t *testing.T) { + ready := true + hostname := "pod-0" + es := &discoveryv1.EndpointSlice{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-slice", + Namespace: "test-ns", + Labels: map[string]string{ + discoveryv1.LabelServiceName: "test-svc", + }, + }, + Endpoints: []discoveryv1.Endpoint{ + { + Addresses: []string{"10.0.0.1", "10.0.0.2"}, // Multiple addresses + Hostname: &hostname, + Conditions: discoveryv1.EndpointConditions{Ready: &ready}, + }, + }, + } + + // Verify shouldExportAsHeadlessServiceFromSlice works with multiple addresses + result := shouldExportAsHeadlessServiceFromSlice(es, logging.NewEntry(logging.New())) + if !result { + t.Error("expected true for EndpointSlice with hostname, regardless of number of addresses") + } + }) +} + +func TestEndpointSliceEmptyHostname(t *testing.T) { + t.Run("shouldExportAsHeadlessServiceFromSlice returns false for empty string hostname", func(t *testing.T) { + emptyHostname := "" + es := &discoveryv1.EndpointSlice{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + discoveryv1.LabelServiceName: "test-service", + }, + }, + Endpoints: []discoveryv1.Endpoint{ + { + Addresses: []string{"10.0.0.1"}, + Hostname: &emptyHostname, + }, + }, + } + + result := shouldExportAsHeadlessServiceFromSlice(es, logging.NewEntry(logging.New())) + if result { + t.Error("expected false when hostname is empty string") + } + }) +} + +func TestWatcherEnableEndpointSlicesField(t *testing.T) { + t.Run("watcher correctly stores enableEndpointSlices flag", func(t *testing.T) { + watcherWithES := &RemoteClusterServiceWatcher{ + enableEndpointSlices: true, + log: logging.WithFields(logging.Fields{"test": "true"}), + } + + watcherWithoutES := &RemoteClusterServiceWatcher{ + enableEndpointSlices: false, + log: logging.WithFields(logging.Fields{"test": "true"}), + } + + if !watcherWithES.enableEndpointSlices { + t.Error("expected enableEndpointSlices to be true") + } + + if watcherWithoutES.enableEndpointSlices { + t.Error("expected enableEndpointSlices to be false") + } + }) +} diff --git a/multicluster/service-mirror/cluster_watcher_headless_es.go b/multicluster/service-mirror/cluster_watcher_headless_es.go new file mode 100644 index 0000000000000..36a195da436ab --- /dev/null +++ b/multicluster/service-mirror/cluster_watcher_headless_es.go @@ -0,0 +1,442 @@ +package servicemirror + +import ( + "context" + "fmt" + + consts "github.com/linkerd/linkerd2/pkg/k8s" + logging "github.com/sirupsen/logrus" + corev1 "k8s.io/api/core/v1" + discoveryv1 "k8s.io/api/discovery/v1" + kerrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" +) + +// handleCreateOrUpdateEndpointSlice processes EndpointSlice objects for exported +// services. When an EndpointSlice object is created or updated in the remote +// cluster, it will be processed here in order to reconcile the local cluster +// state with the remote cluster state. +func (rcsw *RemoteClusterServiceWatcher) handleCreateOrUpdateEndpointSlice( + ctx context.Context, + exportedEndpointSlice *discoveryv1.EndpointSlice, +) error { + ns, svcName, err := getEndpointSliceServiceID(exportedEndpointSlice) + if err != nil { + return err + } + + exportedService, err := rcsw.remoteAPIClient.Svc().Lister().Services(ns).Get(svcName) + if err != nil { + rcsw.log.Debugf("failed to retrieve exported service %s/%s when updating its mirror endpoints: %v", ns, svcName, err) + return fmt.Errorf("error retrieving exported service %s/%s: %w", ns, svcName, err) + } + + if isHeadlessEndpointSlice(exportedEndpointSlice, rcsw.log) { + if rcsw.headlessServicesEnabled { + return rcsw.createOrUpdateHeadlessEndpointsFromSlice(ctx, exportedService, exportedEndpointSlice) + } + return nil + } + + // For non-headless services, handle emptiness check + return rcsw.handleEndpointSliceEmptiness(ctx, exportedService, exportedEndpointSlice) +} + +// handleEndpointSliceEmptiness handles updates to EndpointSlices to check if they've +// become empty/filled since their creation, in order to empty/fill the +// mirrored endpoints as well +func (rcsw *RemoteClusterServiceWatcher) handleEndpointSliceEmptiness( + ctx context.Context, + exportedService *corev1.Service, + exportedEndpointSlice *discoveryv1.EndpointSlice, +) error { + localServiceName := rcsw.mirrorServiceName(exportedService.Name) + ep, err := rcsw.localAPIClient.Endpoint().Lister().Endpoints(exportedService.Namespace).Get(localServiceName) + if err != nil { + return RetryableError{[]error{err}} + } + + // Check if the service is empty using all EndpointSlices + serviceEmpty, err := rcsw.isEmptyServiceES(exportedService) + if err != nil { + return RetryableError{[]error{err}} + } + + localEmpty := rcsw.isEmptyEndpoints(ep) + + if (localEmpty && serviceEmpty) || (!localEmpty && !serviceEmpty) { + return nil + } + + rcsw.log.Infof("Updating subsets for mirror endpoint %s/%s", exportedService.Namespace, exportedService.Name) + if serviceEmpty { + ep.Subsets = []corev1.EndpointSubset{} + } else { + gatewayAddresses, err := rcsw.resolveGatewayAddress() + if err != nil { + return err + } + ports, err := rcsw.getEndpointsPorts(exportedService) + if err != nil { + return err + } + ep.Subsets = []corev1.EndpointSubset{ + { + Addresses: gatewayAddresses, + Ports: ports, + }, + } + } + return rcsw.updateMirrorEndpoints(ctx, ep) +} + +// createOrUpdateHeadlessEndpointsFromSlice processes EndpointSlice objects for +// exported headless services. This is the EndpointSlice equivalent of +// createOrUpdateHeadlessEndpoints. +func (rcsw *RemoteClusterServiceWatcher) createOrUpdateHeadlessEndpointsFromSlice( + ctx context.Context, + exportedService *corev1.Service, + exportedEndpointSlice *discoveryv1.EndpointSlice, +) error { + // Check whether the endpoints should be processed for a headless exported + // service. If the exported service does not have any ports exposed, then + // neither will its corresponding endpoint mirrors, it should not be created + // as a headless mirror. + if len(exportedService.Spec.Ports) == 0 { + rcsw.recorder.Event(exportedService, corev1.EventTypeNormal, eventTypeSkipped, "Skipped mirroring service: object spec has no exposed ports") + rcsw.log.Infof("Skipped creating headless mirror for %s/%s: service object spec has no exposed ports", exportedService.Namespace, exportedService.Name) + return nil + } + + mirrorServiceName := rcsw.mirrorServiceName(exportedService.Name) + mirrorService, err := rcsw.localAPIClient.Svc().Lister().Services(exportedService.Namespace).Get(mirrorServiceName) + if err != nil { + if !kerrors.IsNotFound(err) { + return err + } + + // If the mirror service does not exist, create it, either as clusterIP + // or as headless. + mirrorService, err = rcsw.createRemoteHeadlessServiceFromSlice(ctx, exportedService, exportedEndpointSlice) + if err != nil { + return err + } + } + + headlessMirrorEpName := rcsw.mirrorServiceName(exportedService.Name) + headlessMirrorEndpoints, err := rcsw.localAPIClient.Endpoint().Lister().Endpoints(exportedService.Namespace).Get(headlessMirrorEpName) + if err != nil { + if !kerrors.IsNotFound(err) { + return err + } + + if mirrorService.Spec.ClusterIP != corev1.ClusterIPNone { + return rcsw.createGatewayEndpoints(ctx, exportedService) + } + + // Create endpoint mirrors for headless mirror + if err := rcsw.createHeadlessMirrorEndpointsFromSlice(ctx, exportedService, exportedEndpointSlice); err != nil { + rcsw.log.Debugf("failed to create headless mirrors for EndpointSlice %s/%s: %v", exportedEndpointSlice.Namespace, exportedEndpointSlice.Name, err) + return err + } + + return nil + } + + // Past this point, we do not want to process a mirror service that is not + // headless. We want to process only endpoints for headless mirrors. + if mirrorService.Spec.ClusterIP != corev1.ClusterIPNone { + return nil + } + + mirrorEndpoints := headlessMirrorEndpoints.DeepCopy() + endpointMirrors := make(map[string]struct{}) + newSubsets := make([]corev1.EndpointSubset, 0) + + // Get all EndpointSlices for this service to aggregate + matchLabels := map[string]string{discoveryv1.LabelServiceName: exportedService.Name} + selector := labels.Set(matchLabels).AsSelector() + allSlices, err := rcsw.remoteAPIClient.ES().Lister().EndpointSlices(exportedService.Namespace).List(selector) + if err != nil { + return RetryableError{[]error{err}} + } + + // Build ports from the service + ports := make([]corev1.EndpointPort, 0, len(exportedService.Spec.Ports)) + for _, port := range exportedService.Spec.Ports { + ports = append(ports, corev1.EndpointPort{ + Name: port.Name, + Protocol: port.Protocol, + Port: port.Port, + }) + } + + // Process all EndpointSlices for this service + newAddresses := make([]corev1.EndpointAddress, 0) + for _, slice := range allSlices { + for _, endpoint := range slice.Endpoints { + hostname := "" + if endpoint.Hostname != nil { + hostname = *endpoint.Hostname + } + if hostname == "" { + continue + } + + // Skip endpoints that are not ready + if endpoint.Conditions.Ready != nil && !*endpoint.Conditions.Ready { + continue + } + + endpointMirrorName := rcsw.mirrorServiceName(hostname) + endpointMirrorService, err := rcsw.localAPIClient.Svc().Lister().Services(exportedService.Namespace).Get(endpointMirrorName) + if err != nil { + if !kerrors.IsNotFound(err) { + return err + } + // If the error is 'NotFound' then the Endpoint Mirror service + // does not exist, so create it. + endpointMirrorService, err = rcsw.createEndpointMirrorService(ctx, hostname, slice.ResourceVersion, endpointMirrorName, exportedService) + if err != nil { + return err + } + } + + endpointMirrors[endpointMirrorName] = struct{}{} + newAddresses = append(newAddresses, corev1.EndpointAddress{ + Hostname: hostname, + IP: endpointMirrorService.Spec.ClusterIP, + }) + } + } + + if len(newAddresses) > 0 { + newSubsets = append(newSubsets, corev1.EndpointSubset{ + Addresses: newAddresses, + Ports: ports, + }) + } + + headlessMirrorName := rcsw.mirrorServiceName(exportedService.Name) + matchLabels = map[string]string{ + consts.MirroredHeadlessSvcNameLabel: headlessMirrorName, + } + + // Fetch all Endpoint Mirror services that belong to the same Headless Mirror + endpointMirrorServices, err := rcsw.localAPIClient.Svc().Lister().List(labels.Set(matchLabels).AsSelector()) + if err != nil { + return err + } + + var errors []error + for _, service := range endpointMirrorServices { + // If the service's name does not show up in the up-to-date map of + // Endpoint Mirror names, then we should delete it. + if _, found := endpointMirrors[service.Name]; found { + continue + } + err := rcsw.localAPIClient.Client.CoreV1().Services(service.Namespace).Delete(ctx, service.Name, metav1.DeleteOptions{}) + if err != nil { + if !kerrors.IsNotFound(err) { + errors = append(errors, fmt.Errorf("error deleting Endpoint Mirror service %s/%s: %w", service.Namespace, service.Name, err)) + } + } + } + + if len(errors) > 0 { + return RetryableError{errors} + } + + // Update endpoints + mirrorEndpoints.Subsets = newSubsets + err = rcsw.updateMirrorEndpoints(ctx, mirrorEndpoints) + if err != nil { + return RetryableError{[]error{err}} + } + + return nil +} + +// createRemoteHeadlessServiceFromSlice creates a mirror service for an exported +// headless service using EndpointSlice data. Whether the mirror will be created +// as a headless or clusterIP service depends on the EndpointSlice. +func (rcsw *RemoteClusterServiceWatcher) createRemoteHeadlessServiceFromSlice( + ctx context.Context, + exportedService *corev1.Service, + exportedEndpointSlice *discoveryv1.EndpointSlice, +) (*corev1.Service, error) { + // If we don't have any endpoints to process then avoid creating the service. + if len(exportedEndpointSlice.Endpoints) == 0 { + return &corev1.Service{}, nil + } + + remoteService := exportedService.DeepCopy() + serviceInfo := fmt.Sprintf("%s/%s", remoteService.Namespace, remoteService.Name) + localServiceName := rcsw.mirrorServiceName(remoteService.Name) + + if rcsw.namespaceCreationEnabled { + if err := rcsw.mirrorNamespaceIfNecessary(ctx, remoteService.Namespace); err != nil { + return &corev1.Service{}, err + } + } else { + // Ensure the namespace exists, and skip mirroring if it doesn't + if _, err := rcsw.localAPIClient.NS().Lister().Get(remoteService.Namespace); err != nil { + if kerrors.IsNotFound(err) { + rcsw.log.Warnf("Skipping mirroring of service %s: namespace %s does not exist", serviceInfo, remoteService.Namespace) + return &corev1.Service{}, nil + } + // something else went wrong, so we can just retry + return nil, RetryableError{[]error{err}} + } + } + + serviceToCreate := &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: localServiceName, + Namespace: remoteService.Namespace, + Annotations: rcsw.getMirrorServiceAnnotations(remoteService), + Labels: rcsw.getMirrorServiceLabels(remoteService), + }, + Spec: corev1.ServiceSpec{ + Ports: remapRemoteServicePorts(remoteService.Spec.Ports), + }, + } + + if shouldExportAsHeadlessServiceFromSlice(exportedEndpointSlice, rcsw.log) { + serviceToCreate.Spec.ClusterIP = corev1.ClusterIPNone + rcsw.log.Infof("Creating a new headless service mirror for %s", serviceInfo) + } else { + rcsw.log.Infof("Creating a new service mirror for %s", serviceInfo) + } + + svc, err := rcsw.localAPIClient.Client.CoreV1().Services(remoteService.Namespace).Create(ctx, serviceToCreate, metav1.CreateOptions{}) + if err != nil { + if !kerrors.IsAlreadyExists(err) { + // we might have created it during earlier attempt, if that is not the case, we retry + return &corev1.Service{}, RetryableError{[]error{err}} + } + } + + return svc, err +} + +// createHeadlessMirrorEndpointsFromSlice creates an endpoints object for a +// Headless Mirror service using EndpointSlice data. +func (rcsw *RemoteClusterServiceWatcher) createHeadlessMirrorEndpointsFromSlice( + ctx context.Context, + exportedService *corev1.Service, + exportedEndpointSlice *discoveryv1.EndpointSlice, +) error { + exportedServiceInfo := fmt.Sprintf("%s/%s", exportedService.Namespace, exportedService.Name) + + // Get all EndpointSlices for this service to aggregate + matchLabels := map[string]string{discoveryv1.LabelServiceName: exportedService.Name} + selector := labels.Set(matchLabels).AsSelector() + allSlices, err := rcsw.remoteAPIClient.ES().Lister().EndpointSlices(exportedService.Namespace).List(selector) + if err != nil { + return RetryableError{[]error{err}} + } + + endpointsHostnames := make(map[string]struct{}) + newAddresses := make([]corev1.EndpointAddress, 0) + + for _, slice := range allSlices { + for _, endpoint := range slice.Endpoints { + hostname := "" + if endpoint.Hostname != nil { + hostname = *endpoint.Hostname + } + if hostname == "" { + continue + } + + // Skip endpoints that are not ready + if endpoint.Conditions.Ready != nil && !*endpoint.Conditions.Ready { + continue + } + + endpointMirrorName := rcsw.mirrorServiceName(hostname) + createdService, err := rcsw.createEndpointMirrorService(ctx, hostname, slice.ResourceVersion, endpointMirrorName, exportedService) + if err != nil { + rcsw.log.Errorf("error creating endpoint mirror service %s/%s for exported headless service %s: %v", endpointMirrorName, exportedService.Namespace, exportedServiceInfo, err) + continue + } + + endpointsHostnames[hostname] = struct{}{} + // Use the hostname as the address hostname (for DNS) + targetRefName := hostname + if endpoint.TargetRef != nil { + targetRefName = endpoint.TargetRef.Name + } + newAddresses = append(newAddresses, corev1.EndpointAddress{ + Hostname: targetRefName, + IP: createdService.Spec.ClusterIP, + }) + } + } + + // Build ports from the service + ports := make([]corev1.EndpointPort, 0, len(exportedService.Spec.Ports)) + for _, port := range exportedService.Spec.Ports { + ports = append(ports, corev1.EndpointPort{ + Name: port.Name, + Protocol: port.Protocol, + Port: port.Port, + }) + } + + subsetsToCreate := make([]corev1.EndpointSubset, 0) + if len(newAddresses) > 0 { + subsetsToCreate = append(subsetsToCreate, corev1.EndpointSubset{ + Addresses: newAddresses, + Ports: ports, + }) + } + + headlessMirrorServiceName := rcsw.mirrorServiceName(exportedService.Name) + headlessMirrorEndpoints := &corev1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: headlessMirrorServiceName, + Namespace: exportedService.Namespace, + Labels: rcsw.getMirrorEndpointLabels(exportedService), + Annotations: map[string]string{ + consts.RemoteServiceFqName: fmt.Sprintf("%s.%s.svc.%s", exportedService.Name, exportedService.Namespace, rcsw.link.Spec.TargetClusterDomain), + }, + }, + Subsets: subsetsToCreate, + } + + if rcsw.link.Spec.GatewayIdentity != "" { + headlessMirrorEndpoints.Annotations[consts.RemoteGatewayIdentity] = rcsw.link.Spec.GatewayIdentity + } + + rcsw.log.Infof("Creating a new headless mirror endpoints object for headless mirror %s/%s", headlessMirrorServiceName, exportedService.Namespace) + // The addresses for the headless mirror service point to the Cluster IPs + // of auxiliary services that are tied to gateway liveness. Therefore, + // these addresses should always be considered ready. + _, err = rcsw.localAPIClient.Client.CoreV1().Endpoints(exportedService.Namespace).Create(ctx, headlessMirrorEndpoints, metav1.CreateOptions{}) + if err != nil { + if svcErr := rcsw.localAPIClient.Client.CoreV1().Services(exportedService.Namespace).Delete(ctx, headlessMirrorServiceName, metav1.DeleteOptions{}); svcErr != nil { + rcsw.log.Errorf("failed to delete Service %s after Endpoints creation failed: %s", headlessMirrorServiceName, svcErr) + } + return RetryableError{[]error{err}} + } + + return nil +} + +// shouldExportAsHeadlessServiceFromSlice checks if an exported service should be +// mirrored as a headless service or as a clusterIP service, based on its +// EndpointSlice. For an exported service to be a headless mirror, it needs +// to have at least one named address (hostname) in its EndpointSlice. +func shouldExportAsHeadlessServiceFromSlice(es *discoveryv1.EndpointSlice, log *logging.Entry) bool { + for _, endpoint := range es.Endpoints { + if endpoint.Hostname != nil && *endpoint.Hostname != "" { + return true + } + } + ns, svcName, _ := getEndpointSliceServiceID(es) + log.Infof("Service %s/%s should not be exported as headless: no named addresses in its EndpointSlice", ns, svcName) + return false +} diff --git a/pkg/healthcheck/healthcheck.go b/pkg/healthcheck/healthcheck.go index 08de4dc750ac3..5010f637f257e 100644 --- a/pkg/healthcheck/healthcheck.go +++ b/pkg/healthcheck/healthcheck.go @@ -27,6 +27,7 @@ import ( admissionRegistration "k8s.io/api/admissionregistration/v1" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" + discoveryv1 "k8s.io/api/discovery/v1" apiextv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" kerrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" @@ -407,6 +408,7 @@ type Options struct { InstallManifest string CRDManifest string ChartValues *l5dcharts.Values + EnableEndpointSlices bool } // HealthChecker encapsulates all health check checkers, and clients required to @@ -2369,7 +2371,12 @@ func (hc *HealthChecker) checkMisconfiguredOpaquePortAnnotations(ctx context.Con // This is used instead of `hc.kubeAPI` to limit multiple k8s API requests // and use the caching logic in the shared informers // TODO: move the shared informer code out of `controller/`, and into `pkg` to simplify the dependency tree. - kubeAPI := controllerK8s.NewClusterScopedAPI(hc.kubeAPI.Interface, nil, nil, "local", controllerK8s.Endpoint, controllerK8s.Pod, controllerK8s.Svc) + var kubeAPI *controllerK8s.API + if hc.EnableEndpointSlices { + kubeAPI = controllerK8s.NewClusterScopedAPI(hc.kubeAPI.Interface, nil, nil, "local", controllerK8s.ES, controllerK8s.Pod, controllerK8s.Svc) + } else { + kubeAPI = controllerK8s.NewClusterScopedAPI(hc.kubeAPI.Interface, nil, nil, "local", controllerK8s.Endpoint, controllerK8s.Pod, controllerK8s.Svc) + } kubeAPI.Sync(ctx.Done()) services, err := kubeAPI.Svc().Lister().Services(hc.DataPlaneNamespace).List(labels.Everything()) @@ -2384,12 +2391,16 @@ func (hc *HealthChecker) checkMisconfiguredOpaquePortAnnotations(ctx context.Con continue } - endpoints, err := kubeAPI.Endpoint().Lister().Endpoints(service.Namespace).Get(service.Name) - if err != nil { - return err + var pods map[*corev1.Pod]struct{} + if hc.EnableEndpointSlices { + pods, err = getEndpointSlicePods(service, kubeAPI) + } else { + endpoints, endpointsErr := kubeAPI.Endpoint().Lister().Endpoints(service.Namespace).Get(service.Name) + if endpointsErr != nil { + return endpointsErr + } + pods, err = getEndpointsPods(endpoints, kubeAPI, service.Namespace) } - - pods, err := getEndpointsPods(endpoints, kubeAPI, service.Namespace) if err != nil { return err } @@ -2429,6 +2440,39 @@ func getEndpointsPods(endpoints *corev1.Endpoints, kubeAPI *controllerK8s.API, n return pods, nil } +// getEndpointSlicePods takes a service and returns the set of all pods that +// are targeted by its EndpointSlices. +func getEndpointSlicePods(svc *corev1.Service, kubeAPI *controllerK8s.API) (map[*corev1.Pod]struct{}, error) { + pods := make(map[*corev1.Pod]struct{}) + + matchLabels := map[string]string{discoveryv1.LabelServiceName: svc.Name} + selector := labels.Set(matchLabels).AsSelector() + + slices, err := kubeAPI.ES().Lister().EndpointSlices(svc.Namespace).List(selector) + if err != nil { + return nil, err + } + + for _, slice := range slices { + for _, endpoint := range slice.Endpoints { + if endpoint.TargetRef == nil { + continue + } + if endpoint.TargetRef.Kind != "Pod" { + continue + } + pod, err := kubeAPI.Pod().Lister().Pods(svc.Namespace).Get(endpoint.TargetRef.Name) + if err != nil { + return nil, err + } + if _, ok := pods[pod]; !ok { + pods[pod] = struct{}{} + } + } + } + return pods, nil +} + func misconfiguredOpaqueAnnotation(service *corev1.Service, pod *corev1.Pod) error { var svcPorts, podPorts []string if v, ok := service.Annotations[k8s.ProxyOpaquePortsAnnotation]; ok { diff --git a/pkg/healthcheck/healthcheck_test.go b/pkg/healthcheck/healthcheck_test.go index 7d1c0a8c37928..3537665128cad 100644 --- a/pkg/healthcheck/healthcheck_test.go +++ b/pkg/healthcheck/healthcheck_test.go @@ -3054,6 +3054,228 @@ subsets: } } +func TestCheckOpaquePortAnnotationsWithEndpointSlices(t *testing.T) { + hc := NewHealthChecker( + []CategoryID{LinkerdOpaquePortsDefinitionChecks}, + &Options{ + DataPlaneNamespace: "test-ns", + EnableEndpointSlices: true, + }, + ) + + var err error + + var testCases = []struct { + description string + resources []string + expected error + }{ + { + description: "matching opaque port annotations", + resources: []string{` +apiVersion: v1 +kind: Service +metadata: + name: svc + namespace: test-ns + annotations: + config.linkerd.io/opaque-ports: "9200" +spec: + selector: + app: test + ports: + - name: test + port: 9200 + targetPort: 9200 +`, + ` +apiVersion: v1 +kind: Pod +metadata: + name: pod + namespace: test-ns + labels: + app: test + annotations: + config.linkerd.io/opaque-ports: "9200" +spec: + containers: + - name: test + image: test + ports: + - name: test + containerPort: 9200 +`, + ` +apiVersion: discovery.k8s.io/v1 +kind: EndpointSlice +metadata: + name: svc-abc + namespace: test-ns + labels: + kubernetes.io/service-name: svc +addressType: IPv4 +endpoints: +- addresses: + - "10.244.3.12" + conditions: + ready: true + targetRef: + kind: Pod + name: pod + namespace: test-ns +ports: +- name: test + port: 9200 + protocol: TCP +`, + }, + }, + { + description: "missing opaque port annotation on service", + resources: []string{` +apiVersion: v1 +kind: Service +metadata: + name: svc + namespace: test-ns +spec: + selector: + app: test + ports: + - name: http + port: 9200 + targetPort: 9200 +`, + ` +apiVersion: v1 +kind: Pod +metadata: + name: pod + namespace: test-ns + labels: + app: test + annotations: + config.linkerd.io/opaque-ports: "9200" +spec: + containers: + - name: test + image: test + ports: + - name: test + containerPort: 9200 +`, + ` +apiVersion: discovery.k8s.io/v1 +kind: EndpointSlice +metadata: + name: svc-abc + namespace: test-ns + labels: + kubernetes.io/service-name: svc +addressType: IPv4 +endpoints: +- addresses: + - "10.244.3.12" + conditions: + ready: true + targetRef: + kind: Pod + name: pod + namespace: test-ns +ports: +- name: test + port: 9200 + protocol: TCP +`, + }, + expected: fmt.Errorf("\t* service svc targets the opaque port 9200 through 9200; add 9200 to its config.linkerd.io/opaque-ports annotation"), + }, + { + description: "missing opaque port annotation on pod", + resources: []string{` +apiVersion: v1 +kind: Service +metadata: + name: svc + namespace: test-ns + annotations: + config.linkerd.io/opaque-ports: "9200" +spec: + selector: + app: test + ports: + - name: test + port: 9200 + targetPort: 9200 +`, + ` +apiVersion: v1 +kind: Pod +metadata: + name: pod + namespace: test-ns + labels: + app: test +spec: + containers: + - name: test + image: test + ports: + - name: test + containerPort: 9200 +`, + ` +apiVersion: discovery.k8s.io/v1 +kind: EndpointSlice +metadata: + name: svc-abc + namespace: test-ns + labels: + kubernetes.io/service-name: svc +addressType: IPv4 +endpoints: +- addresses: + - "10.244.3.12" + conditions: + ready: true + targetRef: + kind: Pod + name: pod + namespace: test-ns +ports: +- name: test + port: 9200 + protocol: TCP +`, + }, + expected: fmt.Errorf("\t* service svc expects target port 9200 to be opaque; add it to pod pod config.linkerd.io/opaque-ports annotation"), + }, + } + + for _, tc := range testCases { + tc := tc // pin + t.Run(tc.description, func(t *testing.T) { + hc.kubeAPI, err = k8s.NewFakeAPI(tc.resources...) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + err = hc.checkMisconfiguredOpaquePortAnnotations(context.Background()) + if err == nil && tc.expected != nil { + t.Fatalf("Expected check to fail with %s", tc.expected.Error()) + } + if err != nil && tc.expected != nil { + if err.Error() != tc.expected.Error() { + t.Fatalf("Expected error: %s, received: %s", tc.expected, err) + } + } + if err != nil && tc.expected == nil { + t.Fatalf("Did not expect error but got: %s", err.Error()) + } + }) + } +} + type controlPlaneReplicaOptions struct { destination int identity int