diff --git a/api/v1alpha1/common.go b/api/v1alpha1/common.go index 78f44829..59394ed8 100644 --- a/api/v1alpha1/common.go +++ b/api/v1alpha1/common.go @@ -55,6 +55,9 @@ type PulsarAuthentication struct { // For detailed information on the OAuth2 fields, refer to the PulsarAuthenticationOAuth2 struct. // +optional OAuth2 *PulsarAuthenticationOAuth2 `json:"oauth2,omitempty"` + + // +optional + TLS *PulsarAuthenticationTLS `json:"tls,omitempty"` } // PulsarResourceLifeCyclePolicy defines the behavior for managing Pulsar resources @@ -107,6 +110,12 @@ type PulsarAuthenticationOAuth2 struct { Scope string `json:"scope,omitempty"` } +// PulsarAuthenticationTLS indicates the parameters which are need by pulsar TLS Authentication +type PulsarAuthenticationTLS struct { + ClientCertificatePath string `json:"clientCertificatePath"` + ClientCertificateKeyPath string `json:"clientCertificateKeyPath"` +} + // IsPulsarResourceReady returns true if resource satisfies with these condition // 1. The instance is not deleted // 2. Status ObservedGeneration is equal with meta.ObservedGeneration diff --git a/api/v1alpha1/pulsarconnection_types.go b/api/v1alpha1/pulsarconnection_types.go index ae12c9b3..b58e4653 100644 --- a/api/v1alpha1/pulsarconnection_types.go +++ b/api/v1alpha1/pulsarconnection_types.go @@ -74,6 +74,19 @@ type PulsarConnectionSpec struct { // When setting up Geo-Replication between Pulsar instances, this should be enabled to identify the cluster. // +optional ClusterName string `json:"clusterName,omitempty"` + + // TLSEnableHostnameVerification indicates whether to verify the hostname of the broker. + // Only used when using secure urls. + // +optional + TLSEnableHostnameVerification bool `json:"tlsEnableHostnameVerification,omitempty"` + + // TLSAllowInsecureConnection indicates whether to allow insecure connection to the broker. + // +optional + TLSAllowInsecureConnection bool `json:"tlsAllowInsecureConnection,omitempty"` + + // TLSTrustCertsFilePath Path for the TLS certificate used to validate the broker endpoint when using TLS. + // +optional + TLSTrustCertsFilePath string `json:"tlsTrustCertsFilePath,omitempty"` } // PulsarConnectionStatus defines the observed state of PulsarConnection. diff --git a/api/v1alpha1/zz_generated.deepcopy.go b/api/v1alpha1/zz_generated.deepcopy.go index 55f4d4c3..0eb0c0fb 100644 --- a/api/v1alpha1/zz_generated.deepcopy.go +++ b/api/v1alpha1/zz_generated.deepcopy.go @@ -917,6 +917,11 @@ func (in *PulsarAuthentication) DeepCopyInto(out *PulsarAuthentication) { *out = new(PulsarAuthenticationOAuth2) (*in).DeepCopyInto(*out) } + if in.TLS != nil { + in, out := &in.TLS, &out.TLS + *out = new(PulsarAuthenticationTLS) + **out = **in + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PulsarAuthentication. @@ -949,6 +954,21 @@ func (in *PulsarAuthenticationOAuth2) DeepCopy() *PulsarAuthenticationOAuth2 { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *PulsarAuthenticationTLS) DeepCopyInto(out *PulsarAuthenticationTLS) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PulsarAuthenticationTLS. +func (in *PulsarAuthenticationTLS) DeepCopy() *PulsarAuthenticationTLS { + if in == nil { + return nil + } + out := new(PulsarAuthenticationTLS) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *PulsarConnection) DeepCopyInto(out *PulsarConnection) { *out = *in diff --git a/config/crd/bases/resource.streamnative.io_pulsarconnections.yaml b/config/crd/bases/resource.streamnative.io_pulsarconnections.yaml index 465fd6e4..e99ef70c 100644 --- a/config/crd/bases/resource.streamnative.io_pulsarconnections.yaml +++ b/config/crd/bases/resource.streamnative.io_pulsarconnections.yaml @@ -169,6 +169,18 @@ spec: - issuerEndpoint - key type: object + tls: + description: PulsarAuthenticationTLS indicates the parameters + which are need by pulsar TLS Authentication + properties: + clientCertificateKeyPath: + type: string + clientCertificatePath: + type: string + required: + - clientCertificateKeyPath + - clientCertificatePath + type: object token: description: |- Token specifies the configuration for token-based authentication. @@ -212,6 +224,16 @@ spec: ClusterName specifies the name of the local Pulsar cluster. When setting up Geo-Replication between Pulsar instances, this should be enabled to identify the cluster. type: string + tlsEnableHostnameVerification: + description: TLSEnableHostnameVerification indicates whether to verify the hostname of the broker. + Only used when using secure urls. + type: boolean + tlsAllowInsecureConnection: + description: TLSAllowInsecureConnection indicates whether to allow insecure connection to the broker. + type: boolean + tlsTrustCertsFilePath: + description: TLSTrustCertsFilePath Path for the TLS certificate used to validate the broker endpoint when using TLS. + type: string type: object status: description: |- diff --git a/docs/pulsar_connection.md b/docs/pulsar_connection.md index 8b72ead2..7e74bf68 100644 --- a/docs/pulsar_connection.md +++ b/docs/pulsar_connection.md @@ -176,6 +176,183 @@ spec: ### TLS Connection +To create a TLS connection, you need to set the `adminServiceSecureURL`, `brokerServiceSecureURL`, and `clusterName` fields. +## Overview + +The `PulsarConnection` resource defines the connection details for a Pulsar cluster. It can be used to configure various connection parameters including service URLs, authentication methods, and cluster information. + +## Specifications + +| Field | Description | Required | Version | +|-------|-------------|----------|---------| +| `adminServiceURL` | The admin service URL of the Pulsar cluster (e.g., `http://cluster-broker.test.svc.cluster.local:8080`). | No | All | +| `adminServiceSecureURL` | The admin service URL for secure connection (HTTPS) to the Pulsar cluster (e.g., `https://cluster-broker.test.svc.cluster.local:443`). | No | ≥ 0.3.0 | +| `brokerServiceURL` | The broker service URL of the Pulsar cluster (e.g., `pulsar://cluster-broker.test.svc.cluster.local:6650`). | No | ≥ 0.3.0 | +| `brokerServiceSecureURL` | The broker service URL for secure connection (TLS) to the Pulsar cluster (e.g., `pulsar+ssl://cluster-broker.test.svc.cluster.local:6651`). | No | ≥ 0.3.0 | +| `clusterName` | The Pulsar cluster name. Use `pulsar-admin clusters list` to retrieve. Required for configuring Geo-Replication. | No | ≥ 0.3.0 | +| `authentication` | Authentication configuration. Required when authentication is enabled for the Pulsar cluster. Supports JWT Token and OAuth2 methods. | No | All | +| `brokerClientTrustCertsFilePath` | The file path to the trusted TLS certificate for outgoing connections to Pulsar brokers. Used for TLS verification. | No | ≥ 0.3.0 | + +Note: Fields marked with version ≥ 0.3.0 are only available in that version and above. + +## Authentication Methods + +The `authentication` field supports two methods: JWT Token and OAuth2. Each method can use either a Kubernetes Secret reference or a direct value. + +### Specification + +| Field | Description | Type | Required | +|-------|-------------|------|----------| +| `token` | JWT Token authentication configuration | `ValueOrSecretRef` | No | +| `oauth2` | OAuth2 authentication configuration | `PulsarAuthenticationOAuth2` | No | + +#### ValueOrSecretRef + +| Field | Description | Type | Required | +|-------|-------------|------|----------| +| `value` | Direct string value | `*string` | No | +| `secretRef` | Reference to a Kubernetes Secret | `*SecretKeyRef` | No | + +#### SecretKeyRef + +| Field | Description | Type | Required | +|-------|-------------|------|----------| +| `name` | Name of the Kubernetes Secret | `string` | Yes | +| `key` | Key in the Kubernetes Secret | `string` | Yes | + +#### PulsarAuthenticationOAuth2 + +| Field | Description | Type | Required | +|-------|-------------|------|----------| +| `issuerEndpoint` | URL of the OAuth2 authorization server | `string` | Yes | +| `clientID` | OAuth2 client identifier | `string` | Yes | +| `audience` | Intended recipient of the token | `string` | Yes | +| `key` | Client secret or path to JSON credentials file | `ValueOrSecretRef` | Yes | +| `scope` | Requested permissions from the OAuth2 server | `string` | No | + +Note: Only one authentication method (either `token` or `oauth2`) should be specified at a time. + +### JWT Token Authentication + +JWT Token authentication can be configured using either a direct value or a Kubernetes Secret reference. + +#### Using a direct value + +To use JWT Token authentication with a direct value, you can set the `token` field to the base64-encoded JWT token. + +```yaml +authentication: + token: + value: +``` + +#### Using a Kubernetes Secret reference + +To use JWT Token authentication with a Kubernetes Secret reference, you need to create a Kubernetes Secret containing the JWT token. The secret should have the following structure: + +```yaml +apiVersion: v1 +kind: Secret +metadata: + name: +type: Opaque +stringData: + : +``` + +The `` can be any name. It will be referenced in the `PulsarConnection` resource with the `token.secretRef` field. + +```yaml +authentication: + token: + secretRef: + name: + key: +``` + +### OAuth2 Authentication + +OAuth2 authentication can be configured using either a direct value or a Kubernetes Secret reference. + +#### Using a direct value + +To use OAuth2 authentication with a direct value, you can set the `issuerEndpoint`, `clientID`, `audience`, and `key` fields to the OAuth2 configuration. + +```yaml +authentication: + oauth2: + issuerEndpoint: https://auth.streamnative.cloud + clientID: + audience: urn:sn:pulsar:sndev:us-west + key: + value: | + { + "type":"sn_service_account", + "client_id":"", + "grant_type":"client_credentials", + "client_secret":"", + "issuer_url":"https://auth.streamnative.cloud" + } + scope: +``` + +#### Using a Kubernetes Secret reference + +To use OAuth2 authentication with a Kubernetes Secret reference, you need to create a Kubernetes Secret containing the OAuth2 configuration. The secret should have the following structure: + +```yaml +apiVersion: v1 +kind: Secret +metadata: + name: +type: Opaque +stringData: + : | + { + "type":"sn_service_account", + "client_id":"", + "grant_type":"client_credentials", + "client_secret":"", + "issuer_url":"https://auth.streamnative.cloud" + } +``` + +The `` should contain the OAuth2 configuration. + +```yaml +authentication: + oauth2: + issuerEndpoint: https://auth.streamnative.cloud + clientID: + audience: urn:sn:pulsar:sndev:us-west + key: + secretRef: + name: + key: + scope: +``` + +## PlainText Connection vs TLS Connection + +The `PulsarConnection` resource supports both plaintext and TLS connections. Plaintext connections are used for non-secure connections to the Pulsar cluster, while TLS connections are used for secure connections with TLS enabled. + +### Plaintext Connection + +To create a plaintext connection, you need to set the `adminServiceURL`, `brokerServiceURL`, and `clusterName` fields. + +```yaml +apiVersion: resource.streamnative.io/v1alpha1 +kind: PulsarConnection +metadata: + name: pulsar-connection +spec: + adminServiceURL: http://pulsar-sn-platform-broker.test.svc.cluster.local:8080 + brokerServiceURL: pulsar://pulsar-sn-platform-broker.test.svc.cluster.local:6650 + clusterName: pulsar-cluster +``` + +### TLS Connection + To create a TLS connection, you need to set the `adminServiceSecureURL`, `brokerServiceSecureURL`, and `clusterName` fields. ```yaml @@ -346,28 +523,95 @@ spec: ### PlainText Connection with OAuth2 Authentication with value + ```yaml + apiVersion: resource.streamnative.io/v1alpha1 + kind: PulsarConnection + metadata: + name: test-tls-pulsar-connection + namespace: test + spec: + adminServiceURL: http://test-pulsar-sn-platform-broker.test.svc.cluster.local:8080 + brokerServiceURL: pulsar://test-pulsar-sn-platform-broker.test.svc.cluster.local:6650 + clusterName: pulsar-cluster + authentication: + oauth2: + issuerEndpoint: https://auth.streamnative.cloud + clientID: pvqx76oGvWQMIGGP2ozMfOus2s4tDQAJ + audience: urn:sn:pulsar:sndev:us-west + key: + # Use the keyFile contents as the oauth2 key value + value: {"type":"sn_service_account","client_id":"zvex72oGvFQMBQGZ2ozMxOus2s4tQASJ","client_secret":"60J6fo81j-h69_vVvYvqFOHs2NfOyy6pqGqwIhTgnxpQ7O3UH8PdCbVtdm_SJjIf","client_email":"contoso@sndev.auth.streamnative.cloud","issuer_url":"https://auth.streamnative.cloud"} + +* TLS authentication + + ```yaml + apiVersion: resource.streamnative.io/v1alpha1 + kind: PulsarConnection + metadata: + name: test-tls-auth-pulsar-connection + namespace: test + spec: + adminServiceURL: http://test-pulsar-sn-platform-broker.test.svc.cluster.local:8080 + brokerServiceURL: pulsar://test-pulsar-sn-platform-broker.test.svc.cluster.local:6650 + clusterName: pulsar-cluster + authentication: + tls: + clientCertificateKeyPath: /certs/tls.key + clientCertificatePath: /certs/tls.crt + ``` + +This table lists specifications available for the `PulsarConnection` resource. + +| Option | Description | Required or not | +| ---| --- |--- | +| `adminServiceURL` | The admin service URL of the Pulsar cluster, such as `http://cluster-broker.test.svc.cluster.local:8080`. | No | +| `authentication` | A secret that stores authentication configurations. This option is required when you enable authentication for your Pulsar cluster. Support JWT Token and OAuth2 authentication methods. | No | +| `brokerServiceURL` | The broker service URL of the Pulsar cluster, such as `pulsar://cluster-broker.test.svc.cluster.local:6650`. This option is required for configuring Geo-replication. This option is available for version `0.3.0` or above. | No | +| `brokerServiceSecureURL` | The broker service URL for secure connection to the Pulsar cluster, such as `pulsar+ssl://cluster-broker.test.svc.cluster.local:6651`. This option is required for configuring Geo-replication when TLS is enabled. This option is available for version `0.3.0` or above. | No | +| `adminServiceSecureURL` | The admin service URL for secure connection to the Pulsar cluster, such as `https://cluster-broker.test.svc.cluster.local:443`. This option is available for version `0.3.0` or above. | No | +| `clusterName` | The Pulsar cluster name. You can use the `pulsar-admin clusters list` command to get the Pulsar cluster name. This option is required for configuring Geo-replication. Provided from `0.3.0` | No | +| `tlsAllowInsecureConnection` | A flag that indicates whether to allow insecure connection to the broker. Provided from `0.5.0` | No | +| `tlsEnableHostnameVerification` | A flag that indicates wether hostname verification is enabled. Provided from `0.5.0` | No | +| `tlsTrustCertsFilePath` | The path to the certificate used during hostname verfification. Provided from `0.5.0` | No | + +1. Apply the YAML file to create the Pulsar Connection. + +```shell +kubectl apply -f connection.yaml +``` + +3. Check the resource status. + +```shell +kubectl -n test get pulsarconnection.resource.streamnative.io +``` +```shell +NAME ADMIN_SERVICE_URL GENERATION OBSERVED_GENERATION READY +test-pulsar-connection http://ok-sn-platform-broker.test.svc.cluster.local:8080 1 1 True +``` + +## Update PulsarConnection + +You can update the connection by editing the connection.yaml, then apply it again. For example, if pulsar cluster doesn’t setup the authentication, then you don’t need the authentication part in the spec + ```yaml apiVersion: resource.streamnative.io/v1alpha1 kind: PulsarConnection metadata: - name: pulsar-connection-oauth2-values + name: test-pulsar-connection + namespace: test spec: - adminServiceURL: http://pulsar-sn-platform-broker.test.svc.cluster.local:8080 - brokerServiceURL: pulsar://pulsar-sn-platform-broker.test.svc.cluster.local:6650 - clusterName: pulsar-cluster - authentication: - oauth2: - issuerEndpoint: https://auth.streamnative.cloud - clientID: pvqx76oGvWQMIGGP2ozMfOus2s4tDQAJ - audience: urn:sn:pulsar:sndev:us-west - # Use the keyFile contents as the oauth2 key value - key: - value: | - { - "type":"sn_service_account", - "client_id":"pvqx76oGvWQMIGGP2ozMfOus2s4tDQAJ", - "grant_type":"client_credentials", - "client_secret":"zZr_adLu4LuPrN5FwYWH7was07-23nlzBgK50l_Rfsl2hjzUXKHsbKt", - "issuer_url":"https://auth.streamnative.cloud" - } -``` \ No newline at end of file + adminServiceURL: http://test-pulsar-sn-platform-broker.test.svc.cluster.local:8080 +``` + +```shell +kubectl apply -f connection.yaml +``` + +## Delete PulsarConnection + +```shell +kubectl -n test delete pulsarconnection.resource.streamnative.io test-pulsar-connection +``` + +Please be noticed, because the Pulsar Resources Operator are using the connection to manage pulsar resources, If you delete the pulsar connection, it will only be deleted after the resources CRs are deleted \ No newline at end of file diff --git a/pkg/admin/interface.go b/pkg/admin/interface.go index dc72fb28..5d15a59f 100644 --- a/pkg/admin/interface.go +++ b/pkg/admin/interface.go @@ -15,6 +15,7 @@ package admin import ( + "fmt" "os" "strings" @@ -236,6 +237,10 @@ type PulsarAdminConfig struct { Key string Scope string + // TLS Authentication related configuration + ClientCertificatePath string + ClientCertificateKeyPath string + PulsarAPIVersion *config.APIVersion } @@ -247,8 +252,10 @@ func NewPulsarAdmin(conf PulsarAdminConfig) (PulsarAdmin, error) { var adminClient admin.Client config := &config.Config{ - WebServiceURL: conf.WebServiceURL, - TLSAllowInsecureConnection: true, + WebServiceURL: conf.WebServiceURL, + TLSAllowInsecureConnection: conf.TLSAllowInsecureConnection, + TLSEnableHostnameVerification: conf.TLSEnableHostnameVerification, + TLSTrustCertsFilePath: conf.TLSTrustCertsFilePath, // V2 admin endpoint contains operations for tenant, namespace and topic. PulsarAPIVersion: config.V2, } @@ -290,9 +297,22 @@ func NewPulsarAdmin(conf PulsarAdminConfig) (PulsarAdmin, error) { if err != nil { return nil, err } - } else { + } else if conf.Token != "" { config.Token = conf.Token + adminClient, err = admin.New(config) + if err != nil { + return nil, err + } + } else if conf.ClientCertificatePath != "" { + config.AuthPlugin = auth.TLSPluginName + config.AuthParams = fmt.Sprintf("{\"tlsCertFile\": %q, \"tlsKeyFile\": %q}", conf.ClientCertificatePath, conf.ClientCertificateKeyPath) + + adminClient, err = admin.New(config) + if err != nil { + return nil, err + } + } else { adminClient, err = admin.New(config) if err != nil { return nil, err diff --git a/pkg/connection/reconciler.go b/pkg/connection/reconciler.go index f84c9a33..169cf937 100644 --- a/pkg/connection/reconciler.go +++ b/pkg/connection/reconciler.go @@ -308,8 +308,23 @@ func MakePulsarAdminConfig(ctx context.Context, connection *resourcev1alpha1.Pul if connection.Spec.AdminServiceURL == "" && connection.Spec.AdminServiceSecureURL == "" { return nil, fmt.Errorf("adminServiceURL or adminServiceSecureURL must not be empty") } + + webserviceURL := connection.Spec.AdminServiceSecureURL + tlsEnableHostnameVerification := connection.Spec.TLSEnableHostnameVerification + tlsAllowInsecureConnection := connection.Spec.TLSAllowInsecureConnection + tlsTrustCertsFilePath := connection.Spec.TLSTrustCertsFilePath + + if connection.Spec.AdminServiceSecureURL == "" { + webserviceURL = connection.Spec.AdminServiceURL + tlsEnableHostnameVerification = false + tlsAllowInsecureConnection = true + tlsTrustCertsFilePath = "" + } cfg := admin.PulsarAdminConfig{ - WebServiceURL: connection.Spec.AdminServiceURL, + WebServiceURL: webserviceURL, + TLSAllowInsecureConnection: tlsAllowInsecureConnection, + TLSEnableHostnameVerification: tlsEnableHostnameVerification, + TLSTrustCertsFilePath: tlsTrustCertsFilePath, } hasAuth := false if authn := connection.Spec.Authentication; authn != nil { @@ -339,6 +354,10 @@ func MakePulsarAdminConfig(ctx context.Context, connection *resourcev1alpha1.Pul cfg.Key = *value } } + if tls := authn.TLS; tls != nil { + cfg.ClientCertificatePath = tls.ClientCertificatePath + cfg.ClientCertificateKeyPath = tls.ClientCertificateKeyPath + } } return &cfg, nil }