Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions api/v1alpha1/pulsarnamespace_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,20 @@ import (
"github.com/streamnative/pulsar-resources-operator/pkg/utils"
)

// TopicAutoCreationConfig defines the configuration for automatic topic creation
type TopicAutoCreationConfig struct {
// Allow specifies whether to allow automatic topic creation
Allow bool `json:"allow,omitempty"`

// Type specifies the type of automatically created topics
// +kubebuilder:validation:Enum=partitioned;non-partitioned
Type string `json:"type,omitempty"`

// Partitions specifies the default number of partitions for automatically created topics
// +optional
Partitions *int32 `json:"partitions,omitempty"`
}

// EDIT THIS FILE! THIS IS SCAFFOLDING FOR YOU TO OWN!
// NOTE: json tags are required. Any new fields you add must have json tags for the fields to be serialized.

Expand Down Expand Up @@ -130,6 +144,11 @@ type PulsarNamespaceSpec struct {

// BookieAffinityGroup is the name of the namespace isolation policy to apply to the namespace.
BookieAffinityGroup *BookieAffinityGroupData `json:"bookieAffinityGroup,omitempty"`

// TopicAutoCreationConfig controls whether automatic topic creation is allowed in this namespace
// and configures properties of automatically created topics
// +optional
TopicAutoCreationConfig *TopicAutoCreationConfig `json:"topicAutoCreationConfig,omitempty"`
}

type BookieAffinityGroupData struct {
Expand Down
25 changes: 25 additions & 0 deletions api/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

22 changes: 22 additions & 0 deletions config/crd/bases/resource.streamnative.io_pulsarnamespaces.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,28 @@ spec:
Should be set in conjunction with RetentionSize for effective retention policy.
Retention Quota must exceed configured backlog quota for namespace
type: string
topicAutoCreationConfig:
description: |-
TopicAutoCreationConfig controls whether automatic topic creation is allowed in this namespace
and configures properties of automatically created topics
properties:
allow:
description: Allow specifies whether to allow automatic topic
creation
type: boolean
partitions:
description: Partitions specifies the default number of partitions
for automatically created topics
format: int32
type: integer
type:
description: Type specifies the type of automatically created
topics
enum:
- partitioned
- non-partitioned
type: string
type: object
required:
- connectionRef
- name
Expand Down
45 changes: 44 additions & 1 deletion docs/pulsar_namespace.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,48 @@ The `PulsarNamespace` resource defines a namespace in a Pulsar cluster. It allow
| `replicationClusters` | List of clusters to which the namespace is replicated. Use only if replicating clusters within the same Pulsar instance. | No |
| `deduplication` | Whether to enable message deduplication for the namespace. | No |
| `bookieAffinityGroup` | Set the bookie-affinity group for the namespace, which has two sub fields: `bookkeeperAffinityGroupPrimary(String)` is required, and `bookkeeperAffinityGroupSecondary(String)` is optional. | No |
| `topicAutoCreationConfig` | Configures automatic topic creation behavior within this namespace. Contains settings for whether auto-creation is allowed, the type of topics created, and default number of partitions. | No |

Note: Valid time units are "s" (seconds), "m" (minutes), "h" (hours), "d" (days), "w" (weeks).

## topicAutoCreationConfig

The `topicAutoCreationConfig` field allows you to control the automatic topic creation behavior at the namespace level:

| Field | Description | Required |
|-------------|---------------------------------------------------------------------------------------|----------|
| `allow` | Whether automatic topic creation is allowed in this namespace. | No |
| `type` | The type of topics to create automatically. Options: "partitioned", "non-partitioned". | No |
| `partitions`| The default number of partitions for automatically created topics when type is "partitioned". | No |

This configuration overrides the broker's default topic auto-creation settings for the specific namespace. When a client attempts to produce messages to or consume messages from a non-existent topic, the broker can automatically create that topic based on these settings.

### Configuration examples

1. **Enable auto-creation with partitioned topics**:
```yaml
topicAutoCreationConfig:
allow: true
type: "partitioned"
partitions: 8
```
This will automatically create partitioned topics with 8 partitions when clients attempt to use non-existent topics.

2. **Enable auto-creation with non-partitioned topics**:
```yaml
topicAutoCreationConfig:
allow: true
type: "non-partitioned"
```
This will automatically create non-partitioned topics when clients attempt to use non-existent topics.

3. **Disable auto-creation**:
```yaml
topicAutoCreationConfig:
allow: false
```
This explicitly disables topic auto-creation for the namespace, overriding any broker-level settings that might enable it.

## replicationClusters vs geoReplicationRefs

The `replicationClusters` and `geoReplicationRefs` fields serve different purposes in configuring replication for a Pulsar namespace:
Expand Down Expand Up @@ -74,6 +113,10 @@ spec:
# retentionTime: 20h
# retentionSize: 2Gi
# lifecyclePolicy: CleanUpAfterDeletion
# topicAutoCreationConfig:
# allow: true
# type: "partitioned"
# partitions: 4
```

2. Apply the YAML file to create the namespace.
Expand Down Expand Up @@ -101,7 +144,7 @@ Please note the following important points:

1. The fields `name` and `bundles` cannot be updated after the namespace is created. These are immutable properties of the namespace.

2. Other fields such as `backlogQuotaLimitSize`, `backlogQuotaLimitTime`, `messageTTL`, `maxProducersPerTopic`, `maxConsumersPerTopic`, `maxConsumersPerSubscription`, `retentionTime`, and `retentionSize` can be modified.
2. Other fields such as `backlogQuotaLimitSize`, `backlogQuotaLimitTime`, `messageTTL`, `maxProducersPerTopic`, `maxConsumersPerTopic`, `maxConsumersPerSubscription`, `retentionTime`, `retentionSize`, and `topicAutoCreationConfig` can be modified.

3. If you want to change the `connectionRef`, ensure that the new PulsarConnection resource exists and is properly configured. Changing the `connectionRef` can have significant implications:

Expand Down
32 changes: 32 additions & 0 deletions pkg/admin/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -471,6 +471,38 @@ func (p *PulsarAdminClient) applyNamespacePolicies(completeNSName string, params
}
}

// Handle topic auto-creation configuration
if params.TopicAutoCreationConfig != nil {
topicTypeStr, err := utils.ParseTopicType(params.TopicAutoCreationConfig.Type)
if err != nil {
return err
}

// Convert operator's TopicAutoCreationConfig to Pulsar client's TopicAutoCreationConfig
config := &utils.TopicAutoCreationConfig{
Allow: params.TopicAutoCreationConfig.Allow,
Type: topicTypeStr,
}

// Set default partitions
if params.TopicAutoCreationConfig.Partitions != nil {
partitions := int(*params.TopicAutoCreationConfig.Partitions)
config.Partitions = &partitions
}

// Call Pulsar client API to set topic auto-creation configuration
err = p.adminClient.Namespaces().SetTopicAutoCreation(*naName, *config)
if err != nil {
return err
}
} else {
// If no configuration is specified, try to remove topic auto-creation configuration (ignore errors if it doesn't exist)
err = p.adminClient.Namespaces().RemoveTopicAutoCreation(*naName)
if err != nil && !IsNotFound(err) {
return err
}
}

return nil
}

Expand Down
1 change: 1 addition & 0 deletions pkg/admin/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ type NamespaceParams struct {
ReplicationClusters []string
Deduplication *bool
BookieAffinityGroup *v1alpha1.BookieAffinityGroupData
TopicAutoCreationConfig *v1alpha1.TopicAutoCreationConfig
}

// TopicParams indicates the parameters for creating a topic
Expand Down
1 change: 1 addition & 0 deletions pkg/connection/reconcile_namespace.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ func (r *PulsarNamespaceReconciler) ReconcileNamespace(ctx context.Context, puls
OffloadThresholdSize: namespace.Spec.OffloadThresholdSize,
Deduplication: namespace.Spec.Deduplication,
BookieAffinityGroup: namespace.Spec.BookieAffinityGroup,
TopicAutoCreationConfig: namespace.Spec.TopicAutoCreationConfig,
}

if refs := namespace.Spec.GeoReplicationRefs; len(refs) != 0 || len(namespace.Spec.ReplicationClusters) > 0 {
Expand Down
6 changes: 6 additions & 0 deletions tests/utils/spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/utils/pointer"
"k8s.io/utils/ptr"

"github.com/streamnative/pulsar-resources-operator/api/v1alpha1"
rsutils "github.com/streamnative/pulsar-resources-operator/pkg/utils"
Expand Down Expand Up @@ -81,6 +82,11 @@ func MakePulsarNamespace(namespace, name, namespaceName, connectionName string,
BacklogQuotaLimitSize: &backlogSize,
Bundles: &bundle,
MessageTTL: ttl,
TopicAutoCreationConfig: &v1alpha1.TopicAutoCreationConfig{
Allow: true,
Type: "partitioned",
Partitions: ptr.To[int32](10),
},
},
}
}
Expand Down