diff --git a/pkg/connection/reconcile_permission.go b/pkg/connection/reconcile_permission.go index b3adfd92..e2e9a74b 100644 --- a/pkg/connection/reconcile_permission.go +++ b/pkg/connection/reconcile_permission.go @@ -16,6 +16,7 @@ package connection import ( "context" + "encoding/json" "fmt" "slices" @@ -145,36 +146,75 @@ func (r *PulsarPermissionReconciler) ReconcilePermission(ctx context.Context, pu return err } - currentRoles := []string{} + // Extract current desired state + currentState := r.extractCurrentState(permission) + + // Get previous state from annotation + previousState := r.getPreviousState(permission) + + // Check for context changes (ResourceType or ResourceName) + contextChanged := false + if previousState != nil { + contextChanged = previousState.ResourceType != currentState.ResourceType || + previousState.ResourceName != currentState.ResourceName + } + + if contextChanged { + log.Info("Context change detected, cleaning up previous permissions", + "previousResourceType", previousState.ResourceType, + "currentResourceType", currentState.ResourceType, + "previousResourceName", previousState.ResourceName, + "currentResourceName", currentState.ResourceName) + + // Clean up previous context + if err := r.cleanupPreviousContext(permission, *previousState); err != nil { + log.Error(err, "Failed to cleanup previous context, continuing with current operations") + } + } + + // Determine roles to manage + var previouslyManagedRoles []string + if previousState != nil && !contextChanged { + previouslyManagedRoles = previousState.Roles + } + + currentRoles := make([]string, 0, len(currentPermissions)) incomingRoles := permission.Spec.Roles for role := range currentPermissions { currentRoles = append(currentRoles, role) } - // revoking roles - for _, role := range currentRoles { - if !slices.Contains(incomingRoles, role) { - permission.Spec.Roles = []string{role} - per := GetPermissioner(permission) - if err := pulsarAdmin.RevokePermissions(per); err != nil { - log.Error(err, "Revoke permission failed") - meta.SetStatusCondition(&permission.Status.Conditions, *NewErrorCondition(permission.Generation, err.Error())) - if err := r.conn.client.Status().Update(ctx, permission); err != nil { - log.Error(err, "Failed to update permission status") - return err - } + // Only revoke roles that were previously managed by this PulsarPermission resource + // This prevents conflicts with other PulsarPermission resources managing the same target + for _, role := range previouslyManagedRoles { + // If this role is still in incoming roles OR doesn't exist currently, skip it + if slices.Contains(incomingRoles, role) || !slices.Contains(currentRoles, role) { + continue + } + + log.Info("Revoking previously managed role", "role", role) + tempPermission := permission.DeepCopy() + tempPermission.Spec.Roles = []string{role} + per := GetPermissioner(tempPermission) + if err := pulsarAdmin.RevokePermissions(per); err != nil { + log.Error(err, "Revoke permission failed", "role", role) + meta.SetStatusCondition(&permission.Status.Conditions, *NewErrorCondition(permission.Generation, err.Error())) + if err := r.conn.client.Status().Update(ctx, permission); err != nil { + log.Error(err, "Failed to update permission status") return err } + return err } } - // granting roles + // Grant permissions for all incoming roles for _, role := range incomingRoles { - permission.Spec.Roles = []string{role} - per := GetPermissioner(permission) + tempPermission := permission.DeepCopy() + tempPermission.Spec.Roles = []string{role} + per := GetPermissioner(tempPermission) if err := pulsarAdmin.GrantPermissions(per); err != nil { - log.Error(err, "Grant permission failed") + log.Error(err, "Grant permission failed", "role", role) meta.SetStatusCondition(&permission.Status.Conditions, *NewErrorCondition(permission.Generation, err.Error())) if err := r.conn.client.Status().Update(ctx, permission); err != nil { log.Error(err, "Failed to update permission status") @@ -184,6 +224,18 @@ func (r *PulsarPermissionReconciler) ReconcilePermission(ctx context.Context, pu } } + // Update the state annotation + if err := r.updateStateAnnotation(permission, currentState); err != nil { + log.Error(err, "Failed to update state annotation") + return err + } + + // Update the resource with new annotations + if err := r.conn.client.Update(ctx, permission); err != nil { + log.Error(err, "Failed to update permission annotations") + return err + } + permission.Status.ObservedGeneration = permission.Generation meta.SetStatusCondition(&permission.Status.Conditions, *NewReadyCondition(permission.Generation)) if err := r.conn.client.Status().Update(ctx, permission); err != nil { @@ -216,3 +268,134 @@ func GetPermissioner(p *resourcev1alpha1.PulsarPermission) admin.Permissioner { } return nil } + +const ( + // PulsarPermissionStateAnnotation is the annotation key used to store the previous state + // of PulsarPermission resources for stateful reconciliation + PulsarPermissionStateAnnotation = "pulsarpermissions.resource.streamnative.io/managed-state" +) + +// PulsarPermissionState represents the state that needs to be tracked for PulsarPermission resources +type PulsarPermissionState struct { + ResourceType string `json:"resourceType"` + ResourceName string `json:"resourceName"` + Roles []string `json:"roles"` + Actions []string `json:"actions"` +} + +// extractCurrentState extracts the current desired state from the PulsarPermission spec +func (r *PulsarPermissionReconciler) extractCurrentState(permission *resourcev1alpha1.PulsarPermission) PulsarPermissionState { + // Sort roles and actions for consistent comparison + roles := make([]string, len(permission.Spec.Roles)) + copy(roles, permission.Spec.Roles) + slices.Sort(roles) + + actions := make([]string, len(permission.Spec.Actions)) + copy(actions, permission.Spec.Actions) + slices.Sort(actions) + + return PulsarPermissionState{ + ResourceType: string(permission.Spec.ResoureType), + ResourceName: permission.Spec.ResourceName, + Roles: roles, + Actions: actions, + } +} + +// getPreviousState retrieves the previous state from the resource annotation +func (r *PulsarPermissionReconciler) getPreviousState(permission *resourcev1alpha1.PulsarPermission) *PulsarPermissionState { + annotations := permission.GetAnnotations() + if annotations == nil { + r.log.V(1).Info("No annotations found, treating as first reconciliation") + return nil + } + + stateJSON, exists := annotations[PulsarPermissionStateAnnotation] + if !exists { + r.log.V(1).Info("No previous state annotation found, treating as first reconciliation") + return nil + } + + // Try to unmarshal as PulsarPermissionState + var previousState PulsarPermissionState + if err := json.Unmarshal([]byte(stateJSON), &previousState); err != nil { + r.log.Error(err, "Failed to unmarshal previous state annotation, treating as first reconciliation", + "annotation", stateJSON) + return nil + } + + return &previousState +} + +// updateStateAnnotation updates the annotation with the current state after successful reconciliation +func (r *PulsarPermissionReconciler) updateStateAnnotation(permission *resourcev1alpha1.PulsarPermission, currentState PulsarPermissionState) error { + stateJSON, err := json.Marshal(currentState) + if err != nil { + return err + } + + annotations := permission.GetAnnotations() + if annotations == nil { + annotations = make(map[string]string) + permission.SetAnnotations(annotations) + } + + // Only update if the value has changed + currentValue := annotations[PulsarPermissionStateAnnotation] + newValue := string(stateJSON) + + if currentValue != newValue { + annotations[PulsarPermissionStateAnnotation] = newValue + r.log.V(1).Info("Updated state annotation", "state", currentState) + } + + return nil +} + +// cleanupPreviousContext cleans up permissions from the previous resource context +func (r *PulsarPermissionReconciler) cleanupPreviousContext(permission *resourcev1alpha1.PulsarPermission, prevState PulsarPermissionState) error { + if len(prevState.Roles) == 0 { + return nil + } + + r.log.Info("Cleaning up permissions from previous context", + "previousResourceType", prevState.ResourceType, + "previousResourceName", prevState.ResourceName, + "rolesToCleanup", prevState.Roles) + + // Create a temporary permission resource for the previous context + tempPermission := permission.DeepCopy() + tempPermission.Spec.ResourceName = prevState.ResourceName + tempPermission.Spec.ResoureType = resourcev1alpha1.PulsarResourceType(prevState.ResourceType) + tempPermission.Spec.Roles = prevState.Roles + tempPermission.Spec.Actions = prevState.Actions + + // Get permissioner for the previous context + permissioner := GetPermissioner(tempPermission) + if permissioner == nil { + return fmt.Errorf("failed to get permissioner for previous context") + } + + // Get the pulsar admin instance + pulsarAdmin := r.conn.pulsarAdmin + + // Revoke all roles from the previous context + for _, role := range prevState.Roles { + r.log.Info("Revoking permission from previous context", "role", role) + + // Create a temporary permission for this specific role + rolePermission := tempPermission.DeepCopy() + rolePermission.Spec.Roles = []string{role} + rolePermissioner := GetPermissioner(rolePermission) + + if err := pulsarAdmin.RevokePermissions(rolePermissioner); err != nil { + r.log.Error(err, "Failed to revoke permission from previous context, continuing", + "role", role, + "previousResourceType", prevState.ResourceType, + "previousResourceName", prevState.ResourceName) + // Continue with other roles even if one fails + } + } + + return nil +} diff --git a/pkg/reconciler/stateful_reconciler.go b/pkg/reconciler/stateful_reconciler.go new file mode 100644 index 00000000..f9605ce4 --- /dev/null +++ b/pkg/reconciler/stateful_reconciler.go @@ -0,0 +1,200 @@ +// Copyright 2025 StreamNative +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package reconciler + +import ( + "context" + "encoding/json" + "reflect" + + "github.com/go-logr/logr" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" +) + +// StatefulResource represents a Kubernetes resource that needs stateful reconciliation +type StatefulResource interface { + runtime.Object + metav1.Object +} + +// StatefulReconciler provides a framework for managing resources that need to track +// their previous state to perform proper cleanup operations during reconciliation. +// It uses annotations to store the previous state and compares it with the current +// desired state to determine what changes need to be made. +type StatefulReconciler[T StatefulResource] interface { + // GetStateAnnotationKey returns the annotation key used to store the previous state + GetStateAnnotationKey() string + + // ExtractCurrentState extracts the current desired state from the resource spec + // that needs to be compared with the previous state for change detection + ExtractCurrentState(resource T) (interface{}, error) + + // CompareStates compares the previous state with the current state and returns + // the operations that need to be performed to reconcile the differences + CompareStates(previous, current interface{}) (StateChangeOperations, error) + + // ApplyOperations applies the state change operations to the external system + ApplyOperations(ctx context.Context, resource T, ops StateChangeOperations) error + + // UpdateStateAnnotation updates the annotation with the current state after + // successful reconciliation + UpdateStateAnnotation(resource T, currentState interface{}) error +} + +// StateChangeOperations represents the operations needed to reconcile state changes +type StateChangeOperations struct { + // ItemsToAdd represents items that need to be added to the external system + ItemsToAdd []interface{} `json:"itemsToAdd,omitempty"` + + // ItemsToRemove represents items that need to be removed from the external system + ItemsToRemove []interface{} `json:"itemsToRemove,omitempty"` + + // ItemsToUpdate represents items that need to be updated in the external system + ItemsToUpdate []interface{} `json:"itemsToUpdate,omitempty"` + + // ContextChanged indicates that the context (like resource type or name) has changed + // and requires special handling like cleanup of old context + ContextChanged bool `json:"contextChanged,omitempty"` + + // PreviousContext contains the previous context information for cleanup + PreviousContext interface{} `json:"previousContext,omitempty"` +} + +// BaseStatefulReconciler provides a base implementation of StatefulReconciler +// that handles the common annotation-based state tracking logic +type BaseStatefulReconciler[T StatefulResource] struct { + Logger logr.Logger +} + +// NewBaseStatefulReconciler creates a new BaseStatefulReconciler instance +func NewBaseStatefulReconciler[T StatefulResource](logger logr.Logger) *BaseStatefulReconciler[T] { + return &BaseStatefulReconciler[T]{ + Logger: logger, + } +} + +// Reconcile performs the complete stateful reconciliation workflow: +// 1. Extract current desired state from resource +// 2. Retrieve previous state from annotation +// 3. Compare states to determine required operations +// 4. Apply operations to external system +// 5. Update state annotation on success +func (r *BaseStatefulReconciler[T]) Reconcile(ctx context.Context, resource T, impl StatefulReconciler[T]) error { + // Extract current desired state + currentState, err := impl.ExtractCurrentState(resource) + if err != nil { + r.Logger.Error(err, "Failed to extract current state from resource") + return err + } + + // Get previous state from annotation + previousState, err := r.getPreviousState(resource, impl.GetStateAnnotationKey()) + if err != nil { + r.Logger.Error(err, "Failed to get previous state from annotation") + return err + } + + // Compare states to determine operations + ops, err := impl.CompareStates(previousState, currentState) + if err != nil { + r.Logger.Error(err, "Failed to compare states") + return err + } + + // Log the operations that will be performed + if len(ops.ItemsToAdd) > 0 || len(ops.ItemsToRemove) > 0 || len(ops.ItemsToUpdate) > 0 || ops.ContextChanged { + r.Logger.Info("State changes detected, applying operations", + "itemsToAdd", len(ops.ItemsToAdd), + "itemsToRemove", len(ops.ItemsToRemove), + "itemsToUpdate", len(ops.ItemsToUpdate), + "contextChanged", ops.ContextChanged) + } else { + r.Logger.V(1).Info("No state changes detected, skipping operations") + return nil + } + + // Apply operations to external system + if err := impl.ApplyOperations(ctx, resource, ops); err != nil { + r.Logger.Error(err, "Failed to apply state change operations") + return err + } + + // Update state annotation after successful operations + if err := impl.UpdateStateAnnotation(resource, currentState); err != nil { + r.Logger.Error(err, "Failed to update state annotation") + return err + } + + r.Logger.Info("Successfully completed stateful reconciliation") + return nil +} + +// getPreviousState retrieves the previous state from the resource annotation +func (r *BaseStatefulReconciler[T]) getPreviousState(resource T, annotationKey string) (interface{}, error) { + annotations := resource.GetAnnotations() + if annotations == nil { + r.Logger.V(1).Info("No annotations found, treating as first reconciliation") + return nil, nil + } + + stateJSON, exists := annotations[annotationKey] + if !exists { + r.Logger.V(1).Info("No previous state annotation found, treating as first reconciliation") + return nil, nil + } + + // Try to unmarshal as generic interface{} + var previousState interface{} + if err := json.Unmarshal([]byte(stateJSON), &previousState); err != nil { + r.Logger.Error(err, "Failed to unmarshal previous state annotation, treating as first reconciliation", + "annotation", stateJSON) + return nil, nil + } + + return previousState, nil +} + +// UpdateAnnotation is a helper method to update the state annotation +func (r *BaseStatefulReconciler[T]) UpdateAnnotation(resource T, annotationKey string, state interface{}) error { + stateJSON, err := json.Marshal(state) + if err != nil { + return err + } + + annotations := resource.GetAnnotations() + if annotations == nil { + annotations = make(map[string]string) + resource.SetAnnotations(annotations) + } + + // Only update if the value has changed + currentValue := annotations[annotationKey] + newValue := string(stateJSON) + + if currentValue != newValue { + annotations[annotationKey] = newValue + r.Logger.V(1).Info("Updated state annotation", "key", annotationKey) + return nil + } + + r.Logger.V(2).Info("State annotation unchanged, skipping update", "key", annotationKey) + return nil +} + +// HasStateChanged is a utility method to check if two states are different +func (r *BaseStatefulReconciler[T]) HasStateChanged(previous, current interface{}) bool { + return !reflect.DeepEqual(previous, current) +} diff --git a/tests/operator/resources_test.go b/tests/operator/resources_test.go index cd8b75fd..08c0b2db 100644 --- a/tests/operator/resources_test.go +++ b/tests/operator/resources_test.go @@ -16,11 +16,14 @@ package operator_test import ( "context" + "encoding/json" "fmt" + "slices" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" "github.com/onsi/gomega/format" + "github.com/streamnative/pulsar-resources-operator/pkg/connection" "github.com/streamnative/pulsar-resources-operator/pkg/feature" v1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" @@ -38,6 +41,69 @@ type testJSON struct { Name string `json:"name"` } +// validatePermissionStateAnnotation validates the PulsarPermission state annotation +func validatePermissionStateAnnotation(ctx context.Context, permissionName, namespaceName string, expected connection.PulsarPermissionState) { + permission := &v1alphav1.PulsarPermission{} + tns := types.NamespacedName{Namespace: namespaceName, Name: permissionName} + Expect(k8sClient.Get(ctx, tns, permission)).Should(Succeed()) + + annotations := permission.GetAnnotations() + Expect(annotations).ShouldNot(BeNil(), "Permission should have annotations") + + stateJSON, exists := annotations[connection.PulsarPermissionStateAnnotation] + Expect(exists).Should(BeTrue(), "Permission should have state annotation") + Expect(stateJSON).ShouldNot(BeEmpty(), "State annotation should not be empty") + + var actualState connection.PulsarPermissionState + err := json.Unmarshal([]byte(stateJSON), &actualState) + Expect(err).ShouldNot(HaveOccurred(), "State annotation should be valid JSON") + + // Sort roles for consistent comparison + expectedRoles := make([]string, len(expected.Roles)) + copy(expectedRoles, expected.Roles) + slices.Sort(expectedRoles) + + actualRoles := make([]string, len(actualState.Roles)) + copy(actualRoles, actualState.Roles) + slices.Sort(actualRoles) + + // Sort actions for consistent comparison + expectedActions := make([]string, len(expected.Actions)) + copy(expectedActions, expected.Actions) + slices.Sort(expectedActions) + + actualActions := make([]string, len(actualState.Actions)) + copy(actualActions, actualState.Actions) + slices.Sort(actualActions) + + Expect(actualState.ResourceType).Should(Equal(expected.ResourceType), + "ResourceType should match expected value") + Expect(actualState.ResourceName).Should(Equal(expected.ResourceName), + "ResourceName should match expected value") + Expect(actualRoles).Should(Equal(expectedRoles), + "Roles should match expected values") + Expect(actualActions).Should(Equal(expectedActions), + "Actions should match expected values") +} + +// validateTopicPermissionsContain validates that the topic contains the specified roles +// but allows for additional roles to exist (for multi-permission testing) +func validateTopicPermissionsContain(topicName string, expectedRoles []string) { + Eventually(func(g Gomega) { + podName := fmt.Sprintf("%s-broker-0", brokerName) + containerName := fmt.Sprintf("%s-broker", brokerName) + stdout, _, err := utils.ExecInPod(k8sConfig, namespaceName, podName, containerName, + "./bin/pulsar-admin topics permissions "+topicName) + g.Expect(err).Should(Succeed()) + g.Expect(stdout).Should(Not(BeEmpty())) + + // Verify all expected roles exist + for _, role := range expectedRoles { + g.Expect(stdout).Should(ContainSubstring(role), "Topic should contain role: "+role) + } + }, "20s", "100ms").Should(Succeed()) +} + var _ = Describe("Resources", func() { var ( @@ -59,6 +125,10 @@ var _ = Describe("Resources", func() { topicName2 string = "persistent://cloud/stage/user2" ppermission *v1alphav1.PulsarPermission ppermissionName string = "test-permission" + ppermission2 *v1alphav1.PulsarPermission + ppermission2Name string = "test-permission-2" + ppermission3 *v1alphav1.PulsarPermission + ppermission3Name string = "test-permission-3" exampleSchemaDef = "{\"type\":\"record\",\"name\":\"Example\",\"namespace\":\"test\"," + "\"fields\":[{\"name\":\"ID\",\"type\":\"int\"},{\"name\":\"Name\",\"type\":\"string\"}]}" partitionedTopic = &v1alphav1.PulsarTopic{ @@ -109,6 +179,15 @@ var _ = Describe("Resources", func() { roles := []string{"ironman"} actions := []string{"produce", "consume", "functions"} ppermission = utils.MakePulsarPermission(namespaceName, ppermissionName, topicName, pconnName, v1alphav1.PulsarResourceTypeTopic, roles, actions, v1alphav1.CleanUpAfterDeletion) + + // Additional permissions for multi-permission testing + roles2 := []string{"batman"} + actions2 := []string{"produce", "consume"} + ppermission2 = utils.MakePulsarPermission(namespaceName, ppermission2Name, topicName, pconnName, v1alphav1.PulsarResourceTypeTopic, roles2, actions2, v1alphav1.CleanUpAfterDeletion) + + roles3 := []string{"superman"} + actions3 := []string{"functions"} + ppermission3 = utils.MakePulsarPermission(namespaceName, ppermission3Name, topicName, pconnName, v1alphav1.PulsarResourceTypeTopic, roles3, actions3, v1alphav1.CleanUpAfterDeletion) ppackage = utils.MakePulsarPackage(namespaceName, pfuncName, ppackageurl, pconnName, lifecyclePolicy) pfunc = utils.MakePulsarFunction(namespaceName, pfuncName, ppackageurl, pconnName, lifecyclePolicy) pfuncfailure = utils.MakePulsarFunction(namespaceName, pfuncFailureName, "function://not/exists/package@latest", pconnName, lifecyclePolicy) @@ -306,6 +385,20 @@ var _ = Describe("Resources", func() { }, "20s", "100ms").Should(BeTrue()) }) + It("should have correct initial state annotation", func() { + Eventually(func() error { + expectedState := connection.PulsarPermissionState{ + ResourceType: "topic", + ResourceName: topicName, + Roles: []string{"ironman"}, + Actions: []string{"produce", "consume", "functions"}, + } + + validatePermissionStateAnnotation(ctx, ppermissionName, namespaceName, expectedState) + return nil + }, "20s", "100ms").Should(Succeed()) + }) + It("should add a new role", func() { t := &v1alphav1.PulsarPermission{} tns := types.NamespacedName{Namespace: namespaceName, Name: ppermissionName} @@ -337,6 +430,20 @@ var _ = Describe("Resources", func() { }, "20s", "100ms").Should(Succeed()) }) + It("should have updated state annotation with both roles", func() { + Eventually(func() error { + expectedState := connection.PulsarPermissionState{ + ResourceType: "topic", + ResourceName: topicName, + Roles: []string{"ironman", "spiderman"}, + Actions: []string{"produce", "consume", "functions"}, + } + + validatePermissionStateAnnotation(ctx, ppermissionName, namespaceName, expectedState) + return nil + }, "20s", "100ms").Should(Succeed()) + }) + It("should delete a role", func() { t := &v1alphav1.PulsarPermission{} tns := types.NamespacedName{Namespace: namespaceName, Name: ppermissionName} @@ -368,6 +475,169 @@ var _ = Describe("Resources", func() { g.Expect(stdout).Should(Not(ContainSubstring("spiderman"))) }, "20s", "100ms").Should(Succeed()) }) + + It("should have updated state annotation with only ironman role", func() { + Eventually(func() error { + expectedState := connection.PulsarPermissionState{ + ResourceType: "topic", + ResourceName: topicName, + Roles: []string{"ironman"}, + Actions: []string{"produce", "consume", "functions"}, + } + + validatePermissionStateAnnotation(ctx, ppermissionName, namespaceName, expectedState) + return nil + }, "20s", "100ms").Should(Succeed()) + }) + }) + + Context("Multiple PulsarPermissions on same Topic", Label("Permissions"), func() { + It("should create multiple pulsarpermissions successfully", func() { + err := k8sClient.Create(ctx, ppermission2) + Expect(err == nil || apierrors.IsAlreadyExists(err)).Should(BeTrue()) + err = k8sClient.Create(ctx, ppermission3) + Expect(err == nil || apierrors.IsAlreadyExists(err)).Should(BeTrue()) + }) + + It("both permissions should be ready", func() { + Eventually(func() bool { + perm2 := &v1alphav1.PulsarPermission{} + tns2 := types.NamespacedName{Namespace: namespaceName, Name: ppermission2Name} + Expect(k8sClient.Get(ctx, tns2, perm2)).Should(Succeed()) + return v1alphav1.IsPulsarResourceReady(perm2) + }, "20s", "100ms").Should(BeTrue()) + + Eventually(func() bool { + perm3 := &v1alphav1.PulsarPermission{} + tns3 := types.NamespacedName{Namespace: namespaceName, Name: ppermission3Name} + Expect(k8sClient.Get(ctx, tns3, perm3)).Should(Succeed()) + return v1alphav1.IsPulsarResourceReady(perm3) + }, "20s", "100ms").Should(BeTrue()) + }) + + It("should have correct initial state annotations", func() { + Eventually(func() error { + expectedState2 := connection.PulsarPermissionState{ + ResourceType: "topic", + ResourceName: topicName, + Roles: []string{"batman"}, + Actions: []string{"produce", "consume"}, + } + validatePermissionStateAnnotation(ctx, ppermission2Name, namespaceName, expectedState2) + return nil + }, "20s", "100ms").Should(Succeed()) + + Eventually(func() error { + expectedState3 := connection.PulsarPermissionState{ + ResourceType: "topic", + ResourceName: topicName, + Roles: []string{"superman"}, + Actions: []string{"functions"}, + } + validatePermissionStateAnnotation(ctx, ppermission3Name, namespaceName, expectedState3) + return nil + }, "20s", "100ms").Should(Succeed()) + }) + + It("topic should contain all roles from both permissions", func() { + validateTopicPermissionsContain(topicName, []string{"batman", "superman"}) + }) + + It("should modify first permission by adding wonderwoman", func() { + perm2 := &v1alphav1.PulsarPermission{} + tns2 := types.NamespacedName{Namespace: namespaceName, Name: ppermission2Name} + Expect(k8sClient.Get(ctx, tns2, perm2)).Should(Succeed()) + perm2.Spec.Roles = append(perm2.Spec.Roles, "wonderwoman") + err := k8sClient.Update(ctx, perm2) + Expect(err).Should(Succeed()) + }) + + It("first permission should be ready with updated roles", func() { + Eventually(func() bool { + perm2 := &v1alphav1.PulsarPermission{} + tns2 := types.NamespacedName{Namespace: namespaceName, Name: ppermission2Name} + Expect(k8sClient.Get(ctx, tns2, perm2)).Should(Succeed()) + return v1alphav1.IsPulsarResourceReady(perm2) + }, "20s", "100ms").Should(BeTrue()) + }) + + It("first permission should have updated state annotation", func() { + Eventually(func() error { + expectedState2 := connection.PulsarPermissionState{ + ResourceType: "topic", + ResourceName: topicName, + Roles: []string{"batman", "wonderwoman"}, + Actions: []string{"produce", "consume"}, + } + validatePermissionStateAnnotation(ctx, ppermission2Name, namespaceName, expectedState2) + return nil + }, "20s", "100ms").Should(Succeed()) + }) + + It("second permission should remain unaffected", func() { + Eventually(func() error { + expectedState3 := connection.PulsarPermissionState{ + ResourceType: "topic", + ResourceName: topicName, + Roles: []string{"superman"}, + Actions: []string{"functions"}, + } + validatePermissionStateAnnotation(ctx, ppermission3Name, namespaceName, expectedState3) + return nil + }, "20s", "100ms").Should(Succeed()) + }) + + It("topic should contain all roles after first permission modification", func() { + validateTopicPermissionsContain(topicName, []string{"batman", "wonderwoman", "superman"}) + }) + + It("should modify second permission actions", func() { + perm3 := &v1alphav1.PulsarPermission{} + tns3 := types.NamespacedName{Namespace: namespaceName, Name: ppermission3Name} + Expect(k8sClient.Get(ctx, tns3, perm3)).Should(Succeed()) + perm3.Spec.Actions = []string{"produce"} + err := k8sClient.Update(ctx, perm3) + Expect(err).Should(Succeed()) + }) + + It("second permission should be ready with updated actions", func() { + Eventually(func() bool { + perm3 := &v1alphav1.PulsarPermission{} + tns3 := types.NamespacedName{Namespace: namespaceName, Name: ppermission3Name} + Expect(k8sClient.Get(ctx, tns3, perm3)).Should(Succeed()) + return v1alphav1.IsPulsarResourceReady(perm3) + }, "20s", "100ms").Should(BeTrue()) + }) + + It("second permission should have updated state annotation", func() { + Eventually(func() error { + expectedState3 := connection.PulsarPermissionState{ + ResourceType: "topic", + ResourceName: topicName, + Roles: []string{"superman"}, + Actions: []string{"produce"}, + } + validatePermissionStateAnnotation(ctx, ppermission3Name, namespaceName, expectedState3) + return nil + }, "20s", "100ms").Should(Succeed()) + }) + + It("first permission should remain unaffected by second permission changes", func() { + Eventually(func() error { + expectedState2 := connection.PulsarPermissionState{ + ResourceType: "topic", + ResourceName: topicName, + Roles: []string{"batman", "wonderwoman"}, + Actions: []string{"produce", "consume"}, + } + validatePermissionStateAnnotation(ctx, ppermission2Name, namespaceName, expectedState2) + return nil + }, "20s", "100ms").Should(Succeed()) + }) + + It("topic should still contain all roles after all modifications", func() { + validateTopicPermissionsContain(topicName, []string{"batman", "wonderwoman", "superman"}) + }) }) Context("PulsarFunction & PulsarPackage operation", func() { @@ -618,6 +888,20 @@ var _ = Describe("Resources", func() { g.Expect(k8sClient.Get(ctx, tns, perm)).Should(Succeed()) g.Expect(k8sClient.Delete(ctx, perm)).Should(Succeed()) }).Should(Succeed()) + + Eventually(func(g Gomega) { + perm2 := &v1alphav1.PulsarPermission{} + tns2 := types.NamespacedName{Namespace: namespaceName, Name: ppermission2Name} + g.Expect(k8sClient.Get(ctx, tns2, perm2)).Should(Succeed()) + g.Expect(k8sClient.Delete(ctx, perm2)).Should(Succeed()) + }).Should(Succeed()) + + Eventually(func(g Gomega) { + perm3 := &v1alphav1.PulsarPermission{} + tns3 := types.NamespacedName{Namespace: namespaceName, Name: ppermission3Name} + g.Expect(k8sClient.Get(ctx, tns3, perm3)).Should(Succeed()) + g.Expect(k8sClient.Delete(ctx, perm3)).Should(Succeed()) + }).Should(Succeed()) }) })