Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
217 changes: 200 additions & 17 deletions pkg/connection/reconcile_permission.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package connection

import (
"context"
"encoding/json"
"fmt"
"slices"

Expand Down Expand Up @@ -145,36 +146,75 @@ func (r *PulsarPermissionReconciler) ReconcilePermission(ctx context.Context, pu
return err
}

currentRoles := []string{}
// Extract current desired state
currentState := r.extractCurrentState(permission)

// Get previous state from annotation
previousState := r.getPreviousState(permission)

// Check for context changes (ResourceType or ResourceName)
contextChanged := false
if previousState != nil {
contextChanged = previousState.ResourceType != currentState.ResourceType ||
previousState.ResourceName != currentState.ResourceName
}

if contextChanged {
log.Info("Context change detected, cleaning up previous permissions",
"previousResourceType", previousState.ResourceType,
"currentResourceType", currentState.ResourceType,
"previousResourceName", previousState.ResourceName,
"currentResourceName", currentState.ResourceName)

// Clean up previous context
if err := r.cleanupPreviousContext(permission, *previousState); err != nil {
log.Error(err, "Failed to cleanup previous context, continuing with current operations")
}
}

// Determine roles to manage
var previouslyManagedRoles []string
if previousState != nil && !contextChanged {
previouslyManagedRoles = previousState.Roles
}

currentRoles := make([]string, 0, len(currentPermissions))
incomingRoles := permission.Spec.Roles

for role := range currentPermissions {
currentRoles = append(currentRoles, role)
}

// revoking roles
for _, role := range currentRoles {
if !slices.Contains(incomingRoles, role) {
permission.Spec.Roles = []string{role}
per := GetPermissioner(permission)
if err := pulsarAdmin.RevokePermissions(per); err != nil {
log.Error(err, "Revoke permission failed")
meta.SetStatusCondition(&permission.Status.Conditions, *NewErrorCondition(permission.Generation, err.Error()))
if err := r.conn.client.Status().Update(ctx, permission); err != nil {
log.Error(err, "Failed to update permission status")
return err
}
// Only revoke roles that were previously managed by this PulsarPermission resource
// This prevents conflicts with other PulsarPermission resources managing the same target
for _, role := range previouslyManagedRoles {
// If this role is still in incoming roles OR doesn't exist currently, skip it
if slices.Contains(incomingRoles, role) || !slices.Contains(currentRoles, role) {
continue
}

log.Info("Revoking previously managed role", "role", role)
tempPermission := permission.DeepCopy()
tempPermission.Spec.Roles = []string{role}
per := GetPermissioner(tempPermission)
if err := pulsarAdmin.RevokePermissions(per); err != nil {
log.Error(err, "Revoke permission failed", "role", role)
meta.SetStatusCondition(&permission.Status.Conditions, *NewErrorCondition(permission.Generation, err.Error()))
if err := r.conn.client.Status().Update(ctx, permission); err != nil {
log.Error(err, "Failed to update permission status")
return err
}
return err
}
}

// granting roles
// Grant permissions for all incoming roles
for _, role := range incomingRoles {
permission.Spec.Roles = []string{role}
per := GetPermissioner(permission)
tempPermission := permission.DeepCopy()
tempPermission.Spec.Roles = []string{role}
per := GetPermissioner(tempPermission)
if err := pulsarAdmin.GrantPermissions(per); err != nil {
log.Error(err, "Grant permission failed")
log.Error(err, "Grant permission failed", "role", role)
meta.SetStatusCondition(&permission.Status.Conditions, *NewErrorCondition(permission.Generation, err.Error()))
if err := r.conn.client.Status().Update(ctx, permission); err != nil {
log.Error(err, "Failed to update permission status")
Expand All @@ -184,6 +224,18 @@ func (r *PulsarPermissionReconciler) ReconcilePermission(ctx context.Context, pu
}
}

// Update the state annotation
if err := r.updateStateAnnotation(permission, currentState); err != nil {
log.Error(err, "Failed to update state annotation")
return err
}

// Update the resource with new annotations
if err := r.conn.client.Update(ctx, permission); err != nil {
log.Error(err, "Failed to update permission annotations")
return err
}

permission.Status.ObservedGeneration = permission.Generation
meta.SetStatusCondition(&permission.Status.Conditions, *NewReadyCondition(permission.Generation))
if err := r.conn.client.Status().Update(ctx, permission); err != nil {
Expand Down Expand Up @@ -216,3 +268,134 @@ func GetPermissioner(p *resourcev1alpha1.PulsarPermission) admin.Permissioner {
}
return nil
}

const (
// PulsarPermissionStateAnnotation is the annotation key used to store the previous state
// of PulsarPermission resources for stateful reconciliation
PulsarPermissionStateAnnotation = "pulsarpermissions.resource.streamnative.io/managed-state"
)

// PulsarPermissionState represents the state that needs to be tracked for PulsarPermission resources
type PulsarPermissionState struct {
ResourceType string `json:"resourceType"`
ResourceName string `json:"resourceName"`
Roles []string `json:"roles"`
Actions []string `json:"actions"`
}

// extractCurrentState extracts the current desired state from the PulsarPermission spec
func (r *PulsarPermissionReconciler) extractCurrentState(permission *resourcev1alpha1.PulsarPermission) PulsarPermissionState {
// Sort roles and actions for consistent comparison
roles := make([]string, len(permission.Spec.Roles))
copy(roles, permission.Spec.Roles)
slices.Sort(roles)

actions := make([]string, len(permission.Spec.Actions))
copy(actions, permission.Spec.Actions)
slices.Sort(actions)

return PulsarPermissionState{
ResourceType: string(permission.Spec.ResoureType),
ResourceName: permission.Spec.ResourceName,
Roles: roles,
Actions: actions,
}
}

// getPreviousState retrieves the previous state from the resource annotation
func (r *PulsarPermissionReconciler) getPreviousState(permission *resourcev1alpha1.PulsarPermission) *PulsarPermissionState {
annotations := permission.GetAnnotations()
if annotations == nil {
r.log.V(1).Info("No annotations found, treating as first reconciliation")
return nil
}

stateJSON, exists := annotations[PulsarPermissionStateAnnotation]
if !exists {
r.log.V(1).Info("No previous state annotation found, treating as first reconciliation")
return nil
}

// Try to unmarshal as PulsarPermissionState
var previousState PulsarPermissionState
if err := json.Unmarshal([]byte(stateJSON), &previousState); err != nil {
r.log.Error(err, "Failed to unmarshal previous state annotation, treating as first reconciliation",
"annotation", stateJSON)
return nil
}

return &previousState
}

// updateStateAnnotation updates the annotation with the current state after successful reconciliation
func (r *PulsarPermissionReconciler) updateStateAnnotation(permission *resourcev1alpha1.PulsarPermission, currentState PulsarPermissionState) error {
stateJSON, err := json.Marshal(currentState)
if err != nil {
return err
}

annotations := permission.GetAnnotations()
if annotations == nil {
annotations = make(map[string]string)
permission.SetAnnotations(annotations)
}

// Only update if the value has changed
currentValue := annotations[PulsarPermissionStateAnnotation]
newValue := string(stateJSON)

if currentValue != newValue {
annotations[PulsarPermissionStateAnnotation] = newValue
r.log.V(1).Info("Updated state annotation", "state", currentState)
}

return nil
}

// cleanupPreviousContext cleans up permissions from the previous resource context
func (r *PulsarPermissionReconciler) cleanupPreviousContext(permission *resourcev1alpha1.PulsarPermission, prevState PulsarPermissionState) error {
if len(prevState.Roles) == 0 {
return nil
}

r.log.Info("Cleaning up permissions from previous context",
"previousResourceType", prevState.ResourceType,
"previousResourceName", prevState.ResourceName,
"rolesToCleanup", prevState.Roles)

// Create a temporary permission resource for the previous context
tempPermission := permission.DeepCopy()
tempPermission.Spec.ResourceName = prevState.ResourceName
tempPermission.Spec.ResoureType = resourcev1alpha1.PulsarResourceType(prevState.ResourceType)
tempPermission.Spec.Roles = prevState.Roles
tempPermission.Spec.Actions = prevState.Actions

// Get permissioner for the previous context
permissioner := GetPermissioner(tempPermission)
if permissioner == nil {
return fmt.Errorf("failed to get permissioner for previous context")
}

// Get the pulsar admin instance
pulsarAdmin := r.conn.pulsarAdmin

// Revoke all roles from the previous context
for _, role := range prevState.Roles {
r.log.Info("Revoking permission from previous context", "role", role)

// Create a temporary permission for this specific role
rolePermission := tempPermission.DeepCopy()
rolePermission.Spec.Roles = []string{role}
rolePermissioner := GetPermissioner(rolePermission)

if err := pulsarAdmin.RevokePermissions(rolePermissioner); err != nil {
r.log.Error(err, "Failed to revoke permission from previous context, continuing",
"role", role,
"previousResourceType", prevState.ResourceType,
"previousResourceName", prevState.ResourceName)
// Continue with other roles even if one fails
}
}

return nil
}
Loading