Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,4 +55,15 @@ Please read through below conventions before contributions.
- Go source files and directories use underscores, not dashes.
- Package directories should generally avoid using separators as much as possible. When package names are multiple words, they usually should be in nested subdirectories.
- Document directories and filenames should use dashes rather than underscores.
- All source files should add a license at the beginning.
- All source files should add a license at the beginning.


### How to work locally

1. Clones this repo
2. Create the cluster `minikube start --memory=8192 --cpus=4`
3. [Deploy Apache Pulsar Standalone](https://pulsar.apache.org/docs/4.0.x/getting-started-helm/#step-1-install-pulsar-helm-chart)
4. Open the minikube tunnel in another terminal `minikube tunnel -c`
5. Apply operator's crds `make install`
6. Executes `go run .` in order to run the operator locally rather than inside the cluster
7. Run tests `~/go/bin/ginkgo ./operator`
10 changes: 10 additions & 0 deletions pkg/admin/dummy.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,16 @@ func (d *DummyPulsarAdmin) RevokePermissions(Permissioner) error {
return nil
}

// GetTopicPermissions is a fake implements of GetTopicPermissions
func (d *DummyPulsarAdmin) GetTopicPermissions(string) (map[string][]utils.AuthAction, error) {
return map[string][]utils.AuthAction{}, nil
}

// GetNamespacePermissions is a fake implements of GetNamespacePermissions
func (d *DummyPulsarAdmin) GetNamespacePermissions(string) (map[string][]utils.AuthAction, error) {
return map[string][]utils.AuthAction{}, nil
}

// GetSchema is a fake implements of GetSchema
func (d *DummyPulsarAdmin) GetSchema(string) (*v1alpha1.SchemaInfo, error) {
return nil, nil
Expand Down
18 changes: 18 additions & 0 deletions pkg/admin/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -504,6 +504,24 @@ func (p *PulsarAdminClient) RevokePermissions(permission Permissioner) error {
return nil
}

// GetTopicPermissions retrieve permission by name
func (p *PulsarAdminClient) GetTopicPermissions(topic string) (map[string][]utils.AuthAction, error) {
topicName, err := utils.GetTopicName(topic)
if err != nil {
return nil, err
}
return p.adminClient.Topics().GetPermissions(*topicName)
}

// GetNamespacePermissions retrieve permission by name
func (p *PulsarAdminClient) GetNamespacePermissions(namespaceName string) (map[string][]utils.AuthAction, error) {
namespace, err := utils.GetNamespaceName(namespaceName)
if err != nil {
return nil, err
}
return p.adminClient.Namespaces().GetNamespacePermissions(*namespace)
}

// convertActions converts actions type from string to common.AuthAction
func convertActions(actions []string) ([]utils.AuthAction, error) {
r := make([]utils.AuthAction, 0)
Expand Down
6 changes: 6 additions & 0 deletions pkg/admin/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,12 @@ type PulsarAdmin interface {
// it will revoke all actions which granted to a role on a namespace or topic
RevokePermissions(p Permissioner) error

// GetNamespacePermissions get permissions by namespace
GetNamespacePermissions(namespace string) (map[string][]utils2.AuthAction, error)

// GetTopicPermissions get permissions by topic
GetTopicPermissions(topic string) (map[string][]utils2.AuthAction, error)

// Close releases the connection with pulsar admin
Close() error

Expand Down
60 changes: 55 additions & 5 deletions pkg/connection/reconcile_permission.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,17 @@ package connection
import (
"context"
"fmt"
"slices"

"github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils"
"github.com/go-logr/logr"
"github.com/streamnative/pulsar-resources-operator/pkg/feature"
"k8s.io/apimachinery/pkg/api/meta"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"

resourcev1alpha1 "github.com/streamnative/pulsar-resources-operator/api/v1alpha1"
"github.com/streamnative/pulsar-resources-operator/pkg/admin"
"github.com/streamnative/pulsar-resources-operator/pkg/feature"
"github.com/streamnative/pulsar-resources-operator/pkg/reconciler"
)

Expand Down Expand Up @@ -122,18 +124,66 @@ func (r *PulsarPermissionReconciler) ReconcilePermission(ctx context.Context, pu
return nil
}

log.V(1).Info("Granting permission", "ResourceName", permission.Spec.ResourceName,
log.Info("Updating permission", "ResourceName", permission.Spec.ResourceName,
"ResourceType", permission.Spec.ResoureType, "Roles", permission.Spec.Roles, "Actions", permission.Spec.Actions)
if err := pulsarAdmin.GrantPermissions(per); err != nil {
log.Error(err, "Grant permission failed")

var currentPermissions map[string][]utils.AuthAction
var err error

if permission.Spec.ResoureType == resourcev1alpha1.PulsarResourceTypeTopic {
currentPermissions, err = pulsarAdmin.GetTopicPermissions(permission.Spec.ResourceName)
} else {
currentPermissions, err = pulsarAdmin.GetNamespacePermissions(permission.Spec.ResourceName)
}

if err != nil {
log.Error(err, "Failed to get current permissions")
meta.SetStatusCondition(&permission.Status.Conditions, *NewErrorCondition(permission.Generation, err.Error()))
if err := r.conn.client.Status().Update(ctx, permission); err != nil {
log.Error(err, "Failed to update permission status")
return err
}
return err
}

currentRoles := []string{}
incomingRoles := permission.Spec.Roles

for role := range currentPermissions {
currentRoles = append(currentRoles, role)
}

// revoking roles
for _, role := range currentRoles {
if !slices.Contains(incomingRoles, role) {
permission.Spec.Roles = []string{role}
per := GetPermissioner(permission)
if err := pulsarAdmin.RevokePermissions(per); err != nil {
log.Error(err, "Revoke permission failed")
meta.SetStatusCondition(&permission.Status.Conditions, *NewErrorCondition(permission.Generation, err.Error()))
if err := r.conn.client.Status().Update(ctx, permission); err != nil {
log.Error(err, "Failed to update permission status")
return err
}
return err
}
}
}

// granting roles
for _, role := range incomingRoles {
permission.Spec.Roles = []string{role}
per := GetPermissioner(permission)
if err := pulsarAdmin.GrantPermissions(per); err != nil {
log.Error(err, "Grant permission failed")
meta.SetStatusCondition(&permission.Status.Conditions, *NewErrorCondition(permission.Generation, err.Error()))
if err := r.conn.client.Status().Update(ctx, permission); err != nil {
log.Error(err, "Failed to update permission status")
return err
}
return err
}
}

permission.Status.ObservedGeneration = permission.Generation
meta.SetStatusCondition(&permission.Status.Conditions, *NewReadyCondition(permission.Generation))
if err := r.conn.client.Status().Update(ctx, permission); err != nil {
Expand Down
30 changes: 27 additions & 3 deletions tests/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,35 @@ tests is an individul module beside pulsar resources operator
`go mod tidy` to download modules for tests


# Requirements
## Requirements
- Pulsar Operator installed
- A pulsar cluster installed without authentication and authorization


# Run tests
## Run tests

`ginkgo --trace --progress ./operator`
`ginkgo --trace --progress ./operator`

Optionally, if you have an external pulsar cluster (e.g. deployed on minikube) and you want to test the operator without deploying it in kubernetes:

1. Run the code in a terminal

```bash
make install
go run .
```

2. In another terminal run

```bash
# your admin service url
export ADMIN_SERVICE_URL=http://localhost:80
# your pulsar namespace
export NAMESPACE=pulsar
# your pulsar broker name
export BROKER_NAME=pulsar-mini
# your pulsar proxy url
export PROXY_URL=http://localhost:80

ginkgo --trace --progress ./operator
```
6 changes: 3 additions & 3 deletions tests/operator/operator_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,11 @@ import (
)

var (
namespaceName string = "default"
namespaceName string = utils.GetEnv("NAMESPACE", "default")
k8sClient client.Client
k8sConfig *rest.Config
brokerName string = "test-pulsar"
proxyURL string = fmt.Sprintf("http://%s-broker.%s.svc.cluster.local:6650", brokerName, namespaceName)
brokerName string = utils.GetEnv("BROKER_NAME", "test-pulsar")
proxyURL string = utils.GetEnv("PROXY_URL", fmt.Sprintf("http://%s-broker.%s.svc.cluster.local:6650", brokerName, namespaceName))
pulsarClient pulsar.Client
)

Expand Down
93 changes: 85 additions & 8 deletions tests/operator/resources_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ var _ = Describe("Resources", func() {
Expect(feature.SetFeatureGates()).ShouldNot(HaveOccurred())
ctx = context.TODO()
// use ClusterIP svc when run operator in k8s
adminServiceURL := fmt.Sprintf("http://%s-broker.%s.svc.cluster.local:8080", brokerName, namespaceName)
adminServiceURL := utils.GetEnv("ADMIN_SERVICE_URL", fmt.Sprintf("http://%s-broker.%s.svc.cluster.local:8080", brokerName, namespaceName))
// use NodePort svc when cluster is kind cluster and run operator locally, the nodePort need to be setup in kind
// adminServiceURL := fmt.Sprintf("http://127.0.0.1:%d", nodePort)
pconn = utils.MakePulsarConnection(namespaceName, pconnName, adminServiceURL)
Expand Down Expand Up @@ -125,7 +125,7 @@ var _ = Describe("Resources", func() {
})

Describe("Basic resource operations", Ordered, func() {
Context("Check pulsar broker", func() {
Context("Check pulsar broker", Label("Permissions"), func() {
It("should create the pulsar broker successfully", func() {
Eventually(func() bool {
statefulset := &v1.StatefulSet{}
Expand All @@ -138,14 +138,14 @@ var _ = Describe("Resources", func() {
})
})

Context("PulsarConnection operation", func() {
Context("PulsarConnection operation", Label("Permissions"), func() {
It("should create the pulsarconnection successfully", func() {
err := k8sClient.Create(ctx, pconn)
Expect(err == nil || apierrors.IsAlreadyExists(err)).Should(BeTrue())
})
})

Context("PulsarTenant operation", func() {
Context("PulsarTenant operation", Label("Permissions"), func() {
It("should create the pulsartenant successfully", func() {
err := k8sClient.Create(ctx, ptenant)
Expect(err == nil || apierrors.IsAlreadyExists(err)).Should(BeTrue())
Expand Down Expand Up @@ -176,7 +176,7 @@ var _ = Describe("Resources", func() {
})
})

Context("PulsarNamespace operation", func() {
Context("PulsarNamespace operation", Label("Permissions"), func() {
It("should create the pulsarnamespace successfully", func() {
err := k8sClient.Create(ctx, pnamespace)
Expect(err == nil || apierrors.IsAlreadyExists(err)).Should(BeTrue())
Expand All @@ -193,7 +193,7 @@ var _ = Describe("Resources", func() {
})

Context("PulsarTopic operation", Ordered, func() {
It("should create the pulsartopic successfully", func() {
It("should create the pulsartopic successfully", Label("Permissions"), func() {
err := k8sClient.Create(ctx, ptopic)
Expect(err == nil || apierrors.IsAlreadyExists(err)).Should(BeTrue())
err = k8sClient.Create(ctx, ptopic2)
Expand All @@ -202,7 +202,7 @@ var _ = Describe("Resources", func() {
Expect(err == nil || apierrors.IsAlreadyExists(err)).Should(BeTrue())
})

It("should be ready", func() {
It("should be ready", Label("Permissions"), func() {
Eventually(func() bool {
t := &v1alphav1.PulsarTopic{}
tns := types.NamespacedName{Namespace: namespaceName, Name: ptopicName}
Expand Down Expand Up @@ -291,7 +291,7 @@ var _ = Describe("Resources", func() {

})

Context("PulsarPermission operation", func() {
Context("PulsarPermission operation", Label("Permissions"), func() {
It("should grant the pulsarpermission successfully", func() {
err := k8sClient.Create(ctx, ppermission)
Expect(err == nil || apierrors.IsAlreadyExists(err)).Should(BeTrue())
Expand All @@ -305,6 +305,69 @@ var _ = Describe("Resources", func() {
return v1alphav1.IsPulsarResourceReady(t)
}, "20s", "100ms").Should(BeTrue())
})

It("should add a new role", func() {
t := &v1alphav1.PulsarPermission{}
tns := types.NamespacedName{Namespace: namespaceName, Name: ppermissionName}
Expect(k8sClient.Get(ctx, tns, t)).Should(Succeed())
t.Spec.Roles = append(t.Spec.Roles, "spiderman")
err := k8sClient.Update(ctx, t)
Expect(err).Should(Succeed())
})

It("should be ready", func() {
Eventually(func() bool {
t := &v1alphav1.PulsarPermission{}
tns := types.NamespacedName{Namespace: namespaceName, Name: ppermissionName}
Expect(k8sClient.Get(ctx, tns, t)).Should(Succeed())
return v1alphav1.IsPulsarResourceReady(t)
}, "20s", "100ms").Should(BeTrue())
})

It("spiderman should exists along with ironman", func() {
Eventually(func(g Gomega) {
podName := fmt.Sprintf("%s-broker-0", brokerName)
containerName := fmt.Sprintf("%s-broker", brokerName)
stdout, _, err := utils.ExecInPod(k8sConfig, namespaceName, podName, containerName,
"./bin/pulsar-admin topics permissions "+ppermission.Spec.ResourceName)
g.Expect(err).Should(Succeed())
g.Expect(stdout).Should(Not(BeEmpty()))
g.Expect(stdout).Should(ContainSubstring("ironman"))
g.Expect(stdout).Should(ContainSubstring("spiderman"))
}, "20s", "100ms").Should(Succeed())
})

It("should delete a role", func() {
t := &v1alphav1.PulsarPermission{}
tns := types.NamespacedName{Namespace: namespaceName, Name: ppermissionName}
Expect(k8sClient.Get(ctx, tns, t)).Should(Succeed())
// remove spiderman and assign to roles
t.Spec.Roles = []string{"ironman"}
err := k8sClient.Update(ctx, t)
Expect(err).Should(Succeed())
})

It("should be ready", func() {
Eventually(func() bool {
t := &v1alphav1.PulsarPermission{}
tns := types.NamespacedName{Namespace: namespaceName, Name: ppermissionName}
Expect(k8sClient.Get(ctx, tns, t)).Should(Succeed())
return v1alphav1.IsPulsarResourceReady(t)
}, "20s", "100ms").Should(BeTrue())
})

It("spiderman shouldn't exists anymore but ironman", func() {
Eventually(func(g Gomega) {
podName := fmt.Sprintf("%s-broker-0", brokerName)
containerName := fmt.Sprintf("%s-broker", brokerName)
stdout, _, err := utils.ExecInPod(k8sConfig, namespaceName, podName, containerName,
"./bin/pulsar-admin topics permissions "+ptopic.Spec.Name)
g.Expect(err).Should(Succeed())
g.Expect(stdout).Should(Not(BeEmpty()))
g.Expect(stdout).Should(ContainSubstring("ironman"))
g.Expect(stdout).Should(Not(ContainSubstring("spiderman")))
}, "20s", "100ms").Should(Succeed())
})
})

Context("PulsarFunction & PulsarPackage operation", func() {
Expand Down Expand Up @@ -514,6 +577,20 @@ var _ = Describe("Resources", func() {
g.Expect(k8sClient.Delete(ctx, t)).Should(Succeed())
}).Should(Succeed())

Eventually(func(g Gomega) {
t := &v1alphav1.PulsarTopic{}
tns := types.NamespacedName{Namespace: namespaceName, Name: ptopicName2}
g.Expect(k8sClient.Get(ctx, tns, t)).Should(Succeed())
g.Expect(k8sClient.Delete(ctx, t)).Should(Succeed())
}).Should(Succeed())

Eventually(func(g Gomega) {
t := &v1alphav1.PulsarTopic{}
tns := types.NamespacedName{Namespace: namespaceName, Name: partitionedTopic.Name}
g.Expect(k8sClient.Get(ctx, tns, t)).Should(Succeed())
g.Expect(k8sClient.Delete(ctx, t)).Should(Succeed())
}).Should(Succeed())

Eventually(func(g Gomega) {
ns := &v1alphav1.PulsarNamespace{}
tns := types.NamespacedName{Namespace: namespaceName, Name: pnamespaceName}
Expand Down
Loading