diff --git a/api/v1alpha1/pulsarnamespace_types.go b/api/v1alpha1/pulsarnamespace_types.go index 688146a2..5fdf1039 100644 --- a/api/v1alpha1/pulsarnamespace_types.go +++ b/api/v1alpha1/pulsarnamespace_types.go @@ -22,6 +22,20 @@ import ( "github.com/streamnative/pulsar-resources-operator/pkg/utils" ) +// TopicAutoCreationConfig defines the configuration for automatic topic creation +type TopicAutoCreationConfig struct { + // Allow specifies whether to allow automatic topic creation + Allow bool `json:"allow,omitempty"` + + // Type specifies the type of automatically created topics + // +kubebuilder:validation:Enum=partitioned;non-partitioned + Type string `json:"type,omitempty"` + + // Partitions specifies the default number of partitions for automatically created topics + // +optional + Partitions *int32 `json:"partitions,omitempty"` +} + // EDIT THIS FILE! THIS IS SCAFFOLDING FOR YOU TO OWN! // NOTE: json tags are required. Any new fields you add must have json tags for the fields to be serialized. @@ -130,6 +144,11 @@ type PulsarNamespaceSpec struct { // BookieAffinityGroup is the name of the namespace isolation policy to apply to the namespace. BookieAffinityGroup *BookieAffinityGroupData `json:"bookieAffinityGroup,omitempty"` + + // TopicAutoCreationConfig controls whether automatic topic creation is allowed in this namespace + // and configures properties of automatically created topics + // +optional + TopicAutoCreationConfig *TopicAutoCreationConfig `json:"topicAutoCreationConfig,omitempty"` } type BookieAffinityGroupData struct { diff --git a/api/v1alpha1/zz_generated.deepcopy.go b/api/v1alpha1/zz_generated.deepcopy.go index 0eb0c0fb..c84bfa2d 100644 --- a/api/v1alpha1/zz_generated.deepcopy.go +++ b/api/v1alpha1/zz_generated.deepcopy.go @@ -1638,6 +1638,11 @@ func (in *PulsarNamespaceSpec) DeepCopyInto(out *PulsarNamespaceSpec) { *out = new(BookieAffinityGroupData) **out = **in } + if in.TopicAutoCreationConfig != nil { + in, out := &in.TopicAutoCreationConfig, &out.TopicAutoCreationConfig + *out = new(TopicAutoCreationConfig) + (*in).DeepCopyInto(*out) + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PulsarNamespaceSpec. @@ -2828,6 +2833,26 @@ func (in *Toleration) DeepCopy() *Toleration { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *TopicAutoCreationConfig) DeepCopyInto(out *TopicAutoCreationConfig) { + *out = *in + if in.Partitions != nil { + in, out := &in.Partitions, &out.Partitions + *out = new(int32) + **out = **in + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new TopicAutoCreationConfig. +func (in *TopicAutoCreationConfig) DeepCopy() *TopicAutoCreationConfig { + if in == nil { + return nil + } + out := new(TopicAutoCreationConfig) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *UserMetadata) DeepCopyInto(out *UserMetadata) { *out = *in diff --git a/config/crd/bases/resource.streamnative.io_pulsarnamespaces.yaml b/config/crd/bases/resource.streamnative.io_pulsarnamespaces.yaml index 7fc2ea5f..c5c14219 100644 --- a/config/crd/bases/resource.streamnative.io_pulsarnamespaces.yaml +++ b/config/crd/bases/resource.streamnative.io_pulsarnamespaces.yaml @@ -238,6 +238,28 @@ spec: Should be set in conjunction with RetentionSize for effective retention policy. Retention Quota must exceed configured backlog quota for namespace type: string + topicAutoCreationConfig: + description: |- + TopicAutoCreationConfig controls whether automatic topic creation is allowed in this namespace + and configures properties of automatically created topics + properties: + allow: + description: Allow specifies whether to allow automatic topic + creation + type: boolean + partitions: + description: Partitions specifies the default number of partitions + for automatically created topics + format: int32 + type: integer + type: + description: Type specifies the type of automatically created + topics + enum: + - partitioned + - non-partitioned + type: string + type: object required: - connectionRef - name diff --git a/docs/pulsar_namespace.md b/docs/pulsar_namespace.md index 7e00197a..21204b08 100644 --- a/docs/pulsar_namespace.md +++ b/docs/pulsar_namespace.md @@ -28,9 +28,48 @@ The `PulsarNamespace` resource defines a namespace in a Pulsar cluster. It allow | `replicationClusters` | List of clusters to which the namespace is replicated. Use only if replicating clusters within the same Pulsar instance. | No | | `deduplication` | Whether to enable message deduplication for the namespace. | No | | `bookieAffinityGroup` | Set the bookie-affinity group for the namespace, which has two sub fields: `bookkeeperAffinityGroupPrimary(String)` is required, and `bookkeeperAffinityGroupSecondary(String)` is optional. | No | +| `topicAutoCreationConfig` | Configures automatic topic creation behavior within this namespace. Contains settings for whether auto-creation is allowed, the type of topics created, and default number of partitions. | No | Note: Valid time units are "s" (seconds), "m" (minutes), "h" (hours), "d" (days), "w" (weeks). +## topicAutoCreationConfig + +The `topicAutoCreationConfig` field allows you to control the automatic topic creation behavior at the namespace level: + +| Field | Description | Required | +|-------------|---------------------------------------------------------------------------------------|----------| +| `allow` | Whether automatic topic creation is allowed in this namespace. | No | +| `type` | The type of topics to create automatically. Options: "partitioned", "non-partitioned". | No | +| `partitions`| The default number of partitions for automatically created topics when type is "partitioned". | No | + +This configuration overrides the broker's default topic auto-creation settings for the specific namespace. When a client attempts to produce messages to or consume messages from a non-existent topic, the broker can automatically create that topic based on these settings. + +### Configuration examples + +1. **Enable auto-creation with partitioned topics**: + ```yaml + topicAutoCreationConfig: + allow: true + type: "partitioned" + partitions: 8 + ``` + This will automatically create partitioned topics with 8 partitions when clients attempt to use non-existent topics. + +2. **Enable auto-creation with non-partitioned topics**: + ```yaml + topicAutoCreationConfig: + allow: true + type: "non-partitioned" + ``` + This will automatically create non-partitioned topics when clients attempt to use non-existent topics. + +3. **Disable auto-creation**: + ```yaml + topicAutoCreationConfig: + allow: false + ``` + This explicitly disables topic auto-creation for the namespace, overriding any broker-level settings that might enable it. + ## replicationClusters vs geoReplicationRefs The `replicationClusters` and `geoReplicationRefs` fields serve different purposes in configuring replication for a Pulsar namespace: @@ -74,6 +113,10 @@ spec: # retentionTime: 20h # retentionSize: 2Gi # lifecyclePolicy: CleanUpAfterDeletion + # topicAutoCreationConfig: + # allow: true + # type: "partitioned" + # partitions: 4 ``` 2. Apply the YAML file to create the namespace. @@ -101,7 +144,7 @@ Please note the following important points: 1. The fields `name` and `bundles` cannot be updated after the namespace is created. These are immutable properties of the namespace. -2. Other fields such as `backlogQuotaLimitSize`, `backlogQuotaLimitTime`, `messageTTL`, `maxProducersPerTopic`, `maxConsumersPerTopic`, `maxConsumersPerSubscription`, `retentionTime`, and `retentionSize` can be modified. +2. Other fields such as `backlogQuotaLimitSize`, `backlogQuotaLimitTime`, `messageTTL`, `maxProducersPerTopic`, `maxConsumersPerTopic`, `maxConsumersPerSubscription`, `retentionTime`, `retentionSize`, and `topicAutoCreationConfig` can be modified. 3. If you want to change the `connectionRef`, ensure that the new PulsarConnection resource exists and is properly configured. Changing the `connectionRef` can have significant implications: diff --git a/pkg/admin/impl.go b/pkg/admin/impl.go index 24736485..9f3e75ad 100644 --- a/pkg/admin/impl.go +++ b/pkg/admin/impl.go @@ -471,6 +471,38 @@ func (p *PulsarAdminClient) applyNamespacePolicies(completeNSName string, params } } + // Handle topic auto-creation configuration + if params.TopicAutoCreationConfig != nil { + topicTypeStr, err := utils.ParseTopicType(params.TopicAutoCreationConfig.Type) + if err != nil { + return err + } + + // Convert operator's TopicAutoCreationConfig to Pulsar client's TopicAutoCreationConfig + config := &utils.TopicAutoCreationConfig{ + Allow: params.TopicAutoCreationConfig.Allow, + Type: topicTypeStr, + } + + // Set default partitions + if params.TopicAutoCreationConfig.Partitions != nil { + partitions := int(*params.TopicAutoCreationConfig.Partitions) + config.Partitions = &partitions + } + + // Call Pulsar client API to set topic auto-creation configuration + err = p.adminClient.Namespaces().SetTopicAutoCreation(*naName, *config) + if err != nil { + return err + } + } else { + // If no configuration is specified, try to remove topic auto-creation configuration (ignore errors if it doesn't exist) + err = p.adminClient.Namespaces().RemoveTopicAutoCreation(*naName) + if err != nil && !IsNotFound(err) { + return err + } + } + return nil } diff --git a/pkg/admin/interface.go b/pkg/admin/interface.go index f510511e..1cb42ed4 100644 --- a/pkg/admin/interface.go +++ b/pkg/admin/interface.go @@ -55,6 +55,7 @@ type NamespaceParams struct { ReplicationClusters []string Deduplication *bool BookieAffinityGroup *v1alpha1.BookieAffinityGroupData + TopicAutoCreationConfig *v1alpha1.TopicAutoCreationConfig } // TopicParams indicates the parameters for creating a topic diff --git a/pkg/connection/reconcile_namespace.go b/pkg/connection/reconcile_namespace.go index 705f549e..43fd2875 100644 --- a/pkg/connection/reconcile_namespace.go +++ b/pkg/connection/reconcile_namespace.go @@ -164,6 +164,7 @@ func (r *PulsarNamespaceReconciler) ReconcileNamespace(ctx context.Context, puls OffloadThresholdSize: namespace.Spec.OffloadThresholdSize, Deduplication: namespace.Spec.Deduplication, BookieAffinityGroup: namespace.Spec.BookieAffinityGroup, + TopicAutoCreationConfig: namespace.Spec.TopicAutoCreationConfig, } if refs := namespace.Spec.GeoReplicationRefs; len(refs) != 0 || len(namespace.Spec.ReplicationClusters) > 0 { diff --git a/tests/utils/spec.go b/tests/utils/spec.go index b946e080..ce0c418f 100644 --- a/tests/utils/spec.go +++ b/tests/utils/spec.go @@ -23,6 +23,7 @@ import ( "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/utils/pointer" + "k8s.io/utils/ptr" "github.com/streamnative/pulsar-resources-operator/api/v1alpha1" rsutils "github.com/streamnative/pulsar-resources-operator/pkg/utils" @@ -81,6 +82,11 @@ func MakePulsarNamespace(namespace, name, namespaceName, connectionName string, BacklogQuotaLimitSize: &backlogSize, Bundles: &bundle, MessageTTL: ttl, + TopicAutoCreationConfig: &v1alpha1.TopicAutoCreationConfig{ + Allow: true, + Type: "partitioned", + Partitions: ptr.To[int32](10), + }, }, } }