diff --git a/api/bases/heat.openstack.org_heats.yaml b/api/bases/heat.openstack.org_heats.yaml index b81109eb..66e6508e 100644 --- a/api/bases/heat.openstack.org_heats.yaml +++ b/api/bases/heat.openstack.org_heats.yaml @@ -1959,12 +1959,45 @@ spec: default: memcached description: Memcached instance name. type: string + messagingBus: + description: MessagingBus - Messaging Bus configuration + properties: + cluster: + description: Name of the cluster + minLength: 1 + type: string + user: + description: User - RabbitMQ username + type: string + vhost: + description: Vhost - RabbitMQ vhost name + type: string + required: + - cluster + type: object nodeSelector: additionalProperties: type: string description: NodeSelector to target subset of worker nodes for running the Heat services type: object + notificationsBus: + description: NotificationsBus - Notifications Bus configuration (optional, + separate from MessagingBus) + properties: + cluster: + description: Name of the cluster + minLength: 1 + type: string + user: + description: User - RabbitMQ username + type: string + vhost: + description: Vhost - RabbitMQ vhost name + type: string + required: + - cluster + type: object passwordSelectors: default: authEncryptionKey: HeatAuthEncryptionKey @@ -1999,6 +2032,24 @@ spec: RabbitMQ instance name Needed to request a transportURL that is created and used in Heat type: string + rabbitmq: + description: |- + RabbitMQ - RabbitMQ configuration overrides + Deprecated: Use MessagingBus instead + properties: + cluster: + description: Name of the cluster + minLength: 1 + type: string + user: + description: User - RabbitMQ username + type: string + vhost: + description: Vhost - RabbitMQ vhost name + type: string + required: + - cluster + type: object secret: description: |- Secret containing OpenStack password information for heat HeatDatabasePassword, HeatPassword @@ -2100,6 +2151,10 @@ spec: description: ReadyCount of Heat Engine instance format: int32 type: integer + notificationsTransportURLSecret: + description: NotificationsTransportURLSecret - Secret containing Notifications + RabbitMQ transportURL + type: string observedGeneration: description: |- ObservedGeneration - the most recent generation observed for this diff --git a/api/go.mod b/api/go.mod index 6d669ec1..7e167af7 100644 --- a/api/go.mod +++ b/api/go.mod @@ -16,7 +16,6 @@ require ( github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/emicklei/go-restful/v3 v3.12.2 // indirect - github.com/evanphx/json-patch v5.9.11+incompatible // indirect github.com/evanphx/json-patch/v5 v5.9.11 // indirect github.com/fsnotify/fsnotify v1.9.0 // indirect github.com/fxamacker/cbor/v2 v2.9.0 // indirect @@ -45,6 +44,7 @@ require ( github.com/prometheus/client_model v0.6.2 // indirect github.com/prometheus/common v0.65.0 // indirect github.com/prometheus/procfs v0.16.1 // indirect + github.com/rabbitmq/cluster-operator/v2 v2.16.0 // indirect github.com/spf13/pflag v1.0.7 // indirect github.com/x448/float16 v0.8.4 // indirect go.yaml.in/yaml/v2 v2.4.2 // indirect diff --git a/api/go.sum b/api/go.sum index 4a8ff286..6dc1a2a8 100644 --- a/api/go.sum +++ b/api/go.sum @@ -1,3 +1,4 @@ +github.com/Masterminds/semver v1.5.0 h1:H65muMkzWKEuNDnfl9d70GUjFniHKHRbFPGBuZ3QEww= github.com/Masterminds/semver/v3 v3.4.0 h1:Zog+i5UMtVoCU8oKka5P7i9q9HgrJeGzI9SA1Xbatp0= github.com/Masterminds/semver/v3 v3.4.0/go.mod h1:4V+yj/TJE1HU9XfppCwVMZq3I84lprf4nC11bSS5beM= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= @@ -84,6 +85,8 @@ github.com/openstack-k8s-operators/lib-common/modules/common v0.6.1-0.2025123021 github.com/openstack-k8s-operators/lib-common/modules/common v0.6.1-0.20251230215914-6ba873b49a35/go.mod h1:kycZyoe7OZdW1HUghr2nI3N7wSJtNahXf6b/ypD14f4= github.com/openstack-k8s-operators/lib-common/modules/storage v0.6.1-0.20251230215914-6ba873b49a35 h1:8WZYfCt1VJHa5sJRX0UhpmoXud/fn8LHQhXsakdYXuQ= github.com/openstack-k8s-operators/lib-common/modules/storage v0.6.1-0.20251230215914-6ba873b49a35/go.mod h1:H0aQANk8iJPRhS2Bg9n6cYb/IHF0Cks9g7+uZG04Rhk= +github.com/openstack-k8s-operators/rabbitmq-cluster-operator/v2 v2.6.1-0.20250929174222-a0d328fa4dec h1:saovr368HPAKHN0aRPh8h8n9s9dn3d8Frmfua0UYRlc= +github.com/openstack-k8s-operators/rabbitmq-cluster-operator/v2 v2.6.1-0.20250929174222-a0d328fa4dec/go.mod h1:Nh2NEePLjovUQof2krTAg4JaAoLacqtPTZQXK6izNfg= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= diff --git a/api/v1beta1/conditions.go b/api/v1beta1/conditions.go index 7fd7fe50..5be19c30 100644 --- a/api/v1beta1/conditions.go +++ b/api/v1beta1/conditions.go @@ -31,6 +31,9 @@ const ( // HeatStackDomainReadyCondition ... HeatStackDomainReadyCondition condition.Type = "HeatStackDomainReady" + + // HeatNotificationBusReadyCondition ... + HeatNotificationBusReadyCondition condition.Type = "HeatNotificationBusReady" ) // Common Messages used by API objects. @@ -76,4 +79,19 @@ const ( // HeatStackDomainReadyErrorMessage HeatStackDomainReadyErrorMessage = "HeatStackDomain error occured %s" + + // + // HeatNotificationBusReady condition messages + // + // HeatNotificationBusReadyInitMessage + HeatNotificationBusReadyInitMessage = "HeatNotificationBus not started" + + // HeatNotificationBusReadyRunningMessage + HeatNotificationBusReadyRunningMessage = "HeatNotificationBus creation in progress" + + // HeatNotificationBusReadyMessage + HeatNotificationBusReadyMessage = "HeatNotificationBus successfully created" + + // HeatNotificationBusReadyErrorMessage + HeatNotificationBusReadyErrorMessage = "HeatNotificationBus error occured %s" ) diff --git a/api/v1beta1/heat_types.go b/api/v1beta1/heat_types.go index 59320b8d..5d7bb799 100644 --- a/api/v1beta1/heat_types.go +++ b/api/v1beta1/heat_types.go @@ -17,6 +17,7 @@ limitations under the License. package v1beta1 import ( + rabbitmqv1 "github.com/openstack-k8s-operators/infra-operator/apis/rabbitmq/v1beta1" topologyv1 "github.com/openstack-k8s-operators/infra-operator/apis/topology/v1beta1" "github.com/openstack-k8s-operators/lib-common/modules/common/condition" "github.com/openstack-k8s-operators/lib-common/modules/common/util" @@ -41,6 +42,7 @@ const ( HeatDatabaseMigrationAnnotation = "heat.openstack.org/database-migration" ) + // HeatSpec defines the desired state of Heat type HeatSpec struct { HeatSpecBase `json:",inline"` @@ -95,6 +97,19 @@ type HeatSpecBase struct { // Needed to request a transportURL that is created and used in Heat RabbitMqClusterName string `json:"rabbitMqClusterName"` + // +kubebuilder:validation:Optional + // MessagingBus - Messaging Bus configuration + MessagingBus rabbitmqv1.RabbitMqConfig `json:"messagingBus,omitempty"` + + // +kubebuilder:validation:Optional + // NotificationsBus - Notifications Bus configuration (optional, separate from MessagingBus) + NotificationsBus *rabbitmqv1.RabbitMqConfig `json:"notificationsBus,omitempty"` + + // +kubebuilder:validation:Optional + // RabbitMQ - RabbitMQ configuration overrides + // Deprecated: Use MessagingBus instead + RabbitMQ *rabbitmqv1.RabbitMqConfig `json:"rabbitmq,omitempty"` + // +kubebuilder:validation:Optional // CustomServiceConfig - customize the service config using this parameter to change service defaults, // or overwrite rendered information using raw OpenStack config format. The content gets added to @@ -151,6 +166,9 @@ type HeatStatus struct { // TransportURLSecret - Secret containing RabbitMQ transportURL TransportURLSecret string `json:"transportURLSecret,omitempty"` + // NotificationsTransportURLSecret - Secret containing Notifications RabbitMQ transportURL + NotificationsTransportURLSecret string `json:"notificationsTransportURLSecret,omitempty"` + // ReadyCount of Heat API instance HeatAPIReadyCount int32 `json:"heatApiReadyCount,omitempty"` @@ -222,6 +240,7 @@ func (instance Heat) StatusConditionsList() condition.Conditions { condition.UnknownCondition(condition.DBSyncReadyCondition, condition.InitReason, condition.DBSyncReadyInitMessage), condition.UnknownCondition(condition.MemcachedReadyCondition, condition.InitReason, condition.MemcachedReadyInitMessage), condition.UnknownCondition(condition.RabbitMqTransportURLReadyCondition, condition.InitReason, condition.RabbitMqTransportURLReadyInitMessage), + condition.UnknownCondition(HeatNotificationBusReadyCondition, condition.InitReason, HeatNotificationBusReadyInitMessage), condition.UnknownCondition(condition.InputReadyCondition, condition.InitReason, condition.InputReadyInitMessage), condition.UnknownCondition(condition.ServiceConfigReadyCondition, condition.InitReason, condition.ServiceConfigReadyInitMessage), condition.UnknownCondition(HeatStackDomainReadyCondition, condition.InitReason, HeatStackDomainReadyInitMessage), diff --git a/api/v1beta1/heat_webhook.go b/api/v1beta1/heat_webhook.go index 7f769497..24823868 100644 --- a/api/v1beta1/heat_webhook.go +++ b/api/v1beta1/heat_webhook.go @@ -25,6 +25,7 @@ package v1beta1 import ( "fmt" + rabbitmqv1 "github.com/openstack-k8s-operators/infra-operator/apis/rabbitmq/v1beta1" topologyv1 "github.com/openstack-k8s-operators/infra-operator/apis/topology/v1beta1" "github.com/openstack-k8s-operators/lib-common/modules/common/service" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -65,6 +66,7 @@ func (r *Heat) Default() { // Default - set defaults for this Heat spec func (spec *HeatSpec) Default() { + spec.HeatSpecBase.Default() if spec.HeatAPI.ContainerImage == "" { spec.HeatAPI.ContainerImage = heatDefaults.APIContainerImageURL } @@ -76,10 +78,34 @@ func (spec *HeatSpec) Default() { } } +// Default - set defaults for this HeatSpecBase +func (spec *HeatSpecBase) Default() { + // Handle legacy RabbitMQ field and populate MessagingBus + if spec.RabbitMQ != nil { + // Copy legacy RabbitMQ config to MessagingBus if it's not set + if spec.MessagingBus.Cluster == "" { + spec.MessagingBus = *spec.RabbitMQ + } + } + + // Default MessagingBus with RabbitMqClusterName + rabbitmqv1.DefaultRabbitMqConfig(&spec.MessagingBus, spec.RabbitMqClusterName) + + // Default NotificationsBus if it's specified + // Note: user/vhost are NOT inherited from MessagingBus to ensure separation + // (RPC and notifications should never share credentials) + if spec.NotificationsBus != nil { + // Ensure cluster name is set if not already + if spec.NotificationsBus.Cluster != "" { + rabbitmqv1.DefaultRabbitMqConfig(spec.NotificationsBus, spec.NotificationsBus.Cluster) + } + } +} + // Default - set defaults for this Heat spec core. This version is called // by the OpenStackControlplane func (spec *HeatSpecCore) Default() { - // nothing here yet + spec.HeatSpecBase.Default() } var _ webhook.Validator = &Heat{} @@ -173,6 +199,26 @@ func (r *Heat) ValidateUpdate(old runtime.Object) (admission.Warnings, error) { func (r *HeatSpec) ValidateUpdate(old HeatSpec, basePath *field.Path, annotations map[string]string, namespace string) field.ErrorList { var allErrs field.ErrorList + // Validate deprecated fields and their new equivalents + // Don't allow setting both old and new fields with different values + // Users can either set both to the same value, or null out the old field and set the new one + if r.RabbitMqClusterName != "" && r.MessagingBus.Cluster != "" && + r.RabbitMqClusterName != r.MessagingBus.Cluster { + allErrs = append(allErrs, field.Invalid( + basePath.Child("messagingBus").Child("cluster"), + r.MessagingBus.Cluster, + fmt.Sprintf("messagingBus.cluster cannot differ from deprecated rabbitMqClusterName (%s). "+ + "Either use the new messagingBus.cluster field or the deprecated rabbitMqClusterName, but not both with different values", + r.RabbitMqClusterName))) + } + + // Reject changes to deprecated field unless nulling it out + if r.RabbitMqClusterName != old.RabbitMqClusterName && r.RabbitMqClusterName != "" { + allErrs = append(allErrs, field.Forbidden( + basePath.Child("rabbitMqClusterName"), + "rabbitMqClusterName is deprecated and cannot be changed. Please use messagingBus.cluster instead")) + } + // Allow users to bypass this validation in cases where they have independently verified // the validity of their new database to ensure consistency with the current one. if _, ok := annotations[HeatDatabaseMigrationAnnotation]; !ok { @@ -203,6 +249,26 @@ func (r *HeatSpec) ValidateUpdate(old HeatSpec, basePath *field.Path, annotation func (r *HeatSpecCore) ValidateUpdate(old HeatSpecCore, basePath *field.Path, namespace string) field.ErrorList { var allErrs field.ErrorList + // Validate deprecated fields and their new equivalents + // Don't allow setting both old and new fields with different values + // Users can either set both to the same value, or null out the old field and set the new one + if r.RabbitMqClusterName != "" && r.MessagingBus.Cluster != "" && + r.RabbitMqClusterName != r.MessagingBus.Cluster { + allErrs = append(allErrs, field.Invalid( + basePath.Child("messagingBus").Child("cluster"), + r.MessagingBus.Cluster, + fmt.Sprintf("messagingBus.cluster cannot differ from deprecated rabbitMqClusterName (%s). "+ + "Either use the new messagingBus.cluster field or the deprecated rabbitMqClusterName, but not both with different values", + r.RabbitMqClusterName))) + } + + // Reject changes to deprecated field unless nulling it out + if r.RabbitMqClusterName != old.RabbitMqClusterName && r.RabbitMqClusterName != "" { + allErrs = append(allErrs, field.Forbidden( + basePath.Child("rabbitMqClusterName"), + "rabbitMqClusterName is deprecated and cannot be changed. Please use messagingBus.cluster instead")) + } + // We currently have no logic in place to perform database migrations. Changing databases // would render all of the existing stacks unmanageable. We should block changes to the // databaseInstance to protect existing workloads. diff --git a/api/v1beta1/zz_generated.deepcopy.go b/api/v1beta1/zz_generated.deepcopy.go index 287f2d59..03715dac 100644 --- a/api/v1beta1/zz_generated.deepcopy.go +++ b/api/v1beta1/zz_generated.deepcopy.go @@ -21,6 +21,7 @@ limitations under the License. package v1beta1 import ( + rabbitmqv1beta1 "github.com/openstack-k8s-operators/infra-operator/apis/rabbitmq/v1beta1" topologyv1beta1 "github.com/openstack-k8s-operators/infra-operator/apis/topology/v1beta1" "github.com/openstack-k8s-operators/lib-common/modules/common/condition" "github.com/openstack-k8s-operators/lib-common/modules/common/service" @@ -663,6 +664,17 @@ func (in *HeatSpec) DeepCopy() *HeatSpec { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *HeatSpecBase) DeepCopyInto(out *HeatSpecBase) { *out = *in + out.MessagingBus = in.MessagingBus + if in.NotificationsBus != nil { + in, out := &in.NotificationsBus, &out.NotificationsBus + *out = new(rabbitmqv1beta1.RabbitMqConfig) + **out = **in + } + if in.RabbitMQ != nil { + in, out := &in.RabbitMQ, &out.RabbitMQ + *out = new(rabbitmqv1beta1.RabbitMqConfig) + **out = **in + } if in.CustomServiceConfigSecrets != nil { in, out := &in.CustomServiceConfigSecrets, &out.CustomServiceConfigSecrets *out = make([]string, len(*in)) diff --git a/config/crd/bases/heat.openstack.org_heats.yaml b/config/crd/bases/heat.openstack.org_heats.yaml index b81109eb..66e6508e 100644 --- a/config/crd/bases/heat.openstack.org_heats.yaml +++ b/config/crd/bases/heat.openstack.org_heats.yaml @@ -1959,12 +1959,45 @@ spec: default: memcached description: Memcached instance name. type: string + messagingBus: + description: MessagingBus - Messaging Bus configuration + properties: + cluster: + description: Name of the cluster + minLength: 1 + type: string + user: + description: User - RabbitMQ username + type: string + vhost: + description: Vhost - RabbitMQ vhost name + type: string + required: + - cluster + type: object nodeSelector: additionalProperties: type: string description: NodeSelector to target subset of worker nodes for running the Heat services type: object + notificationsBus: + description: NotificationsBus - Notifications Bus configuration (optional, + separate from MessagingBus) + properties: + cluster: + description: Name of the cluster + minLength: 1 + type: string + user: + description: User - RabbitMQ username + type: string + vhost: + description: Vhost - RabbitMQ vhost name + type: string + required: + - cluster + type: object passwordSelectors: default: authEncryptionKey: HeatAuthEncryptionKey @@ -1999,6 +2032,24 @@ spec: RabbitMQ instance name Needed to request a transportURL that is created and used in Heat type: string + rabbitmq: + description: |- + RabbitMQ - RabbitMQ configuration overrides + Deprecated: Use MessagingBus instead + properties: + cluster: + description: Name of the cluster + minLength: 1 + type: string + user: + description: User - RabbitMQ username + type: string + vhost: + description: Vhost - RabbitMQ vhost name + type: string + required: + - cluster + type: object secret: description: |- Secret containing OpenStack password information for heat HeatDatabasePassword, HeatPassword @@ -2100,6 +2151,10 @@ spec: description: ReadyCount of Heat Engine instance format: int32 type: integer + notificationsTransportURLSecret: + description: NotificationsTransportURLSecret - Secret containing Notifications + RabbitMQ transportURL + type: string observedGeneration: description: |- ObservedGeneration - the most recent generation observed for this diff --git a/internal/controller/heat_controller.go b/internal/controller/heat_controller.go index 8eb575d8..7049749a 100644 --- a/internal/controller/heat_controller.go +++ b/internal/controller/heat_controller.go @@ -555,7 +555,16 @@ func (r *HeatReconciler) reconcileNormal(ctx context.Context, instance *heatv1be // // create RabbitMQ transportURL CR and get the actual URL from the associated secret that is created // - transportURL, op, err := r.transportURLCreateOrUpdate(instance) + serviceLabels := map[string]string{ + common.AppSelector: heat.ServiceName, + } + + transportURL, op, err := r.transportURLCreateOrUpdate( + instance, + serviceLabels, + instance.Spec.RabbitMqClusterName, + instance.Spec.MessagingBus, + ) if err != nil { instance.Status.Conditions.Set(condition.FalseCondition( condition.RabbitMqTransportURLReadyCondition, @@ -613,6 +622,75 @@ func (r *HeatReconciler) reconcileNormal(ctx context.Context, instance *heatv1be instance.Status.Conditions.MarkTrue(condition.RabbitMqTransportURLReadyCondition, condition.RabbitMqTransportURLReadyMessage) + // + // create notifications RabbitMQ transportURL if NotificationsBus is configured + // + if instance.Spec.NotificationsBus != nil && instance.Spec.NotificationsBus.Cluster != "" { + notificationsTransportURL, notifOp, err := r.notificationsTransportURLCreateOrUpdate( + instance, + serviceLabels, + instance.Spec.NotificationsBus.Cluster, + *instance.Spec.NotificationsBus, + ) + if err != nil { + instance.Status.Conditions.Set(condition.FalseCondition( + heatv1beta1.HeatNotificationBusReadyCondition, + condition.ErrorReason, + condition.SeverityWarning, + heatv1beta1.HeatNotificationBusReadyErrorMessage, + err.Error())) + return ctrl.Result{}, err + } + + if notifOp != controllerutil.OperationResultNone { + Log.Info(fmt.Sprintf("Notifications TransportURL %s successfully reconciled - operation: %s", notificationsTransportURL.Name, string(notifOp))) + } + + instance.Status.NotificationsTransportURLSecret = notificationsTransportURL.Status.SecretName + + if instance.Status.NotificationsTransportURLSecret == "" { + Log.Info(fmt.Sprintf("Waiting for Notifications TransportURL %s secret to be created", notificationsTransportURL.Name)) + + instance.Status.Conditions.Set(condition.FalseCondition( + heatv1beta1.HeatNotificationBusReadyCondition, + condition.RequestedReason, + condition.SeverityInfo, + heatv1beta1.HeatNotificationBusReadyRunningMessage)) + + return ctrl.Result{RequeueAfter: time.Second * 10}, nil + } + + // + // check for required Notifications TransportURL secret + // + notificationsTransportURLSecret, notifHash, err := oko_secret.GetSecret(ctx, helper, instance.Status.NotificationsTransportURLSecret, instance.Namespace) + if err != nil { + if k8s_errors.IsNotFound(err) { + Log.Info(fmt.Sprintf("Notifications TransportURL secret %s not found", instance.Status.NotificationsTransportURLSecret)) + instance.Status.Conditions.Set(condition.FalseCondition( + heatv1beta1.HeatNotificationBusReadyCondition, + condition.RequestedReason, + condition.SeverityInfo, + heatv1beta1.HeatNotificationBusReadyRunningMessage)) + return ctrl.Result{RequeueAfter: time.Second * 10}, nil + } + instance.Status.Conditions.Set(condition.FalseCondition( + heatv1beta1.HeatNotificationBusReadyCondition, + condition.ErrorReason, + condition.SeverityWarning, + heatv1beta1.HeatNotificationBusReadyErrorMessage, + err.Error())) + return ctrl.Result{}, err + } + secretVars[notificationsTransportURLSecret.Name] = env.SetValue(notifHash) + + instance.Status.Conditions.MarkTrue(heatv1beta1.HeatNotificationBusReadyCondition, heatv1beta1.HeatNotificationBusReadyMessage) + } else { + // No notifications bus configured, mark as not required and clear status + instance.Status.NotificationsTransportURLSecret = "" + instance.Status.Conditions.MarkTrue(heatv1beta1.HeatNotificationBusReadyCondition, "NotificationsBus not configured") + } + db, result, err := r.ensureDB(ctx, helper, instance) if err != nil { return ctrl.Result{}, err @@ -659,10 +737,6 @@ func (r *HeatReconciler) reconcileNormal(ctx context.Context, instance *heatv1be // TODO check when/if Init, Update, or Upgrade should/could be skipped // - serviceLabels := map[string]string{ - common.AppSelector: heat.ServiceName, - } - // Handle service init ctrlResult, err := r.reconcileInit(ctx, instance, helper, serviceLabels) if err != nil || (ctrlResult != ctrl.Result{}) { @@ -1146,10 +1220,20 @@ func (r *HeatReconciler) generateServiceSecrets( transportURL := strings.TrimSuffix(string(transportURLSecret.Data["transport_url"]), "\n") quorumQueues := strings.TrimSuffix(string(transportURLSecret.Data["quorumqueues"]), "\n") == "true" + // Get notifications transport URL if configured + var notificationsTransportURL string + if instance.Status.NotificationsTransportURLSecret != "" { + notificationsTransportURLSecret, _, err := oko_secret.GetSecret(ctx, h, instance.Status.NotificationsTransportURLSecret, instance.Namespace) + if err != nil { + return err + } + notificationsTransportURL = strings.TrimSuffix(string(notificationsTransportURLSecret.Data["transport_url"]), "\n") + } + databaseAccount := db.GetAccount() dbSecret := db.GetSecret() - templateParameters := initTemplateParameters(instance, authURL, password, domainAdminPassword, authEncryptionKey, transportURL, mc, databaseAccount, dbSecret, quorumQueues) + templateParameters := initTemplateParameters(instance, authURL, password, domainAdminPassword, authEncryptionKey, transportURL, notificationsTransportURL, mc, databaseAccount, dbSecret, quorumQueues) // Render vhost configuration for API and CFN httpdAPIVhostConfig := map[string]any{} @@ -1209,16 +1293,59 @@ func (r *HeatReconciler) createHashOfInputHashes( return hash, nil } -func (r *HeatReconciler) transportURLCreateOrUpdate(instance *heatv1beta1.Heat) (*rabbitmqv1.TransportURL, controllerutil.OperationResult, error) { +func (r *HeatReconciler) transportURLCreateOrUpdate( + instance *heatv1beta1.Heat, + serviceLabels map[string]string, + rabbitMqClusterName string, + rabbitMqConfig rabbitmqv1.RabbitMqConfig, +) (*rabbitmqv1.TransportURL, controllerutil.OperationResult, error) { transportURL := &rabbitmqv1.TransportURL{ ObjectMeta: metav1.ObjectMeta{ Name: fmt.Sprintf("%s-heat-transport", instance.Name), Namespace: instance.Namespace, + Labels: serviceLabels, }, } op, err := controllerutil.CreateOrUpdate(context.TODO(), r.Client, transportURL, func() error { - transportURL.Spec.RabbitmqClusterName = instance.Spec.RabbitMqClusterName + transportURL.Spec.RabbitmqClusterName = rabbitMqClusterName + + // Set custom username and vhost if configured + if rabbitMqConfig.User != "" { + transportURL.Spec.Username = rabbitMqConfig.User + } + // Always set Vhost - empty string means use default "/" vhost + transportURL.Spec.Vhost = rabbitMqConfig.Vhost + + return controllerutil.SetControllerReference(instance, transportURL, r.Scheme) + }) + + return transportURL, op, err +} + +func (r *HeatReconciler) notificationsTransportURLCreateOrUpdate( + instance *heatv1beta1.Heat, + serviceLabels map[string]string, + rabbitMqClusterName string, + rabbitMqConfig rabbitmqv1.RabbitMqConfig, +) (*rabbitmqv1.TransportURL, controllerutil.OperationResult, error) { + transportURL := &rabbitmqv1.TransportURL{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("%s-heat-notifications-transport", instance.Name), + Namespace: instance.Namespace, + Labels: serviceLabels, + }, + } + + op, err := controllerutil.CreateOrUpdate(context.TODO(), r.Client, transportURL, func() error { + transportURL.Spec.RabbitmqClusterName = rabbitMqClusterName + + // Set custom username and vhost if configured + if rabbitMqConfig.User != "" { + transportURL.Spec.Username = rabbitMqConfig.User + } + // Always set Vhost - empty string means use default "/" vhost + transportURL.Spec.Vhost = rabbitMqConfig.Vhost return controllerutil.SetControllerReference(instance, transportURL, r.Scheme) }) @@ -1508,6 +1635,7 @@ func initTemplateParameters( domainAdminPassword string, authEncryptionKey string, transportURL string, + notificationsTransportURL string, mc *memcachedv1.Memcached, databaseAccount *mariadbv1.MariaDBAccount, dbSecret *corev1.Secret, @@ -1521,7 +1649,7 @@ func initTemplateParameters( heat.DatabaseName, ) - return map[string]any{ + params := map[string]any{ "KeystoneInternalURL": authURL, "ServiceUser": instance.Spec.ServiceUser, "ServicePassword": password, @@ -1537,6 +1665,13 @@ func initTemplateParameters( "Timeout": instance.Spec.APITimeout, "QuorumQueues": quorumQueues, } + + // Add notifications transport URL if configured + if notificationsTransportURL != "" { + params["NotificationsTransportURL"] = notificationsTransportURL + } + + return params } func renderVhost(httpdVhostConfig map[string]any, instance *heatv1beta1.Heat, endpt service.Endpoint, serviceName string, tlsEnabled bool) { diff --git a/templates/heat/config/00-default.conf b/templates/heat/config/00-default.conf index 89b4bf69..b6a8485d 100644 --- a/templates/heat/config/00-default.conf +++ b/templates/heat/config/00-default.conf @@ -43,7 +43,12 @@ amqp_durable_queues=true auth_uri={{ .KeystoneInternalURL }}/v3/ec2tokens [oslo_messaging_notifications] +{{if (index . "NotificationsTransportURL")}} +driver=messagingv2 +transport_url={{ .NotificationsTransportURL }} +{{else}} driver=noop +{{end}} [oslo_middleware] enable_proxy_headers_parsing=True diff --git a/test/functional/base_test.go b/test/functional/base_test.go index aeb2061a..67ef72b8 100644 --- a/test/functional/base_test.go +++ b/test/functional/base_test.go @@ -24,6 +24,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" heatv1 "github.com/openstack-k8s-operators/heat-operator/api/v1beta1" + rabbitmqv1 "github.com/openstack-k8s-operators/infra-operator/apis/rabbitmq/v1beta1" condition "github.com/openstack-k8s-operators/lib-common/modules/common/condition" ) @@ -100,3 +101,44 @@ func HeatConditionGetter(name types.NamespacedName) condition.Conditions { instance := GetHeat(name) return instance.Status.Conditions } + +func GetHeatSpecWithRabbitMQ(rabbitmqUser *string, rabbitmqVHost *string) map[string]any { + spec := GetDefaultHeatSpec() + if rabbitmqUser != nil || rabbitmqVHost != nil { + messagingBus := map[string]any{} + if rabbitmqUser != nil { + messagingBus["user"] = *rabbitmqUser + } + if rabbitmqVHost != nil { + messagingBus["vhost"] = *rabbitmqVHost + } + spec["messagingBus"] = messagingBus + } + return spec +} + +func GetHeatSpecWithNotificationsBus(notificationsCluster *string, notificationsUser *string, notificationsVHost *string) map[string]any { + spec := GetDefaultHeatSpec() + if notificationsCluster != nil || notificationsUser != nil || notificationsVHost != nil { + notificationsBus := map[string]any{} + if notificationsCluster != nil { + notificationsBus["cluster"] = *notificationsCluster + } + if notificationsUser != nil { + notificationsBus["user"] = *notificationsUser + } + if notificationsVHost != nil { + notificationsBus["vhost"] = *notificationsVHost + } + spec["notificationsBus"] = notificationsBus + } + return spec +} + +func GetTransportURL(name types.NamespacedName) *rabbitmqv1.TransportURL { + instance := &rabbitmqv1.TransportURL{} + Eventually(func(g Gomega) { + g.Expect(k8sClient.Get(ctx, name, instance)).Should(Succeed()) + }, timeout*5, interval).Should(Succeed()) + return instance +} diff --git a/test/functional/heat_controller_test.go b/test/functional/heat_controller_test.go index 87e51285..36f52095 100644 --- a/test/functional/heat_controller_test.go +++ b/test/functional/heat_controller_test.go @@ -36,6 +36,7 @@ import ( heatv1 "github.com/openstack-k8s-operators/heat-operator/api/v1beta1" "github.com/openstack-k8s-operators/heat-operator/internal/heat" memcachedv1 "github.com/openstack-k8s-operators/infra-operator/apis/memcached/v1beta1" + rabbitmqv1 "github.com/openstack-k8s-operators/infra-operator/apis/rabbitmq/v1beta1" keystonev1 "github.com/openstack-k8s-operators/keystone-operator/api/v1beta1" condition "github.com/openstack-k8s-operators/lib-common/modules/common/condition" ) @@ -952,4 +953,172 @@ var _ = Describe("Heat controller", func() { }, timeout, interval).Should(Succeed()) }) }) + + When("A Heat instance is created with default RabbitMQ settings", func() { + BeforeEach(func() { + DeferCleanup(th.DeleteInstance, CreateHeat(heatName, GetDefaultHeatSpec())) + DeferCleanup( + k8sClient.Delete, ctx, CreateHeatSecret(namespace, SecretName)) + DeferCleanup(infra.DeleteMemcached, infra.CreateMemcached(namespace, "memcached", memcachedSpec)) + infra.SimulateMemcachedReady(memcachedName) + }) + + It("should create a TransportURL without custom user/vhost", func() { + transportURL := GetTransportURL(heatTransportURLName) + Expect(transportURL.Spec.RabbitmqClusterName).Should(Equal("rabbitmq")) + Expect(transportURL.Spec.Username).Should(Equal("")) + Expect(transportURL.Spec.Vhost).Should(Equal("")) + }) + }) + + When("A Heat instance is created with custom RabbitMQ settings", func() { + BeforeEach(func() { + customUser := "heat" + customVHost := "/heat" + spec := GetHeatSpecWithRabbitMQ(&customUser, &customVHost) + DeferCleanup(th.DeleteInstance, CreateHeat(heatName, spec)) + DeferCleanup( + k8sClient.Delete, ctx, CreateHeatSecret(namespace, SecretName)) + DeferCleanup(infra.DeleteMemcached, infra.CreateMemcached(namespace, "memcached", memcachedSpec)) + infra.SimulateMemcachedReady(memcachedName) + }) + + It("should create a TransportURL with custom user and vhost", func() { + transportURL := GetTransportURL(heatTransportURLName) + Expect(transportURL.Spec.RabbitmqClusterName).Should(Equal("rabbitmq")) + Expect(transportURL.Spec.Username).Should(Equal("heat")) + Expect(transportURL.Spec.Vhost).Should(Equal("/heat")) + }) + }) + + When("A Heat instance is created with only custom RabbitMQ user", func() { + BeforeEach(func() { + customUser := "heat" + spec := GetHeatSpecWithRabbitMQ(&customUser, nil) + DeferCleanup(th.DeleteInstance, CreateHeat(heatName, spec)) + DeferCleanup( + k8sClient.Delete, ctx, CreateHeatSecret(namespace, SecretName)) + DeferCleanup(infra.DeleteMemcached, infra.CreateMemcached(namespace, "memcached", memcachedSpec)) + infra.SimulateMemcachedReady(memcachedName) + }) + + It("should create a TransportURL with custom user only", func() { + transportURL := GetTransportURL(heatTransportURLName) + Expect(transportURL.Spec.RabbitmqClusterName).Should(Equal("rabbitmq")) + Expect(transportURL.Spec.Username).Should(Equal("heat")) + Expect(transportURL.Spec.Vhost).Should(Equal("")) + }) + }) + + When("A Heat instance is created with only custom RabbitMQ vhost", func() { + BeforeEach(func() { + customVHost := "/heat" + spec := GetHeatSpecWithRabbitMQ(nil, &customVHost) + DeferCleanup(th.DeleteInstance, CreateHeat(heatName, spec)) + DeferCleanup( + k8sClient.Delete, ctx, CreateHeatSecret(namespace, SecretName)) + DeferCleanup(infra.DeleteMemcached, infra.CreateMemcached(namespace, "memcached", memcachedSpec)) + infra.SimulateMemcachedReady(memcachedName) + }) + + It("should create a TransportURL with custom vhost only", func() { + transportURL := GetTransportURL(heatTransportURLName) + Expect(transportURL.Spec.RabbitmqClusterName).Should(Equal("rabbitmq")) + Expect(transportURL.Spec.Username).Should(Equal("")) + Expect(transportURL.Spec.Vhost).Should(Equal("/heat")) + }) + }) + + When("A Heat instance is created with notificationsBus configured", func() { + BeforeEach(func() { + notificationsCluster := "notifications-rabbitmq" + notificationsUser := "heat-notifications" + notificationsVHost := "/notifications" + spec := GetHeatSpecWithNotificationsBus(¬ificationsCluster, ¬ificationsUser, ¬ificationsVHost) + DeferCleanup(th.DeleteInstance, CreateHeat(heatName, spec)) + }) + + It("should have notificationsBus spec configured correctly", func() { + heat := GetHeat(heatName) + Expect(heat.Spec.NotificationsBus).ShouldNot(BeNil()) + Expect(heat.Spec.NotificationsBus.Cluster).Should(Equal("notifications-rabbitmq")) + Expect(heat.Spec.NotificationsBus.User).Should(Equal("heat-notifications")) + Expect(heat.Spec.NotificationsBus.Vhost).Should(Equal("/notifications")) + }) + }) + + When("Heat starts with notifications enabled and then disables them", func() { + BeforeEach(func() { + notificationsCluster := "notifications-rabbitmq" + notificationsUser := "heat-notifications" + notificationsVHost := "/notifications" + spec := GetHeatSpecWithNotificationsBus(¬ificationsCluster, ¬ificationsUser, ¬ificationsVHost) + + DeferCleanup( + k8sClient.Delete, ctx, CreateHeatSecret(namespace, SecretName)) + DeferCleanup(infra.DeleteMemcached, infra.CreateMemcached(namespace, "memcached", memcachedSpec)) + infra.SimulateMemcachedReady(memcachedName) + + DeferCleanup( + k8sClient.Delete, ctx, CreateHeatMessageBusSecret(namespace, HeatMessageBusSecretName)) + DeferCleanup( + k8sClient.Delete, ctx, CreateHeatMessageBusSecret(namespace, "notifications-rabbitmq-secret")) + + DeferCleanup( + mariadb.DeleteDBService, + mariadb.CreateDBService( + namespace, + "openstack", + corev1.ServiceSpec{ + Ports: []corev1.ServicePort{{Port: 3306}}, + }, + ), + ) + + DeferCleanup(th.DeleteInstance, CreateHeat(heatName, spec)) + + // Simulate the main transport URL + infra.SimulateTransportURLReady(heatTransportURLName) + + // Wait for the notifications transport URL to be created by the controller + notificationsTransportURLName := types.NamespacedName{ + Namespace: namespace, + Name: heatName.Name + "-heat-notifications-transport", + } + Eventually(func() error { + return k8sClient.Get(ctx, notificationsTransportURLName, &rabbitmqv1.TransportURL{}) + }, timeout, interval).Should(Succeed()) + + // Simulate the notifications transport URL as ready + infra.SimulateTransportURLReady(notificationsTransportURLName) + }) + + It("should initially have notifications enabled", func() { + Eventually(func(g Gomega) { + heat := GetHeat(heatName) + g.Expect(heat.Status.NotificationsTransportURLSecret).ToNot(BeEmpty()) + }, timeout, interval).Should(Succeed()) + }) + + It("should disable notifications when notificationsBus is removed", func() { + // Verify notifications are initially enabled + Eventually(func(g Gomega) { + heat := GetHeat(heatName) + g.Expect(heat.Status.NotificationsTransportURLSecret).ToNot(BeEmpty()) + }, timeout, interval).Should(Succeed()) + + // Update the Heat spec to remove notificationsBus + Eventually(func(g Gomega) { + heat := GetHeat(heatName) + heat.Spec.NotificationsBus = nil + g.Expect(k8sClient.Update(ctx, heat)).To(Succeed()) + }, timeout, interval).Should(Succeed()) + + // Wait for notifications to be disabled + Eventually(func(g Gomega) { + heat := GetHeat(heatName) + g.Expect(heat.Status.NotificationsTransportURLSecret).To(BeEmpty()) + }, timeout, interval).Should(Succeed()) + }) + }) }) diff --git a/test/functional/heat_webhook_test.go b/test/functional/heat_webhook_test.go index 7824c9ef..bcf99a89 100644 --- a/test/functional/heat_webhook_test.go +++ b/test/functional/heat_webhook_test.go @@ -207,6 +207,30 @@ var _ = Describe("Heat Webhook", func() { }) }) + It("rejects update to deprecated rabbitMqClusterName field", func() { + spec := GetDefaultHeatSpec() + spec["rabbitMqClusterName"] = "rabbitmq" + + heatName := types.NamespacedName{ + Namespace: namespace, + Name: "heat-rabbitmq-test", + } + + DeferCleanup(th.DeleteInstance, CreateHeat(heatName, spec)) + + // Try to update rabbitMqClusterName + Eventually(func(g Gomega) { + instance := GetHeat(heatName) + instance.Spec.RabbitMqClusterName = "rabbitmq2" + err := th.K8sClient.Update(th.Ctx, instance) + g.Expect(err).Should(HaveOccurred()) + g.Expect(err.Error()).To( + ContainSubstring("rabbitMqClusterName is deprecated and cannot be changed")) + g.Expect(err.Error()).To( + ContainSubstring("Please use messagingBus.cluster instead")) + }).Should(Succeed()) + }) + DescribeTable("rejects wrong topology for", func(serviceNameFunc func() (string, string)) {