Skip to content

Commit 1629fbc

Browse files
authored
feat: namespace config for topic auto creation (#303)
* feat: support namespace TopicAutoCreation * sync docs * fix ci
1 parent 2cf43d6 commit 1629fbc

File tree

8 files changed

+150
-1
lines changed

8 files changed

+150
-1
lines changed

api/v1alpha1/pulsarnamespace_types.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,20 @@ import (
2222
"github.com/streamnative/pulsar-resources-operator/pkg/utils"
2323
)
2424

25+
// TopicAutoCreationConfig defines the configuration for automatic topic creation
26+
type TopicAutoCreationConfig struct {
27+
// Allow specifies whether to allow automatic topic creation
28+
Allow bool `json:"allow,omitempty"`
29+
30+
// Type specifies the type of automatically created topics
31+
// +kubebuilder:validation:Enum=partitioned;non-partitioned
32+
Type string `json:"type,omitempty"`
33+
34+
// Partitions specifies the default number of partitions for automatically created topics
35+
// +optional
36+
Partitions *int32 `json:"partitions,omitempty"`
37+
}
38+
2539
// EDIT THIS FILE! THIS IS SCAFFOLDING FOR YOU TO OWN!
2640
// NOTE: json tags are required. Any new fields you add must have json tags for the fields to be serialized.
2741

@@ -130,6 +144,11 @@ type PulsarNamespaceSpec struct {
130144

131145
// BookieAffinityGroup is the name of the namespace isolation policy to apply to the namespace.
132146
BookieAffinityGroup *BookieAffinityGroupData `json:"bookieAffinityGroup,omitempty"`
147+
148+
// TopicAutoCreationConfig controls whether automatic topic creation is allowed in this namespace
149+
// and configures properties of automatically created topics
150+
// +optional
151+
TopicAutoCreationConfig *TopicAutoCreationConfig `json:"topicAutoCreationConfig,omitempty"`
133152
}
134153

135154
type BookieAffinityGroupData struct {

api/v1alpha1/zz_generated.deepcopy.go

Lines changed: 25 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

config/crd/bases/resource.streamnative.io_pulsarnamespaces.yaml

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -238,6 +238,28 @@ spec:
238238
Should be set in conjunction with RetentionSize for effective retention policy.
239239
Retention Quota must exceed configured backlog quota for namespace
240240
type: string
241+
topicAutoCreationConfig:
242+
description: |-
243+
TopicAutoCreationConfig controls whether automatic topic creation is allowed in this namespace
244+
and configures properties of automatically created topics
245+
properties:
246+
allow:
247+
description: Allow specifies whether to allow automatic topic
248+
creation
249+
type: boolean
250+
partitions:
251+
description: Partitions specifies the default number of partitions
252+
for automatically created topics
253+
format: int32
254+
type: integer
255+
type:
256+
description: Type specifies the type of automatically created
257+
topics
258+
enum:
259+
- partitioned
260+
- non-partitioned
261+
type: string
262+
type: object
241263
required:
242264
- connectionRef
243265
- name

docs/pulsar_namespace.md

Lines changed: 44 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,48 @@ The `PulsarNamespace` resource defines a namespace in a Pulsar cluster. It allow
2828
| `replicationClusters` | List of clusters to which the namespace is replicated. Use only if replicating clusters within the same Pulsar instance. | No |
2929
| `deduplication` | Whether to enable message deduplication for the namespace. | No |
3030
| `bookieAffinityGroup` | Set the bookie-affinity group for the namespace, which has two sub fields: `bookkeeperAffinityGroupPrimary(String)` is required, and `bookkeeperAffinityGroupSecondary(String)` is optional. | No |
31+
| `topicAutoCreationConfig` | Configures automatic topic creation behavior within this namespace. Contains settings for whether auto-creation is allowed, the type of topics created, and default number of partitions. | No |
3132

3233
Note: Valid time units are "s" (seconds), "m" (minutes), "h" (hours), "d" (days), "w" (weeks).
3334

35+
## topicAutoCreationConfig
36+
37+
The `topicAutoCreationConfig` field allows you to control the automatic topic creation behavior at the namespace level:
38+
39+
| Field | Description | Required |
40+
|-------------|---------------------------------------------------------------------------------------|----------|
41+
| `allow` | Whether automatic topic creation is allowed in this namespace. | No |
42+
| `type` | The type of topics to create automatically. Options: "partitioned", "non-partitioned". | No |
43+
| `partitions`| The default number of partitions for automatically created topics when type is "partitioned". | No |
44+
45+
This configuration overrides the broker's default topic auto-creation settings for the specific namespace. When a client attempts to produce messages to or consume messages from a non-existent topic, the broker can automatically create that topic based on these settings.
46+
47+
### Configuration examples
48+
49+
1. **Enable auto-creation with partitioned topics**:
50+
```yaml
51+
topicAutoCreationConfig:
52+
allow: true
53+
type: "partitioned"
54+
partitions: 8
55+
```
56+
This will automatically create partitioned topics with 8 partitions when clients attempt to use non-existent topics.
57+
58+
2. **Enable auto-creation with non-partitioned topics**:
59+
```yaml
60+
topicAutoCreationConfig:
61+
allow: true
62+
type: "non-partitioned"
63+
```
64+
This will automatically create non-partitioned topics when clients attempt to use non-existent topics.
65+
66+
3. **Disable auto-creation**:
67+
```yaml
68+
topicAutoCreationConfig:
69+
allow: false
70+
```
71+
This explicitly disables topic auto-creation for the namespace, overriding any broker-level settings that might enable it.
72+
3473
## replicationClusters vs geoReplicationRefs
3574
3675
The `replicationClusters` and `geoReplicationRefs` fields serve different purposes in configuring replication for a Pulsar namespace:
@@ -74,6 +113,10 @@ spec:
74113
# retentionTime: 20h
75114
# retentionSize: 2Gi
76115
# lifecyclePolicy: CleanUpAfterDeletion
116+
# topicAutoCreationConfig:
117+
# allow: true
118+
# type: "partitioned"
119+
# partitions: 4
77120
```
78121

79122
2. Apply the YAML file to create the namespace.
@@ -101,7 +144,7 @@ Please note the following important points:
101144

102145
1. The fields `name` and `bundles` cannot be updated after the namespace is created. These are immutable properties of the namespace.
103146

104-
2. Other fields such as `backlogQuotaLimitSize`, `backlogQuotaLimitTime`, `messageTTL`, `maxProducersPerTopic`, `maxConsumersPerTopic`, `maxConsumersPerSubscription`, `retentionTime`, and `retentionSize` can be modified.
147+
2. Other fields such as `backlogQuotaLimitSize`, `backlogQuotaLimitTime`, `messageTTL`, `maxProducersPerTopic`, `maxConsumersPerTopic`, `maxConsumersPerSubscription`, `retentionTime`, `retentionSize`, and `topicAutoCreationConfig` can be modified.
105148

106149
3. If you want to change the `connectionRef`, ensure that the new PulsarConnection resource exists and is properly configured. Changing the `connectionRef` can have significant implications:
107150

pkg/admin/impl.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -471,6 +471,38 @@ func (p *PulsarAdminClient) applyNamespacePolicies(completeNSName string, params
471471
}
472472
}
473473

474+
// Handle topic auto-creation configuration
475+
if params.TopicAutoCreationConfig != nil {
476+
topicTypeStr, err := utils.ParseTopicType(params.TopicAutoCreationConfig.Type)
477+
if err != nil {
478+
return err
479+
}
480+
481+
// Convert operator's TopicAutoCreationConfig to Pulsar client's TopicAutoCreationConfig
482+
config := &utils.TopicAutoCreationConfig{
483+
Allow: params.TopicAutoCreationConfig.Allow,
484+
Type: topicTypeStr,
485+
}
486+
487+
// Set default partitions
488+
if params.TopicAutoCreationConfig.Partitions != nil {
489+
partitions := int(*params.TopicAutoCreationConfig.Partitions)
490+
config.Partitions = &partitions
491+
}
492+
493+
// Call Pulsar client API to set topic auto-creation configuration
494+
err = p.adminClient.Namespaces().SetTopicAutoCreation(*naName, *config)
495+
if err != nil {
496+
return err
497+
}
498+
} else {
499+
// If no configuration is specified, try to remove topic auto-creation configuration (ignore errors if it doesn't exist)
500+
err = p.adminClient.Namespaces().RemoveTopicAutoCreation(*naName)
501+
if err != nil && !IsNotFound(err) {
502+
return err
503+
}
504+
}
505+
474506
return nil
475507
}
476508

pkg/admin/interface.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ type NamespaceParams struct {
5555
ReplicationClusters []string
5656
Deduplication *bool
5757
BookieAffinityGroup *v1alpha1.BookieAffinityGroupData
58+
TopicAutoCreationConfig *v1alpha1.TopicAutoCreationConfig
5859
}
5960

6061
// TopicParams indicates the parameters for creating a topic

pkg/connection/reconcile_namespace.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,7 @@ func (r *PulsarNamespaceReconciler) ReconcileNamespace(ctx context.Context, puls
164164
OffloadThresholdSize: namespace.Spec.OffloadThresholdSize,
165165
Deduplication: namespace.Spec.Deduplication,
166166
BookieAffinityGroup: namespace.Spec.BookieAffinityGroup,
167+
TopicAutoCreationConfig: namespace.Spec.TopicAutoCreationConfig,
167168
}
168169

169170
if refs := namespace.Spec.GeoReplicationRefs; len(refs) != 0 || len(namespace.Spec.ReplicationClusters) > 0 {

tests/utils/spec.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"k8s.io/apimachinery/pkg/api/resource"
2424
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2525
"k8s.io/utils/pointer"
26+
"k8s.io/utils/ptr"
2627

2728
"github.com/streamnative/pulsar-resources-operator/api/v1alpha1"
2829
rsutils "github.com/streamnative/pulsar-resources-operator/pkg/utils"
@@ -81,6 +82,11 @@ func MakePulsarNamespace(namespace, name, namespaceName, connectionName string,
8182
BacklogQuotaLimitSize: &backlogSize,
8283
Bundles: &bundle,
8384
MessageTTL: ttl,
85+
TopicAutoCreationConfig: &v1alpha1.TopicAutoCreationConfig{
86+
Allow: true,
87+
Type: "partitioned",
88+
Partitions: ptr.To[int32](10),
89+
},
8490
},
8591
}
8692
}

0 commit comments

Comments
 (0)