Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,4 @@ node_modules/
.chart-packages/

.cursor
.envrc
1 change: 1 addition & 0 deletions .licenserc.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -39,5 +39,6 @@ header:
- '**/*.json'
- '**/.helmignore'
- 'testbin/**'
- '.envrc'

comment: on-failure
8 changes: 6 additions & 2 deletions api/v1alpha1/pulsartopic_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,13 +85,17 @@ type PulsarTopicSpec struct {

// RetentionTime specifies the minimum time to retain messages on the topic.
// Should be set in conjunction with RetentionSize for effective retention policy.
// Retention Quota must exceed configured backlog quota for topic
// Retention Quota must exceed configured backlog quota for topic.
// Use "-1" for infinite retention time.
// Valid formats: "1h", "30m", "5s", "-1"
// +optional
RetentionTime *utils.Duration `json:"retentionTime,omitempty"`

// RetentionSize specifies the maximum size of backlog retained on the topic.
// Should be set in conjunction with RetentionTime for effective retention policy.
// Retention Quota must exceed configured backlog quota for topic
// Retention Quota must exceed configured backlog quota for topic.
// Use "-1" for infinite retention size.
// Valid formats: "1Gi", "500Mi", "100M", "-1"
// +optional
RetentionSize *resource.Quantity `json:"retentionSize,omitempty"`

Expand Down
8 changes: 6 additions & 2 deletions config/crd/bases/resource.streamnative.io_pulsartopics.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -434,14 +434,18 @@ spec:
description: |-
RetentionSize specifies the maximum size of backlog retained on the topic.
Should be set in conjunction with RetentionTime for effective retention policy.
Retention Quota must exceed configured backlog quota for topic
Retention Quota must exceed configured backlog quota for topic.
Use "-1" for infinite retention size.
Valid formats: "1Gi", "500Mi", "100M", "-1"
pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$
x-kubernetes-int-or-string: true
retentionTime:
description: |-
RetentionTime specifies the minimum time to retain messages on the topic.
Should be set in conjunction with RetentionSize for effective retention policy.
Retention Quota must exceed configured backlog quota for topic
Retention Quota must exceed configured backlog quota for topic.
Use "-1" for infinite retention time.
Valid formats: "1h", "30m", "5s", "-1"
type: string
schemaCompatibilityStrategy:
description: |-
Expand Down
50 changes: 46 additions & 4 deletions docs/pulsar_namespace.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ The `PulsarNamespace` resource defines a namespace in a Pulsar cluster. It allow
| `maxConsumersPerTopic` | Maximum number of consumers allowed on a single topic in the namespace. | No |
| `maxConsumersPerSubscription` | Maximum number of consumers allowed on a single subscription in the namespace. | No |
| `messageTTL` | Time to Live (TTL) for messages in the namespace. Messages older than this TTL will be automatically marked as consumed. | No |
| `retentionTime` | Minimum time to retain messages in the namespace. Should be set in conjunction with RetentionSize for effective retention policy. | No |
| `retentionSize` | Maximum size of backlog retained in the namespace. Should be set in conjunction with RetentionTime for effective retention policy. | No |
| `retentionTime` | Minimum time to retain messages in the namespace. Should be set in conjunction with RetentionSize for effective retention policy. Use "-1" for infinite retention time. | No |
| `retentionSize` | Maximum size of backlog retained in the namespace. Should be set in conjunction with RetentionTime for effective retention policy. Use "-1" for infinite retention size. | No |
| `backlogQuotaLimitTime` | Time limit for message backlog. Messages older than this limit will be removed or handled according to the retention policy. | No |
| `backlogQuotaLimitSize` | Size limit for message backlog. When the limit is reached, older messages will be removed or handled according to the retention policy. | No |
| `backlogQuotaRetentionPolicy` | Retention policy for messages when backlog quota is exceeded. Options: "producer_request_hold", "producer_exception", or "consumer_backlog_eviction". | No |
Expand Down Expand Up @@ -393,6 +393,48 @@ The `replicationClusters` and `geoReplicationRefs` fields serve different purpos

Choose `replicationClusters` for simpler, intra-instance replication, and `geoReplicationRefs` for more complex, inter-instance geo-replication scenarios. These fields are mutually exclusive; use only one depending on your replication requirements.

## Infinite Retention Configuration

The `retentionTime` and `retentionSize` fields support infinite retention by using the special value `"-1"`. This is equivalent to passing -1 to Pulsar admin APIs and provides unlimited retention capabilities for all topics within the namespace.

### Infinite Retention Time

To set infinite retention time for the namespace, use the value `"-1"` for the `retentionTime` field:

```yaml
spec:
retentionTime: "-1" # Messages will be retained indefinitely regardless of age
retentionSize: "100Gi" # Still limited by size
```

### Infinite Retention Size

To set infinite retention size for the namespace, use the value `"-1"` for the `retentionSize` field:

```yaml
spec:
retentionTime: "30d" # Still limited by time
retentionSize: "-1" # No size limit for message retention
```

### Complete Infinite Retention

For completely unlimited retention (both time and size), set both fields to `"-1"`:

```yaml
spec:
retentionTime: "-1" # Infinite time retention
retentionSize: "-1" # Infinite size retention
```

**Important Notes:**
- The `"-1"` value is case-sensitive and must be quoted in YAML
- Infinite retention should be used carefully as it can lead to unlimited storage consumption
- Retention quota must exceed configured backlog quota for the namespace
- These settings apply to all topics within the namespace by default
- Individual topics can override namespace-level retention settings
- Consider the storage and cost implications before enabling infinite retention

## Create A Pulsar Namespace

1. Define a namespace named `test-tenant/testns` by using the YAML file and save the YAML file `namespace.yaml`.
Expand Down Expand Up @@ -466,8 +508,8 @@ spec:
# maxProducersPerTopic: 2
# maxConsumersPerTopic: 2
# maxConsumersPerSubscription: 2
# retentionTime: 20h
# retentionSize: 2Gi
# retentionTime: 20h # or "-1" for infinite retention time
# retentionSize: 2Gi # or "-1" for infinite retention size
# lifecyclePolicy: CleanUpAfterDeletion
# topicAutoCreationConfig:
# allow: true
Expand Down
47 changes: 43 additions & 4 deletions docs/pulsar_topic.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ The `PulsarTopic` resource defines a topic in a Pulsar cluster. It allows you to
| `messageTTL` | Time to Live (TTL) for messages in the topic. Messages older than this TTL will be automatically marked as consumed. | No |
| `maxUnAckedMessagesPerConsumer` | Maximum number of unacknowledged messages allowed per consumer. | No |
| `maxUnAckedMessagesPerSubscription` | Maximum number of unacknowledged messages allowed per subscription. | No |
| `retentionTime` | Minimum time to retain messages in the topic. Should be set in conjunction with retentionSize for effective retention policy. | No |
| `retentionSize` | Maximum size of backlog retained in the topic. Should be set in conjunction with retentionTime for effective retention policy. | No |
| `retentionTime` | Minimum time to retain messages in the topic. Should be set in conjunction with retentionSize for effective retention policy. Use "-1" for infinite retention time. | No |
| `retentionSize` | Maximum size of backlog retained in the topic. Should be set in conjunction with retentionTime for effective retention policy. Use "-1" for infinite retention size. | No |
| `backlogQuotaLimitTime` | Time limit for message backlog. Messages older than this limit will be removed or handled according to the retention policy. | No |
| `backlogQuotaLimitSize` | Size limit for message backlog. When the limit is reached, older messages will be removed or handled according to the retention policy. | No |
| `backlogQuotaRetentionPolicy` | Retention policy for messages when backlog quota is exceeded. Options: "producer_request_hold", "producer_exception", or "consumer_backlog_eviction". | No |
Expand Down Expand Up @@ -68,6 +68,45 @@ The `replicationClusters` and `geoReplicationRefs` fields serve different purpos

Choose `replicationClusters` for simpler, intra-instance replication, and `geoReplicationRefs` for more complex, inter-instance geo-replication scenarios. These fields are mutually exclusive; use only one depending on your replication requirements.

## Infinite Retention Configuration

The `retentionTime` and `retentionSize` fields support infinite retention by using the special value `"-1"`. This is equivalent to passing -1 to Pulsar admin APIs and provides unlimited retention capabilities.

### Infinite Retention Time

To set infinite retention time, use the value `"-1"` for the `retentionTime` field:

```yaml
spec:
retentionTime: "-1" # Messages will be retained indefinitely regardless of age
retentionSize: "10Gi" # Still limited by size
```

### Infinite Retention Size

To set infinite retention size, use the value `"-1"` for the `retentionSize` field:

```yaml
spec:
retentionTime: "7d" # Still limited by time
retentionSize: "-1" # No size limit for message retention
```

### Complete Infinite Retention

For completely unlimited retention (both time and size), set both fields to `"-1"`:

```yaml
spec:
retentionTime: "-1" # Infinite time retention
retentionSize: "-1" # Infinite size retention
```

**Important Notes:**
- The `"-1"` value is case-sensitive and must be quoted in YAML
- Infinite retention should be used carefully as it can lead to unlimited storage consumption
- Retention quota must exceed configured backlog quota for the topic
- Consider the storage and cost implications before enabling infinite retention

## Create A Pulsar Topic

Expand All @@ -89,8 +128,8 @@ spec:
# messageTTL:
# maxUnAckedMessagesPerConsumer:
# maxUnAckedMessagesPerSubscription:
# retentionTime: 20h
# retentionSize: 2Gi
# retentionTime: 20h # or "-1" for infinite retention time
# retentionSize: 2Gi # or "-1" for infinite retention size
# backlogQuotaLimitTime: 24h
# backlogQuotaLimitSize: 1Gi
# backlogQuotaRetentionPolicy: producer_request_hold
Expand Down
36 changes: 26 additions & 10 deletions pkg/admin/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,14 +284,22 @@ func (p *PulsarAdminClient) applyTopicPolicies(topicName *utils.TopicName, param
retentionTime := -1
retentionSize := -1
if params.RetentionTime != nil {
t, err := params.RetentionTime.Parse()
if err != nil {
return err
if params.RetentionTime.IsInfinite() {
retentionTime = -1 // Infinite retention time
} else {
t, err := params.RetentionTime.Parse()
if err != nil {
return err
}
retentionTime = int(t.Minutes())
}
retentionTime = int(t.Minutes())
}
if params.RetentionSize != nil {
retentionSize = int(params.RetentionSize.ScaledValue(resource.Mega))
if rutils.IsInfiniteQuantity(params.RetentionSize) {
retentionSize = -1 // Infinite retention size
} else {
retentionSize = int(params.RetentionSize.ScaledValue(resource.Mega))
}
}
retentionPolicy := utils.NewRetentionPolicies(retentionTime, retentionSize)
err = p.adminClient.Topics().SetRetention(*topicName, retentionPolicy)
Expand Down Expand Up @@ -646,14 +654,22 @@ func (p *PulsarAdminClient) applyNamespacePolicies(completeNSName string, params
retentionTime := -1
retentionSize := -1
if params.RetentionTime != nil {
t, err := params.RetentionTime.Parse()
if err != nil {
return err
if params.RetentionTime.IsInfinite() {
retentionTime = -1 // Infinite retention time
} else {
t, err := params.RetentionTime.Parse()
if err != nil {
return err
}
retentionTime = int(t.Minutes())
}
retentionTime = int(t.Minutes())
}
if params.RetentionSize != nil {
retentionSize = int(params.RetentionSize.ScaledValue(resource.Mega))
if rutils.IsInfiniteQuantity(params.RetentionSize) {
retentionSize = -1 // Infinite retention size
} else {
retentionSize = int(params.RetentionSize.ScaledValue(resource.Mega))
}
}
retentionPolicy := utils.NewRetentionPolicies(retentionTime, retentionSize)
err = p.adminClient.Namespaces().SetRetention(completeNSName, retentionPolicy)
Expand Down
59 changes: 56 additions & 3 deletions pkg/utils/duration.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,74 @@
package utils

import (
"fmt"
"time"

str2duration "github.com/xhit/go-str2duration/v2"
)

// Duration represents a elapsed time in string.
// Supports standard duration formats (e.g., "1h", "30m", "5s") and special value "-1" for infinite duration.
type Duration string

const (
// InfiniteDurationValue represents the special value for infinite duration
InfiniteDurationValue = "-1"
)

// Parse parses a duration from string.
// Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h", "d", "w".
// Special value "-1" represents infinite duration and returns -1 nanosecond.
func (d *Duration) Parse() (time.Duration, error) {
var res time.Duration
res, err := str2duration.ParseDuration(string(*d))
s := string(*d)

// Handle infinite duration special case
if s == InfiniteDurationValue {
return time.Duration(-1), nil
}

// Parse normal duration
res, err := str2duration.ParseDuration(s)
if err != nil {
return res, err
return 0, fmt.Errorf("invalid duration format '%s': %w", s, err)
}

// Ensure duration is positive for non-infinite values
if res < 0 {
return 0, fmt.Errorf("duration must be positive or -1 for infinite, got: %s", s)
}

return res, nil
}

// IsInfinite returns true if the duration represents infinite duration ("-1").
func (d *Duration) IsInfinite() bool {
return string(*d) == InfiniteDurationValue
}

// ToSeconds returns the duration in seconds.
// Returns -1 for infinite duration, or the actual seconds for finite duration.
func (d *Duration) ToSeconds() (int64, error) {
if d.IsInfinite() {
return -1, nil
}

duration, err := d.Parse()
if err != nil {
return 0, err
}

return int64(duration.Seconds()), nil
}

// String returns the string representation of the duration.
func (d Duration) String() string {
return string(d)
}

// Validate validates the duration format.
// Accepts standard duration formats and the special value "-1".
func (d *Duration) Validate() error {
_, err := d.Parse()
return err
}
Loading