diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 249b3408..d303b6da 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -55,4 +55,15 @@ Please read through below conventions before contributions. - Go source files and directories use underscores, not dashes. - Package directories should generally avoid using separators as much as possible. When package names are multiple words, they usually should be in nested subdirectories. - Document directories and filenames should use dashes rather than underscores. -- All source files should add a license at the beginning. \ No newline at end of file +- All source files should add a license at the beginning. + + +### How to work locally + +1. Clones this repo +2. Create the cluster `minikube start --memory=8192 --cpus=4` +3. [Deploy Apache Pulsar Standalone](https://pulsar.apache.org/docs/4.0.x/getting-started-helm/#step-1-install-pulsar-helm-chart) +4. Open the minikube tunnel in another terminal `minikube tunnel -c` +5. Apply operator's crds `make install` +6. Executes `go run .` in order to run the operator locally rather than inside the cluster +7. Run tests `~/go/bin/ginkgo ./operator` \ No newline at end of file diff --git a/pkg/admin/dummy.go b/pkg/admin/dummy.go index 75a4444d..c231ae3a 100644 --- a/pkg/admin/dummy.go +++ b/pkg/admin/dummy.go @@ -111,6 +111,16 @@ func (d *DummyPulsarAdmin) RevokePermissions(Permissioner) error { return nil } +// GetTopicPermissions is a fake implements of GetTopicPermissions +func (d *DummyPulsarAdmin) GetTopicPermissions(string) (map[string][]utils.AuthAction, error) { + return map[string][]utils.AuthAction{}, nil +} + +// GetNamespacePermissions is a fake implements of GetNamespacePermissions +func (d *DummyPulsarAdmin) GetNamespacePermissions(string) (map[string][]utils.AuthAction, error) { + return map[string][]utils.AuthAction{}, nil +} + // GetSchema is a fake implements of GetSchema func (d *DummyPulsarAdmin) GetSchema(string) (*v1alpha1.SchemaInfo, error) { return nil, nil diff --git a/pkg/admin/impl.go b/pkg/admin/impl.go index 3a4fd580..24736485 100644 --- a/pkg/admin/impl.go +++ b/pkg/admin/impl.go @@ -504,6 +504,24 @@ func (p *PulsarAdminClient) RevokePermissions(permission Permissioner) error { return nil } +// GetTopicPermissions retrieve permission by name +func (p *PulsarAdminClient) GetTopicPermissions(topic string) (map[string][]utils.AuthAction, error) { + topicName, err := utils.GetTopicName(topic) + if err != nil { + return nil, err + } + return p.adminClient.Topics().GetPermissions(*topicName) +} + +// GetNamespacePermissions retrieve permission by name +func (p *PulsarAdminClient) GetNamespacePermissions(namespaceName string) (map[string][]utils.AuthAction, error) { + namespace, err := utils.GetNamespaceName(namespaceName) + if err != nil { + return nil, err + } + return p.adminClient.Namespaces().GetNamespacePermissions(*namespace) +} + // convertActions converts actions type from string to common.AuthAction func convertActions(actions []string) ([]utils.AuthAction, error) { r := make([]utils.AuthAction, 0) diff --git a/pkg/admin/interface.go b/pkg/admin/interface.go index dc72fb28..1569c961 100644 --- a/pkg/admin/interface.go +++ b/pkg/admin/interface.go @@ -133,6 +133,12 @@ type PulsarAdmin interface { // it will revoke all actions which granted to a role on a namespace or topic RevokePermissions(p Permissioner) error + // GetNamespacePermissions get permissions by namespace + GetNamespacePermissions(namespace string) (map[string][]utils2.AuthAction, error) + + // GetTopicPermissions get permissions by topic + GetTopicPermissions(topic string) (map[string][]utils2.AuthAction, error) + // Close releases the connection with pulsar admin Close() error diff --git a/pkg/connection/reconcile_permission.go b/pkg/connection/reconcile_permission.go index c99948a7..b3adfd92 100644 --- a/pkg/connection/reconcile_permission.go +++ b/pkg/connection/reconcile_permission.go @@ -17,15 +17,17 @@ package connection import ( "context" "fmt" + "slices" + "github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils" "github.com/go-logr/logr" - "github.com/streamnative/pulsar-resources-operator/pkg/feature" "k8s.io/apimachinery/pkg/api/meta" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" resourcev1alpha1 "github.com/streamnative/pulsar-resources-operator/api/v1alpha1" "github.com/streamnative/pulsar-resources-operator/pkg/admin" + "github.com/streamnative/pulsar-resources-operator/pkg/feature" "github.com/streamnative/pulsar-resources-operator/pkg/reconciler" ) @@ -122,18 +124,66 @@ func (r *PulsarPermissionReconciler) ReconcilePermission(ctx context.Context, pu return nil } - log.V(1).Info("Granting permission", "ResourceName", permission.Spec.ResourceName, + log.Info("Updating permission", "ResourceName", permission.Spec.ResourceName, "ResourceType", permission.Spec.ResoureType, "Roles", permission.Spec.Roles, "Actions", permission.Spec.Actions) - if err := pulsarAdmin.GrantPermissions(per); err != nil { - log.Error(err, "Grant permission failed") + + var currentPermissions map[string][]utils.AuthAction + var err error + + if permission.Spec.ResoureType == resourcev1alpha1.PulsarResourceTypeTopic { + currentPermissions, err = pulsarAdmin.GetTopicPermissions(permission.Spec.ResourceName) + } else { + currentPermissions, err = pulsarAdmin.GetNamespacePermissions(permission.Spec.ResourceName) + } + + if err != nil { + log.Error(err, "Failed to get current permissions") meta.SetStatusCondition(&permission.Status.Conditions, *NewErrorCondition(permission.Generation, err.Error())) if err := r.conn.client.Status().Update(ctx, permission); err != nil { log.Error(err, "Failed to update permission status") - return err } return err } + currentRoles := []string{} + incomingRoles := permission.Spec.Roles + + for role := range currentPermissions { + currentRoles = append(currentRoles, role) + } + + // revoking roles + for _, role := range currentRoles { + if !slices.Contains(incomingRoles, role) { + permission.Spec.Roles = []string{role} + per := GetPermissioner(permission) + if err := pulsarAdmin.RevokePermissions(per); err != nil { + log.Error(err, "Revoke permission failed") + meta.SetStatusCondition(&permission.Status.Conditions, *NewErrorCondition(permission.Generation, err.Error())) + if err := r.conn.client.Status().Update(ctx, permission); err != nil { + log.Error(err, "Failed to update permission status") + return err + } + return err + } + } + } + + // granting roles + for _, role := range incomingRoles { + permission.Spec.Roles = []string{role} + per := GetPermissioner(permission) + if err := pulsarAdmin.GrantPermissions(per); err != nil { + log.Error(err, "Grant permission failed") + meta.SetStatusCondition(&permission.Status.Conditions, *NewErrorCondition(permission.Generation, err.Error())) + if err := r.conn.client.Status().Update(ctx, permission); err != nil { + log.Error(err, "Failed to update permission status") + return err + } + return err + } + } + permission.Status.ObservedGeneration = permission.Generation meta.SetStatusCondition(&permission.Status.Conditions, *NewReadyCondition(permission.Generation)) if err := r.conn.client.Status().Update(ctx, permission); err != nil { diff --git a/tests/README.md b/tests/README.md index ed3d71b1..ccab3809 100644 --- a/tests/README.md +++ b/tests/README.md @@ -5,11 +5,35 @@ tests is an individul module beside pulsar resources operator `go mod tidy` to download modules for tests -# Requirements +## Requirements - Pulsar Operator installed - A pulsar cluster installed without authentication and authorization -# Run tests +## Run tests -`ginkgo --trace --progress ./operator` \ No newline at end of file +`ginkgo --trace --progress ./operator` + +Optionally, if you have an external pulsar cluster (e.g. deployed on minikube) and you want to test the operator without deploying it in kubernetes: + +1. Run the code in a terminal + +```bash +make install +go run . +``` + +2. In another terminal run + +```bash +# your admin service url +export ADMIN_SERVICE_URL=http://localhost:80 +# your pulsar namespace +export NAMESPACE=pulsar +# your pulsar broker name +export BROKER_NAME=pulsar-mini +# your pulsar proxy url +export PROXY_URL=http://localhost:80 + +ginkgo --trace --progress ./operator +``` diff --git a/tests/operator/operator_suite_test.go b/tests/operator/operator_suite_test.go index 4bad722b..bf443aec 100644 --- a/tests/operator/operator_suite_test.go +++ b/tests/operator/operator_suite_test.go @@ -34,11 +34,11 @@ import ( ) var ( - namespaceName string = "default" + namespaceName string = utils.GetEnv("NAMESPACE", "default") k8sClient client.Client k8sConfig *rest.Config - brokerName string = "test-pulsar" - proxyURL string = fmt.Sprintf("http://%s-broker.%s.svc.cluster.local:6650", brokerName, namespaceName) + brokerName string = utils.GetEnv("BROKER_NAME", "test-pulsar") + proxyURL string = utils.GetEnv("PROXY_URL", fmt.Sprintf("http://%s-broker.%s.svc.cluster.local:6650", brokerName, namespaceName)) pulsarClient pulsar.Client ) diff --git a/tests/operator/resources_test.go b/tests/operator/resources_test.go index d6a971dc..cd8b75fd 100644 --- a/tests/operator/resources_test.go +++ b/tests/operator/resources_test.go @@ -95,7 +95,7 @@ var _ = Describe("Resources", func() { Expect(feature.SetFeatureGates()).ShouldNot(HaveOccurred()) ctx = context.TODO() // use ClusterIP svc when run operator in k8s - adminServiceURL := fmt.Sprintf("http://%s-broker.%s.svc.cluster.local:8080", brokerName, namespaceName) + adminServiceURL := utils.GetEnv("ADMIN_SERVICE_URL", fmt.Sprintf("http://%s-broker.%s.svc.cluster.local:8080", brokerName, namespaceName)) // use NodePort svc when cluster is kind cluster and run operator locally, the nodePort need to be setup in kind // adminServiceURL := fmt.Sprintf("http://127.0.0.1:%d", nodePort) pconn = utils.MakePulsarConnection(namespaceName, pconnName, adminServiceURL) @@ -125,7 +125,7 @@ var _ = Describe("Resources", func() { }) Describe("Basic resource operations", Ordered, func() { - Context("Check pulsar broker", func() { + Context("Check pulsar broker", Label("Permissions"), func() { It("should create the pulsar broker successfully", func() { Eventually(func() bool { statefulset := &v1.StatefulSet{} @@ -138,14 +138,14 @@ var _ = Describe("Resources", func() { }) }) - Context("PulsarConnection operation", func() { + Context("PulsarConnection operation", Label("Permissions"), func() { It("should create the pulsarconnection successfully", func() { err := k8sClient.Create(ctx, pconn) Expect(err == nil || apierrors.IsAlreadyExists(err)).Should(BeTrue()) }) }) - Context("PulsarTenant operation", func() { + Context("PulsarTenant operation", Label("Permissions"), func() { It("should create the pulsartenant successfully", func() { err := k8sClient.Create(ctx, ptenant) Expect(err == nil || apierrors.IsAlreadyExists(err)).Should(BeTrue()) @@ -176,7 +176,7 @@ var _ = Describe("Resources", func() { }) }) - Context("PulsarNamespace operation", func() { + Context("PulsarNamespace operation", Label("Permissions"), func() { It("should create the pulsarnamespace successfully", func() { err := k8sClient.Create(ctx, pnamespace) Expect(err == nil || apierrors.IsAlreadyExists(err)).Should(BeTrue()) @@ -193,7 +193,7 @@ var _ = Describe("Resources", func() { }) Context("PulsarTopic operation", Ordered, func() { - It("should create the pulsartopic successfully", func() { + It("should create the pulsartopic successfully", Label("Permissions"), func() { err := k8sClient.Create(ctx, ptopic) Expect(err == nil || apierrors.IsAlreadyExists(err)).Should(BeTrue()) err = k8sClient.Create(ctx, ptopic2) @@ -202,7 +202,7 @@ var _ = Describe("Resources", func() { Expect(err == nil || apierrors.IsAlreadyExists(err)).Should(BeTrue()) }) - It("should be ready", func() { + It("should be ready", Label("Permissions"), func() { Eventually(func() bool { t := &v1alphav1.PulsarTopic{} tns := types.NamespacedName{Namespace: namespaceName, Name: ptopicName} @@ -291,7 +291,7 @@ var _ = Describe("Resources", func() { }) - Context("PulsarPermission operation", func() { + Context("PulsarPermission operation", Label("Permissions"), func() { It("should grant the pulsarpermission successfully", func() { err := k8sClient.Create(ctx, ppermission) Expect(err == nil || apierrors.IsAlreadyExists(err)).Should(BeTrue()) @@ -305,6 +305,69 @@ var _ = Describe("Resources", func() { return v1alphav1.IsPulsarResourceReady(t) }, "20s", "100ms").Should(BeTrue()) }) + + It("should add a new role", func() { + t := &v1alphav1.PulsarPermission{} + tns := types.NamespacedName{Namespace: namespaceName, Name: ppermissionName} + Expect(k8sClient.Get(ctx, tns, t)).Should(Succeed()) + t.Spec.Roles = append(t.Spec.Roles, "spiderman") + err := k8sClient.Update(ctx, t) + Expect(err).Should(Succeed()) + }) + + It("should be ready", func() { + Eventually(func() bool { + t := &v1alphav1.PulsarPermission{} + tns := types.NamespacedName{Namespace: namespaceName, Name: ppermissionName} + Expect(k8sClient.Get(ctx, tns, t)).Should(Succeed()) + return v1alphav1.IsPulsarResourceReady(t) + }, "20s", "100ms").Should(BeTrue()) + }) + + It("spiderman should exists along with ironman", func() { + Eventually(func(g Gomega) { + podName := fmt.Sprintf("%s-broker-0", brokerName) + containerName := fmt.Sprintf("%s-broker", brokerName) + stdout, _, err := utils.ExecInPod(k8sConfig, namespaceName, podName, containerName, + "./bin/pulsar-admin topics permissions "+ppermission.Spec.ResourceName) + g.Expect(err).Should(Succeed()) + g.Expect(stdout).Should(Not(BeEmpty())) + g.Expect(stdout).Should(ContainSubstring("ironman")) + g.Expect(stdout).Should(ContainSubstring("spiderman")) + }, "20s", "100ms").Should(Succeed()) + }) + + It("should delete a role", func() { + t := &v1alphav1.PulsarPermission{} + tns := types.NamespacedName{Namespace: namespaceName, Name: ppermissionName} + Expect(k8sClient.Get(ctx, tns, t)).Should(Succeed()) + // remove spiderman and assign to roles + t.Spec.Roles = []string{"ironman"} + err := k8sClient.Update(ctx, t) + Expect(err).Should(Succeed()) + }) + + It("should be ready", func() { + Eventually(func() bool { + t := &v1alphav1.PulsarPermission{} + tns := types.NamespacedName{Namespace: namespaceName, Name: ppermissionName} + Expect(k8sClient.Get(ctx, tns, t)).Should(Succeed()) + return v1alphav1.IsPulsarResourceReady(t) + }, "20s", "100ms").Should(BeTrue()) + }) + + It("spiderman shouldn't exists anymore but ironman", func() { + Eventually(func(g Gomega) { + podName := fmt.Sprintf("%s-broker-0", brokerName) + containerName := fmt.Sprintf("%s-broker", brokerName) + stdout, _, err := utils.ExecInPod(k8sConfig, namespaceName, podName, containerName, + "./bin/pulsar-admin topics permissions "+ptopic.Spec.Name) + g.Expect(err).Should(Succeed()) + g.Expect(stdout).Should(Not(BeEmpty())) + g.Expect(stdout).Should(ContainSubstring("ironman")) + g.Expect(stdout).Should(Not(ContainSubstring("spiderman"))) + }, "20s", "100ms").Should(Succeed()) + }) }) Context("PulsarFunction & PulsarPackage operation", func() { @@ -514,6 +577,20 @@ var _ = Describe("Resources", func() { g.Expect(k8sClient.Delete(ctx, t)).Should(Succeed()) }).Should(Succeed()) + Eventually(func(g Gomega) { + t := &v1alphav1.PulsarTopic{} + tns := types.NamespacedName{Namespace: namespaceName, Name: ptopicName2} + g.Expect(k8sClient.Get(ctx, tns, t)).Should(Succeed()) + g.Expect(k8sClient.Delete(ctx, t)).Should(Succeed()) + }).Should(Succeed()) + + Eventually(func(g Gomega) { + t := &v1alphav1.PulsarTopic{} + tns := types.NamespacedName{Namespace: namespaceName, Name: partitionedTopic.Name} + g.Expect(k8sClient.Get(ctx, tns, t)).Should(Succeed()) + g.Expect(k8sClient.Delete(ctx, t)).Should(Succeed()) + }).Should(Succeed()) + Eventually(func(g Gomega) { ns := &v1alphav1.PulsarNamespace{} tns := types.NamespacedName{Namespace: namespaceName, Name: pnamespaceName} diff --git a/tests/utils/spec.go b/tests/utils/spec.go index e3746046..b946e080 100644 --- a/tests/utils/spec.go +++ b/tests/utils/spec.go @@ -16,6 +16,7 @@ package utils import ( "encoding/json" + "os" corev1 "k8s.io/api/core/v1" v1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" @@ -307,3 +308,11 @@ func MakeNSIsolationPolicy(namespace, name, clusterName, connectionName string, }, } } + +func GetEnv(key, fallback string) string { + value, exists := os.LookupEnv(key) + if !exists { + value = fallback + } + return value +}