Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions api/v1alpha1/pulsarnamespace_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

adminutils "github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils"
"github.com/streamnative/pulsar-resources-operator/pkg/utils"
)

Expand Down Expand Up @@ -64,6 +65,19 @@ type PulsarNamespaceSpec struct {
// +optional
LifecyclePolicy PulsarResourceLifeCyclePolicy `json:"lifecyclePolicy,omitempty"`

// SchemaCompatibilityStrategy defines the schema compatibility strategy for this namespace.
// If not specified, the cluster's default schema compatibility strategy will be used.
// This setting controls how schema evolution is handled for topics within this namespace.
// +optional
// +kubebuilder:validation:Enum=AutoUpdateDisabled;Backward;Forward;Full;AlwaysCompatible;BackwardTransitive;ForwardTransitive;FullTransitive
SchemaCompatibilityStrategy *adminutils.SchemaCompatibilityStrategy `json:"schemaCompatibilityStrategy,omitempty"`

// SchemaValidationEnforced controls whether schema validation is enforced for this namespace.
// When enabled, producers must provide a schema when publishing messages.
// If not specified, the cluster's default schema validation enforcement setting will be used.
// +optional
SchemaValidationEnforced *bool `json:"schemaValidationEnforced,omitempty"`

// MaxProducersPerTopic sets the maximum number of producers allowed on a single topic in the namespace.
// +optional
MaxProducersPerTopic *int32 `json:"maxProducersPerTopic,omitempty"`
Expand Down
27 changes: 19 additions & 8 deletions api/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 21 additions & 0 deletions config/crd/bases/resource.streamnative.io_pulsarnamespaces.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,27 @@ spec:
Should be set in conjunction with RetentionSize for effective retention policy.
Retention Quota must exceed configured backlog quota for namespace
type: string
schemaCompatibilityStrategy:
description: |-
SchemaCompatibilityStrategy defines the schema compatibility strategy for this namespace.
If not specified, the cluster's default schema compatibility strategy will be used.
This setting controls how schema evolution is handled for topics within this namespace.
enum:
- AutoUpdateDisabled
- Backward
- Forward
- Full
- AlwaysCompatible
- BackwardTransitive
- ForwardTransitive
- FullTransitive
type: string
schemaValidationEnforced:
description: |-
SchemaValidationEnforced controls whether schema validation is enforced for this namespace.
When enabled, producers must provide a schema when publishing messages.
If not specified, the cluster's default schema validation enforcement setting will be used.
type: boolean
topicAutoCreationConfig:
description: |-
TopicAutoCreationConfig controls whether automatic topic creation is allowed in this namespace
Expand Down
90 changes: 89 additions & 1 deletion docs/pulsar_namespace.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ The `PulsarNamespace` resource defines a namespace in a Pulsar cluster. It allow
| `deduplication` | Whether to enable message deduplication for the namespace. | No |
| `bookieAffinityGroup` | Set the bookie-affinity group for the namespace, which has two sub fields: `bookkeeperAffinityGroupPrimary(String)` is required, and `bookkeeperAffinityGroupSecondary(String)` is optional. | No |
| `topicAutoCreationConfig` | Configures automatic topic creation behavior within this namespace. Contains settings for whether auto-creation is allowed, the type of topics created, and default number of partitions. | No |
| `schemaCompatibilityStrategy` | Schema compatibility strategy for this namespace. Controls how schema evolution is handled for topics within this namespace. Options: `AutoUpdateDisabled`, `Backward`, `Forward`, `Full`, `AlwaysCompatible`, `BackwardTransitive`, `ForwardTransitive`, `FullTransitive`. | No |
| `schemaValidationEnforced` | Controls whether schema validation is enforced for this namespace. When enabled, producers must provide a schema when publishing messages. If not specified, the cluster's default schema validation enforcement setting will be used. | No |

Note: Valid time units are "s" (seconds), "m" (minutes), "h" (hours), "d" (days), "w" (weeks).

Expand Down Expand Up @@ -70,6 +72,89 @@ This configuration overrides the broker's default topic auto-creation settings f
```
This explicitly disables topic auto-creation for the namespace, overriding any broker-level settings that might enable it.

## Schema Management

Pulsar provides powerful schema management capabilities at the namespace level, allowing you to control how schema evolution is handled and whether schema validation is enforced. This feature consists of two complementary settings: schema compatibility strategy and schema validation enforcement.

### Schema Compatibility Strategy

The `schemaCompatibilityStrategy` field controls how Pulsar handles schema evolution for topics within the namespace. This allows you to configure different compatibility requirements for different namespaces based on your use case.

#### Available Strategies

1. **AutoUpdateDisabled**: Disables automatic schema updates and requires manual schema management. This is the most restrictive strategy, suitable for ultra-stable environments where strict schema control is required and no automatic schema evolution is desired.

2. **AlwaysCompatible**: Allows any schema changes without validation. This is the most permissive strategy but may lead to compatibility issues. Suitable for development/testing environments.

3. **Backward**: New schema can read data written with the previous schema. This strategy supports consumer-driven schema evolution, such as adding optional fields or removing fields.

4. **BackwardTransitive**: New schema can read data written with any previous schema in the chain. This provides long-term backward compatibility across multiple schema versions.

5. **Forward**: Previous schema can read data written with the new schema. This strategy supports producer-driven schema evolution, such as adding fields that older consumers can ignore.

6. **ForwardTransitive**: Any previous schema can read data written with the new schema. This ensures new data is readable by any older schema version.

7. **Full**: Schema changes are both forward and backward compatible. Both new and previous schemas can read data written by either schema. This provides strict compatibility requirements in both directions.

8. **FullTransitive**: Schema changes are forward and backward compatible with all schemas. Any schema in the chain can read data written by any other schema in the chain. This provides maximum compatibility guarantees.

#### Usage Examples

**Development Environment**:
```yaml
schemaCompatibilityStrategy: AlwaysCompatible
```

**Production Environment**:
```yaml
schemaCompatibilityStrategy: Backward
```

**Critical Systems**:
```yaml
schemaCompatibilityStrategy: FullTransitive
```

### Schema Validation Enforcement

The `schemaValidationEnforced` field controls whether producers must provide a schema when publishing messages to topics within the namespace.

- **When enabled (`true`)**: Producers must provide a schema when publishing messages. Messages without schemas will be rejected. This ensures all data in the namespace has a defined structure and is recommended for production environments where data consistency is critical.

- **When disabled (`false`)**: Producers can publish messages with or without schemas. This allows for more flexibility in message publishing and is useful for development/testing environments or legacy integrations.

- **Default behavior**: If `schemaValidationEnforced` is not specified, the cluster's default schema validation enforcement setting will be used.

### Configuration Examples by Use Case

#### Development/Testing Environment
```yaml
schemaCompatibilityStrategy: AlwaysCompatible
schemaValidationEnforced: false
```
This configuration allows rapid schema iteration and flexible schema validation for experimentation.

#### Standard Production Environment
```yaml
schemaCompatibilityStrategy: Backward
schemaValidationEnforced: true
```
This provides a good balance between flexibility and safety, ensuring consumers can handle schema changes while enforcing schema validation for data consistency.

#### Mission-Critical Systems
```yaml
schemaCompatibilityStrategy: FullTransitive
schemaValidationEnforced: true
```
This configuration provides maximum compatibility guarantees with strict schema validation enforcement.

#### Legacy System Integration
```yaml
schemaCompatibilityStrategy: ForwardTransitive
schemaValidationEnforced: false
```
This ensures older systems can consume new data while allowing gradual migration without strict schema requirements.

## replicationClusters vs geoReplicationRefs

The `replicationClusters` and `geoReplicationRefs` fields serve different purposes in configuring replication for a Pulsar namespace:
Expand Down Expand Up @@ -105,6 +190,9 @@ spec:
backlogQuotaLimitTime: 24h
bundles: 16
messageTTL: 1h
# Schema management configuration
# schemaCompatibilityStrategy: Backward
# schemaValidationEnforced: true
# backlogQuotaRetentionPolicy: producer_request_hold
# maxProducersPerTopic: 2
# maxConsumersPerTopic: 2
Expand Down Expand Up @@ -144,7 +232,7 @@ Please note the following important points:

1. The fields `name` and `bundles` cannot be updated after the namespace is created. These are immutable properties of the namespace.

2. Other fields such as `backlogQuotaLimitSize`, `backlogQuotaLimitTime`, `messageTTL`, `maxProducersPerTopic`, `maxConsumersPerTopic`, `maxConsumersPerSubscription`, `retentionTime`, `retentionSize`, and `topicAutoCreationConfig` can be modified.
2. Other fields such as `backlogQuotaLimitSize`, `backlogQuotaLimitTime`, `messageTTL`, `maxProducersPerTopic`, `maxConsumersPerTopic`, `maxConsumersPerSubscription`, `retentionTime`, `retentionSize`, `topicAutoCreationConfig`, `schemaCompatibilityStrategy`, and `schemaValidationEnforced` can be modified.

3. If you want to change the `connectionRef`, ensure that the new PulsarConnection resource exists and is properly configured. Changing the `connectionRef` can have significant implications:

Expand Down
Loading