diff --git a/api/v1alpha1/pulsargeoreplication_types.go b/api/v1alpha1/pulsargeoreplication_types.go index 1a487c4e..73e6d74d 100644 --- a/api/v1alpha1/pulsargeoreplication_types.go +++ b/api/v1alpha1/pulsargeoreplication_types.go @@ -36,6 +36,12 @@ type PulsarGeoReplicationSpec struct { // +kubebuilder:validation:Enum=CleanUpAfterDeletion;KeepAfterDeletion // +optional LifecyclePolicy PulsarResourceLifeCyclePolicy `json:"lifecyclePolicy,omitempty"` + + // ClusterParamsOverride allows overriding specific cluster parameters when setting up + // geo-replication. This is useful when the destination cluster requires different + // configuration than what's defined in the DestinationConnectionRef. + // +optional + ClusterParamsOverride *ClusterParamsOverride `json:"clusterParamsOverride,omitempty"` } // PulsarGeoReplicationStatus defines the observed state of PulsarGeoReplication @@ -89,3 +95,53 @@ type ClusterInfo struct { // ConnectionRef is the connection reference that can connect to the pulsar cluster ConnectionRef corev1.LocalObjectReference `json:"connectionRef"` } + +// ClusterParamsOverride allows overriding specific parameters when creating/updating cluster info +// for geo-replication. This provides flexibility to customize cluster configuration without +// modifying the underlying PulsarConnection. +type ClusterParamsOverride struct { + // ServiceURL overrides the HTTP(S) URL for the Pulsar cluster's admin service + // +optional + ServiceURL *string `json:"serviceURL,omitempty"` + + // ServiceSecureURL overrides the HTTPS URL for secure connections to the Pulsar admin service + // +optional + ServiceSecureURL *string `json:"serviceSecureURL,omitempty"` + + // BrokerServiceURL overrides the non-TLS URL for connecting to Pulsar brokers + // +optional + BrokerServiceURL *string `json:"brokerServiceURL,omitempty"` + + // BrokerServiceSecureURL overrides the TLS-enabled URL for secure connections to Pulsar brokers + // +optional + BrokerServiceSecureURL *string `json:"brokerServiceSecureURL,omitempty"` + + // BrokerClientTrustCertsFilePath overrides the file path to the trusted TLS certificate + // for outgoing connections to Pulsar brokers + // +optional + BrokerClientTrustCertsFilePath *string `json:"brokerClientTrustCertsFilePath,omitempty"` + + // Authentication overrides the authentication configuration for the cluster. + // When this field is set, the secret update check will be skipped for this geo-replication. + // +optional + Authentication *ClusterAuthOverride `json:"authentication,omitempty"` +} + +// ClusterAuthOverride allows overriding authentication parameters for cluster configuration. +// This is useful when the geo-replication target requires different authentication than +// the source connection. +type ClusterAuthOverride struct { + // AuthPlugin specifies the authentication plugin class name + // Common values: "org.apache.pulsar.client.impl.auth.AuthenticationToken", + // "org.apache.pulsar.client.impl.auth.oauth2.AuthenticationOAuth2" + // +optional + AuthPlugin *string `json:"authPlugin,omitempty"` + + // AuthParameters contains the authentication parameters as a string. + // Format depends on the AuthPlugin: + // - For Token: "token:your-token-here" + // - For Token: "file://your-token-file-path-on-brokers" + // - For OAuth2: JSON string with client credentials + // +optional + AuthParameters *string `json:"authParameters,omitempty"` +} diff --git a/api/v1alpha1/zz_generated.deepcopy.go b/api/v1alpha1/zz_generated.deepcopy.go index f3b6ccac..30647881 100644 --- a/api/v1alpha1/zz_generated.deepcopy.go +++ b/api/v1alpha1/zz_generated.deepcopy.go @@ -245,6 +245,31 @@ func (in *BookieAffinityGroupData) DeepCopy() *BookieAffinityGroupData { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ClusterAuthOverride) DeepCopyInto(out *ClusterAuthOverride) { + *out = *in + if in.AuthPlugin != nil { + in, out := &in.AuthPlugin, &out.AuthPlugin + *out = new(string) + **out = **in + } + if in.AuthParameters != nil { + in, out := &in.AuthParameters, &out.AuthParameters + *out = new(string) + **out = **in + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ClusterAuthOverride. +func (in *ClusterAuthOverride) DeepCopy() *ClusterAuthOverride { + if in == nil { + return nil + } + out := new(ClusterAuthOverride) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *ClusterInfo) DeepCopyInto(out *ClusterInfo) { *out = *in @@ -261,6 +286,51 @@ func (in *ClusterInfo) DeepCopy() *ClusterInfo { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ClusterParamsOverride) DeepCopyInto(out *ClusterParamsOverride) { + *out = *in + if in.ServiceURL != nil { + in, out := &in.ServiceURL, &out.ServiceURL + *out = new(string) + **out = **in + } + if in.ServiceSecureURL != nil { + in, out := &in.ServiceSecureURL, &out.ServiceSecureURL + *out = new(string) + **out = **in + } + if in.BrokerServiceURL != nil { + in, out := &in.BrokerServiceURL, &out.BrokerServiceURL + *out = new(string) + **out = **in + } + if in.BrokerServiceSecureURL != nil { + in, out := &in.BrokerServiceSecureURL, &out.BrokerServiceSecureURL + *out = new(string) + **out = **in + } + if in.BrokerClientTrustCertsFilePath != nil { + in, out := &in.BrokerClientTrustCertsFilePath, &out.BrokerClientTrustCertsFilePath + *out = new(string) + **out = **in + } + if in.Authentication != nil { + in, out := &in.Authentication, &out.Authentication + *out = new(ClusterAuthOverride) + (*in).DeepCopyInto(*out) + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ClusterParamsOverride. +func (in *ClusterParamsOverride) DeepCopy() *ClusterParamsOverride { + if in == nil { + return nil + } + out := new(ClusterParamsOverride) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *CommunityDeploymentMetadata) DeepCopyInto(out *CommunityDeploymentMetadata) { *out = *in @@ -1445,7 +1515,7 @@ func (in *PulsarGeoReplication) DeepCopyInto(out *PulsarGeoReplication) { *out = *in out.TypeMeta = in.TypeMeta in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) - out.Spec = in.Spec + in.Spec.DeepCopyInto(&out.Spec) in.Status.DeepCopyInto(&out.Status) } @@ -1504,6 +1574,11 @@ func (in *PulsarGeoReplicationSpec) DeepCopyInto(out *PulsarGeoReplicationSpec) *out = *in out.ConnectionRef = in.ConnectionRef out.DestinationConnectionRef = in.DestinationConnectionRef + if in.ClusterParamsOverride != nil { + in, out := &in.ClusterParamsOverride, &out.ClusterParamsOverride + *out = new(ClusterParamsOverride) + (*in).DeepCopyInto(*out) + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PulsarGeoReplicationSpec. diff --git a/config/crd/bases/resource.streamnative.io_pulsargeoreplications.yaml b/config/crd/bases/resource.streamnative.io_pulsargeoreplications.yaml index 9caa28a5..ee30cfc5 100644 --- a/config/crd/bases/resource.streamnative.io_pulsargeoreplications.yaml +++ b/config/crd/bases/resource.streamnative.io_pulsargeoreplications.yaml @@ -54,6 +54,54 @@ spec: spec: description: PulsarGeoReplicationSpec defines the desired state of PulsarGeoReplication properties: + clusterParamsOverride: + description: |- + ClusterParamsOverride allows overriding specific cluster parameters when setting up + geo-replication. This is useful when the destination cluster requires different + configuration than what's defined in the DestinationConnectionRef. + properties: + authentication: + description: |- + Authentication overrides the authentication configuration for the cluster. + When this field is set, the secret update check will be skipped for this geo-replication. + properties: + authParameters: + description: |- + AuthParameters contains the authentication parameters as a string. + Format depends on the AuthPlugin: + - For Token: "token:your-token-here" + - For Token: "file://your-token-file-path-on-brokers" + - For OAuth2: JSON string with client credentials + type: string + authPlugin: + description: |- + AuthPlugin specifies the authentication plugin class name + Common values: "org.apache.pulsar.client.impl.auth.AuthenticationToken", + "org.apache.pulsar.client.impl.auth.oauth2.AuthenticationOAuth2" + type: string + type: object + brokerClientTrustCertsFilePath: + description: |- + BrokerClientTrustCertsFilePath overrides the file path to the trusted TLS certificate + for outgoing connections to Pulsar brokers + type: string + brokerServiceSecureURL: + description: BrokerServiceSecureURL overrides the TLS-enabled + URL for secure connections to Pulsar brokers + type: string + brokerServiceURL: + description: BrokerServiceURL overrides the non-TLS URL for connecting + to Pulsar brokers + type: string + serviceSecureURL: + description: ServiceSecureURL overrides the HTTPS URL for secure + connections to the Pulsar admin service + type: string + serviceURL: + description: ServiceURL overrides the HTTP(S) URL for the Pulsar + cluster's admin service + type: string + type: object connectionRef: description: ConnectionRef is the reference to the source PulsarConnection properties: diff --git a/docs/pulsar_geo_replication.md b/docs/pulsar_geo_replication.md index 8f1d3fa5..8bf4d2a2 100644 --- a/docs/pulsar_geo_replication.md +++ b/docs/pulsar_geo_replication.md @@ -28,6 +28,7 @@ The `PulsarGeoReplication` resource has the following specifications: | `connectionRef` | Reference to the PulsarConnection resource used to connect to the source Pulsar cluster. | Yes | | `destinationConnectionRef` | Reference to the PulsarConnection resource used to connect to the destination Pulsar cluster. | Yes | | `lifecyclePolicy` | Determines whether to keep or delete the geo-replication configuration when the Kubernetes resource is deleted. Options: `CleanUpAfterDeletion`, `KeepAfterDeletion`. Default is `CleanUpAfterDeletion`. | No | +| `clusterParamsOverride` | Allows overriding specific cluster parameters when setting up geo-replication. This is useful when the destination cluster requires different configuration than what's defined in the `destinationConnectionRef`. See [Cluster Parameters Override](#cluster-parameters-override) for details. | No | The `PulsarGeoReplication` resource is designed to configure geo-replication between separate Pulsar instances. It creates a new "Cluster" in the destination Pulsar cluster identified by `destinationConnectionRef`. This setup allows configuring the replication of data from the source cluster (identified by `connectionRef`) to the destination cluster. By establishing this connection, the brokers in the source cluster can communicate with and replicate data to the brokers in the destination cluster, enabling geo-replication between the two separate Pulsar instances. @@ -49,6 +50,67 @@ Note: When configuring geo-replication between `connectionRef` and `destinationC 1. The brokers in the `connectionRef` cluster are able to communicate with the `destinationConnectionRef` cluster, and the `destinationConnectionRef` cluster is able to authenticate the connections from the `connectionRef` cluster. +### Cluster Parameters Override + +The `clusterParamsOverride` field provides a powerful way to customize cluster configuration for geo-replication without modifying the underlying `PulsarConnection` resource. This is particularly useful when: + +1. **Different authentication is required** for geo-replication compared to regular cluster operations +2. **Alternative URLs need to be used** for inter-cluster communication +3. **Specific TLS configurations** are needed for cross-cluster connections + +#### Supported Override Parameters + +The `clusterParamsOverride` supports the following fields: + +- **URL Configuration**: + - `serviceURL`: Override the HTTP(S) URL for the Pulsar cluster's admin service + - `serviceSecureURL`: Override the HTTPS URL for secure admin connections + - `brokerServiceURL`: Override the non-TLS URL for broker connections + - `brokerServiceSecureURL`: Override the TLS-enabled URL for secure broker connections + +- **TLS Configuration**: + - `brokerClientTrustCertsFilePath`: Override the path to trusted TLS certificates + +- **Authentication Configuration**: + - `authentication.authPlugin`: Override the authentication plugin class name + - `authentication.authParameters`: Override the authentication parameters + +#### Authentication Override Benefits + +When `authentication` is specified in the override, the system automatically: +- **Skips secret validation checks** for the destination connection +- **Avoids unnecessary Secret API calls** for improved performance +- **Uses the override authentication directly** without processing the destinationConnectionRef authentication + +#### Example Usage + +```yaml +apiVersion: resource.streamnative.io/v1alpha1 +kind: PulsarGeoReplication +metadata: + name: us-east-to-west-geo-replication + namespace: us-east +spec: + connectionRef: + name: us-east-local-connection + destinationConnectionRef: + name: us-east-to-west-connection + clusterParamsOverride: + # Override URLs for cross-cluster communication + serviceURL: "https://geo-replication-admin.us-west.example.com:8443" + brokerServiceURL: "pulsar://geo-replication-broker.us-west.example.com:6650" + # Override authentication for geo-replication + authentication: + authPlugin: "org.apache.pulsar.client.impl.auth.AuthenticationToken" + authParameters: "token:geo-replication-specific-token" +``` + +**Important Notes**: +- Override parameters take precedence over the corresponding fields in `destinationConnectionRef` +- Only non-null override values will replace the destination connection values +- Authentication override is particularly useful for scenarios requiring different credentials for geo-replication +- The override does not affect how the operator connects to manage other resources in the destination cluster + ### Lifecycle Policy The `lifecyclePolicy` field determines what happens to the geo-replication configuration when the Kubernetes PulsarGeoReplication resource is deleted: diff --git a/pkg/connection/reconcile_geo_replication.go b/pkg/connection/reconcile_geo_replication.go index ca8192e6..095ba1e6 100644 --- a/pkg/connection/reconcile_geo_replication.go +++ b/pkg/connection/reconcile_geo_replication.go @@ -181,7 +181,7 @@ func (r *PulsarGeoReplicationReconciler) ReconcileGeoReplication(ctx context.Con return err } - secretUpdated, err := r.checkSecretRefUpdate(*destConnection) + secretUpdated, err := r.checkSecretRefUpdate(*destConnection, geoReplication.Spec.ClusterParamsOverride) if err != nil { return err } @@ -198,7 +198,7 @@ func (r *PulsarGeoReplicationReconciler) ReconcileGeoReplication(ctx context.Con return nil } - clusterParam, err2 := createParams(ctx, destConnection, r.conn.client) + clusterParam, err2 := createParams(ctx, destConnection, r.conn.client, geoReplication.Spec.ClusterParamsOverride) if err2 != nil { return err2 } @@ -252,7 +252,12 @@ func (r *PulsarGeoReplicationReconciler) ReconcileGeoReplication(ctx context.Con return nil } -func (r *PulsarGeoReplicationReconciler) checkSecretRefUpdate(connection resourcev1alpha1.PulsarConnection) (bool, error) { +func (r *PulsarGeoReplicationReconciler) checkSecretRefUpdate(connection resourcev1alpha1.PulsarConnection, override *resourcev1alpha1.ClusterParamsOverride) (bool, error) { + // If authentication is overridden, skip secret update check + if override != nil && override.Authentication != nil { + return false, nil + } + auth := connection.Spec.Authentication if auth == nil || (auth.Token != nil && auth.Token.SecretRef == nil) || (auth.OAuth2 != nil && auth.OAuth2.Key == nil) || @@ -291,7 +296,7 @@ func (r *PulsarGeoReplicationReconciler) checkSecretRefUpdate(connection resourc return false, nil } -func createParams(ctx context.Context, destConnection *resourcev1alpha1.PulsarConnection, client client.Client) (*admin.ClusterParams, error) { +func createParams(ctx context.Context, destConnection *resourcev1alpha1.PulsarConnection, client client.Client, override *resourcev1alpha1.ClusterParamsOverride) (*admin.ClusterParams, error) { clusterParam := &admin.ClusterParams{ ServiceURL: destConnection.Spec.AdminServiceURL, BrokerServiceURL: destConnection.Spec.BrokerServiceURL, @@ -300,44 +305,77 @@ func createParams(ctx context.Context, destConnection *resourcev1alpha1.PulsarCo BrokerClientTrustCertsFilePath: destConnection.Spec.BrokerClientTrustCertsFilePath, } - hasAuth := false - if auth := destConnection.Spec.Authentication; auth != nil { - if auth.Token != nil { - value, err := GetValue(ctx, client, destConnection.Namespace, auth.Token) - if err != nil { - return nil, err - } - if value != nil { - clusterParam.AuthPlugin = resourcev1alpha1.AuthPluginToken - clusterParam.AuthParameters = "token:" + *value - hasAuth = true - } + // Process authentication: use override if available, otherwise use destConnection auth + if override != nil && override.Authentication != nil { + // Use override authentication directly + if override.Authentication.AuthPlugin != nil { + clusterParam.AuthPlugin = *override.Authentication.AuthPlugin } - if oauth2 := auth.OAuth2; !hasAuth && oauth2 != nil { - var paramsJSON = utils.ClientCredentials{ - IssuerURL: oauth2.IssuerEndpoint, - Audience: oauth2.Audience, - Scope: oauth2.Scope, - ClientID: oauth2.ClientID, - } - if oauth2.Key != nil { - value, err := GetValue(ctx, client, destConnection.Namespace, oauth2.Key) + if override.Authentication.AuthParameters != nil { + clusterParam.AuthParameters = *override.Authentication.AuthParameters + } + } else { + // Process destConnection authentication only if no override is provided + hasAuth := false + if auth := destConnection.Spec.Authentication; auth != nil { + if auth.Token != nil { + value, err := GetValue(ctx, client, destConnection.Namespace, auth.Token) if err != nil { return nil, err } if value != nil { - paramsJSON.PrivateKey = "data:application/json;base64," + base64.StdEncoding.EncodeToString([]byte(*value)) - clusterParam.AuthPlugin = resourcev1alpha1.AuthPluginOAuth2 - paramsJSONString, err := json.Marshal(paramsJSON) + clusterParam.AuthPlugin = resourcev1alpha1.AuthPluginToken + clusterParam.AuthParameters = "token:" + *value + hasAuth = true + } + } + if oauth2 := auth.OAuth2; !hasAuth && oauth2 != nil { + var paramsJSON = utils.ClientCredentials{ + IssuerURL: oauth2.IssuerEndpoint, + Audience: oauth2.Audience, + Scope: oauth2.Scope, + ClientID: oauth2.ClientID, + } + if oauth2.Key != nil { + value, err := GetValue(ctx, client, destConnection.Namespace, oauth2.Key) if err != nil { return nil, err } - clusterParam.AuthParameters = string(paramsJSONString) + if value != nil { + paramsJSON.PrivateKey = "data:application/json;base64," + base64.StdEncoding.EncodeToString([]byte(*value)) + clusterParam.AuthPlugin = resourcev1alpha1.AuthPluginOAuth2 + paramsJSONString, err := json.Marshal(paramsJSON) + if err != nil { + return nil, err + } + clusterParam.AuthParameters = string(paramsJSONString) + } + } else { + return nil, fmt.Errorf("OAuth2 key is empty") } - } else { - return nil, fmt.Errorf("OAuth2 key is empty") } } } + + // Apply other override parameters if provided + if override != nil { + // Override URL parameters + if override.ServiceURL != nil { + clusterParam.ServiceURL = *override.ServiceURL + } + if override.ServiceSecureURL != nil { + clusterParam.ServiceSecureURL = *override.ServiceSecureURL + } + if override.BrokerServiceURL != nil { + clusterParam.BrokerServiceURL = *override.BrokerServiceURL + } + if override.BrokerServiceSecureURL != nil { + clusterParam.BrokerServiceSecureURL = *override.BrokerServiceSecureURL + } + if override.BrokerClientTrustCertsFilePath != nil { + clusterParam.BrokerClientTrustCertsFilePath = *override.BrokerClientTrustCertsFilePath + } + } + return clusterParam, nil }