diff --git a/Makefile b/Makefile index 22269704..d9a41b83 100644 --- a/Makefile +++ b/Makefile @@ -67,8 +67,8 @@ define local_tag $(TAG)$(shell [ "$(USE_LOCAL_IMAGE)" = "true" ] && echo "-local") endef -.PHONY: e2e TEST_SUITES ?= remote_ip remote_dns_name spire +.PHONY: e2e e2e: kind-clusters ## Runs end-to-end tests against KinD clusters @local_tag=$(call local_tag); \ $(foreach suite, $(TEST_SUITES), \ diff --git a/Makefile.func.mk b/Makefile.func.mk index c1d54a8d..b0c5da47 100644 --- a/Makefile.func.mk +++ b/Makefile.func.mk @@ -2,7 +2,7 @@ define go-mod-version $(shell go mod graph | grep $(1) | head -n 1 | cut -d'@' -f 2) endef -# Using controller-gen to fetch external CRDs and put them in defined folder folder +# Using controller-gen to fetch external CRDs and put them in defined folder. # They can be used e.g. in testing using EnvTest where controller under test # requires additional resources to manage. # diff --git a/api/v1alpha1/meshfederation_types.go b/api/v1alpha1/meshfederation_types.go index 4868bfaa..6d6017a7 100644 --- a/api/v1alpha1/meshfederation_types.go +++ b/api/v1alpha1/meshfederation_types.go @@ -74,6 +74,9 @@ type MeshFederationStatus struct { // Conditions describes the state of the MeshFederation resource. // +optional Conditions []metav1.Condition `json:"conditions,omitempty"` + + // +optional + ExportedServices []string `json:"exportedServices,omitempty"` } type PortConfig struct { diff --git a/api/v1alpha1/zz_generated.deepcopy.go b/api/v1alpha1/zz_generated.deepcopy.go index 4ff98582..121c5595 100644 --- a/api/v1alpha1/zz_generated.deepcopy.go +++ b/api/v1alpha1/zz_generated.deepcopy.go @@ -261,6 +261,11 @@ func (in *MeshFederationStatus) DeepCopyInto(out *MeshFederationStatus) { (*in)[i].DeepCopyInto(&(*out)[i]) } } + if in.ExportedServices != nil { + in, out := &in.ExportedServices, &out.ExportedServices + *out = make([]string, len(*in)) + copy(*out, *in) + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new MeshFederationStatus. diff --git a/chart/crds/federation.openshift-service-mesh.io_meshfederations.yaml b/chart/crds/federation.openshift-service-mesh.io_meshfederations.yaml index 811660e4..28923334 100644 --- a/chart/crds/federation.openshift-service-mesh.io_meshfederations.yaml +++ b/chart/crds/federation.openshift-service-mesh.io_meshfederations.yaml @@ -227,6 +227,10 @@ spec: - type type: object type: array + exportedServices: + items: + type: string + type: array type: object type: object served: true diff --git a/chart/templates/clusterrole.yaml b/chart/templates/clusterrole.yaml index 66812402..b8b14e6a 100644 --- a/chart/templates/clusterrole.yaml +++ b/chart/templates/clusterrole.yaml @@ -11,23 +11,23 @@ rules: verbs: ["get", "list", "create", "update", "patch", "delete"] - apiGroups: ["security.istio.io"] resources: ["peerauthentications"] - verbs: ["get", "list", "create", "update", "patch", "delete"] + verbs: ["get", "list", "create", "update", "patch", "delete", "watch"] {{- if (include "remotes.hasOpenshiftRouterPeer" .) }} - apiGroups: ["networking.istio.io"] resources: ["destinationrules"] - verbs: ["get", "list", "create", "update", "patch", "delete"] + verbs: ["get", "list", "create", "update", "patch", "delete", "watch"] {{- end }} {{- if eq .Values.federation.meshPeers.local.ingressType "openshift-router" }} - apiGroups: ["networking.istio.io"] resources: ["envoyfilters"] - verbs: ["get", "list", "create", "update", "patch", "delete"] + verbs: ["get", "list", "create", "update", "patch", "delete", "watch"] - apiGroups: ["route.openshift.io"] resources: ["routes", "routes/custom-host"] - verbs: ["get", "list", "create", "update", "patch", "delete"] + verbs: ["get", "list", "create", "update", "patch", "delete", "watch"] {{- end }} - apiGroups: ["federation.openshift-service-mesh.io"] resources: ["meshfederations", "federatedservices"] verbs: ["create", "delete", "get", "list", "patch", "update", "watch"] - apiGroups: ["federation.openshift-service-mesh.io"] resources: ["meshfederations/status", "federatedservices/status"] - verbs: ["get"] + verbs: ["get", "list", "create", "update", "patch", "delete", "watch"] diff --git a/cmd/federation-controller/main.go b/cmd/federation-controller/main.go index 553a5350..06e1706f 100644 --- a/cmd/federation-controller/main.go +++ b/cmd/federation-controller/main.go @@ -324,7 +324,7 @@ func startFDSClient(ctx context.Context, remote config.Remote, meshConfigPushReq DiscoveryAddr: discoveryAddr, Authority: remote.ServiceFQDN(), Handlers: map[string]adsc.ResponseHandler{ - xds.ExportedServiceTypeUrl: fds.NewImportedServiceHandler(importedServiceStore, meshConfigPushRequests), + xds.FederatedServiceTypeUrl: fds.NewImportedServiceHandler(importedServiceStore, meshConfigPushRequests), }, ReconnectDelay: reconnectDelay, }) diff --git a/docs/arch/diagrams/ctrl-overview.drawio b/docs/arch/diagrams/ctrl-overview.drawio new file mode 100644 index 00000000..7c9253ec --- /dev/null +++ b/docs/arch/diagrams/ctrl-overview.drawio @@ -0,0 +1,706 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/internal/controller/meshfederation/export.go b/internal/controller/meshfederation/export.go new file mode 100644 index 00000000..566c11f7 --- /dev/null +++ b/internal/controller/meshfederation/export.go @@ -0,0 +1,137 @@ +// Copyright Red Hat, Inc. +// +// Licensed under the Apache License, Version 2.0 (the License); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an AS IS BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package meshfederation + +import ( + "context" + "fmt" + "strings" + "sync" + + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/known/anypb" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/labels" + "sigs.k8s.io/controller-runtime/pkg/client" + + protov1alpha1 "github.com/openshift-service-mesh/federation/internal/api/federation/v1alpha1" + "github.com/openshift-service-mesh/federation/internal/pkg/discovery" +) + +// TODO(design): should we have one server per MF or single server broadcasting to all -> that would imply recognizing subscribers somehow +// TODO(design): currently we won't be able to run two meshfederations at once due to port conflict +type serverExporter struct { + server *discovery.Server + handler *exportedServicesBroadcaster +} + +type serviceExporterRegistry struct { + exporters sync.Map +} + +func (r *serviceExporterRegistry) LoadOrStore(name string, serviceExporter *exportedServicesBroadcaster) *discovery.Server { + actual, exists := r.exporters.LoadOrStore(name, serverExporter{ + server: discovery.NewServer(serviceExporter), + handler: serviceExporter, + }) + + exporter := actual.(serverExporter) + if exists { + // update settings + exporter.handler.selector = serviceExporter.selector + } + + return exporter.server +} + +var _ discovery.RequestHandler = (*exportedServicesBroadcaster)(nil) + +type exportedServicesBroadcaster struct { + client client.Client + typeUrl string + selector labels.Selector +} + +func (e exportedServicesBroadcaster) GetTypeUrl() string { + return e.typeUrl +} + +func (e exportedServicesBroadcaster) GenerateResponse() ([]*anypb.Any, error) { + services := &corev1.ServiceList{} + // TODO: rework ads(s|c) to get ctx? + // We cannot latch into ctx from owning Reconcile call, as this piece of code can be called from outside reconcile loop on client push request. + if errSvcList := e.client.List(context.TODO(), services, client.MatchingLabelsSelector{Selector: e.selector}); errSvcList != nil { + return []*anypb.Any{}, errSvcList + } + + return convert(services.Items) +} + +func convert(services []corev1.Service) ([]*anypb.Any, error) { + var federatedServices []*protov1alpha1.FederatedService + + for _, svc := range services { + var ports []*protov1alpha1.ServicePort + for _, port := range svc.Spec.Ports { + servicePort := &protov1alpha1.ServicePort{ + Name: port.Name, + Number: uint32(port.Port), + } + if port.TargetPort.IntVal != 0 { + servicePort.TargetPort = uint32(port.TargetPort.IntVal) + } + servicePort.Protocol = detectProtocol(port.Name) + ports = append(ports, servicePort) + } + federatedSvc := &protov1alpha1.FederatedService{ + Hostname: fmt.Sprintf("%s.%s.svc.cluster.local", svc.Name, svc.Namespace), + Ports: ports, + Labels: svc.Labels, + } + federatedServices = append(federatedServices, federatedSvc) + } + + return serialize(federatedServices) +} + +// TODO: check appProtocol and reject UDP +func detectProtocol(portName string) string { + if portName == "https" || strings.HasPrefix(portName, "https-") { + return "HTTPS" + } else if portName == "http" || strings.HasPrefix(portName, "http-") { + return "HTTP" + } else if portName == "http2" || strings.HasPrefix(portName, "http2-") { + return "HTTP2" + } else if portName == "grpc" || strings.HasPrefix(portName, "grpc-") { + return "GRPC" + } else if portName == "tls" || strings.HasPrefix(portName, "tls-") { + return "TLS" + } else if portName == "mongo" || strings.HasPrefix(portName, "mongo-") { + return "MONGO" + } + return "TCP" +} + +func serialize(exportedServices []*protov1alpha1.FederatedService) ([]*anypb.Any, error) { + var serializedServices []*anypb.Any + for _, exportedService := range exportedServices { + serializedExportedService := &anypb.Any{} + if err := anypb.MarshalFrom(serializedExportedService, exportedService, proto.MarshalOptions{}); err != nil { + return []*anypb.Any{}, fmt.Errorf("failed to serialize ExportedService %s to protobuf message: %w", exportedService.Hostname, err) + } + serializedServices = append(serializedServices, serializedExportedService) + } + return serializedServices, nil +} diff --git a/internal/controller/meshfederation/meshfederation_controller.go b/internal/controller/meshfederation/meshfederation_controller.go index 7a163e88..02f4176f 100644 --- a/internal/controller/meshfederation/meshfederation_controller.go +++ b/internal/controller/meshfederation/meshfederation_controller.go @@ -16,79 +16,238 @@ package meshfederation import ( "context" + "errors" "fmt" - "slices" + "reflect" + routev1 "github.com/openshift/api/route/v1" + "istio.io/client-go/pkg/apis/networking/v1alpha3" + "istio.io/client-go/pkg/apis/security/v1beta1" + "istio.io/istio/pkg/slices" + corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" machinerymeta "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - ctrl "sigs.k8s.io/controller-runtime" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/predicate" + "sigs.k8s.io/controller-runtime/pkg/reconcile" "github.com/openshift-service-mesh/federation/api/v1alpha1" "github.com/openshift-service-mesh/federation/internal/controller" + "github.com/openshift-service-mesh/federation/internal/controller/finalizer" + "github.com/openshift-service-mesh/federation/internal/pkg/discovery" + "github.com/openshift-service-mesh/federation/internal/pkg/legacy/xds" ) -// +kubebuilder:rbac:groups=federation.openshift-service-mesh.io,resources=meshfederations,verbs=get;list;watch;create;update;patch;delete -// +kubebuilder:rbac:groups=federation.openshift-service-mesh.io,resources=meshfederations/status,verbs=get;update;patch -// +kubebuilder:rbac:groups=federation.openshift-service-mesh.io,resources=meshfederations/finalizers,verbs=update +// +kubebuilder:rbac:groups=federation.openshift-service-mesh.io,resources=meshfederations;federatedservices,verbs=create;delete;get;list;patch;update;watch +// +kubebuilder:rbac:groups="",resources=services,verbs=get;watch;list +// +kubebuilder:rbac:groups=networking.istio.io,resources=gateways;serviceentries;workloadentries,verbs=get;list;create;update;patch;delete +// +kubebuilder:rbac:groups=security.istio.io,resources=peerauthentications,verbs=get;list;create;update;patch;delete;watch +// +kubebuilder:rbac:groups=networking.istio.io,resources=envoyfilters,verbs=get;list;create;update;patch;delete;watch +// +kubebuilder:rbac:groups=route.openshift.io,resources=routes;routes/custom-host,verbs=get;list;create;update;patch;delete;watch // Reconciler ensure that cluster is configured according to the spec defined in MeshFederation object. type Reconciler struct { client.Client + exporterRegistry *serviceExporterRegistry + exporter *exportedServicesBroadcaster + finalizerHandler *finalizer.Handler } var _ controller.Reconciler = (*Reconciler)(nil) func NewReconciler(c client.Client) *Reconciler { - return &Reconciler{Client: c} + return &Reconciler{ + Client: c, + exporterRegistry: &serviceExporterRegistry{}, + finalizerHandler: finalizer.NewHandler(c, "federation.openshift-service-mesh.io/mesh-federation"), + } } -func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { - log.FromContext(ctx).Info("Reconciling object", "namespace", req.Namespace) +func (r *Reconciler) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) { + logger := log.FromContext(ctx) + logger.Info("Reconciling object", "namespace", req.Namespace) meshFederation := &v1alpha1.MeshFederation{} if err := r.Client.Get(ctx, req.NamespacedName, meshFederation); err != nil { if apierrors.IsNotFound(err) { - return ctrl.Result{}, nil + return reconcile.Result{}, nil } else { - return ctrl.Result{}, fmt.Errorf("failed fetching MeshFederation %s, reason: %w", req.NamespacedName, err) + return reconcile.Result{}, fmt.Errorf("failed fetching MeshFederation %s, reason: %w", req.NamespacedName, err) } } - // TODO(meshfederation-ctrl): main logic goes here + original := meshFederation.DeepCopy() + + exportSelector, errSelector := metav1.LabelSelectorAsSelector(meshFederation.Spec.ExportRules.ServiceSelectors) + if errSelector != nil { + logger.Error(errSelector, "failed while creating service export selector") + return reconcile.Result{}, nil + } + + server := r.exporterRegistry.LoadOrStore(req.NamespacedName.String(), &exportedServicesBroadcaster{ + client: r.Client, + typeUrl: discovery.FederatedServiceTypeUrl, + selector: exportSelector, + }) + + if finalized, errFinalize := r.finalizerHandler.Finalize(ctx, meshFederation, func() error { + server.Stop() + + return nil + }); finalized { + return reconcile.Result{}, errFinalize + } + + if finalizerAlreadyExists, errAdd := r.finalizerHandler.Add(ctx, meshFederation); !finalizerAlreadyExists { + return reconcile.Result{}, errAdd + } - // Dummy success // TODO: figure out preferred approach to deal with conditions (metav1 vs conditionsv1 from Openshift) // TODO: wrap conditions handling in a pkg/funcs representing domain-oriented conditions - conditionsChanged := machinerymeta.SetStatusCondition(&meshFederation.Status.Conditions, metav1.Condition{ + server.StartOnce(ctx) + + exportedServices := &corev1.ServiceList{} + // TODO paginate? options? + // TODO handle multiple matching rules (as one is AND-ed, not OR-ed) + if errSvcList := r.Client.List(ctx, exportedServices, client.MatchingLabelsSelector{Selector: exportSelector}); errSvcList != nil { + return reconcile.Result{}, errSvcList + } + + federatedServices, errConvert := convert(exportedServices.Items) + if errConvert != nil { + // TODO: report status + logger.Error(errConvert, "failed while creating service export selector") + } + if errPush := server.PushAll(xds.PushRequest{TypeUrl: discovery.FederatedServiceTypeUrl, Resources: federatedServices}); errPush != nil { + // TODO: report status + logger.Error(errPush, "failed pushing SotW to subscribed remotes") + } + + meshFederation.Status.ExportedServices = []string{} + for _, item := range exportedServices.Items { + meshFederation.Status.ExportedServices = append(meshFederation.Status.ExportedServices, item.Namespace+"/"+item.Name) + } + + if result, errReconcile := r.subReconcile(ctx, meshFederation, exportedServices); errReconcile != nil { + return result, errReconcile + } + + // TODO capture status on all errors + _ = machinerymeta.SetStatusCondition(&meshFederation.Status.Conditions, metav1.Condition{ Type: "Available", Status: "True", Reason: "MeshFederationReconciled", Message: "Reconcile completed successfully", }) - if conditionsChanged { + if !reflect.DeepEqual(original.Status, meshFederation.Status) { conditions := slices.Clone(meshFederation.Status.Conditions) - // TODO: patch and call only when actually conditionsChanged + // TODO: patch and call only when conditions were changed _, errStatusUpdate := controller.RetryStatusUpdate(ctx, r.Client, meshFederation, func(saved *v1alpha1.MeshFederation) { + // TODO(design): do we want to keep all the services? seems convenient, but shouldn't we be concerned about the size? + saved.Status.ExportedServices = meshFederation.Status.ExportedServices for _, condition := range conditions { machinerymeta.SetStatusCondition(&saved.Status.Conditions, condition) } }) - return ctrl.Result{}, errStatusUpdate + + return reconcile.Result{}, errStatusUpdate } - return ctrl.Result{}, nil + return reconcile.Result{}, nil } // SetupWithManager sets up the controller with the Manager. -func (r *Reconciler) SetupWithManager(mgr ctrl.Manager) error { - return ctrl.NewControllerManagedBy(mgr). +func (r *Reconciler) SetupWithManager(mgr manager.Manager) error { + return builder.ControllerManagedBy(mgr). Named("mesh-federation-ctrl"). For(&v1alpha1.MeshFederation{}). + Owns(&v1alpha3.EnvoyFilter{}). + Owns(&v1beta1.PeerAuthentication{}). + Owns(&v1alpha3.Gateway{}). + Owns(&routev1.Route{}). + Watches( + // TODO(design): initial reconcile will trigger a lot of requests - one for each service. This can become expensive. + &corev1.Service{}, + handler.EnqueueRequestsFromMapFunc(r.handleServiceToExport), + builder.WithPredicates(predicate.Or(predicate.GenerationChangedPredicate{}, predicate.LabelChangedPredicate{})), + ). WithEventFilter(predicate.Or(predicate.GenerationChangedPredicate{}, controller.FinalizerChanged())). Complete(r) } + +func (r *Reconciler) subReconcile(ctx context.Context, meshFederation *v1alpha1.MeshFederation, exportedServices *corev1.ServiceList) (reconcile.Result, error) { + reconcilers := []controller.SubReconciler[*v1alpha1.MeshFederation]{ + IngressGatewayReconciler{exportedServices: exportedServices}.Reconcile, + PeerAuth, + } + + if meshFederation.Spec.IngressConfig.Type == "openshift-router" { + reconcilers = append( + reconcilers, + EnvoyFilter{exportedServices: exportedServices}.Reconcile, + RouteReconciler{exportedServices: exportedServices}.Reconcile, + ) + } + + var errs []error + var accResult reconcile.Result + + for _, subreconciler := range reconcilers { + result, errSub := subreconciler(ctx, r.Client, meshFederation) + if errSub != nil { + errs = append(errs, errSub) + } + + if result.Requeue { + accResult.Requeue = true + } + + if result.RequeueAfter > accResult.RequeueAfter { + accResult.RequeueAfter = result.RequeueAfter + } + } + + return accResult, errors.Join(errs...) +} + +func (r *Reconciler) handleServiceToExport(ctx context.Context, object client.Object) []reconcile.Request { + logger := log.FromContext(ctx) + meshFederations := &v1alpha1.MeshFederationList{} + // TODO paginate? options? + if errList := r.Client.List(ctx, meshFederations); errList != nil { + logger.Error(errList, "failed mapping Service to MeshFederations", "service", object.GetName()+"/"+object.GetNamespace()) + return nil + } + + slices.Filter(meshFederations.Items, func(federation v1alpha1.MeshFederation) bool { + return isExported(ctx, federation, object) + }) + + return slices.Map(meshFederations.Items, func(item v1alpha1.MeshFederation) reconcile.Request { + return reconcile.Request{NamespacedName: types.NamespacedName{ + Namespace: item.Namespace, + Name: item.Name, + }} + }) +} + +func isExported(ctx context.Context, federation v1alpha1.MeshFederation, object client.Object) bool { + logger := log.FromContext(ctx) + + labelSelector, errLabel := metav1.LabelSelectorAsSelector(federation.Spec.ExportRules.ServiceSelectors) + if errLabel != nil { + logger.Error(errLabel, "failed evaluating selectors", "MeshFederation", federation.GetName()+"/"+federation.GetNamespace()) + + return false + } + + return labelSelector.Matches(labels.Set(object.GetLabels())) +} diff --git a/internal/controller/meshfederation/meshfederation_controller_test.go b/internal/controller/meshfederation/meshfederation_controller_test.go index f72466a1..ea93f5be 100644 --- a/internal/controller/meshfederation/meshfederation_controller_test.go +++ b/internal/controller/meshfederation/meshfederation_controller_test.go @@ -17,8 +17,11 @@ package meshfederation_test import ( "context" "fmt" + "strconv" "time" + routev1 "github.com/openshift/api/route/v1" + "istio.io/client-go/pkg/apis/networking/v1alpha3" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" utilrand "k8s.io/apimachinery/pkg/util/rand" @@ -26,6 +29,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "github.com/openshift-service-mesh/federation/api/v1alpha1" + "github.com/openshift-service-mesh/federation/internal/pkg/meta" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" @@ -57,16 +61,26 @@ var _ = Describe("Configuring Mesh Federation in the cluster", func() { Expect(err).ToNot(HaveOccurred()) }) - AfterEach(func() { - envTest.DeleteAll(testNs) + AfterEach(func(ctx context.Context) { + envTest.DeleteAll(ctx, testNs) }) - When("new MeshFederation is created", func() { + When("Using Istio Ingress Config", func() { - It("should update condition on successful reconciliation", func(ctx context.Context) { + It("should export services using defined label matcher", func(ctx context.Context) { // given - federationName := "test-west" - meshFederation := createMeshFederation(federationName, testNsName) + services, _ := generateServices(testNsName, "hello", 5) + + for _, service := range services { + _, errCreate := controllerutil.CreateOrUpdate(ctx, envTest.Client, service, func() error { + // noop + return nil + }) + Expect(errCreate).ToNot(HaveOccurred()) + } + + // when + meshFederation := createMeshFederation("local", testNsName) // Defaulting is not working correctly yet, therefore explicit settings for the .spec // see: https://github.com/openshift-service-mesh/federation/pull/155 meshFederation.Spec.Network = "west" @@ -93,10 +107,18 @@ var _ = Describe("Configuring Mesh Federation in the cluster", func() { }) Expect(err).ToNot(HaveOccurred()) + defer func() { + cleanup := append([]k8sclient.Object(nil), meshFederation) + for _, service := range services { + cleanup = append(cleanup, service) + } + envTest.DeleteAll(ctx, cleanup...) + }() + // then Eventually(func(g Gomega, ctx context.Context) error { - currentMeshFederation := createMeshFederation(federationName, testNsName) - if errGet := envTest.Get(ctx, k8sclient.ObjectKeyFromObject(currentMeshFederation), currentMeshFederation); errGet != nil { + currentMeshFederation := &v1alpha1.MeshFederation{} + if errGet := envTest.Get(ctx, k8sclient.ObjectKeyFromObject(meshFederation), currentMeshFederation); errGet != nil { return errGet } @@ -105,15 +127,120 @@ var _ = Describe("Configuring Mesh Federation in the cluster", func() { "Expects MeshFederationReconciled condition to have status True", ) + g.Expect(currentMeshFederation.Status.ExportedServices).To(ConsistOf(testNsName + "/hello-2")) + + envoyFilters := &v1alpha3.EnvoyFilterList{} + if errList := envTest.List(ctx, envoyFilters); errList != nil { + return errList + } + g.Expect(envoyFilters.Items).To(BeEmpty()) + + routes := &routev1.RouteList{} + if errList := envTest.List(ctx, routes); errList != nil { + return errList + } + g.Expect(routes.Items).To(BeEmpty()) + return nil }).WithContext(ctx). Within(4 * time.Second). ProbeEvery(250 * time.Millisecond). Should(Succeed()) + }) + }) + + When("Using Openshift Router as Ingress", func() { + + It("should export services using defined match expression matcher", func(ctx context.Context) { + // given + + meshFederation := createMeshFederation("local", testNsName) + // Defaulting is not working correctly yet, therefore explicit settings for the .spec + // see: https://github.com/openshift-service-mesh/federation/pull/155 + meshFederation.Spec.Network = "west" + meshFederation.Spec.ControlPlaneNamespace = testNsName + meshFederation.Spec.IngressConfig.Type = "openshift-router" + meshFederation.Spec.IngressConfig.GatewayConfig.Selector = map[string]string{ + "security.istio.io/tlsMode": "istio", + } + meshFederation.Spec.IngressConfig.GatewayConfig.PortConfig = v1alpha1.PortConfig{ + Name: "tls-passthrough", + Number: 15443, + } + meshFederation.Spec.ExportRules = &v1alpha1.ExportRules{ + ServiceSelectors: &metav1.LabelSelector{ + MatchExpressions: []metav1.LabelSelectorRequirement{ + { + Key: "app", + Operator: metav1.LabelSelectorOpIn, + Values: []string{"hello-1", "hello-4"}, + }, + }, + }, + } + + _, err := controllerutil.CreateOrUpdate(ctx, envTest.Client, meshFederation, func() error { + return nil + }) + Expect(err).ToNot(HaveOccurred()) + + // when + var services []*corev1.Service + defer func() { + cleanup := append([]k8sclient.Object(nil), meshFederation) + for _, service := range services { + cleanup = append(cleanup, service) + } + envTest.DeleteAll(ctx, cleanup...) + }() + services, _ = generateServices(testNsName, "hello", 5) + + for _, service := range services { + _, errCreate := controllerutil.CreateOrUpdate(ctx, envTest.Client, service, func() error { + // noop + return nil + }) + Expect(errCreate).ToNot(HaveOccurred()) + } + + // then + Eventually(func(g Gomega, ctx context.Context) error { + currentMeshFederation := &v1alpha1.MeshFederation{} + if errGet := envTest.Get(ctx, k8sclient.ObjectKeyFromObject(meshFederation), currentMeshFederation); errGet != nil { + return errGet + } + + g.Expect(currentMeshFederation.Status.Conditions).To( + ContainElement(WithTransform(extractStatusOf("MeshFederationReconciled"), Equal(metav1.ConditionTrue))), + "Expects MeshFederationReconciled condition to have status True", + ) + + g.Expect(currentMeshFederation.Status.ExportedServices).To(ConsistOf(testNsName+"/hello-1", testNsName+"/hello-4")) + + envoyFilters := &v1alpha3.EnvoyFilterList{} + if errList := envTest.List(ctx, envoyFilters); errList != nil { + return errList + } + g.Expect(envoyFilters.Items).To(Not(BeEmpty())) + + routes := &routev1.RouteList{} + if errList := envTest.List(ctx, routes); errList != nil { + return errList + } + g.Expect(routes.Items).To(Not(BeEmpty())) + + return nil + }).WithContext(ctx). + Within(4 * time.Second). + ProbeEvery(250 * time.Millisecond). + Should(Succeed()) + + }) }) + // TODO: follow-up with more tests }) // createMeshFederation initializes MeshFederation struct with basic metadata. @@ -130,6 +257,47 @@ func createMeshFederation(name, nsName string) *v1alpha1.MeshFederation { } } +func generateServices(ns, svcPrefix string, count int) ([]*corev1.Service, []string) { + var services []*corev1.Service + var names []string + + for i := 0; i < count; i++ { + name := fmt.Sprintf("%s-%s", svcPrefix, strconv.Itoa(i)) + names = append(names, ns+"/"+name) + svc := createSvc(name, ns) + meta.AddLabel(svc, "app", "hello-"+strconv.Itoa(i)) + services = append(services, svc) + } + + return services, names +} + +func createSvc(name, ns string) *corev1.Service { + return &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: ns, + }, + Spec: corev1.ServiceSpec{ + Selector: map[string]string{ + "app": "hello", + }, + Ports: []corev1.ServicePort{ + { + Name: "tcp", + Port: 42, + Protocol: corev1.ProtocolTCP, + }, + { + Name: "udp", + Port: 42, + Protocol: corev1.ProtocolUDP, + }, + }, + }, + } +} + func extractStatusOf(reason string) func(c metav1.Condition) metav1.ConditionStatus { return func(c metav1.Condition) metav1.ConditionStatus { if c.Reason == reason { diff --git a/internal/controller/meshfederation/reconcilers.go b/internal/controller/meshfederation/reconcilers.go new file mode 100644 index 00000000..50fa8c00 --- /dev/null +++ b/internal/controller/meshfederation/reconcilers.go @@ -0,0 +1,293 @@ +// Copyright Red Hat, Inc. +// +// Licensed under the Apache License, Version 2.0 (the License); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an AS IS BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package meshfederation + +import ( + "context" + "errors" + "fmt" + + routev1 "github.com/openshift/api/route/v1" + "google.golang.org/protobuf/types/known/structpb" + istionetv1alpha3 "istio.io/api/networking/v1alpha3" + securityv1beta1 "istio.io/api/security/v1beta1" + typev1beta1 "istio.io/api/type/v1beta1" + "istio.io/client-go/pkg/apis/networking/v1alpha3" + "istio.io/client-go/pkg/apis/security/v1beta1" + "istio.io/istio/pkg/util/protomarshal" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/intstr" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" + + "github.com/openshift-service-mesh/federation/api/v1alpha1" +) + +type objReconciler[T client.Object, Spec any] struct { + Obj T + DesiredSpec func() Spec +} + +func PeerAuth(ctx context.Context, cl client.Client, meshFederation *v1alpha1.MeshFederation) (ctrl.Result, error) { + desiredSpec := func() securityv1beta1.PeerAuthentication { + return securityv1beta1.PeerAuthentication{ + Selector: &typev1beta1.WorkloadSelector{ + MatchLabels: map[string]string{ + "app.kubernetes.io/name": "federation-controller", + }, + }, + Mtls: &securityv1beta1.PeerAuthentication_MutualTLS{ + Mode: securityv1beta1.PeerAuthentication_MutualTLS_STRICT, + }, + } + } + + peerAuth := &v1beta1.PeerAuthentication{ + ObjectMeta: metav1.ObjectMeta{ + Name: "fds-strict-mtls", + Namespace: meshFederation.Spec.ControlPlaneNamespace, + Labels: map[string]string{"federation.openshift-service-mesh.io/peer": "todo"}, + }, + Spec: desiredSpec(), + } + + peerAuthReconciler := objReconciler[*v1beta1.PeerAuthentication, securityv1beta1.PeerAuthentication]{ + Obj: peerAuth, + DesiredSpec: func() securityv1beta1.PeerAuthentication { + return desiredSpec() + }, + } + + pa := peerAuthReconciler.Obj + _, err := ctrl.CreateOrUpdate(ctx, cl, peerAuthReconciler.Obj, func() error { + pa.Spec = peerAuthReconciler.DesiredSpec() + return controllerutil.SetControllerReference(meshFederation, pa, cl.Scheme()) + }) + + return ctrl.Result{}, err +} + +type EnvoyFilter struct { + exportedServices *corev1.ServiceList +} + +func (e EnvoyFilter) Reconcile(ctx context.Context, cl client.Client, meshFederation *v1alpha1.MeshFederation) (ctrl.Result, error) { + + desiredSpec := func(svcName, svcNamespace string, port int32) istionetv1alpha3.EnvoyFilter { + buildPatchStruct := func(config string) *structpb.Struct { + val := &structpb.Struct{} + if err := protomarshal.UnmarshalString(config, val); err != nil { + fmt.Printf("error unmarshalling envoyfilter config %q: %v", config, err) + } + return val + } + + routerCompatibleSNI := func(svcName, svcNs string, port uint32) string { + return fmt.Sprintf("%s-%d.%s.svc.cluster.local", svcName, port, svcNs) + } + + return istionetv1alpha3.EnvoyFilter{ + WorkloadSelector: &istionetv1alpha3.WorkloadSelector{ + Labels: meshFederation.Spec.IngressConfig.GatewayConfig.Selector, + }, + ConfigPatches: []*istionetv1alpha3.EnvoyFilter_EnvoyConfigObjectPatch{{ + ApplyTo: istionetv1alpha3.EnvoyFilter_FILTER_CHAIN, + Match: &istionetv1alpha3.EnvoyFilter_EnvoyConfigObjectMatch{ + ObjectTypes: &istionetv1alpha3.EnvoyFilter_EnvoyConfigObjectMatch_Listener{ + Listener: &istionetv1alpha3.EnvoyFilter_ListenerMatch{ + Name: fmt.Sprintf("0.0.0.0_%d", meshFederation.Spec.IngressConfig.GatewayConfig.PortConfig.Number), + FilterChain: &istionetv1alpha3.EnvoyFilter_ListenerMatch_FilterChainMatch{ + Sni: fmt.Sprintf("outbound_.%d_._.%s.%s.svc.cluster.local", port, svcName, svcNamespace), + }, + }, + }, + }, + Patch: &istionetv1alpha3.EnvoyFilter_Patch{ + Operation: istionetv1alpha3.EnvoyFilter_Patch_MERGE, + Value: buildPatchStruct(fmt.Sprintf(`{"filter_chain_match":{"server_names":["%s"]}}`, routerCompatibleSNI(svcName, svcNamespace, uint32(port)))), + }, + }}, + } + } + + envoyFilter := func(svcName, svcNamespace string, port int32) objReconciler[*v1alpha3.EnvoyFilter, istionetv1alpha3.EnvoyFilter] { + result := &v1alpha3.EnvoyFilter{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("sni-%s-%s-%d", svcName, svcNamespace, port), + Namespace: meshFederation.Spec.ControlPlaneNamespace, + Labels: map[string]string{ + "federation.openshift-service-mesh.io/peer": "todo", + }, + }, + } + + result.Spec = desiredSpec(svcName, svcNamespace, port) + + return objReconciler[*v1alpha3.EnvoyFilter, istionetv1alpha3.EnvoyFilter]{ + Obj: result, + DesiredSpec: func() istionetv1alpha3.EnvoyFilter { + return desiredSpec(svcName, svcNamespace, port) + }, + } + } + + envoyFilters := []objReconciler[*v1alpha3.EnvoyFilter, istionetv1alpha3.EnvoyFilter]{envoyFilter(fmt.Sprintf("federation-discovery-service-%s", meshFederation.Name), "istio-system", 15080)} + + for _, svc := range e.exportedServices.Items { + for _, port := range svc.Spec.Ports { + envoyFilters = append(envoyFilters, envoyFilter(svc.Name, svc.Namespace, port.Port)) + } + } + + var errs []error + for _, desiredState := range envoyFilters { + ef := desiredState.Obj + _, err := ctrl.CreateOrUpdate(ctx, cl, ef, func() error { + ef.Spec = desiredState.DesiredSpec() + return controllerutil.SetControllerReference(meshFederation, ef, cl.Scheme()) + }) + + if err != nil { + errs = append(errs, fmt.Errorf("failed to create or update envoy filter %s: %w", ef.Name, err)) + } + } + + return ctrl.Result{}, errors.Join(errs...) +} + +type RouteReconciler struct { + exportedServices *corev1.ServiceList +} + +func (r RouteReconciler) Reconcile(ctx context.Context, cl client.Client, meshFederation *v1alpha1.MeshFederation) (ctrl.Result, error) { + desiredSpec := func(svcName, svcNamespace string, port int32) routev1.RouteSpec { + return routev1.RouteSpec{ + Host: fmt.Sprintf("%s-%d.%s.svc.cluster.local", svcName, port, svcNamespace), + To: routev1.RouteTargetReference{ + Kind: "Service", + Name: "federation-ingress-gateway", + }, + Port: &routev1.RoutePort{ + TargetPort: intstr.FromString(meshFederation.Spec.IngressConfig.GatewayConfig.PortConfig.Name), + }, + TLS: &routev1.TLSConfig{ + Termination: routev1.TLSTerminationPassthrough, + }, + } + } + + createRoute := func(svcName, svcNamespace string, port int32) objReconciler[*routev1.Route, routev1.RouteSpec] { + result := &routev1.Route{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("%s-%s-%d-to-federation-ingress-gateway", svcName, svcNamespace, port), + Namespace: meshFederation.Spec.ControlPlaneNamespace, + Labels: map[string]string{"federation.openshift-service-mesh.io/peer": "todo"}, + }, + } + + result.Spec = desiredSpec(svcName, svcNamespace, port) + + return objReconciler[*routev1.Route, routev1.RouteSpec]{ + Obj: result, + DesiredSpec: func() routev1.RouteSpec { + return desiredSpec(svcName, svcNamespace, port) + }, + } + } + + routes := []objReconciler[*routev1.Route, routev1.RouteSpec]{ + createRoute(fmt.Sprintf("federation-discovery-service-%s", meshFederation.Name), "istio-system", 15080), + } + + for _, svc := range r.exportedServices.Items { + for _, port := range svc.Spec.Ports { + routes = append(routes, createRoute(svc.Name, svc.Namespace, port.Port)) + } + } + + var errs []error + for _, route := range routes { + rt := route.Obj + _, err := ctrl.CreateOrUpdate(ctx, cl, rt, func() error { + rt.Spec = route.DesiredSpec() + return controllerutil.SetControllerReference(meshFederation, rt, cl.Scheme()) + }) + + if err != nil { + errs = append(errs, fmt.Errorf("failed to create or update route %s: %w", rt.Name, err)) + } + } + + return ctrl.Result{}, errors.Join(errs...) +} + +type IngressGatewayReconciler struct { + exportedServices *corev1.ServiceList +} + +func (i IngressGatewayReconciler) Reconcile(ctx context.Context, cl client.Client, meshFederation *v1alpha1.MeshFederation) (ctrl.Result, error) { + hosts := []string{fmt.Sprintf("federation-discovery-service-%s.%s.svc.cluster.local", meshFederation.Name, meshFederation.Namespace)} + for _, svc := range i.exportedServices.Items { + hosts = append(hosts, fmt.Sprintf("%s.%s.svc.cluster.local", svc.Name, svc.Namespace)) + } + + desiredSpec := func() istionetv1alpha3.Gateway { + return istionetv1alpha3.Gateway{ + Selector: meshFederation.Spec.IngressConfig.GatewayConfig.Selector, + Servers: []*istionetv1alpha3.Server{{ + Hosts: hosts, + Port: &istionetv1alpha3.Port{ + Number: meshFederation.Spec.IngressConfig.GatewayConfig.PortConfig.Number, + Name: meshFederation.Spec.IngressConfig.GatewayConfig.PortConfig.Name, + Protocol: "TLS", + }, + Tls: &istionetv1alpha3.ServerTLSSettings{ + Mode: istionetv1alpha3.ServerTLSSettings_AUTO_PASSTHROUGH, + }, + }}, + } + } + + createGateway := func() objReconciler[*v1alpha3.Gateway, istionetv1alpha3.Gateway] { + result := &v1alpha3.Gateway{ + ObjectMeta: metav1.ObjectMeta{ + Name: "federation-ingress-gateway", + Namespace: meshFederation.Spec.ControlPlaneNamespace, + Labels: map[string]string{"federation.openshift-service-mesh.io/peer": "todo"}, + }, + } + + result.Spec = desiredSpec() + + return objReconciler[*v1alpha3.Gateway, istionetv1alpha3.Gateway]{ + Obj: result, + DesiredSpec: func() istionetv1alpha3.Gateway { + return desiredSpec() + }, + } + } + + desiredGateway := createGateway() + + gateway := desiredGateway.Obj + _, err := ctrl.CreateOrUpdate(ctx, cl, gateway, func() error { + gateway.Spec = desiredGateway.DesiredSpec() + return controllerutil.SetControllerReference(meshFederation, gateway, cl.Scheme()) + }) + + return ctrl.Result{}, err +} diff --git a/internal/controller/meshfederation/suite_test.go b/internal/controller/meshfederation/suite_test.go index 03f4ddbf..1234f31f 100644 --- a/internal/controller/meshfederation/suite_test.go +++ b/internal/controller/meshfederation/suite_test.go @@ -50,6 +50,7 @@ var _ = SynchronizedBeforeSuite(func(ctx context.Context) { newMeshFederationCtrl := func(cl client.Client) controller.Reconciler { return meshfederation.NewReconciler(cl) } + envTest, cancelFunc = k8senvtest.StartWithControllers(GinkgoT(), newMeshFederationCtrl) }, func() {}) diff --git a/internal/controller/retry.go b/internal/controller/retry.go index cd09e757..725d9934 100644 --- a/internal/controller/retry.go +++ b/internal/controller/retry.go @@ -26,7 +26,7 @@ import ( // logic for updating resource object. type MutateFn[T client.Object] func(saved T) -// ClientCallFn defines what client.Client operation on a given object should be performed. +// ClientCallFn defines what client.client operation on a given object should be performed. type ClientCallFn[T client.Object] func(ctx context.Context, cli client.Client, obj T) error // RetryUpdate attempts to update a specified Kubernetes resource and retries on conflict. diff --git a/internal/controller/scheme.go b/internal/controller/scheme.go index b69f2334..45a04765 100644 --- a/internal/controller/scheme.go +++ b/internal/controller/scheme.go @@ -15,6 +15,9 @@ package controller import ( + routev1 "github.com/openshift/api/route/v1" + networkingv1alpha3 "istio.io/client-go/pkg/apis/networking/v1alpha3" + securityv1beta1 "istio.io/client-go/pkg/apis/security/v1beta1" "k8s.io/apimachinery/pkg/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime" clientgoscheme "k8s.io/client-go/kubernetes/scheme" @@ -25,5 +28,8 @@ import ( func MustAddToScheme(scheme *runtime.Scheme) { utilruntime.Must(clientgoscheme.AddToScheme(scheme)) utilruntime.Must(v1alpha1.AddToScheme(scheme)) + utilruntime.Must(networkingv1alpha3.AddToScheme(scheme)) + utilruntime.Must(securityv1beta1.AddToScheme(scheme)) + utilruntime.Must(routev1.AddToScheme(scheme)) // +kubebuilder:scaffold:scheme } diff --git a/internal/controller/types.go b/internal/controller/types.go index e1f371ac..8dd319d9 100644 --- a/internal/controller/types.go +++ b/internal/controller/types.go @@ -18,6 +18,7 @@ import ( "context" ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" ) // Reconciler defines required functions for each @@ -25,3 +26,5 @@ type Reconciler interface { Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) SetupWithManager(mgr ctrl.Manager) error } + +type SubReconciler[T client.Object] func(ctx context.Context, cl client.Client, parent T) (ctrl.Result, error) diff --git a/internal/pkg/discovery/adss_handler.go b/internal/pkg/discovery/adss_handler.go new file mode 100644 index 00000000..591d74e7 --- /dev/null +++ b/internal/pkg/discovery/adss_handler.go @@ -0,0 +1,198 @@ +// Copyright Red Hat, Inc. +// +// Licensed under the Apache License, Version 2.0 (the License); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an AS IS BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package discovery + +import ( + "context" + "fmt" + "math" + "os" + "strconv" + "sync" + "sync/atomic" + "time" + + envoycfgcorev3 "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" + discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "google.golang.org/protobuf/types/known/anypb" + istiolog "istio.io/istio/pkg/log" + + "github.com/openshift-service-mesh/federation/internal/pkg/legacy/xds" +) + +var log = istiolog.RegisterScope("adss", "Aggregated Discovery Service Server") + +// DeltaDiscoveryStream is a server interface for XDS. +// DeltaDiscoveryStream is a server interface for Delta XDS. +type ( + DiscoveryStream = discovery.AggregatedDiscoveryService_StreamAggregatedResourcesServer + DeltaDiscoveryStream = discovery.AggregatedDiscoveryService_DeltaAggregatedResourcesServer +) + +// adsServer implements Envoy's AggregatedDiscoveryService. +type adsServer struct { + handlers map[string]RequestHandler + subscribers sync.Map + nextSubscriberID atomic.Uint64 +} + +// subscriber represents a client that is subscribed to XDS resources. +type subscriber struct { + id uint64 + stream DiscoveryStream + closeStream func() +} + +var _ discovery.AggregatedDiscoveryServiceServer = (*adsServer)(nil) + +func (adss *adsServer) StreamAggregatedResources(downstream DiscoveryStream) error { + log.Info("New subscriber connected") + ctx, closeStream := context.WithCancel(downstream.Context()) + + sub := &subscriber{ + id: adss.nextSubscriberID.Add(1), + stream: downstream, + closeStream: closeStream, + } + + adss.subscribers.Store(sub.id, sub) + + go adss.recvFromStream(int64(sub.id), downstream) + + <-ctx.Done() + return nil +} + +// DeltaAggregatedResources is not implemented. +func (adss *adsServer) DeltaAggregatedResources(downstream DeltaDiscoveryStream) error { + return status.Errorf(codes.Unimplemented, "Not Implemented") +} + +var ( + maxUintDigits = len(strconv.FormatUint(uint64(math.MaxUint64), 10)) + subIDFmtStr = `%0` + strconv.Itoa(maxUintDigits) + `d` +) + +// recvFromStream receives discovery requests from the subscriber. +func (adss *adsServer) recvFromStream(id int64, downstream DiscoveryStream) { + log.Infof("Received from stream %d", id) + for { + discoveryRequest, err := downstream.Recv() + if err != nil { + log.Errorf("error while recv discovery request from subscriber %s: %v", fmt.Sprintf(subIDFmtStr, id), err) + break + } + log.Infof("Got discovery request from subscriber %s: %v", fmt.Sprintf(subIDFmtStr, id), discoveryRequest) + if discoveryRequest.GetVersionInfo() == "" { + resources, err := adss.generateResources(discoveryRequest.GetTypeUrl()) + if err != nil { + // TODO: Do not push empty resources if there was an error during resource generation, + // because that may cause unintentional removal of the subscribed resources. + log.Errorf("failed to generate resources of type %s: %v", discoveryRequest.GetTypeUrl(), err) + } + log.Infof("Sending initial config snapshot for type %s: %s", discoveryRequest.GetTypeUrl(), resources) + if err := sendToStream(downstream, discoveryRequest.GetTypeUrl(), resources, strconv.FormatInt(time.Now().Unix(), 10)); err != nil { + log.Errorf("failed to send initial config snapshot for type %s: %v", discoveryRequest.GetTypeUrl(), err) + } + } + } +} + +func (adss *adsServer) generateResources(typeUrl string) ([]*anypb.Any, error) { + handler, found := adss.handlers[typeUrl] + if !found { + return []*anypb.Any{}, nil + } + + log.Infof("Generating config snapshot for type %s", typeUrl) + resources, err := handler.GenerateResponse() + if err != nil { + log.Errorf("failed generating resources for type %s: %v", typeUrl, err) + return []*anypb.Any{}, fmt.Errorf("failed generating resources for type %s: %w", typeUrl, err) + } + return resources, nil +} + +// sendToStream sends XDS resources to the subscriber. +func sendToStream(downstream DiscoveryStream, typeUrl string, xdsResources []*anypb.Any, version string) error { + if err := downstream.Send(&discovery.DiscoveryResponse{ + TypeUrl: typeUrl, + VersionInfo: version, + Resources: xdsResources, + ControlPlane: &envoycfgcorev3.ControlPlane{ + Identifier: os.Getenv("POD_NAME"), + }, + Nonce: version, + }); err != nil { + return err + } + return nil +} + +func (adss *adsServer) subscribersLen() int { + length := 0 + adss.subscribers.Range(func(_, _ interface{}) bool { + length++ + return true + }) + return length +} + +func (adss *adsServer) Push(pushRequest xds.PushRequest) error { + if adss.subscribersLen() == 0 { + log.Infof("Skip pushing XDS resources for request [type=%s,resources=%v] as there are no subscribers", pushRequest.TypeUrl, pushRequest.Resources) + return nil + } + + resources := pushRequest.Resources + if resources == nil { + var err error + resources, err = adss.generateResources(pushRequest.TypeUrl) + if err != nil { + return err + } + } + + log.Infof("Pushing discovery response to subscribers: [type=%s,resources=%v]", pushRequest.TypeUrl, resources) + adss.subscribers.Range(func(key, value any) bool { + log.Infof("Sending to subscriber %s", fmt.Sprintf(subIDFmtStr, key.(uint64))) + if err := value.(*subscriber).stream.Send(&discovery.DiscoveryResponse{ + TypeUrl: pushRequest.TypeUrl, + VersionInfo: strconv.FormatInt(time.Now().Unix(), 10), // TODO improve version computation + Resources: resources, + ControlPlane: &envoycfgcorev3.ControlPlane{ + Identifier: os.Getenv("POD_NAME"), + }, + }); err != nil { + log.Errorf("error sending XDS resources: %v", err) + value.(*subscriber).closeStream() + adss.subscribers.Delete(key) + } + return true + }) + return nil +} + +// closeSubscribers closes all active subscriber streams. +func (adss *adsServer) closeSubscribers() { + adss.subscribers.Range(func(key, value any) bool { + log.Infof("Closing stream of subscriber %s", fmt.Sprintf(subIDFmtStr, key.(uint64))) + value.(*subscriber).closeStream() + adss.subscribers.Delete(key) + return true + }) +} diff --git a/internal/pkg/discovery/grpc_server.go b/internal/pkg/discovery/grpc_server.go new file mode 100644 index 00000000..bc9c821b --- /dev/null +++ b/internal/pkg/discovery/grpc_server.go @@ -0,0 +1,118 @@ +// Copyright Red Hat, Inc. +// +// Licensed under the Apache License, Version 2.0 (the License); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an AS IS BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package discovery + +import ( + "context" + "fmt" + "net" + "time" + + discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" + "google.golang.org/grpc" + + "github.com/openshift-service-mesh/federation/internal/pkg/legacy/xds" +) + +const restartDelay = 2 * time.Second + +type Server struct { + grpc *grpc.Server + ads *adsServer + running bool +} + +func NewServer(handlers ...RequestHandler) *Server { + grpcServer := grpc.NewServer() + handlerMap := make(map[string]RequestHandler) + for _, g := range handlers { + handlerMap[g.GetTypeUrl()] = g + } + ads := &adsServer{ + handlers: handlerMap, + } + + discovery.RegisterAggregatedDiscoveryServiceServer(grpcServer, ads) + + return &Server{ + grpc: grpcServer, + ads: ads, + } +} + +func (s *Server) PushAll(pushRequest xds.PushRequest) error { + return s.ads.Push(pushRequest) +} + +// Run starts the gRPC server and awaits for push requests to broadcast configuration. +func (s *Server) Run(ctx context.Context) error { + listener, err := net.Listen("tcp", ":15080") + if err != nil { + return fmt.Errorf("failed creating TCP listener: %w", err) + } + + // TODO(fdsserver): rethink how to handle graceful stop + defer s.Stop() + + ctx, cancel := context.WithCancel(ctx) + + go func() { + s.running = true // TODO(fdsserver): is that safe enough to assume? + if err := s.grpc.Serve(listener); err != nil { + cancel() + } + }() + + <-ctx.Done() + + return nil +} + +func (s *Server) IsRunning() bool { + return s.running +} + +// StartOnce will start the server if it's not already running. +// Returns true if it has been started or false if it's been already running. +func (s *Server) StartOnce(ctx context.Context) bool { + if !s.IsRunning() { + go func() { + for { + if err := s.Run(ctx); err != nil { + log.Errorf("server encountered an error: %v, restarting in %s...", err, restartDelay) + + select { + case <-time.After(restartDelay): + case <-ctx.Done(): + log.Info("context canceled, stopping restart attempts") + return + } + } else { + break // running, break out of the retry loop + } + } + }() + + return true + } + + return false +} + +func (s *Server) Stop() { + s.ads.closeSubscribers() + s.grpc.GracefulStop() + s.running = false +} diff --git a/internal/pkg/discovery/request_handler.go b/internal/pkg/discovery/request_handler.go new file mode 100644 index 00000000..76c152b7 --- /dev/null +++ b/internal/pkg/discovery/request_handler.go @@ -0,0 +1,26 @@ +// Copyright Red Hat, Inc. +// +// Licensed under the Apache License, Version 2.0 (the License); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an AS IS BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package discovery + +import "google.golang.org/protobuf/types/known/anypb" + +// RequestHandler generates XDS response for requests from subscribers or push requests triggered by other events. +type RequestHandler interface { + // GetTypeUrl returns supported XDS type. + // An implementation can support only one XDS type. + GetTypeUrl() string + // GenerateResponse returns generated resources for requested XDS type. + GenerateResponse() ([]*anypb.Any, error) +} diff --git a/internal/pkg/discovery/types.go b/internal/pkg/discovery/types.go new file mode 100644 index 00000000..e0875ba1 --- /dev/null +++ b/internal/pkg/discovery/types.go @@ -0,0 +1,28 @@ +// Copyright Red Hat, Inc. +// +// Licensed under the Apache License, Version 2.0 (the License); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an AS IS BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package discovery + +import "google.golang.org/protobuf/types/known/anypb" + +const FederatedServiceTypeUrl = "federation.openshift-service-mesh.io/v1alpha1/FederatedService" + +// PushRequest notifies ADS server that it should send DiscoveryResponse to subscribers. +type PushRequest struct { + // TypeUrl specifies DiscoveryResponse type and must always be set. + TypeUrl string + // Resources contains data to be sent to subscribers. + // If it is not set, ADS server will trigger proper request handler to generate resources of given type. + Resources []*anypb.Any +} diff --git a/internal/pkg/legacy/fds/exported_service_generator.go b/internal/pkg/legacy/fds/exported_service_generator.go index 736a28bc..e097d8a7 100644 --- a/internal/pkg/legacy/fds/exported_service_generator.go +++ b/internal/pkg/legacy/fds/exported_service_generator.go @@ -44,7 +44,7 @@ func NewExportedServicesGenerator(cfg config.Federation, serviceLister v1.Servic } func (g *ExportedServicesGenerator) GetTypeUrl() string { - return xds.ExportedServiceTypeUrl + return xds.FederatedServiceTypeUrl } func (g *ExportedServicesGenerator) GenerateResponse() ([]*anypb.Any, error) { diff --git a/internal/pkg/legacy/informer/service_export_event_handler.go b/internal/pkg/legacy/informer/service_export_event_handler.go index 2800c486..394ba60c 100644 --- a/internal/pkg/legacy/informer/service_export_event_handler.go +++ b/internal/pkg/legacy/informer/service_export_event_handler.go @@ -84,5 +84,5 @@ func (w *ServiceExportEventHandler) triggerXDSPush() { w.mcpPushRequests <- xds.PushRequest{TypeUrl: xds.GatewayTypeUrl} w.mcpPushRequests <- xds.PushRequest{TypeUrl: xds.EnvoyFilterTypeUrl} w.mcpPushRequests <- xds.PushRequest{TypeUrl: xds.RouteTypeUrl} - w.fdsPushRequests <- xds.PushRequest{TypeUrl: xds.ExportedServiceTypeUrl} + w.fdsPushRequests <- xds.PushRequest{TypeUrl: xds.FederatedServiceTypeUrl} } diff --git a/internal/pkg/legacy/informer/service_export_event_handler_test.go b/internal/pkg/legacy/informer/service_export_event_handler_test.go index 5fe677f5..06ac9433 100644 --- a/internal/pkg/legacy/informer/service_export_event_handler_test.go +++ b/internal/pkg/legacy/informer/service_export_event_handler_test.go @@ -164,7 +164,7 @@ func TestXDSTriggers(t *testing.T) { checkChannel(t, mcpPushRequests, xds.GatewayTypeUrl, tc.isTimeoutExpected) checkChannel(t, mcpPushRequests, xds.EnvoyFilterTypeUrl, tc.isTimeoutExpected) checkChannel(t, mcpPushRequests, xds.RouteTypeUrl, tc.isTimeoutExpected) - checkChannel(t, fdsPushRequests, xds.ExportedServiceTypeUrl, tc.isTimeoutExpected) + checkChannel(t, fdsPushRequests, xds.FederatedServiceTypeUrl, tc.isTimeoutExpected) }) } } diff --git a/internal/pkg/legacy/xds/types.go b/internal/pkg/legacy/xds/types.go index a1b4423c..a2b4843d 100644 --- a/internal/pkg/legacy/xds/types.go +++ b/internal/pkg/legacy/xds/types.go @@ -15,7 +15,7 @@ package xds const ( - ExportedServiceTypeUrl = "federation.openshift-service-mesh.io/v1alpha1/ExportedService" + FederatedServiceTypeUrl = "federation.openshift-service-mesh.io/v1alpha1/FederatedService" DestinationRuleTypeUrl = "networking.istio.io/v1alpha3/DestinationRule" GatewayTypeUrl = "networking.istio.io/v1alpha3/Gateway" ServiceEntryTypeUrl = "networking.istio.io/v1alpha3/ServiceEntry" diff --git a/internal/pkg/meta/labels.go b/internal/pkg/meta/labels.go new file mode 100644 index 00000000..4705cb33 --- /dev/null +++ b/internal/pkg/meta/labels.go @@ -0,0 +1,27 @@ +// Copyright Red Hat, Inc. +// +// Licensed under the Apache License, Version 2.0 (the License); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an AS IS BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package meta + +import metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + +func AddLabel(obj metav1.Object, key, value string) { + existingLabels := obj.GetLabels() + if existingLabels == nil { + existingLabels = make(map[string]string) + } + + existingLabels[key] = value + obj.SetLabels(existingLabels) +} diff --git a/test/k8senvtest/cleaner.go b/test/k8senvtest/cleaner.go index eba2afd6..40bc38b3 100644 --- a/test/k8senvtest/cleaner.go +++ b/test/k8senvtest/cleaner.go @@ -59,10 +59,10 @@ func CreateCleaner(k8sClient client.Client, config *rest.Config, timeout, interv } } -func (c *Cleaner) DeleteAll(objects ...client.Object) { //nolint:gocognit //reason it is what is ;) +func (c *Cleaner) DeleteAll(ctx context.Context, objects ...client.Object) { //nolint:gocognit //reason it is what is ;) for _, o := range objects { obj := o - Expect(client.IgnoreNotFound(c.client.Delete(context.Background(), obj))).Should(Succeed()) + Expect(client.IgnoreNotFound(c.client.Delete(ctx, obj))).Should(Succeed()) if namespace, ok := obj.(*corev1.Namespace); ok { // Normally the kube-controller-manager would handle finalization @@ -79,14 +79,14 @@ func (c *Cleaner) DeleteAll(objects ...client.Object) { //nolint:gocognit //reas u := unstructured.Unstructured{} u.SetGroupVersionKind(gvk) - errDelete := c.client.DeleteAllOf(context.Background(), &u, client.InNamespace(namespace.Name)) + errDelete := c.client.DeleteAllOf(ctx, &u, client.InNamespace(namespace.Name)) Expect(client.IgnoreNotFound(ignoreMethodNotAllowed(errDelete))).ShouldNot(HaveOccurred()) } Eventually(func() error { key := client.ObjectKeyFromObject(namespace) - if errGet := c.client.Get(context.Background(), key, namespace); errGet != nil { + if errGet := c.client.Get(ctx, key, namespace); errGet != nil { return client.IgnoreNotFound(errGet) } // remove `kubernetes` finalizer @@ -101,7 +101,7 @@ func (c *Cleaner) DeleteAll(objects ...client.Object) { //nolint:gocognit //reas // We have to use the k8s.io/client-go library here to expose // ability to patch the /finalize subresource on the namespace - _, err := c.clientset.CoreV1().Namespaces().Finalize(context.Background(), namespace, metav1.UpdateOptions{}) + _, err := c.clientset.CoreV1().Namespaces().Finalize(ctx, namespace, metav1.UpdateOptions{}) return err }, c.timeout, c.interval).Should(Succeed()) @@ -109,7 +109,7 @@ func (c *Cleaner) DeleteAll(objects ...client.Object) { //nolint:gocognit //reas Eventually(func() metav1.StatusReason { key := client.ObjectKeyFromObject(obj) - if err := c.client.Get(context.Background(), key, obj); err != nil { + if err := c.client.Get(ctx, key, obj); err != nil { return k8serr.ReasonForError(err) } diff --git a/test/k8senvtest/config.go b/test/k8senvtest/config.go index 5f3915c3..4e8bae2d 100644 --- a/test/k8senvtest/config.go +++ b/test/k8senvtest/config.go @@ -54,12 +54,12 @@ type Client struct { *Cleaner } -func (c *Client) DeleteAll(objects ...client.Object) { +func (c *Client) DeleteAll(ctx context.Context, objects ...client.Object) { if c.Cleaner == nil { c.Cleaner = CreateCleaner(c.Client, c.Config, 10*time.Second, 250*time.Millisecond) } - c.Cleaner.DeleteAll(objects...) + c.Cleaner.DeleteAll(ctx, objects...) } // Configure creates a new configuration for the Kubernetes EnvTest.