diff --git a/.licenserc.yaml b/.licenserc.yaml index 07f03d00..46b56079 100644 --- a/.licenserc.yaml +++ b/.licenserc.yaml @@ -38,5 +38,6 @@ header: - '**/README.md.gotmpl' - '**/*.json' - '**/.helmignore' + - 'testbin/**' comment: on-failure diff --git a/config/crd/bases/resource.streamnative.io_apikeys.yaml b/config/crd/bases/resource.streamnative.io_apikeys.yaml index 90ec81fe..229c1597 100644 --- a/config/crd/bases/resource.streamnative.io_apikeys.yaml +++ b/config/crd/bases/resource.streamnative.io_apikeys.yaml @@ -97,6 +97,10 @@ spec: This can only be set on initial creation and not updated later format: date-time type: string + exportPlaintextToken: + description: ExportPlaintextToken indicates whether the token should + be exported in plaintext + type: boolean instanceName: description: InstanceName is the name of the instance this API key is for diff --git a/config/crd/bases/resource.streamnative.io_pulsarconnections.yaml b/config/crd/bases/resource.streamnative.io_pulsarconnections.yaml index e99ef70c..e9864203 100644 --- a/config/crd/bases/resource.streamnative.io_pulsarconnections.yaml +++ b/config/crd/bases/resource.streamnative.io_pulsarconnections.yaml @@ -224,15 +224,18 @@ spec: ClusterName specifies the name of the local Pulsar cluster. When setting up Geo-Replication between Pulsar instances, this should be enabled to identify the cluster. type: string + tlsAllowInsecureConnection: + description: TLSAllowInsecureConnection indicates whether to allow + insecure connection to the broker. + type: boolean tlsEnableHostnameVerification: - description: TLSEnableHostnameVerification indicates whether to verify the hostname of the broker. + description: |- + TLSEnableHostnameVerification indicates whether to verify the hostname of the broker. Only used when using secure urls. type: boolean - tlsAllowInsecureConnection: - description: TLSAllowInsecureConnection indicates whether to allow insecure connection to the broker. - type: boolean tlsTrustCertsFilePath: - description: TLSTrustCertsFilePath Path for the TLS certificate used to validate the broker endpoint when using TLS. + description: TLSTrustCertsFilePath Path for the TLS certificate used + to validate the broker endpoint when using TLS. type: string type: object status: diff --git a/controllers/apikey_controller.go b/controllers/apikey_controller.go index f7c18334..3e098f44 100644 --- a/controllers/apikey_controller.go +++ b/controllers/apikey_controller.go @@ -18,7 +18,6 @@ import ( "context" "crypto/rsa" "fmt" - "sync" "time" "github.com/lestrrat-go/jwx/v2/jwa" @@ -33,7 +32,6 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/watch" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/log" @@ -47,10 +45,6 @@ type APIKeyReconciler struct { client.Client Scheme *runtime.Scheme ConnectionManager *ConnectionManager - // watcherMap stores active watchers for APIKeys - watcherMap map[types.NamespacedName]watch.Interface - // watcherMutex protects watcherMap - watcherMutex sync.RWMutex } const APIKeyFinalizer = "apikey.resource.streamnative.io/finalizer" @@ -61,115 +55,6 @@ const APIKeyFinalizer = "apikey.resource.streamnative.io/finalizer" //+kubebuilder:rbac:groups=resource.streamnative.io,resources=streamnativecloudconnections,verbs=get;list;watch //+kubebuilder:rbac:groups="",resources=secrets,verbs=get;list;watch;create;update;patch;delete -// handleWatchEvents processes events from the watch interface -func (r *APIKeyReconciler) handleWatchEvents(ctx context.Context, namespacedName types.NamespacedName, watcher watch.Interface) { - logger := log.FromContext(ctx) - defer watcher.Stop() - - for { - select { - case <-ctx.Done(): - return - case event, ok := <-watcher.ResultChan(): - if !ok { - logger.Info("Watch channel closed", "namespace", namespacedName.Namespace, "name", namespacedName.Name) - // Remove the watcher from the map - r.watcherMutex.Lock() - delete(r.watcherMap, namespacedName) - r.watcherMutex.Unlock() - return - } - - logger.Info("Received watch event", - "apiKey", namespacedName.Name, - "eventType", event.Type) - - if event.Type == watch.Modified { - // Check if the object is an APIKey - cloudAPIKey, ok := event.Object.(*cloudapi.APIKey) - if !ok { - logger.Error(fmt.Errorf("unexpected object type"), "Failed to convert object to APIKey") - continue - } - - // Log token and encryption status - hasEncryptedToken := cloudAPIKey.Status.EncryptedToken != nil - hasJWE := hasEncryptedToken && cloudAPIKey.Status.EncryptedToken.JWE != nil - logger.Info("Received APIKey update", - "apiKey", namespacedName.Name, - "hasEncryptedToken", hasEncryptedToken, - "hasJWE", hasJWE) - - // Get the local APIKey - localAPIKey := &resourcev1alpha1.APIKey{} - if err := r.Get(ctx, namespacedName, localAPIKey); err != nil { - logger.Error(err, "Failed to get local APIKey") - continue - } - - // Check remote status and handle abnormal situations - if isRemoteStatusAbnormal(cloudAPIKey) { - logger.Info("Detected abnormal remote APIKey status", - "apiKey", namespacedName.Name, - "hasEncryptedToken", hasEncryptedToken, - "hasJWE", hasJWE) - r.updateAPIKeyStatus(ctx, localAPIKey, - fmt.Errorf("remote API server returned incomplete or invalid status"), - "RemoteStatusAbnormal", - "Remote API server returned incomplete or invalid status for the APIKey") - continue - } - - if localAPIKey.Spec.ExportPlaintextToken != nil && *localAPIKey.Spec.ExportPlaintextToken { - // Process encrypted token and update Secret if needed - if cloudAPIKey.Status.EncryptedToken != nil && cloudAPIKey.Status.EncryptedToken.JWE != nil { - logger.Info("Found encrypted token in watch event, processing", "apiKey", namespacedName.Name) - r.processEncryptedToken(ctx, localAPIKey, cloudAPIKey) - } else { - logger.Info("No encrypted token found in watch event", "apiKey", namespacedName.Name) - // Update status to reflect that we're waiting for token - r.updateAPIKeyStatus(ctx, localAPIKey, nil, "WaitingForToken", - "Waiting for encrypted token from remote API server") - } - } else { - r.updateAPIKeyStatus(ctx, localAPIKey, nil, "Ready", "APIKey created successfully") - } - } - } - } -} - -// isRemoteStatusAbnormal checks if the remote API key status is incomplete or invalid -func isRemoteStatusAbnormal(cloudAPIKey *cloudapi.APIKey) bool { - // Consider status abnormal if: - // 1. Status is completely empty/nil (very unlikely but possible) - if cloudAPIKey == nil { - return true - } - - // 2. We expect some basic fields to be present in a normal response - // KeyID should be present in a successful API key - if cloudAPIKey.Status.KeyID == nil { - return true - } - - // 3. For an APIKey that's been processed by the server, we should have either: - // - An encrypted token - // - Or explicit conditions indicating why token creation failed - if cloudAPIKey.Status.EncryptedToken == nil && len(cloudAPIKey.Status.Conditions) == 0 { - return true - } - - // 4. If there are conditions, check if they indicate failure - for _, condition := range cloudAPIKey.Status.Conditions { - if condition.Type == "Ready" && condition.Status == metav1.ConditionFalse { - return true - } - } - - return false -} - // processEncryptedToken handles decrypting the token and storing it in a Secret func (r *APIKeyReconciler) processEncryptedToken(ctx context.Context, localAPIKey *resourcev1alpha1.APIKey, cloudAPIKey *cloudapi.APIKey) { logger := log.FromContext(ctx) @@ -214,9 +99,8 @@ func (r *APIKeyReconciler) processEncryptedToken(ctx context.Context, localAPIKe logger.Info("Successfully imported private key", "apiKey", localAPIKey.Name) // Decrypt token - jweToken := *cloudAPIKey.Status.EncryptedToken.JWE - logger.Info("Attempting to decrypt JWE token", "apiKey", localAPIKey.Name, "jweLength", len(jweToken)) - token, err := crypto.DecryptJWEToken(jweToken, privateKey) + logger.Info("Attempting to decrypt JWE token", "apiKey", localAPIKey.Name) + token, err := DecryptToken(privateKey, *cloudAPIKey.Status.EncryptedToken) if err != nil { logger.Error(err, "Failed to decrypt token") r.updateAPIKeyStatus(ctx, localAPIKey, @@ -248,37 +132,6 @@ func (r *APIKeyReconciler) processEncryptedToken(ctx context.Context, localAPIKe r.updateAPIKeyStatus(ctx, localAPIKey, nil, "Ready", "APIKey token successfully decrypted and stored in secret") } -// setupWatch creates a new watcher for an APIKey -func (r *APIKeyReconciler) setupWatch(ctx context.Context, apiKey *resourcev1alpha1.APIKey, apiKeyClient *controllers2.APIKeyClient) error { - namespacedName := types.NamespacedName{ - Namespace: apiKey.Namespace, - Name: apiKey.Name, - } - - // Check if we already have a watcher - r.watcherMutex.RLock() - _, exists := r.watcherMap[namespacedName] - r.watcherMutex.RUnlock() - if exists { - return nil - } - - // Create new watcher - watcher, err := apiKeyClient.WatchAPIKey(ctx, apiKey.Name) - if err != nil { - return fmt.Errorf("failed to create watcher: %w", err) - } - - // Store watcher in map - r.watcherMutex.Lock() - r.watcherMap[namespacedName] = watcher - r.watcherMutex.Unlock() - - // Start watching in a new goroutine - go r.handleWatchEvents(ctx, namespacedName, watcher) - return nil -} - // Reconcile handles the reconciliation of APIKey objects func (r *APIKeyReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { logger := log.FromContext(ctx) @@ -291,15 +144,10 @@ func (r *APIKeyReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr apiKey := &resourcev1alpha1.APIKey{} if err := r.Get(ctx, req.NamespacedName, apiKey); err != nil { if apierrors.IsNotFound(err) { - // Stop and remove watcher if it exists - r.watcherMutex.Lock() - if watcher, exists := r.watcherMap[req.NamespacedName]; exists { - watcher.Stop() - delete(r.watcherMap, req.NamespacedName) - } - r.watcherMutex.Unlock() + logger.Info("APIKey not found. Reconciliation will stop.", "namespace", req.Namespace, "name", req.Name) return ctrl.Result{}, nil } + // Other error fetching APIKey return ctrl.Result{}, err } @@ -317,7 +165,6 @@ func (r *APIKeyReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr // Get API connection apiConn, err := r.ConnectionManager.GetOrCreateConnection(connection, nil) if err != nil { - // If connection is not initialized, requeue the request if _, ok := err.(*NotInitializedError); ok { logger.Info("Connection not initialized, requeueing", "error", err.Error()) return ctrl.Result{Requeue: true}, nil @@ -346,19 +193,15 @@ func (r *APIKeyReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr // Handle deletion if !apiKey.DeletionTimestamp.IsZero() { if controllers2.ContainsString(apiKey.Finalizers, APIKeyFinalizer) { - // Try to delete remote APIKey if err := apiKeyClient.DeleteAPIKey(ctx, apiKey); err != nil { if !apierrors.IsNotFound(err) { r.updateAPIKeyStatus(ctx, apiKey, err, "DeleteFailed", fmt.Sprintf("Failed to delete external resources: %v", err)) return ctrl.Result{}, err } - // If the resource is already gone, that's fine logger.Info("Remote APIKey already deleted or not found", "apiKey", apiKey.Name) } - - // Remove finalizer after successful deletion apiKey.Finalizers = controllers2.RemoveString(apiKey.Finalizers, APIKeyFinalizer) if err := r.Update(ctx, apiKey); err != nil { return ctrl.Result{}, err @@ -367,7 +210,6 @@ func (r *APIKeyReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr return ctrl.Result{}, nil } - // Add finalizer if it doesn't exist if !controllers2.ContainsString(apiKey.Finalizers, APIKeyFinalizer) { apiKey.Finalizers = append(apiKey.Finalizers, APIKeyFinalizer) if err := r.Update(ctx, apiKey); err != nil { @@ -375,7 +217,6 @@ func (r *APIKeyReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr } } - // Check if ApiKey exists existingAPIKey, err := apiKeyClient.GetAPIKey(ctx, apiKey.Name) if err != nil { logger.Info("Failed to get APIKey", "error", err, "existingAPIKey", existingAPIKey) @@ -388,105 +229,158 @@ func (r *APIKeyReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr } if existingAPIKey == nil { - // Generate RSA key pair for token encryption - privateKey, err := crypto.GenerateRSAKeyPair() - if err != nil { - r.updateAPIKeyStatus(ctx, apiKey, err, "KeyGenerationFailed", - fmt.Sprintf("Failed to generate RSA key pair: %v", err)) - return ctrl.Result{}, err - } + if apiKey.Spec.EncryptionKey == nil || apiKey.Spec.EncryptionKey.PEM == "" { + // Generate RSA key pair for token encryption + privateKey, pKeyErr := crypto.GenerateRSAKeyPair() + if pKeyErr != nil { + r.updateAPIKeyStatus(ctx, apiKey, pKeyErr, "KeyGenerationFailed", + fmt.Sprintf("Failed to generate RSA key pair: %v", pKeyErr)) + return ctrl.Result{}, pKeyErr + } - // Export public key in PEM format - publicKeyPEM, err := crypto.ExportPublicKeyAsPEM(privateKey) - if err != nil { - r.updateAPIKeyStatus(ctx, apiKey, err, "KeyExportFailed", - fmt.Sprintf("Failed to export public key: %v", err)) - return ctrl.Result{}, err - } + publicKeyPEM, pubKeyErr := crypto.ExportPublicKeyAsPEM(privateKey) + if pubKeyErr != nil { + r.updateAPIKeyStatus(ctx, apiKey, pubKeyErr, "KeyExportFailed", + fmt.Sprintf("Failed to export public key: %v", pubKeyErr)) + return ctrl.Result{}, pubKeyErr + } - // Store private key in Secret - privateKeyPEM := crypto.ExportPrivateKeyAsPEM(privateKey) - if err := utils.CreateOrUpdatePrivateKeySecret(ctx, r.Client, apiKey, apiKey.Namespace, apiKey.Name, privateKeyPEM); err != nil { - r.updateAPIKeyStatus(ctx, apiKey, err, "SecretCreationFailed", - fmt.Sprintf("Failed to create private key secret: %v", err)) - return ctrl.Result{}, err - } + privateKeyPEM := crypto.ExportPrivateKeyAsPEM(privateKey) + if err := utils.CreateOrUpdatePrivateKeySecret(ctx, r.Client, apiKey, apiKey.Namespace, apiKey.Name, privateKeyPEM); err != nil { + r.updateAPIKeyStatus(ctx, apiKey, err, "SecretCreationFailed", + fmt.Sprintf("Failed to create private key secret: %v", err)) + return ctrl.Result{}, err + } - // Add encryption key to spec - if apiKey.Spec.EncryptionKey == nil { - apiKey.Spec.EncryptionKey = &resourcev1alpha1.EncryptionKey{} + if apiKey.Spec.EncryptionKey == nil { + apiKey.Spec.EncryptionKey = &resourcev1alpha1.EncryptionKey{} + } + apiKey.Spec.EncryptionKey.PEM = publicKeyPEM } - apiKey.Spec.EncryptionKey.PEM = publicKeyPEM - // Update APIKey with encryption key - if err := r.Update(ctx, apiKey); err != nil { + if err := r.Update(ctx, apiKey); err != nil { // Update spec with public key PEM + logger.Error(err, "Failed to update APIKey spec with public key PEM") return ctrl.Result{}, err } - // Create APIKey - resultAPIKey, err := apiKeyClient.CreateAPIKey(ctx, apiKey) - if err != nil { - r.updateAPIKeyStatus(ctx, apiKey, err, "CreateAPIKeyFailed", - fmt.Sprintf("Failed to create APIKey: %v", err)) - return ctrl.Result{}, err + createdAPIKey, createErr := apiKeyClient.CreateAPIKey(ctx, apiKey) + if createErr != nil { + r.updateAPIKeyStatus(ctx, apiKey, createErr, "CreateAPIKeyFailed", + fmt.Sprintf("Failed to create APIKey: %v", createErr)) + return ctrl.Result{}, createErr } + logger.Info("Successfully created remote APIKey", "apiKeyName", apiKey.Name) + + r.syncCloudStatusToLocal(apiKey, createdAPIKey) if apiKey.Spec.ExportPlaintextToken != nil && *apiKey.Spec.ExportPlaintextToken { - // Update status with token information - if resultAPIKey.Status.EncryptedToken != nil && resultAPIKey.Status.EncryptedToken.JWE != nil { - // Process the encrypted token - r.processEncryptedToken(ctx, apiKey, resultAPIKey) + if createdAPIKey.Status.EncryptedToken != nil && createdAPIKey.Status.EncryptedToken.JWE != nil { + r.processEncryptedToken(ctx, apiKey, createdAPIKey) } else { r.updateAPIKeyStatus(ctx, apiKey, nil, "WaitingForToken", "Waiting for encrypted token from remote API server") } } else { - // Update status with token information - r.updateAPIKeyStatus(ctx, apiKey, nil, "Ready", "APIKey created successfully") + r.updateAPIKeyStatus(ctx, apiKey, nil, "Ready", "APIKey created successfully (no token export requested)") } - - // Set up watch for APIKey - if err := r.setupWatch(ctx, apiKey, apiKeyClient); err != nil { - logger.Error(err, "Failed to set up watch", "apiKey", apiKey.Name) - } - - // Update status - r.updateAPIKeyStatus(ctx, apiKey, nil, "Ready", "APIKey created successfully") + // The status might have been updated by processEncryptedToken or the else block above. + // We requeue to ensure the latest status is reflected and to handle any eventual consistency. return ctrl.Result{RequeueAfter: requeueInterval}, nil } - // Update revocation status if needed - if apiKey.Spec.Revoke != existingAPIKey.Spec.Revoke { - // Create a new local copy with updated values - updatedAPIKey := apiKey.DeepCopy() - if err := r.Update(ctx, updatedAPIKey); err != nil { - r.updateAPIKeyStatus(ctx, apiKey, err, "UpdateFailed", - fmt.Sprintf("Failed to update APIKey: %v", err)) - return ctrl.Result{}, err + r.syncCloudStatusToLocal(apiKey, existingAPIKey) + + // If ExportPlaintextToken is true, ensure token is processed + if apiKey.Spec.ExportPlaintextToken != nil && *apiKey.Spec.ExportPlaintextToken { + if existingAPIKey.Status.EncryptedToken != nil && existingAPIKey.Status.EncryptedToken.JWE != nil { + r.processEncryptedToken(ctx, apiKey, existingAPIKey) + } else { + r.updateAPIKeyStatus(ctx, apiKey, nil, "WaitingForToken", "Existing APIKey waiting for encrypted token from remote API server or token not available") } } - // Update description if needed + // Sync spec fields like Revoke and Description if they differ + // We compare the spec of the local apiKey CR with the spec of the fetched existingAPIKey from the cloud. + // If they differ, we send the local apiKey (which represents the desired state) to UpdateAPIKey. + needsRemoteUpdate := false + if apiKey.Spec.Revoke != existingAPIKey.Spec.Revoke { + logger.Info("Revoke status differs", "apiKeyName", apiKey.Name, "local", apiKey.Spec.Revoke, "remote", existingAPIKey.Spec.Revoke) + needsRemoteUpdate = true + } if apiKey.Spec.Description != existingAPIKey.Spec.Description { - // Create a new local copy with updated values - updatedAPIKey := apiKey.DeepCopy() - if err := r.Update(ctx, updatedAPIKey); err != nil { - r.updateAPIKeyStatus(ctx, apiKey, err, "UpdateFailed", - fmt.Sprintf("Failed to update APIKey: %v", err)) - return ctrl.Result{}, err - } + logger.Info("Description differs", "apiKeyName", apiKey.Name, "local", apiKey.Spec.Description, "remote", existingAPIKey.Spec.Description) + needsRemoteUpdate = true } - // Set up watch for APIKey - if err := r.setupWatch(ctx, apiKey, apiKeyClient); err != nil { - logger.Error(err, "Failed to set up watch", "apiKey", apiKey.Name) + if needsRemoteUpdate { + logger.Info("Updating remote APIKey spec due to detected differences", "apiKeyName", apiKey.Name) + // Pass the local apiKey CR, as UpdateAPIKey expects *resourcev1alpha1.APIKey + // It will be converted to cloudapi.APIKey by the client. + _, err := apiKeyClient.UpdateAPIKey(ctx, apiKey) + if err != nil { + r.updateAPIKeyStatus(ctx, apiKey, err, "UpdateFailed", fmt.Sprintf("Failed to update remote APIKey spec: %v", err)) + return ctrl.Result{}, err + } + logger.Info("Successfully updated remote APIKey spec", "apiKeyName", apiKey.Name) } - // Update status + // Always update local status to reflect observed generation and potentially new conditions from processing token. r.updateAPIKeyStatus(ctx, apiKey, nil, "Ready", "APIKey synced successfully") return ctrl.Result{RequeueAfter: requeueInterval}, nil } +// syncCloudStatusToLocal copies relevant status fields from the cloud APIKey (cloudapi.APIKey) +// to the local APIKey resource's status (resourcev1alpha1.APIKey). +// This is used to ensure that the local representation reflects the state observed from the cloud. +func (r *APIKeyReconciler) syncCloudStatusToLocal(localAPIKey *resourcev1alpha1.APIKey, cloudAPIKey *cloudapi.APIKey) { + if cloudAPIKey == nil { + // If there's no cloud APIKey (e.g., not found), clear relevant local status fields. + localAPIKey.Status.KeyID = nil + localAPIKey.Status.IssuedAt = nil + localAPIKey.Status.ExpiresAt = nil + localAPIKey.Status.RevokedAt = nil + localAPIKey.Status.EncryptedToken = nil + return + } + + localAPIKey.Status.KeyID = cloudAPIKey.Status.KeyID // Direct assignment for *string + + if cloudAPIKey.Status.IssuedAt != nil { + localAPIKey.Status.IssuedAt = cloudAPIKey.Status.IssuedAt.DeepCopy() + } else { + localAPIKey.Status.IssuedAt = nil + } + + // Sync ExpiresAt + if cloudAPIKey.Status.ExpiresAt != nil { + localAPIKey.Status.ExpiresAt = cloudAPIKey.Status.ExpiresAt.DeepCopy() + } else { + localAPIKey.Status.ExpiresAt = nil + } + + // Sync RevokedAt + if cloudAPIKey.Status.RevokedAt != nil { + localAPIKey.Status.RevokedAt = cloudAPIKey.Status.RevokedAt.DeepCopy() + } else { + localAPIKey.Status.RevokedAt = nil + } + + if cloudAPIKey.Status.EncryptedToken != nil { + if localAPIKey.Status.EncryptedToken == nil { + localAPIKey.Status.EncryptedToken = &resourcev1alpha1.EncryptedToken{} + } + if cloudAPIKey.Status.EncryptedToken.JWE != nil { + // Create a new string pointer for JWE to ensure no aliasing issues. + jweCopy := *cloudAPIKey.Status.EncryptedToken.JWE + localAPIKey.Status.EncryptedToken.JWE = &jweCopy + } else { + localAPIKey.Status.EncryptedToken.JWE = nil + } + } else { + localAPIKey.Status.EncryptedToken = nil + } +} + // updateAPIKeyStatus updates the status of the ApiKey resource func (r *APIKeyReconciler) updateAPIKeyStatus( ctx context.Context, @@ -498,50 +392,48 @@ func (r *APIKeyReconciler) updateAPIKeyStatus( logger := log.FromContext(ctx) apiKey.Status.ObservedGeneration = apiKey.Generation - // Set ready condition based on error - meta := metav1.Now() - readyCondition := metav1.Condition{ + newCondition := metav1.Condition{ Type: "Ready", Status: metav1.ConditionTrue, Reason: reason, Message: message, - LastTransitionTime: meta, + LastTransitionTime: metav1.Now(), } if err != nil { - readyCondition.Status = metav1.ConditionFalse - logger.Error(err, "APIKey reconciliation failed", - "reason", reason, "message", message) + newCondition.Status = metav1.ConditionFalse + // Do not log error here again if err is not nil, as it's usually logged by the caller } // Update ready condition - meta = metav1.Now() - if len(apiKey.Status.Conditions) == 0 { - apiKey.Status.Conditions = []metav1.Condition{readyCondition} - } else { - for i, condition := range apiKey.Status.Conditions { - if condition.Type == "Ready" { - // Only update time if status changes - if condition.Status != readyCondition.Status { - readyCondition.LastTransitionTime = meta - } else { - readyCondition.LastTransitionTime = condition.LastTransitionTime - } - apiKey.Status.Conditions[i] = readyCondition - break + found := false + for i, condition := range apiKey.Status.Conditions { + if condition.Type == "Ready" { + // Only update LastTransitionTime if Status or Reason or Message changes + if condition.Status != newCondition.Status || condition.Reason != newCondition.Reason || condition.Message != newCondition.Message { + apiKey.Status.Conditions[i] = newCondition + } else { + // If nothing changed, keep the old LastTransitionTime + newCondition.LastTransitionTime = condition.LastTransitionTime + apiKey.Status.Conditions[i] = newCondition } + found = true + break } } + if !found { + apiKey.Status.Conditions = append(apiKey.Status.Conditions, newCondition) + } - // Update APIKey status - if err := r.Status().Update(ctx, apiKey); err != nil { - logger.Error(err, "Failed to update APIKey status") + // Persist status update + if statusUpdateErr := r.Status().Update(ctx, apiKey); statusUpdateErr != nil { + logger.Error(statusUpdateErr, "Failed to update APIKey status") + // Potentially requeue if status update fails, but be careful of loops } } // SetupWithManager sets up the controller with the Manager. func (r *APIKeyReconciler) SetupWithManager(mgr ctrl.Manager) error { - r.watcherMap = make(map[types.NamespacedName]watch.Interface) return ctrl.NewControllerManagedBy(mgr). For(&resourcev1alpha1.APIKey{}). WithEventFilter(predicate.GenerationChangedPredicate{}). @@ -549,6 +441,8 @@ func (r *APIKeyReconciler) SetupWithManager(mgr ctrl.Manager) error { } // DecryptToken decrypts an encrypted token using the provided private key +// This function is a utility and not part of the Reconciler methods. +// It's kept here for potential direct use or if it was previously used by other parts of the codebase. func DecryptToken(priv *rsa.PrivateKey, encryptedToken cloudapi.EncryptedToken) (string, error) { if encryptedToken.JWE == nil || *encryptedToken.JWE == "" { return "", errors.New("encrypted token is empty") diff --git a/controllers/flinkdeployment_controller.go b/controllers/flinkdeployment_controller.go index 21079e12..f8e173f1 100644 --- a/controllers/flinkdeployment_controller.go +++ b/controllers/flinkdeployment_controller.go @@ -18,7 +18,6 @@ import ( "context" "encoding/json" "fmt" - "sync" "time" resourcev1alpha1 "github.com/streamnative/pulsar-resources-operator/api/v1alpha1" @@ -28,13 +27,11 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/watch" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/predicate" - - computeapi "github.com/streamnative/pulsar-resources-operator/pkg/streamnativecloud/apis/compute/v1alpha1" ) // FlinkDeploymentReconciler reconciles a FlinkDeployment object @@ -42,10 +39,6 @@ type FlinkDeploymentReconciler struct { client.Client Scheme *runtime.Scheme ConnectionManager *ConnectionManager - // watcherMap stores active watchers for FlinkDeployments - watcherMap map[types.NamespacedName]watch.Interface - // watcherMutex protects watcherMap - watcherMutex sync.RWMutex } //+kubebuilder:rbac:groups=resource.streamnative.io,resources=computeflinkdeployments,verbs=get;list;watch;create;update;patch;delete @@ -54,276 +47,165 @@ type FlinkDeploymentReconciler struct { //+kubebuilder:rbac:groups=resource.streamnative.io,resources=streamnativecloudconnections,verbs=get;list;watch //+kubebuilder:rbac:groups=resource.streamnative.io,resources=computeworkspaces,verbs=get;list;watch -// handleWatchEvents processes events from the watch interface -func (r *FlinkDeploymentReconciler) handleWatchEvents(ctx context.Context, namespacedName types.NamespacedName, watcher watch.Interface) { - logger := log.FromContext(ctx) - defer watcher.Stop() - - for { - select { - case <-ctx.Done(): - return - case event, ok := <-watcher.ResultChan(): - if !ok { - logger.Info("Watch channel closed", "namespace", namespacedName.Namespace, "name", namespacedName.Name) - // Remove the watcher from the map - r.watcherMutex.Lock() - delete(r.watcherMap, namespacedName) - r.watcherMutex.Unlock() - return - } - - if event.Type == watch.Modified { - remoteDeployment, ok := event.Object.(*computeapi.FlinkDeployment) - if !ok { - logger.Error(fmt.Errorf("unexpected object type"), "Failed to convert object to FlinkDeployment") - continue - } - - // Get the local deployment - localDeployment := &resourcev1alpha1.ComputeFlinkDeployment{} - if err := r.Get(ctx, namespacedName, localDeployment); err != nil { - logger.Error(err, "Failed to get local FlinkDeployment") - continue - } - - // Convert status to RawExtension - statusBytes, err := json.Marshal(remoteDeployment.Status) - if err != nil { - logger.Error(err, "Failed to marshal deployment status") - continue - } - - // Create new status - newStatus := localDeployment.Status.DeepCopy() - newStatus.DeploymentStatus = &runtime.RawExtension{Raw: statusBytes} - - // Check if status has changed - if !controllers2.FlinkDeploymentStatusHasChanged(&localDeployment.Status, newStatus) { - continue - } - - // Update status - localDeployment.Status = *newStatus - if err := r.Status().Update(ctx, localDeployment); err != nil { - logger.Error(err, "Failed to update FlinkDeployment status") - } - } - } - } -} - -// setupWatch creates a new watcher for a FlinkDeployment -func (r *FlinkDeploymentReconciler) setupWatch(ctx context.Context, deployment *resourcev1alpha1.ComputeFlinkDeployment, deploymentClient *controllers2.FlinkDeploymentClient) error { - namespacedName := types.NamespacedName{ - Namespace: deployment.Namespace, - Name: deployment.Name, - } - - // Check if we already have a watcher - r.watcherMutex.RLock() - _, exists := r.watcherMap[namespacedName] - r.watcherMutex.RUnlock() - if exists { - return nil - } - - // Create new watcher - watcher, err := deploymentClient.WatchFlinkDeployment(ctx, deployment.Name) - if err != nil { - return fmt.Errorf("failed to create watcher: %w", err) - } - - // Store watcher in map - r.watcherMutex.Lock() - r.watcherMap[namespacedName] = watcher - r.watcherMutex.Unlock() - - // Start watching in a new goroutine - go r.handleWatchEvents(ctx, namespacedName, watcher) - return nil -} - // Reconcile handles the reconciliation of FlinkDeployment objects func (r *FlinkDeploymentReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { logger := log.FromContext(ctx) logger.Info("Reconciling FlinkDeployment", "namespace", req.Namespace, "name", req.Name) - // Add requeue interval for status sync requeueInterval := time.Minute - // Get the FlinkDeployment resource deployment := &resourcev1alpha1.ComputeFlinkDeployment{} if err := r.Get(ctx, req.NamespacedName, deployment); err != nil { if apierrors.IsNotFound(err) { - // Stop and remove watcher if it exists - r.watcherMutex.Lock() - if watcher, exists := r.watcherMap[req.NamespacedName]; exists { - watcher.Stop() - delete(r.watcherMap, req.NamespacedName) - } - r.watcherMutex.Unlock() + logger.Info("FlinkDeployment resource not found. Ignoring since object must be deleted.") return ctrl.Result{}, nil } + logger.Error(err, "Failed to get FlinkDeployment resource") return ctrl.Result{}, err } - // Get APIServerRef from ComputeWorkspace if not specified in FlinkDeployment apiServerRef := deployment.Spec.APIServerRef if apiServerRef.Name == "" { - // Get the ComputeWorkspace workspace := &resourcev1alpha1.ComputeWorkspace{} if err := r.Get(ctx, types.NamespacedName{ Namespace: req.Namespace, Name: deployment.Spec.WorkspaceName, }, workspace); err != nil { r.updateDeploymentStatus(ctx, deployment, err, "GetWorkspaceFailed", - fmt.Sprintf("Failed to get ComputeWorkspace: %v", err)) + fmt.Sprintf("Failed to get ComputeWorkspace %s: %v", deployment.Spec.WorkspaceName, err)) + return ctrl.Result{}, err + } + if workspace.Spec.APIServerRef.Name == "" { + err := fmt.Errorf("APIServerRef is empty in both FlinkDeployment spec and referenced ComputeWorkspace %s spec", workspace.Name) + r.updateDeploymentStatus(ctx, deployment, err, "ValidationFailed", err.Error()) return ctrl.Result{}, err } apiServerRef = workspace.Spec.APIServerRef } - // Get the APIServerConnection - apiConn := &resourcev1alpha1.StreamNativeCloudConnection{} + apiConnResource := &resourcev1alpha1.StreamNativeCloudConnection{} if err := r.Get(ctx, types.NamespacedName{ Namespace: req.Namespace, Name: apiServerRef.Name, - }, apiConn); err != nil { + }, apiConnResource); err != nil { r.updateDeploymentStatus(ctx, deployment, err, "GetAPIServerConnectionFailed", - fmt.Sprintf("Failed to get APIServerConnection: %v", err)) + fmt.Sprintf("Failed to get StreamNativeCloudConnection %s: %v", apiServerRef.Name, err)) return ctrl.Result{}, err } - // Get the connection - conn, err := r.ConnectionManager.GetOrCreateConnection(apiConn, nil) + conn, err := r.ConnectionManager.GetOrCreateConnection(apiConnResource, nil) if err != nil { - // If connection is not initialized, requeue the request if _, ok := err.(*NotInitializedError); ok { - logger.Info("Connection not initialized, requeueing", "error", err.Error()) + logger.Info("Connection not initialized, requeueing", "connectionName", apiConnResource.Name, "error", err.Error()) return ctrl.Result{Requeue: true}, nil } r.updateDeploymentStatus(ctx, deployment, err, "GetConnectionFailed", - fmt.Sprintf("Failed to get connection: %v", err)) + fmt.Sprintf("Failed to get active connection for %s: %v", apiConnResource.Name, err)) return ctrl.Result{}, err } - // Create deployment client - deploymentClient, err := controllers2.NewFlinkDeploymentClient(conn, apiConn.Spec.Organization) + if apiConnResource.Spec.Organization == "" { + err := fmt.Errorf("organization is required in StreamNativeCloudConnection %s but not specified", apiConnResource.Name) + r.updateDeploymentStatus(ctx, deployment, err, "ValidationFailed", err.Error()) + return ctrl.Result{}, err + } + deploymentClient, err := controllers2.NewFlinkDeploymentClient(conn, apiConnResource.Spec.Organization) if err != nil { r.updateDeploymentStatus(ctx, deployment, err, "CreateDeploymentClientFailed", - fmt.Sprintf("Failed to create deployment client: %v", err)) + fmt.Sprintf("Failed to create Flink deployment client: %v", err)) return ctrl.Result{}, err } - // Handle deletion + finalizerName := controllers2.FlinkDeploymentFinalizer if !deployment.DeletionTimestamp.IsZero() { - if controllers2.ContainsString(deployment.Finalizers, controllers2.FlinkDeploymentFinalizer) { - // Try to delete remote deployment + if controllerutil.ContainsFinalizer(deployment, finalizerName) { if err := deploymentClient.DeleteFlinkDeployment(ctx, deployment); err != nil { if !apierrors.IsNotFound(err) { r.updateDeploymentStatus(ctx, deployment, err, "DeleteFailed", - fmt.Sprintf("Failed to delete external resources: %v", err)) + fmt.Sprintf("Failed to delete remote FlinkDeployment: %v", err)) return ctrl.Result{}, err } - // If the resource is already gone, that's fine - logger.Info("Remote FlinkDeployment already deleted or not found", - "deployment", deployment.Name) + logger.Info("Remote FlinkDeployment already deleted or not found", "deploymentName", deployment.Name) } - // Remove finalizer after successful deletion - deployment.Finalizers = controllers2.RemoveString(deployment.Finalizers, controllers2.FlinkDeploymentFinalizer) + controllerutil.RemoveFinalizer(deployment, finalizerName) if err := r.Update(ctx, deployment); err != nil { + logger.Error(err, "Failed to remove finalizer from FlinkDeployment") return ctrl.Result{}, err } + logger.Info("Successfully removed finalizer from FlinkDeployment", "deploymentName", deployment.Name) } return ctrl.Result{}, nil } - // Add finalizer if it doesn't exist - if !controllers2.ContainsString(deployment.Finalizers, controllers2.FlinkDeploymentFinalizer) { - deployment.Finalizers = append(deployment.Finalizers, controllers2.FlinkDeploymentFinalizer) + if !controllerutil.ContainsFinalizer(deployment, finalizerName) { + controllerutil.AddFinalizer(deployment, finalizerName) if err := r.Update(ctx, deployment); err != nil { + r.updateDeploymentStatus(ctx, deployment, err, "UpdateFinalizerFailed", fmt.Sprintf("Failed to add finalizer: %v", err)) return ctrl.Result{}, err } + logger.Info("Successfully added finalizer to FlinkDeployment", "deploymentName", deployment.Name) + return ctrl.Result{Requeue: true}, nil } - // Check if deployment exists - existingDeployment, err := deploymentClient.GetFlinkDeployment(ctx, deployment.Name) + existingRemoteDeployment, err := deploymentClient.GetFlinkDeployment(ctx, deployment.Name) if err != nil { if !apierrors.IsNotFound(err) { - r.updateDeploymentStatus(ctx, deployment, err, "GetDeploymentFailed", - fmt.Sprintf("Failed to get deployment: %v", err)) + r.updateDeploymentStatus(ctx, deployment, err, "GetRemoteDeploymentFailed", fmt.Sprintf("Failed to get remote FlinkDeployment: %v", err)) return ctrl.Result{}, err } - existingDeployment = nil + existingRemoteDeployment = nil } - if existingDeployment == nil { - // Create deployment - resp, err := deploymentClient.CreateFlinkDeployment(ctx, deployment) + if existingRemoteDeployment == nil { + logger.Info("Remote FlinkDeployment not found, creating new one.", "deploymentName", deployment.Name) + createdRemoteDeployment, err := deploymentClient.CreateFlinkDeployment(ctx, deployment) if err != nil { - r.updateDeploymentStatus(ctx, deployment, err, "CreateDeploymentFailed", - fmt.Sprintf("Failed to create deployment: %v", err)) + r.updateDeploymentStatus(ctx, deployment, err, "CreateRemoteDeploymentFailed", fmt.Sprintf("Failed to create remote FlinkDeployment: %v", err)) return ctrl.Result{}, err } + logger.Info("Successfully created remote FlinkDeployment.", "deploymentName", deployment.Name) - // Convert status to RawExtension - statusBytes, err := json.Marshal(resp.Status) + statusBytes, err := json.Marshal(createdRemoteDeployment.Status) if err != nil { - r.updateDeploymentStatus(ctx, deployment, err, "StatusMarshalFailed", - fmt.Sprintf("Failed to marshal deployment status: %v", err)) + r.updateDeploymentStatus(ctx, deployment, err, "StatusMarshalFailed", fmt.Sprintf("Failed to marshal created FlinkDeployment status: %v", err)) return ctrl.Result{}, err } - - deployment.Status.DeploymentStatus = &runtime.RawExtension{ - Raw: statusBytes, - } - r.updateDeploymentStatus(ctx, deployment, nil, "Ready", "Deployment created successfully") + deployment.Status.DeploymentStatus = &runtime.RawExtension{Raw: statusBytes} + r.updateDeploymentStatus(ctx, deployment, nil, "Ready", "FlinkDeployment created and synced successfully") } else { - // Update deployment - resp, err := deploymentClient.UpdateFlinkDeployment(ctx, deployment) + logger.Info("Remote FlinkDeployment found, attempting to update.", "deploymentName", deployment.Name) + updatedRemoteDeployment, err := deploymentClient.UpdateFlinkDeployment(ctx, deployment) if err != nil { - r.updateDeploymentStatus(ctx, deployment, err, "UpdateDeploymentFailed", - fmt.Sprintf("Failed to update deployment: %v", err)) + r.updateDeploymentStatus(ctx, deployment, err, "UpdateRemoteDeploymentFailed", fmt.Sprintf("Failed to update remote FlinkDeployment: %v", err)) return ctrl.Result{}, err } + logger.Info("Successfully updated remote FlinkDeployment.", "deploymentName", deployment.Name) - // Convert status to RawExtension - statusBytes, err := json.Marshal(resp.Status) + statusBytes, err := json.Marshal(updatedRemoteDeployment.Status) if err != nil { - r.updateDeploymentStatus(ctx, deployment, err, "StatusMarshalFailed", - fmt.Sprintf("Failed to marshal deployment status: %v", err)) + r.updateDeploymentStatus(ctx, deployment, err, "StatusMarshalFailed", fmt.Sprintf("Failed to marshal updated FlinkDeployment status: %v", err)) return ctrl.Result{}, err } - - deployment.Status.DeploymentStatus = &runtime.RawExtension{ - Raw: statusBytes, - } - r.updateDeploymentStatus(ctx, deployment, nil, "Ready", "Deployment updated successfully") + deployment.Status.DeploymentStatus = &runtime.RawExtension{Raw: statusBytes} + r.updateDeploymentStatus(ctx, deployment, nil, "Ready", "FlinkDeployment updated and synced successfully") } - // Setup watch after deployment is created/updated - if err := r.setupWatch(ctx, deployment, deploymentClient); err != nil { - logger.Error(err, "Failed to setup watch") - // Don't return error, just log it - } - - // Return with requeue interval for status sync + logger.Info("Successfully reconciled FlinkDeployment", "deploymentName", deployment.Name) return ctrl.Result{RequeueAfter: requeueInterval}, nil } func (r *FlinkDeploymentReconciler) updateDeploymentStatus( ctx context.Context, deployment *resourcev1alpha1.ComputeFlinkDeployment, - err error, + errEncountered error, reason string, message string, ) { - // Create new status for comparison - newStatus := deployment.Status.DeepCopy() + logger := log.FromContext(ctx) + + currentStatus := deployment.Status.DeepCopy() - // Create new condition condition := metav1.Condition{ Type: "Ready", Status: metav1.ConditionTrue, @@ -333,48 +215,42 @@ func (r *FlinkDeploymentReconciler) updateDeploymentStatus( LastTransitionTime: metav1.Now(), } - if err != nil { + if errEncountered != nil { + logger.Error(errEncountered, message, "flinkDeploymentName", deployment.Name) condition.Status = metav1.ConditionFalse } - // Create new conditions slice - newStatus.Conditions = make([]metav1.Condition, 0) - for _, c := range deployment.Status.Conditions { - if c.Type != condition.Type { - newStatus.Conditions = append(newStatus.Conditions, c) + newConditions := []metav1.Condition{} + foundReady := false + for _, c := range currentStatus.Conditions { + if c.Type == condition.Type { + if c.ObservedGeneration <= condition.ObservedGeneration { + newConditions = append(newConditions, condition) + foundReady = true + } else { + newConditions = append(newConditions, c) + foundReady = true + } + } else { + newConditions = append(newConditions, c) } } - newStatus.Conditions = append(newStatus.Conditions, condition) - - // Check if status has actually changed - if !controllers2.FlinkDeploymentStatusHasChanged(&deployment.Status, newStatus) { - return + if !foundReady { + newConditions = append(newConditions, condition) } + currentStatus.Conditions = newConditions + + deployment.Status = *currentStatus - // Update status - deployment.Status = *newStatus if err := r.Status().Update(ctx, deployment); err != nil { - log.FromContext(ctx).Error(err, "Failed to update FlinkDeployment status") + logger.Error(err, "Failed to update FlinkDeployment status", "flinkDeploymentName", deployment.Name) } } // SetupWithManager sets up the controller with the Manager. func (r *FlinkDeploymentReconciler) SetupWithManager(mgr ctrl.Manager) error { - // Initialize the watcher map - r.watcherMap = make(map[types.NamespacedName]watch.Interface) - return ctrl.NewControllerManagedBy(mgr). For(&resourcev1alpha1.ComputeFlinkDeployment{}). - // Remove GenerationChangedPredicate to allow status updates - // Add periodic reconciliation to sync status - WithEventFilter(predicate.Or( - // Trigger on spec changes - predicate.GenerationChangedPredicate{}, - // Trigger periodically to sync status - predicate.NewPredicateFuncs(func(object client.Object) bool { - // Trigger every minute to sync status - return true - }), - )). + WithEventFilter(predicate.GenerationChangedPredicate{}). Complete(r) } diff --git a/controllers/secret_controller.go b/controllers/secret_controller.go index 908e4335..3f6b6410 100644 --- a/controllers/secret_controller.go +++ b/controllers/secret_controller.go @@ -17,24 +17,21 @@ package controllers import ( "context" "fmt" - "sync" "time" resourcev1alpha1 "github.com/streamnative/pulsar-resources-operator/api/v1alpha1" - controllers2 "github.com/streamnative/pulsar-resources-operator/pkg/streamnativecloud" + cloudapi "github.com/streamnative/pulsar-resources-operator/pkg/streamnativecloud" // Standard alias for cloud client package corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/watch" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/predicate" - - cloudapi "github.com/streamnative/pulsar-resources-operator/pkg/streamnativecloud/apis/cloud/v1alpha1" ) // SecretReconciler reconciles a StreamNative Cloud Secret object @@ -42,10 +39,6 @@ type SecretReconciler struct { client.Client Scheme *runtime.Scheme ConnectionManager *ConnectionManager - // watcherMap stores active watchers for Secrets - watcherMap map[types.NamespacedName]watch.Interface - // watcherMutex protects watcherMap - watcherMutex sync.RWMutex } //+kubebuilder:rbac:groups=resource.streamnative.io,resources=secrets,verbs=get;list;watch;create;update;patch;delete @@ -54,95 +47,21 @@ type SecretReconciler struct { //+kubebuilder:rbac:groups=resource.streamnative.io,resources=streamnativecloudconnections,verbs=get;list;watch //+kubebuilder:rbac:groups="",resources=secrets,verbs=get;list;watch -// handleWatchEvents processes events from the watch interface -func (r *SecretReconciler) handleWatchEvents(ctx context.Context, namespacedName types.NamespacedName, watcher watch.Interface) { - logger := log.FromContext(ctx) - defer watcher.Stop() - - for { - select { - case <-ctx.Done(): - return - case event, ok := <-watcher.ResultChan(): - if !ok { - logger.Info("Watch channel closed", "namespace", namespacedName.Namespace, "name", namespacedName.Name) - // Remove the watcher from the map - r.watcherMutex.Lock() - delete(r.watcherMap, namespacedName) - r.watcherMutex.Unlock() - return - } - - if event.Type == watch.Modified { - // Check if the object is a Secret - _, ok := event.Object.(*cloudapi.Secret) - if !ok { - logger.Error(fmt.Errorf("unexpected object type"), "Failed to convert object to Secret") - continue - } - - // Get the local secret - localSecret := &resourcev1alpha1.Secret{} - if err := r.Get(ctx, namespacedName, localSecret); err != nil { - logger.Error(err, "Failed to get local Secret") - continue - } - - // Update status - r.updateSecretStatus(ctx, localSecret, nil, "Ready", "Secret synced successfully") - } - } - } -} - -// setupWatch creates a new watcher for a Secret -func (r *SecretReconciler) setupWatch(ctx context.Context, secret *resourcev1alpha1.Secret, secretClient *controllers2.SecretClient) error { - namespacedName := types.NamespacedName{ - Namespace: secret.Namespace, - Name: secret.Name, - } - - // Check if we already have a watcher - r.watcherMutex.RLock() - _, exists := r.watcherMap[namespacedName] - r.watcherMutex.RUnlock() - if exists { - return nil - } - - // Create new watcher - watcher, err := secretClient.WatchSecret(ctx, secret.Name) - if err != nil { - return fmt.Errorf("failed to create watcher: %w", err) - } - - // Store watcher in map - r.watcherMutex.Lock() - r.watcherMap[namespacedName] = watcher - r.watcherMutex.Unlock() - - // Start watching in a new goroutine - go r.handleWatchEvents(ctx, namespacedName, watcher) - return nil -} - // getSecretData obtains the Secret data either from direct Data field or from SecretRef -func (r *SecretReconciler) getSecretData(ctx context.Context, secret *resourcev1alpha1.Secret) (map[string]string, *corev1.SecretType, error) { +func (r *SecretReconciler) getSecretData(ctx context.Context, secretCR *resourcev1alpha1.Secret) (map[string]string, *corev1.SecretType, error) { // If direct data is provided, use it - if len(secret.Spec.Data) > 0 { - return secret.Spec.Data, secret.Spec.Type, nil + if len(secretCR.Spec.Data) > 0 { + return secretCR.Spec.Data, secretCR.Spec.Type, nil } // If SecretRef is provided, fetch from the referenced Kubernetes Secret - if secret.Spec.SecretRef != nil { - // Get the referenced Kubernetes Secret - nsName := secret.Spec.SecretRef.ToNamespacedName() + if secretCR.Spec.SecretRef != nil { + nsName := secretCR.Spec.SecretRef.ToNamespacedName() k8sSecret := &corev1.Secret{} if err := r.Get(ctx, nsName, k8sSecret); err != nil { return nil, nil, fmt.Errorf("failed to get referenced Secret %s/%s: %w", nsName.Namespace, nsName.Name, err) } - // Convert the binary data to string data stringData := make(map[string]string) for k, v := range k8sSecret.Data { stringData[k] = string(v) @@ -150,7 +69,6 @@ func (r *SecretReconciler) getSecretData(ctx context.Context, secret *resourcev1 return stringData, &k8sSecret.Type, nil } - // Neither Data nor SecretRef is provided return nil, nil, fmt.Errorf("neither Data nor SecretRef is specified in the Secret spec") } @@ -159,224 +77,232 @@ func (r *SecretReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr logger := log.FromContext(ctx) logger.Info("Reconciling Secret", "namespace", req.Namespace, "name", req.Name) - // Add requeue interval for status sync - requeueInterval := time.Minute + requeueInterval := 1 * time.Minute // Default requeue interval - // Get the Secret resource - secret := &resourcev1alpha1.Secret{} - if err := r.Get(ctx, req.NamespacedName, secret); err != nil { + secretCR := &resourcev1alpha1.Secret{} + if err := r.Get(ctx, req.NamespacedName, secretCR); err != nil { if apierrors.IsNotFound(err) { - // Stop and remove watcher if it exists - r.watcherMutex.Lock() - if watcher, exists := r.watcherMap[req.NamespacedName]; exists { - watcher.Stop() - delete(r.watcherMap, req.NamespacedName) - } - r.watcherMutex.Unlock() + logger.Info("Secret resource not found. Ignoring since object must be deleted.") return ctrl.Result{}, nil } + logger.Error(err, "Failed to get Secret resource") return ctrl.Result{}, err } - // Get the APIServerConnection + // Validate APIServerRef + if secretCR.Spec.APIServerRef.Name == "" { + err := fmt.Errorf("APIServerRef.Name is required in Secret spec but not specified") + r.updateSecretStatus(ctx, secretCR, err, "ValidationFailed", err.Error()) + return ctrl.Result{}, err // No requeue for permanent misconfiguration + } + + // Get StreamNativeCloudConnection connection := &resourcev1alpha1.StreamNativeCloudConnection{} if err := r.Get(ctx, types.NamespacedName{ - Namespace: req.Namespace, - Name: secret.Spec.APIServerRef.Name, + Namespace: req.Namespace, // Assuming connection is in the same namespace + Name: secretCR.Spec.APIServerRef.Name, }, connection); err != nil { - r.updateSecretStatus(ctx, secret, err, "ConnectionNotFound", - fmt.Sprintf("Failed to get APIServerConnection: %v", err)) + r.updateSecretStatus(ctx, secretCR, err, "ConnectionNotFound", + fmt.Sprintf("Failed to get StreamNativeCloudConnection %s: %v", secretCR.Spec.APIServerRef.Name, err)) return ctrl.Result{}, err } - // Get API connection + // Get or create API connection apiConn, err := r.ConnectionManager.GetOrCreateConnection(connection, nil) if err != nil { - // If connection is not initialized, requeue the request if _, ok := err.(*NotInitializedError); ok { - logger.Info("Connection not initialized, requeueing", "error", err.Error()) + logger.Info("Connection not initialized, requeueing", "connectionName", connection.Name, "error", err.Error()) return ctrl.Result{Requeue: true}, nil } - r.updateSecretStatus(ctx, secret, err, "GetConnectionFailed", - fmt.Sprintf("Failed to get connection: %v", err)) + r.updateSecretStatus(ctx, secretCR, err, "GetConnectionFailed", + fmt.Sprintf("Failed to get active connection for %s: %v", connection.Name, err)) return ctrl.Result{}, err } - // Get organization from connection - organization := connection.Spec.Organization - if organization == "" { - err := fmt.Errorf("organization is required but not specified") - r.updateSecretStatus(ctx, secret, err, "ValidationFailed", err.Error()) + // Validate Organization in connection spec + if connection.Spec.Organization == "" { + err := fmt.Errorf("organization is required in StreamNativeCloudConnection %s but not specified", connection.Name) + r.updateSecretStatus(ctx, secretCR, err, "ValidationFailed", err.Error()) return ctrl.Result{}, err } - // Create secret client - secretClient, err := controllers2.NewSecretClient(apiConn, organization) + // Create Secret client + secretClient, err := cloudapi.NewSecretClient(apiConn, connection.Spec.Organization) if err != nil { - r.updateSecretStatus(ctx, secret, err, "ClientCreationFailed", - fmt.Sprintf("Failed to create secret client: %v", err)) + r.updateSecretStatus(ctx, secretCR, err, "ClientCreationFailed", + fmt.Sprintf("Failed to create Secret client: %v", err)) return ctrl.Result{}, err } + finalizerName := cloudapi.SecretFinalizer // Handle deletion - if !secret.DeletionTimestamp.IsZero() { - if controllers2.ContainsString(secret.Finalizers, controllers2.SecretFinalizer) { - // Try to delete remote secret - if err := secretClient.DeleteSecret(ctx, secret); err != nil { + if !secretCR.ObjectMeta.DeletionTimestamp.IsZero() { + if controllerutil.ContainsFinalizer(secretCR, finalizerName) { + if err := secretClient.DeleteSecret(ctx, secretCR); err != nil { + // If the remote secret is already gone, that's okay. if !apierrors.IsNotFound(err) { - r.updateSecretStatus(ctx, secret, err, "DeleteFailed", - fmt.Sprintf("Failed to delete external resources: %v", err)) + r.updateSecretStatus(ctx, secretCR, err, "DeleteRemoteSecretFailed", fmt.Sprintf("Failed to delete remote Secret: %v", err)) return ctrl.Result{}, err } - // If the resource is already gone, that's fine - logger.Info("Remote Secret already deleted or not found", - "secret", secret.Name) } - // Remove finalizer after successful deletion - secret.Finalizers = controllers2.RemoveString(secret.Finalizers, controllers2.SecretFinalizer) - if err := r.Update(ctx, secret); err != nil { + controllerutil.RemoveFinalizer(secretCR, finalizerName) + if err := r.Update(ctx, secretCR); err != nil { + logger.Error(err, "Failed to remove finalizer from Secret") return ctrl.Result{}, err } } return ctrl.Result{}, nil } - // Add finalizer if it doesn't exist - if !controllers2.ContainsString(secret.Finalizers, controllers2.SecretFinalizer) { - secret.Finalizers = append(secret.Finalizers, controllers2.SecretFinalizer) - if err := r.Update(ctx, secret); err != nil { + // Add finalizer if not present + if !controllerutil.ContainsFinalizer(secretCR, finalizerName) { + controllerutil.AddFinalizer(secretCR, finalizerName) + if err := r.Update(ctx, secretCR); err != nil { + r.updateSecretStatus(ctx, secretCR, err, "AddFinalizerFailed", fmt.Sprintf("Failed to add finalizer: %v", err)) return ctrl.Result{}, err } + // Requeue after adding finalizer to ensure the update is processed before proceeding + return ctrl.Result{Requeue: true}, nil } - // Get secret data (either from direct Data field or from SecretRef) - secretData, secretType, err := r.getSecretData(ctx, secret) - if err != nil { - r.updateSecretStatus(ctx, secret, err, "GetSecretDataFailed", - fmt.Sprintf("Failed to get secret data: %v", err)) - return ctrl.Result{}, err + // Resolve secret data from Spec.Data or Spec.SecretRef + // The secretCR passed to cloud client methods should have its Spec.Data and Spec.Type populated. + currentSpecData := make(map[string]string) + for k, v := range secretCR.Spec.Data { + currentSpecData[k] = v } + currentSpecType := secretCR.Spec.Type - // Update the secret's Data field with the fetched data if SecretRef was used - if secret.Spec.SecretRef != nil && len(secret.Spec.Data) == 0 { - secret.Spec.Data = secretData - if secret.Spec.Type == nil || *secret.Spec.Type == "" { - secret.Spec.Type = secretType - } - if err := r.Update(ctx, secret); err != nil { - r.updateSecretStatus(ctx, secret, err, "UpdateSecretFailed", - fmt.Sprintf("Failed to update secret with data from referenced Secret: %v", err)) + // If SecretRef is used, we resolve it and update the CR if necessary. + // This ensures that the CR in etcd reflects the data being sent to the cloud API. + if secretCR.Spec.SecretRef != nil { + resolvedData, resolvedType, err := r.getSecretData(ctx, secretCR) + if err != nil { + r.updateSecretStatus(ctx, secretCR, err, "GetSecretDataFailed", fmt.Sprintf("Failed to get secret data from SecretRef: %v", err)) return ctrl.Result{}, err } + + // Check if an update to the local CR is needed + updateLocalCR := false + if len(secretCR.Spec.Data) == 0 { // Only populate from SecretRef if direct data is not set + secretCR.Spec.Data = resolvedData + updateLocalCR = true + } + // Only update type if original spec type was nil or empty, and resolvedType is not nil + if (secretCR.Spec.Type == nil || *secretCR.Spec.Type == "") && resolvedType != nil { + secretCR.Spec.Type = resolvedType + updateLocalCR = true + } + + if updateLocalCR { + if err := r.Update(ctx, secretCR); err != nil { + // Restore original spec data before status update to avoid inconsistent state reporting + secretCR.Spec.Data = currentSpecData + secretCR.Spec.Type = currentSpecType + r.updateSecretStatus(ctx, secretCR, err, "UpdateLocalSecretFailed", + fmt.Sprintf("Failed to update local Secret CR with resolved data: %v", err)) + return ctrl.Result{}, err + } + return ctrl.Result{Requeue: true}, nil // Requeue to use the updated CR + } } - // Check if secret exists - existingSecret, err := secretClient.GetSecret(ctx, secret.Name) + existingRemoteSecret, err := secretClient.GetSecret(ctx, secretCR.Name) if err != nil { - logger.Info("Failed to get secret", "error", err, "existingSecret", existingSecret) if !apierrors.IsNotFound(err) { - r.updateSecretStatus(ctx, secret, err, "GetSecretFailed", - fmt.Sprintf("Failed to get secret: %v", err)) - return ctrl.Result{}, client.IgnoreNotFound(err) + r.updateSecretStatus(ctx, secretCR, err, "GetRemoteSecretFailed", fmt.Sprintf("Failed to get remote Secret: %v", err)) + return ctrl.Result{}, err } - existingSecret = nil + existingRemoteSecret = nil } - if existingSecret == nil { - // Create secret - _, err := secretClient.CreateSecret(ctx, secret) - if err != nil { - r.updateSecretStatus(ctx, secret, err, "CreateSecretFailed", - fmt.Sprintf("Failed to create secret: %v", err)) + if existingRemoteSecret == nil { + if _, err := secretClient.CreateSecret(ctx, secretCR); err != nil { + r.updateSecretStatus(ctx, secretCR, err, "CreateRemoteSecretFailed", fmt.Sprintf("Failed to create remote Secret: %v", err)) return ctrl.Result{}, err } - - // Update status - r.updateSecretStatus(ctx, secret, nil, "Ready", "Secret created successfully") + r.updateSecretStatus(ctx, secretCR, nil, "Ready", "Secret created and synced successfully") } else { - // Update secret - _, err := secretClient.UpdateSecret(ctx, secret) - if err != nil { - r.updateSecretStatus(ctx, secret, err, "UpdateSecretFailed", - fmt.Sprintf("Failed to update secret: %v", err)) + if _, err := secretClient.UpdateSecret(ctx, secretCR); err != nil { // Pass the K8s CR + r.updateSecretStatus(ctx, secretCR, err, "UpdateRemoteSecretFailed", fmt.Sprintf("Failed to update remote Secret: %v", err)) return ctrl.Result{}, err } - // Update status - r.updateSecretStatus(ctx, secret, nil, "Ready", "Secret updated successfully") - } - - // Setup watch after secret is created/updated - if err := r.setupWatch(ctx, secret, secretClient); err != nil { - logger.Error(err, "Failed to setup watch") - // Don't return error, just log it + r.updateSecretStatus(ctx, secretCR, nil, "Ready", "Secret updated and synced successfully") } + logger.Info("Successfully reconciled Secret", "namespace", req.Namespace, "name", req.Name) return ctrl.Result{RequeueAfter: requeueInterval}, nil } func (r *SecretReconciler) updateSecretStatus( ctx context.Context, - secret *resourcev1alpha1.Secret, - err error, + secretCR *resourcev1alpha1.Secret, // Changed to secretCR for clarity + errEncountered error, reason string, message string, ) { - // Create new condition - condition := metav1.Condition{ - Type: "Ready", - Status: metav1.ConditionTrue, + logger := log.FromContext(ctx) + statusChanged := false + + // Prepare the new condition + newCondition := metav1.Condition{ + Type: resourcev1alpha1.ConditionReady, + Status: metav1.ConditionFalse, Reason: reason, Message: message, - ObservedGeneration: secret.Generation, + ObservedGeneration: secretCR.Generation, LastTransitionTime: metav1.Now(), } - - if err != nil { - condition.Status = metav1.ConditionFalse + if errEncountered == nil { + newCondition.Status = metav1.ConditionTrue + } else { + logger.Error(errEncountered, message, "secretName", secretCR.Name) } - // Create new conditions slice for comparison - newConditions := make([]metav1.Condition, 0) - for _, c := range secret.Status.Conditions { - if c.Type != condition.Type { - newConditions = append(newConditions, c) - } + // Check existing conditions + existingCondition := findCondition(secretCR.Status.Conditions, resourcev1alpha1.ConditionReady) + if existingCondition == nil { + secretCR.Status.Conditions = append(secretCR.Status.Conditions, newCondition) + statusChanged = true + } else if existingCondition.Status != newCondition.Status || + existingCondition.Reason != newCondition.Reason || + existingCondition.Message != newCondition.Message || + existingCondition.ObservedGeneration != newCondition.ObservedGeneration { + existingCondition.Status = newCondition.Status + existingCondition.Reason = newCondition.Reason + existingCondition.Message = newCondition.Message + existingCondition.ObservedGeneration = newCondition.ObservedGeneration + existingCondition.LastTransitionTime = newCondition.LastTransitionTime + statusChanged = true } - newConditions = append(newConditions, condition) - // Check if status has actually changed - if !controllers2.StatusHasChanged(secret.Status.Conditions, newConditions) { - return + if statusChanged { + err := r.Status().Update(ctx, secretCR) + if err != nil { + logger.Error(err, "Failed to update Secret status", "secretName", secretCR.Name) + } else { + logger.Info("Successfully updated Secret status", "secretName", secretCR.Name, "reason", reason) + } } +} - // Update conditions - secret.Status.Conditions = newConditions - - // Update observed generation - secret.Status.ObservedGeneration = secret.Generation - - // Update status - if err := r.Status().Update(ctx, secret); err != nil { - log.FromContext(ctx).Error(err, "Failed to update Secret status") +// findCondition finds a condition of a specific type in a list of conditions. +// Returns nil if not found. +func findCondition(conditions []metav1.Condition, conditionType string) *metav1.Condition { + for i := range conditions { + if conditions[i].Type == conditionType { + return &conditions[i] + } } + return nil } // SetupWithManager sets up the controller with the Manager. func (r *SecretReconciler) SetupWithManager(mgr ctrl.Manager) error { - // Initialize the watcher map - r.watcherMap = make(map[types.NamespacedName]watch.Interface) - return ctrl.NewControllerManagedBy(mgr). For(&resourcev1alpha1.Secret{}). - WithEventFilter(predicate.Or( - // Trigger on spec changes - predicate.GenerationChangedPredicate{}, - // Trigger periodically to sync status - predicate.NewPredicateFuncs(func(object client.Object) bool { - // Trigger every minute to sync status - return true - }), - )). + Owns(&corev1.Secret{}). // If we want to react to changes in owned k8s secrets + WithEventFilter(predicate.GenerationChangedPredicate{}). // Only reconcile on spec changes or finalizer changes Complete(r) } diff --git a/controllers/serviceaccount_controller.go b/controllers/serviceaccount_controller.go index 83fe0ad7..ac32f3ee 100644 --- a/controllers/serviceaccount_controller.go +++ b/controllers/serviceaccount_controller.go @@ -18,7 +18,6 @@ import ( "context" "encoding/base64" "fmt" - "sync" "time" resourcev1alpha1 "github.com/streamnative/pulsar-resources-operator/api/v1alpha1" @@ -29,13 +28,11 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/watch" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/predicate" - - cloudapi "github.com/streamnative/pulsar-resources-operator/pkg/streamnativecloud/apis/cloud/v1alpha1" ) // ServiceAccountReconciler reconciles a StreamNative Cloud ServiceAccount object @@ -43,10 +40,6 @@ type ServiceAccountReconciler struct { client.Client Scheme *runtime.Scheme ConnectionManager *ConnectionManager - // watcherMap stores active watchers for ServiceAccounts - watcherMap map[types.NamespacedName]watch.Interface - // watcherMutex protects watcherMap - watcherMutex sync.RWMutex } const ServiceAccountFinalizer = "serviceaccount.resource.streamnative.io/finalizer" @@ -57,103 +50,6 @@ const ServiceAccountFinalizer = "serviceaccount.resource.streamnative.io/finaliz //+kubebuilder:rbac:groups=resource.streamnative.io,resources=streamnativecloudconnections,verbs=get;list;watch //+kubebuilder:rbac:groups="",resources=secrets,verbs=get;list;watch;create;update;patch;delete -// handleWatchEvents processes events from the watch interface -func (r *ServiceAccountReconciler) handleWatchEvents(ctx context.Context, namespacedName types.NamespacedName, watcher watch.Interface) { - logger := log.FromContext(ctx) - defer watcher.Stop() - - for { - select { - case <-ctx.Done(): - return - case event, ok := <-watcher.ResultChan(): - if !ok { - logger.Info("Watch channel closed", "namespace", namespacedName.Namespace, "name", namespacedName.Name) - // Remove the watcher from the map - r.watcherMutex.Lock() - delete(r.watcherMap, namespacedName) - r.watcherMutex.Unlock() - return - } - - if event.Type == watch.Modified { - // Check if the object is a ServiceAccount - cloudSA, ok := event.Object.(*cloudapi.ServiceAccount) - if !ok { - logger.Error(fmt.Errorf("unexpected object type"), "Failed to convert object to ServiceAccount") - continue - } - - // Get the local ServiceAccount - localSA := &resourcev1alpha1.ServiceAccount{} - if err := r.Get(ctx, namespacedName, localSA); err != nil { - logger.Error(err, "Failed to get local ServiceAccount") - continue - } - - // Update status - r.updateServiceAccountStatus(ctx, localSA, nil, "Ready", "ServiceAccount synced successfully") - - // Process credentials and create Secret if needed - if cloudSA.Status.PrivateKeyType == utils.ServiceAccountCredentialsType && cloudSA.Status.PrivateKeyData != "" { - r.processServiceAccountCredentials(ctx, localSA, cloudSA) - } - } - } - } -} - -// processServiceAccountCredentials handles credentials data and creates a Secret -func (r *ServiceAccountReconciler) processServiceAccountCredentials(ctx context.Context, localSA *resourcev1alpha1.ServiceAccount, cloudSA *cloudapi.ServiceAccount) { - logger := log.FromContext(ctx) - - // Base64 decode the private key data - credentialsData, err := base64.StdEncoding.DecodeString(cloudSA.Status.PrivateKeyData) - if err != nil { - logger.Error(err, "Failed to decode private key data") - return - } - - // Create or update Secret with credentials - if err := utils.CreateOrUpdateServiceAccountCredentialsSecret(ctx, r.Client, localSA, localSA.Namespace, localSA.Name, string(credentialsData)); err != nil { - logger.Error(err, "Failed to create or update service account credentials secret") - return - } - - logger.Info("Successfully created credentials secret for service account") -} - -// setupWatch creates a new watcher for a ServiceAccount -func (r *ServiceAccountReconciler) setupWatch(ctx context.Context, serviceAccount *resourcev1alpha1.ServiceAccount, saClient *controllers2.ServiceAccountClient) error { - namespacedName := types.NamespacedName{ - Namespace: serviceAccount.Namespace, - Name: serviceAccount.Name, - } - - // Check if we already have a watcher - r.watcherMutex.RLock() - _, exists := r.watcherMap[namespacedName] - r.watcherMutex.RUnlock() - if exists { - return nil - } - - // Create new watcher - watcher, err := saClient.WatchServiceAccount(ctx, serviceAccount.Name) - if err != nil { - return fmt.Errorf("failed to create watcher: %w", err) - } - - // Store watcher in map - r.watcherMutex.Lock() - r.watcherMap[namespacedName] = watcher - r.watcherMutex.Unlock() - - // Start watching in a new goroutine - go r.handleWatchEvents(ctx, namespacedName, watcher) - return nil -} - // Reconcile handles the reconciliation of ServiceAccount objects func (r *ServiceAccountReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { logger := log.FromContext(ctx) @@ -166,13 +62,7 @@ func (r *ServiceAccountReconciler) Reconcile(ctx context.Context, req ctrl.Reque serviceAccount := &resourcev1alpha1.ServiceAccount{} if err := r.Get(ctx, req.NamespacedName, serviceAccount); err != nil { if apierrors.IsNotFound(err) { - // Stop and remove watcher if it exists - r.watcherMutex.Lock() - if watcher, exists := r.watcherMap[req.NamespacedName]; exists { - watcher.Stop() - delete(r.watcherMap, req.NamespacedName) - } - r.watcherMutex.Unlock() + logger.Info("ServiceAccount not found. Reconciliation will stop.", "namespace", req.Namespace, "name", req.Name) return ctrl.Result{}, nil } return ctrl.Result{}, err @@ -220,7 +110,7 @@ func (r *ServiceAccountReconciler) Reconcile(ctx context.Context, req ctrl.Reque // Handle deletion if !serviceAccount.DeletionTimestamp.IsZero() { - if controllers2.ContainsString(serviceAccount.Finalizers, ServiceAccountFinalizer) { + if controllerutil.ContainsFinalizer(serviceAccount, ServiceAccountFinalizer) { // Try to delete remote ServiceAccount if err := saClient.DeleteServiceAccount(ctx, serviceAccount); err != nil { if !apierrors.IsNotFound(err) { @@ -234,7 +124,7 @@ func (r *ServiceAccountReconciler) Reconcile(ctx context.Context, req ctrl.Reque } // Remove finalizer after successful deletion - serviceAccount.Finalizers = controllers2.RemoveString(serviceAccount.Finalizers, ServiceAccountFinalizer) + controllerutil.RemoveFinalizer(serviceAccount, ServiceAccountFinalizer) if err := r.Update(ctx, serviceAccount); err != nil { return ctrl.Result{}, err } @@ -243,18 +133,20 @@ func (r *ServiceAccountReconciler) Reconcile(ctx context.Context, req ctrl.Reque } // Add finalizer if it doesn't exist - if !controllers2.ContainsString(serviceAccount.Finalizers, ServiceAccountFinalizer) { - serviceAccount.Finalizers = append(serviceAccount.Finalizers, ServiceAccountFinalizer) + if !controllerutil.ContainsFinalizer(serviceAccount, ServiceAccountFinalizer) { + controllerutil.AddFinalizer(serviceAccount, ServiceAccountFinalizer) if err := r.Update(ctx, serviceAccount); err != nil { return ctrl.Result{}, err } + // Requeue after adding finalizer to ensure the update is processed before proceeding + return ctrl.Result{Requeue: true}, nil } // Check if ServiceAccount exists existingSA, err := saClient.GetServiceAccount(ctx, serviceAccount.Name) if err != nil { - logger.Info("Failed to get ServiceAccount", "error", err, "existingSA", existingSA) if !apierrors.IsNotFound(err) { + logger.Info("Failed to get ServiceAccount", "error", err, "existingSA", existingSA) r.updateServiceAccountStatus(ctx, serviceAccount, err, "GetServiceAccountFailed", fmt.Sprintf("Failed to get ServiceAccount: %v", err)) return ctrl.Result{}, client.IgnoreNotFound(err) @@ -286,18 +178,11 @@ func (r *ServiceAccountReconciler) Reconcile(ctx context.Context, req ctrl.Reque } else { if err := utils.CreateOrUpdateServiceAccountCredentialsSecret(ctx, r.Client, serviceAccount, serviceAccount.Namespace, serviceAccount.Name, string(credentialsData)); err != nil { logger.Error(err, "Failed to create or update service account credentials secret") - } else { - logger.Info("Successfully created credentials secret for service account") } } } } - // Set up watch for ServiceAccount - if err := r.setupWatch(ctx, serviceAccount, saClient); err != nil { - logger.Error(err, "Failed to set up watch", "serviceAccount", serviceAccount.Name) - } - // Update status r.updateServiceAccountStatus(ctx, serviceAccount, nil, "Ready", "ServiceAccount created successfully") } else { @@ -316,20 +201,14 @@ func (r *ServiceAccountReconciler) Reconcile(ctx context.Context, req ctrl.Reque } else { if err := utils.CreateOrUpdateServiceAccountCredentialsSecret(ctx, r.Client, serviceAccount, serviceAccount.Namespace, serviceAccount.Name, string(credentialsData)); err != nil { logger.Error(err, "Failed to create or update service account credentials secret") - } else { - logger.Info("Successfully created credentials secret for service account") } } } } - // Set up watch for ServiceAccount - if err := r.setupWatch(ctx, serviceAccount, saClient); err != nil { - logger.Error(err, "Failed to set up watch", "serviceAccount", serviceAccount.Name) - } - // Update status r.updateServiceAccountStatus(ctx, serviceAccount, nil, "Ready", "ServiceAccount synced successfully") + logger.Info("ServiceAccount reconciled", "namespace", serviceAccount.Namespace, "name", serviceAccount.Name) } return ctrl.Result{RequeueAfter: requeueInterval}, nil @@ -388,7 +267,6 @@ func (r *ServiceAccountReconciler) updateServiceAccountStatus( // SetupWithManager sets up the controller with the Manager. func (r *ServiceAccountReconciler) SetupWithManager(mgr ctrl.Manager) error { - r.watcherMap = make(map[types.NamespacedName]watch.Interface) return ctrl.NewControllerManagedBy(mgr). For(&resourcev1alpha1.ServiceAccount{}). WithEventFilter(predicate.GenerationChangedPredicate{}). diff --git a/controllers/serviceaccountbinding_controller.go b/controllers/serviceaccountbinding_controller.go index 7a4e068c..fbc5c360 100644 --- a/controllers/serviceaccountbinding_controller.go +++ b/controllers/serviceaccountbinding_controller.go @@ -17,7 +17,7 @@ package controllers import ( "context" "fmt" - "sync" + "time" "github.com/pkg/errors" @@ -28,13 +28,12 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/watch" + ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/predicate" - - cloudapi "github.com/streamnative/pulsar-resources-operator/pkg/streamnativecloud/apis/cloud/v1alpha1" ) // ServiceAccountBindingReconciler reconciles a StreamNative Cloud ServiceAccountBinding object @@ -42,10 +41,6 @@ type ServiceAccountBindingReconciler struct { client.Client Scheme *runtime.Scheme ConnectionManager *ConnectionManager - // watcherMap stores active watchers for ServiceAccountBindings - watcherMap map[types.NamespacedName]watch.Interface - // watcherMutex protects watcherMap - watcherMutex sync.RWMutex } const ServiceAccountBindingFinalizer = "serviceaccountbinding.resource.streamnative.io/finalizer" @@ -56,78 +51,6 @@ const ServiceAccountBindingFinalizer = "serviceaccountbinding.resource.streamnat //+kubebuilder:rbac:groups=resource.streamnative.io,resources=serviceaccounts,verbs=get;list;watch //+kubebuilder:rbac:groups=resource.streamnative.io,resources=streamnativecloudconnections,verbs=get;list;watch -// handleWatchEvents processes events from the watch interface -func (r *ServiceAccountBindingReconciler) handleWatchEvents(ctx context.Context, namespacedName types.NamespacedName, watcher watch.Interface) { - logger := log.FromContext(ctx) - defer watcher.Stop() - - for { - select { - case <-ctx.Done(): - return - case event, ok := <-watcher.ResultChan(): - if !ok { - logger.Info("Watch channel closed", "namespace", namespacedName.Namespace, "name", namespacedName.Name) - // Remove the watcher from the map - r.watcherMutex.Lock() - delete(r.watcherMap, namespacedName) - r.watcherMutex.Unlock() - return - } - - if event.Type == watch.Modified { - // Check if the object is a ServiceAccountBinding - _, ok := event.Object.(*cloudapi.ServiceAccountBinding) - if !ok { - logger.Error(fmt.Errorf("unexpected object type"), "Failed to convert object to ServiceAccountBinding") - continue - } - - // Get the local ServiceAccountBinding - localBinding := &resourcev1alpha1.ServiceAccountBinding{} - if err := r.Get(ctx, namespacedName, localBinding); err != nil { - logger.Error(err, "Failed to get local ServiceAccountBinding") - continue - } - - // Update status - r.updateServiceAccountBindingStatus(ctx, localBinding, nil, "Ready", "ServiceAccountBinding synced successfully") - } - } - } -} - -// setupWatch creates a new watcher for a ServiceAccountBinding -func (r *ServiceAccountBindingReconciler) setupWatch(ctx context.Context, binding *resourcev1alpha1.ServiceAccountBinding, bindingClient *controllers2.ServiceAccountBindingClient) error { - namespacedName := types.NamespacedName{ - Namespace: binding.Namespace, - Name: binding.Name, - } - - // Check if we already have a watcher - r.watcherMutex.RLock() - _, exists := r.watcherMap[namespacedName] - r.watcherMutex.RUnlock() - if exists { - return nil - } - - // Create new watcher - watcher, err := bindingClient.WatchServiceAccountBinding(ctx, binding.Name) - if err != nil { - return fmt.Errorf("failed to create watcher: %w", err) - } - - // Store watcher in map - r.watcherMutex.Lock() - r.watcherMap[namespacedName] = watcher - r.watcherMutex.Unlock() - - // Start watching in a new goroutine - go r.handleWatchEvents(ctx, namespacedName, watcher) - return nil -} - // Reconcile handles the reconciliation of ServiceAccountBinding objects func (r *ServiceAccountBindingReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { logger := log.FromContext(ctx) @@ -140,13 +63,6 @@ func (r *ServiceAccountBindingReconciler) Reconcile(ctx context.Context, req ctr binding := &resourcev1alpha1.ServiceAccountBinding{} if err := r.Get(ctx, req.NamespacedName, binding); err != nil { if apierrors.IsNotFound(err) { - // Stop and remove watcher if it exists - r.watcherMutex.Lock() - if watcher, exists := r.watcherMap[req.NamespacedName]; exists { - watcher.Stop() - delete(r.watcherMap, req.NamespacedName) - } - r.watcherMutex.Unlock() return ctrl.Result{}, nil } return ctrl.Result{}, err @@ -155,38 +71,56 @@ func (r *ServiceAccountBindingReconciler) Reconcile(ctx context.Context, req ctr // Get the ServiceAccount serviceAccount := &resourcev1alpha1.ServiceAccount{} if err := r.Get(ctx, types.NamespacedName{ - Namespace: req.Namespace, + Namespace: req.Namespace, // Assuming ServiceAccount is in the same namespace as the binding Name: binding.Spec.ServiceAccountName, }, serviceAccount); err != nil { r.updateServiceAccountBindingStatus(ctx, binding, err, "ServiceAccountNotFound", - fmt.Sprintf("Failed to get ServiceAccount: %v", err)) - return ctrl.Result{}, err + fmt.Sprintf("Failed to get ServiceAccount %s: %v", binding.Spec.ServiceAccountName, err)) + return ctrl.Result{}, err // Return error to requeue immediately + } + + // Check if the referenced ServiceAccount is ready. + // This is a simplified check. A more robust check would iterate through conditions. + saReady := false + for _, cond := range serviceAccount.Status.Conditions { + if cond.Type == "Ready" && cond.Status == metav1.ConditionTrue { + saReady = true + break + } + } + if !saReady { + errMsg := fmt.Sprintf("ServiceAccount %s is not yet Ready. Will requeue.", serviceAccount.Name) + logger.Info(errMsg) + r.updateServiceAccountBindingStatus(ctx, binding, errors.New(errMsg), "ServiceAccountNotReady", errMsg) + return ctrl.Result{RequeueAfter: requeueInterval}, nil // Requeue, as SA might become ready. } // Determine which APIServerRef to use - // If APIServerRef is specified in binding, use that; otherwise use the one from ServiceAccount apiServerRefName := "" if binding.Spec.APIServerRef != nil && binding.Spec.APIServerRef.Name != "" { apiServerRefName = binding.Spec.APIServerRef.Name - } else { + } else if serviceAccount.Spec.APIServerRef.Name != "" { // Corrected: Check Name for non-pointer struct apiServerRefName = serviceAccount.Spec.APIServerRef.Name + } else { + err := fmt.Errorf("APIServerRef not found in ServiceAccountBinding spec or its referenced ServiceAccount spec") + r.updateServiceAccountBindingStatus(ctx, binding, err, "ValidationFailed", err.Error()) + return ctrl.Result{}, err // No need to requeue if this is a permanent misconfiguration } // Get the APIServerConnection connection := &resourcev1alpha1.StreamNativeCloudConnection{} if err := r.Get(ctx, types.NamespacedName{ - Namespace: req.Namespace, + Namespace: req.Namespace, // Assuming StreamNativeCloudConnection is in the same namespace Name: apiServerRefName, }, connection); err != nil { r.updateServiceAccountBindingStatus(ctx, binding, err, "ConnectionNotFound", - fmt.Sprintf("Failed to get APIServerConnection: %v", err)) - return ctrl.Result{}, err + fmt.Sprintf("Failed to get StreamNativeCloudConnection %s: %v", apiServerRefName, err)) + return ctrl.Result{}, err // Return error to requeue immediately } // Get API connection apiConn, err := r.ConnectionManager.GetOrCreateConnection(connection, nil) if err != nil { - // If connection is not initialized, requeue the request if _, ok := err.(*NotInitializedError); ok { logger.Info("Connection not initialized, requeueing", "error", err.Error()) return ctrl.Result{Requeue: true}, nil @@ -199,9 +133,9 @@ func (r *ServiceAccountBindingReconciler) Reconcile(ctx context.Context, req ctr // Get organization from connection organization := connection.Spec.Organization if organization == "" { - err := fmt.Errorf("organization is required but not specified") + err := fmt.Errorf("organization is required in StreamNativeCloudConnection %s but not specified", connection.Name) r.updateServiceAccountBindingStatus(ctx, binding, err, "ValidationFailed", err.Error()) - return ctrl.Result{}, err + return ctrl.Result{}, err // No need to requeue if this is a permanent misconfiguration } // Create ServiceAccountBinding client @@ -214,38 +148,32 @@ func (r *ServiceAccountBindingReconciler) Reconcile(ctx context.Context, req ctr // Validate PoolMemberRefs if len(binding.Spec.PoolMemberRefs) == 0 { - err := fmt.Errorf("at least one poolMemberRef is required") + err := fmt.Errorf("at least one poolMemberRef is required in spec.poolMemberRefs") r.updateServiceAccountBindingStatus(ctx, binding, err, "ValidationFailed", err.Error()) - return ctrl.Result{}, err + return ctrl.Result{}, err // No need to requeue if this is a permanent misconfiguration } // Handle deletion if !binding.DeletionTimestamp.IsZero() { - if controllers2.ContainsString(binding.Finalizers, ServiceAccountBindingFinalizer) { - // Try to delete remote ServiceAccountBinding for each PoolMemberRef + if controllerutil.ContainsFinalizer(binding, ServiceAccountBindingFinalizer) { for i, poolMemberRef := range binding.Spec.PoolMemberRefs { - // Create a unique name for each remote ServiceAccountBinding based on the local binding name and index remoteName := fmt.Sprintf("%s.%s.%s", binding.Spec.ServiceAccountName, poolMemberRef.Namespace, poolMemberRef.Name) - - // Delete the remote ServiceAccountBinding if err := bindingClient.DeleteServiceAccountBinding(ctx, &resourcev1alpha1.ServiceAccountBinding{ ObjectMeta: metav1.ObjectMeta{ - Name: remoteName, + Name: remoteName, // This should ideally match the name used when creating/getting }, }); err != nil { if !apierrors.IsNotFound(err) { r.updateServiceAccountBindingStatus(ctx, binding, err, "DeleteFailed", - fmt.Sprintf("Failed to delete remote ServiceAccountBinding for PoolMemberRef %d: %v", i, err)) + fmt.Sprintf("Failed to delete remote ServiceAccountBinding for PoolMemberRef %d (%s): %v", i, remoteName, err)) return ctrl.Result{}, err } - // If the resource is already gone, that's fine logger.Info("Remote ServiceAccountBinding already deleted or not found", "binding", remoteName, "poolMemberRef", poolMemberRef) } } - // Remove finalizer after successful deletion of all remote ServiceAccountBindings - binding.Finalizers = controllers2.RemoveString(binding.Finalizers, ServiceAccountBindingFinalizer) + controllerutil.RemoveFinalizer(binding, ServiceAccountBindingFinalizer) if err := r.Update(ctx, binding); err != nil { return ctrl.Result{}, err } @@ -253,81 +181,79 @@ func (r *ServiceAccountBindingReconciler) Reconcile(ctx context.Context, req ctr return ctrl.Result{}, nil } - // Add finalizer if it doesn't exist - if !controllers2.ContainsString(binding.Finalizers, ServiceAccountBindingFinalizer) { - binding.Finalizers = append(binding.Finalizers, ServiceAccountBindingFinalizer) + // Add finalizer + if !controllerutil.ContainsFinalizer(binding, ServiceAccountBindingFinalizer) { + controllerutil.AddFinalizer(binding, ServiceAccountBindingFinalizer) if err := r.Update(ctx, binding); err != nil { + r.updateServiceAccountBindingStatus(ctx, binding, err, "UpdateFinalizerFailed", + fmt.Sprintf("Failed to add finalizer: %v", err)) return ctrl.Result{}, err } + // Requeue after adding finalizer to ensure the update is processed before proceeding + return ctrl.Result{Requeue: true}, nil } - // Process each PoolMemberRef - var errs []error + // Reconcile each PoolMemberRef + allBindingsReady := true + var lastError error for i, poolMemberRef := range binding.Spec.PoolMemberRefs { - // Create a unique name for each remote ServiceAccountBinding based on the local binding name and index remoteName := fmt.Sprintf("%s.%s.%s", binding.Spec.ServiceAccountName, poolMemberRef.Namespace, poolMemberRef.Name) - // Create a ServiceAccountBinding for this PoolMemberRef - remoteBinding := &resourcev1alpha1.ServiceAccountBinding{ + // This is the payload for the client's CreateServiceAccountBinding method + // It should be of type *resourcev1alpha1.ServiceAccountBinding + payloadForClient := &resourcev1alpha1.ServiceAccountBinding{ ObjectMeta: metav1.ObjectMeta{ - Name: remoteName, + Name: remoteName, // This name will be used by the client's convertToCloudServiceAccountBinding + Namespace: binding.Namespace, // Preserve original namespace, client might use it or organization }, Spec: resourcev1alpha1.ServiceAccountBindingSpec{ ServiceAccountName: binding.Spec.ServiceAccountName, - PoolMemberRefs: []resourcev1alpha1.PoolMemberReference{poolMemberRef}, + PoolMemberRefs: []resourcev1alpha1.PoolMemberReference{poolMemberRef}, // Single ref for this specific call + // APIServerRef is not directly used by convertToCloudServiceAccountBinding for spec, but keep for completeness if other logic depends on it + APIServerRef: binding.Spec.APIServerRef, }, } - // Check if ServiceAccountBinding exists - existingBinding, err := bindingClient.GetServiceAccountBinding(ctx, remoteName) + // Check if the remote binding already exists + existingRemoteBinding, err := bindingClient.GetServiceAccountBinding(ctx, remoteName) if err != nil { - logger.Info("Failed to get ServiceAccountBinding", "error", err, "existingBinding", existingBinding) - if !apierrors.IsNotFound(err) { - errs = append(errs, fmt.Errorf("failed to get ServiceAccountBinding for PoolMemberRef %d: %w", i, err)) - continue - } - existingBinding = nil - } - - if existingBinding == nil { - // Create ServiceAccountBinding - _, err := bindingClient.CreateServiceAccountBinding(ctx, remoteBinding) - if err != nil { - errs = append(errs, fmt.Errorf("failed to create ServiceAccountBinding for PoolMemberRef %d: %w", i, err)) + if apierrors.IsNotFound(err) { + // Remote binding does not exist, so create it. + if _, err := bindingClient.CreateServiceAccountBinding(ctx, payloadForClient); err != nil { + errMsg := fmt.Sprintf("Failed to create remote ServiceAccountBinding for PoolMemberRef %d (%s): %v", i, remoteName, err) + r.updateServiceAccountBindingStatus(ctx, binding, err, "CreateFailed", errMsg) + allBindingsReady = false + lastError = errors.Wrapf(err, errMsg) + continue + } + logger.Info("Successfully created remote ServiceAccountBinding", "bindingName", remoteName, "poolMemberRef", poolMemberRef) + } else { + // Another error occurred while trying to get the remote binding. + errMsg := fmt.Sprintf("Failed to get remote ServiceAccountBinding for PoolMemberRef %d (%s): %v", i, remoteName, err) + r.updateServiceAccountBindingStatus(ctx, binding, err, "GetFailed", errMsg) + allBindingsReady = false + lastError = errors.Wrapf(err, errMsg) continue } - logger.Info("Created ServiceAccountBinding", "name", remoteName, "poolMemberRef", poolMemberRef) } else { - // Update ServiceAccountBinding - _, err := bindingClient.UpdateServiceAccountBinding(ctx, remoteBinding) - if err != nil { - errs = append(errs, fmt.Errorf("failed to update ServiceAccountBinding for PoolMemberRef %d: %w", i, err)) - continue - } - logger.Info("Updated ServiceAccountBinding", "name", remoteName, "poolMemberRef", poolMemberRef) - } - - // Setup watch for each remote binding - if err := r.setupWatch(ctx, remoteBinding, bindingClient); err != nil { - logger.Error(err, "Failed to setup watch", "name", remoteName) - // Don't return error, just log it + // Remote binding exists. + logger.Info("Remote ServiceAccountBinding already exists", "bindingName", remoteName, "poolMemberRef", poolMemberRef, "existingRemoteName", existingRemoteBinding.ObjectMeta.Name) + // TODO: Implement update logic if necessary. + // Compare existingRemoteBinding.Spec with what payloadForClient would generate via conversion. + // For now, we assume if it exists, it's correctly configured or updates are not handled here. } } - // Update status based on errors - if len(errs) > 0 { - // Combine error messages - errorMsg := "Failed to process some PoolMemberRefs: " - for _, err := range errs { - errorMsg += err.Error() + "; " - } - r.updateServiceAccountBindingStatus(ctx, binding, errors.New(errorMsg), "ProcessingFailed", errorMsg) - return ctrl.Result{}, nil + if !allBindingsReady { + // If any binding failed, the overall status is not Ready. + // The status message will reflect the last error encountered during the loop for simplicity. + // A more sophisticated approach might collect all errors. + r.updateServiceAccountBindingStatus(ctx, binding, lastError, "Reconciling", "Some ServiceAccountBindings are still being processed or encountered errors. See last error for details.") + return ctrl.Result{RequeueAfter: requeueInterval}, nil } - // Update status - r.updateServiceAccountBindingStatus(ctx, binding, nil, "Ready", "All ServiceAccountBindings processed successfully") - + r.updateServiceAccountBindingStatus(ctx, binding, nil, "Ready", "All ServiceAccountBindings synced successfully") + logger.Info("Successfully reconciled ServiceAccountBinding") return ctrl.Result{RequeueAfter: requeueInterval}, nil } @@ -338,61 +264,57 @@ func (r *ServiceAccountBindingReconciler) updateServiceAccountBindingStatus( reason string, message string, ) { - // Create new condition + logger := log.FromContext(ctx) + binding.Status.ObservedGeneration = binding.Generation condition := metav1.Condition{ Type: "Ready", - Status: metav1.ConditionTrue, - Reason: reason, - Message: message, ObservedGeneration: binding.Generation, LastTransitionTime: metav1.Now(), + Reason: reason, + Message: message, } - if err != nil { + logger.Error(err, message, "serviceAccountBindingName", binding.Name) condition.Status = metav1.ConditionFalse + } else { + condition.Status = metav1.ConditionTrue } - // Create new conditions slice for comparison - newConditions := make([]metav1.Condition, 0) + newConditions := []metav1.Condition{} + foundReady := false for _, c := range binding.Status.Conditions { - if c.Type != condition.Type { + if c.Type == "Ready" { + // If we're providing an update for the current generation, replace the old Ready condition. + if c.ObservedGeneration == condition.ObservedGeneration { + newConditions = append(newConditions, condition) + foundReady = true + } else if c.ObservedGeneration < condition.ObservedGeneration { + // New condition is for a newer generation, replace old one + newConditions = append(newConditions, condition) + foundReady = true + } else { + // Old condition is for a newer generation (should not happen if logic is correct, but keep it) + newConditions = append(newConditions, c) + foundReady = true // Mark as found, so we don't add the new one if old is newer + } + } else { newConditions = append(newConditions, c) } } - newConditions = append(newConditions, condition) - - // Check if status has actually changed - if !controllers2.StatusHasChanged(binding.Status.Conditions, newConditions) { - return + if !foundReady { + newConditions = append(newConditions, condition) } - - // Update conditions binding.Status.Conditions = newConditions - // Update observed generation - binding.Status.ObservedGeneration = binding.Generation - - // Update status - if err := r.Status().Update(ctx, binding); err != nil { - log.FromContext(ctx).Error(err, "Failed to update ServiceAccountBinding status") + if statusUpdateErr := r.Status().Update(ctx, binding); statusUpdateErr != nil { + logger.Error(statusUpdateErr, "Failed to update ServiceAccountBinding status", "serviceAccountBindingName", binding.Name) } } // SetupWithManager sets up the controller with the Manager. func (r *ServiceAccountBindingReconciler) SetupWithManager(mgr ctrl.Manager) error { - // Initialize the watcher map - r.watcherMap = make(map[types.NamespacedName]watch.Interface) - return ctrl.NewControllerManagedBy(mgr). For(&resourcev1alpha1.ServiceAccountBinding{}). - WithEventFilter(predicate.Or( - // Trigger on spec changes - predicate.GenerationChangedPredicate{}, - // Trigger periodically to sync status - predicate.NewPredicateFuncs(func(object client.Object) bool { - // Trigger every minute to sync status - return true - }), - )). + WithEventFilter(predicate.GenerationChangedPredicate{}). Complete(r) } diff --git a/controllers/workspace_controller.go b/controllers/workspace_controller.go index 4a3d9b67..2f4d6cf4 100644 --- a/controllers/workspace_controller.go +++ b/controllers/workspace_controller.go @@ -17,7 +17,6 @@ package controllers import ( "context" "fmt" - "sync" "time" resourcev1alpha1 "github.com/streamnative/pulsar-resources-operator/api/v1alpha1" @@ -27,13 +26,11 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/watch" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/predicate" - - computeapi "github.com/streamnative/pulsar-resources-operator/pkg/streamnativecloud/apis/compute/v1alpha1" ) // WorkspaceReconciler reconciles a Workspace object @@ -41,10 +38,6 @@ type WorkspaceReconciler struct { client.Client Scheme *runtime.Scheme ConnectionManager *ConnectionManager - // watcherMap stores active watchers for Workspaces - watcherMap map[types.NamespacedName]watch.Interface - // watcherMutex protects watcherMap - watcherMutex sync.RWMutex } //+kubebuilder:rbac:groups=resource.streamnative.io,resources=computeworkspaces,verbs=get;list;watch;create;update;patch;delete @@ -52,80 +45,6 @@ type WorkspaceReconciler struct { //+kubebuilder:rbac:groups=resource.streamnative.io,resources=computeworkspaces/finalizers,verbs=update //+kubebuilder:rbac:groups=resource.streamnative.io,resources=streamnativecloudconnections,verbs=get;list;watch -// handleWatchEvents processes events from the watch interface -func (r *WorkspaceReconciler) handleWatchEvents(ctx context.Context, namespacedName types.NamespacedName, watcher watch.Interface) { - logger := log.FromContext(ctx) - defer watcher.Stop() - - for { - select { - case <-ctx.Done(): - return - case event, ok := <-watcher.ResultChan(): - if !ok { - logger.Info("Watch channel closed", "namespace", namespacedName.Namespace, "name", namespacedName.Name) - // Remove the watcher from the map - r.watcherMutex.Lock() - delete(r.watcherMap, namespacedName) - r.watcherMutex.Unlock() - return - } - - if event.Type == watch.Modified { - remoteWorkspace, ok := event.Object.(*computeapi.Workspace) - if !ok { - logger.Error(fmt.Errorf("unexpected object type"), "Failed to convert object to Workspace") - continue - } - - // Get the local workspace - localWorkspace := &resourcev1alpha1.ComputeWorkspace{} - if err := r.Get(ctx, namespacedName, localWorkspace); err != nil { - logger.Error(err, "Failed to get local Workspace") - continue - } - - // Update workspace ID and status - localWorkspace.Status.WorkspaceID = remoteWorkspace.Name - - // Update status - r.updateWorkspaceStatus(ctx, localWorkspace, nil, "Ready", "Workspace synced successfully") - } - } - } -} - -// setupWatch creates a new watcher for a Workspace -func (r *WorkspaceReconciler) setupWatch(ctx context.Context, workspace *resourcev1alpha1.ComputeWorkspace, workspaceClient *controllers2.WorkspaceClient) error { - namespacedName := types.NamespacedName{ - Namespace: workspace.Namespace, - Name: workspace.Name, - } - - // Check if we already have a watcher - r.watcherMutex.RLock() - _, exists := r.watcherMap[namespacedName] - r.watcherMutex.RUnlock() - if exists { - return nil - } - - // Create new watcher - watcher, err := workspaceClient.WatchWorkspace(ctx, workspace.Name) - if err != nil { - return fmt.Errorf("failed to create watcher: %w", err) - } - - // Store watcher in map - r.watcherMutex.Lock() - r.watcherMap[namespacedName] = watcher - r.watcherMutex.Unlock() - - // Start watching in a new goroutine - go r.handleWatchEvents(ctx, namespacedName, watcher) - return nil -} - // Reconcile handles the reconciliation of Workspace objects func (r *WorkspaceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { logger := log.FromContext(ctx) @@ -138,13 +57,7 @@ func (r *WorkspaceReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( workspace := &resourcev1alpha1.ComputeWorkspace{} if err := r.Get(ctx, req.NamespacedName, workspace); err != nil { if apierrors.IsNotFound(err) { - // Stop and remove watcher if it exists - r.watcherMutex.Lock() - if watcher, exists := r.watcherMap[req.NamespacedName]; exists { - watcher.Stop() - delete(r.watcherMap, req.NamespacedName) - } - r.watcherMutex.Unlock() + logger.Info("Workspace not found. Reconciliation will stop.", "namespace", req.Namespace, "name", req.Name) return ctrl.Result{}, nil } return ctrl.Result{}, err @@ -192,7 +105,7 @@ func (r *WorkspaceReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( // Handle deletion if !workspace.DeletionTimestamp.IsZero() { - if controllers2.ContainsString(workspace.Finalizers, controllers2.WorkspaceFinalizer) { + if controllerutil.ContainsFinalizer(workspace, controllers2.WorkspaceFinalizer) { // Try to delete remote workspace if err := workspaceClient.DeleteWorkspace(ctx, workspace); err != nil { if !apierrors.IsNotFound(err) { @@ -206,7 +119,7 @@ func (r *WorkspaceReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( } // Remove finalizer after successful deletion - workspace.Finalizers = controllers2.RemoveString(workspace.Finalizers, controllers2.WorkspaceFinalizer) + controllerutil.RemoveFinalizer(workspace, controllers2.WorkspaceFinalizer) if err := r.Update(ctx, workspace); err != nil { return ctrl.Result{}, err } @@ -215,11 +128,13 @@ func (r *WorkspaceReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( } // Add finalizer if it doesn't exist - if !controllers2.ContainsString(workspace.Finalizers, controllers2.WorkspaceFinalizer) { - workspace.Finalizers = append(workspace.Finalizers, controllers2.WorkspaceFinalizer) + if !controllerutil.ContainsFinalizer(workspace, controllers2.WorkspaceFinalizer) { + controllerutil.AddFinalizer(workspace, controllers2.WorkspaceFinalizer) if err := r.Update(ctx, workspace); err != nil { return ctrl.Result{}, err } + // Requeue after adding finalizer to ensure the update is processed before proceeding + return ctrl.Result{Requeue: true}, nil } // Check if workspace exists @@ -248,23 +163,16 @@ func (r *WorkspaceReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( r.updateWorkspaceStatus(ctx, workspace, nil, "Ready", "Workspace created successfully") } else { // Update workspace - resp, err := workspaceClient.UpdateWorkspace(ctx, workspace) + updatedWorkspace, err := workspaceClient.UpdateWorkspace(ctx, workspace) if err != nil { r.updateWorkspaceStatus(ctx, workspace, err, "UpdateWorkspaceFailed", fmt.Sprintf("Failed to update workspace: %v", err)) return ctrl.Result{}, err } - // Update status - workspace.Status.WorkspaceID = resp.Name + workspace.Status.WorkspaceID = updatedWorkspace.Name r.updateWorkspaceStatus(ctx, workspace, nil, "Ready", "Workspace updated successfully") } - // Setup watch after workspace is created/updated - if err := r.setupWatch(ctx, workspace, workspaceClient); err != nil { - logger.Error(err, "Failed to setup watch") - // Don't return error, just log it - } - return ctrl.Result{RequeueAfter: requeueInterval}, nil } @@ -275,61 +183,50 @@ func (r *WorkspaceReconciler) updateWorkspaceStatus( reason string, message string, ) { - // Create new condition - condition := metav1.Condition{ + logger := log.FromContext(ctx) + workspace.Status.ObservedGeneration = workspace.Generation + + newCondition := metav1.Condition{ Type: "Ready", Status: metav1.ConditionTrue, Reason: reason, Message: message, - ObservedGeneration: workspace.Generation, LastTransitionTime: metav1.Now(), } if err != nil { - condition.Status = metav1.ConditionFalse - } - - // Create new conditions slice for comparison - newConditions := make([]metav1.Condition, 0) - for _, c := range workspace.Status.Conditions { - if c.Type != condition.Type { - newConditions = append(newConditions, c) + newCondition.Status = metav1.ConditionFalse + } + + // Update ready condition + found := false + for i, condition := range workspace.Status.Conditions { + if condition.Type == "Ready" { + if condition.Status != newCondition.Status || condition.Reason != newCondition.Reason || condition.Message != newCondition.Message { + workspace.Status.Conditions[i] = newCondition + } else { + newCondition.LastTransitionTime = condition.LastTransitionTime + workspace.Status.Conditions[i] = newCondition + } + found = true + break } } - newConditions = append(newConditions, condition) - // Check if status has actually changed - if !controllers2.StatusHasChanged(workspace.Status.Conditions, newConditions) { - return + if !found { + workspace.Status.Conditions = append(workspace.Status.Conditions, newCondition) } - // Update conditions - workspace.Status.Conditions = newConditions - - // Update observed generation - workspace.Status.ObservedGeneration = workspace.Generation - - // Update status - if err := r.Status().Update(ctx, workspace); err != nil { - log.FromContext(ctx).Error(err, "Failed to update Workspace status") + // Persist status update + if statusUpdateErr := r.Status().Update(ctx, workspace); statusUpdateErr != nil { + logger.Error(statusUpdateErr, "Failed to update Workspace status") } } // SetupWithManager sets up the controller with the Manager. func (r *WorkspaceReconciler) SetupWithManager(mgr ctrl.Manager) error { - // Initialize the watcher map - r.watcherMap = make(map[types.NamespacedName]watch.Interface) - return ctrl.NewControllerManagedBy(mgr). For(&resourcev1alpha1.ComputeWorkspace{}). - WithEventFilter(predicate.Or( - // Trigger on spec changes - predicate.GenerationChangedPredicate{}, - // Trigger periodically to sync status - predicate.NewPredicateFuncs(func(object client.Object) bool { - // Trigger every minute to sync status - return true - }), - )). + WithEventFilter(predicate.GenerationChangedPredicate{}). Complete(r) } diff --git a/pkg/crypto/rsa.go b/pkg/crypto/rsa.go index a8d5469a..a944f77a 100644 --- a/pkg/crypto/rsa.go +++ b/pkg/crypto/rsa.go @@ -43,7 +43,7 @@ func ExportPublicKeyAsPEM(privateKey *rsa.PrivateKey) (string, error) { } publicKeyBlock := &pem.Block{ - Type: "RSA PUBLIC KEY", + Type: "PUBLIC KEY", Bytes: publicKeyBytes, } diff --git a/pkg/streamnativecloud/apikey_client.go b/pkg/streamnativecloud/apikey_client.go index 7ff81dc5..bb4fbbd3 100644 --- a/pkg/streamnativecloud/apikey_client.go +++ b/pkg/streamnativecloud/apikey_client.go @@ -16,10 +16,6 @@ package streamnativecloud import ( "context" - "crypto/rand" - "crypto/rsa" - "crypto/x509" - "encoding/pem" "fmt" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -102,9 +98,11 @@ func convertToCloudAPIKey(apiKey *resourcev1alpha1.APIKey) *cloudapi.APIKey { if apiKey.Spec.ExpirationTime != nil { cloudAPIKey.Spec.ExpirationTime = apiKey.Spec.ExpirationTime } - - privateKey := GenerateEncryptionKey() - cloudAPIKey.Spec.EncryptionKey = ExportPublicKey(privateKey) + if apiKey.Spec.EncryptionKey != nil { + cloudAPIKey.Spec.EncryptionKey = &cloudapi.EncryptionKey{ + PEM: apiKey.Spec.EncryptionKey.PEM, + } + } return cloudAPIKey } @@ -138,25 +136,3 @@ func (c *APIKeyClient) UpdateAPIKey(ctx context.Context, apiKey *resourcev1alpha func (c *APIKeyClient) DeleteAPIKey(ctx context.Context, apiKey *resourcev1alpha1.APIKey) error { return c.client.CloudV1alpha1().APIKeys(c.organization).Delete(ctx, apiKey.Name, metav1.DeleteOptions{}) } - -func GenerateEncryptionKey() *rsa.PrivateKey { - privateKey, err := rsa.GenerateKey(rand.Reader, 2048) - if err != nil { - panic(err) - } - return privateKey -} - -func ExportPublicKey(key *rsa.PrivateKey) *cloudapi.EncryptionKey { - der, err := x509.MarshalPKIXPublicKey(&key.PublicKey) - if err != nil { - panic(err) - } - pemKey := pem.EncodeToMemory(&pem.Block{ - Type: "PUBLIC KEY", - Bytes: der, - }) - return &cloudapi.EncryptionKey{ - PEM: string(pemKey), - } -}