Skip to content

Commit 2aa26a3

Browse files
authored
feat: support infinite retention policy for PulsarTopic and PulsarNamespace (#339)
* feat: enhance retention policies in PulsarTopic and PulsarNamespace * fix lint
1 parent 13712f4 commit 2aa26a3

File tree

13 files changed

+907
-25
lines changed

13 files changed

+907
-25
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,3 +51,4 @@ node_modules/
5151
.chart-packages/
5252

5353
.cursor
54+
.envrc

.licenserc.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,5 +39,6 @@ header:
3939
- '**/*.json'
4040
- '**/.helmignore'
4141
- 'testbin/**'
42+
- '.envrc'
4243

4344
comment: on-failure

api/v1alpha1/pulsartopic_types.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -85,13 +85,17 @@ type PulsarTopicSpec struct {
8585

8686
// RetentionTime specifies the minimum time to retain messages on the topic.
8787
// Should be set in conjunction with RetentionSize for effective retention policy.
88-
// Retention Quota must exceed configured backlog quota for topic
88+
// Retention Quota must exceed configured backlog quota for topic.
89+
// Use "-1" for infinite retention time.
90+
// Valid formats: "1h", "30m", "5s", "-1"
8991
// +optional
9092
RetentionTime *utils.Duration `json:"retentionTime,omitempty"`
9193

9294
// RetentionSize specifies the maximum size of backlog retained on the topic.
9395
// Should be set in conjunction with RetentionTime for effective retention policy.
94-
// Retention Quota must exceed configured backlog quota for topic
96+
// Retention Quota must exceed configured backlog quota for topic.
97+
// Use "-1" for infinite retention size.
98+
// Valid formats: "1Gi", "500Mi", "100M", "-1"
9599
// +optional
96100
RetentionSize *resource.Quantity `json:"retentionSize,omitempty"`
97101

config/crd/bases/resource.streamnative.io_pulsartopics.yaml

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -434,14 +434,18 @@ spec:
434434
description: |-
435435
RetentionSize specifies the maximum size of backlog retained on the topic.
436436
Should be set in conjunction with RetentionTime for effective retention policy.
437-
Retention Quota must exceed configured backlog quota for topic
437+
Retention Quota must exceed configured backlog quota for topic.
438+
Use "-1" for infinite retention size.
439+
Valid formats: "1Gi", "500Mi", "100M", "-1"
438440
pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$
439441
x-kubernetes-int-or-string: true
440442
retentionTime:
441443
description: |-
442444
RetentionTime specifies the minimum time to retain messages on the topic.
443445
Should be set in conjunction with RetentionSize for effective retention policy.
444-
Retention Quota must exceed configured backlog quota for topic
446+
Retention Quota must exceed configured backlog quota for topic.
447+
Use "-1" for infinite retention time.
448+
Valid formats: "1h", "30m", "5s", "-1"
445449
type: string
446450
schemaCompatibilityStrategy:
447451
description: |-

docs/pulsar_namespace.md

Lines changed: 46 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,8 @@ The `PulsarNamespace` resource defines a namespace in a Pulsar cluster. It allow
1616
| `maxConsumersPerTopic` | Maximum number of consumers allowed on a single topic in the namespace. | No |
1717
| `maxConsumersPerSubscription` | Maximum number of consumers allowed on a single subscription in the namespace. | No |
1818
| `messageTTL` | Time to Live (TTL) for messages in the namespace. Messages older than this TTL will be automatically marked as consumed. | No |
19-
| `retentionTime` | Minimum time to retain messages in the namespace. Should be set in conjunction with RetentionSize for effective retention policy. | No |
20-
| `retentionSize` | Maximum size of backlog retained in the namespace. Should be set in conjunction with RetentionTime for effective retention policy. | No |
19+
| `retentionTime` | Minimum time to retain messages in the namespace. Should be set in conjunction with RetentionSize for effective retention policy. Use "-1" for infinite retention time. | No |
20+
| `retentionSize` | Maximum size of backlog retained in the namespace. Should be set in conjunction with RetentionTime for effective retention policy. Use "-1" for infinite retention size. | No |
2121
| `backlogQuotaLimitTime` | Time limit for message backlog. Messages older than this limit will be removed or handled according to the retention policy. | No |
2222
| `backlogQuotaLimitSize` | Size limit for message backlog. When the limit is reached, older messages will be removed or handled according to the retention policy. | No |
2323
| `backlogQuotaRetentionPolicy` | Retention policy for messages when backlog quota is exceeded. Options: "producer_request_hold", "producer_exception", or "consumer_backlog_eviction". | No |
@@ -393,6 +393,48 @@ The `replicationClusters` and `geoReplicationRefs` fields serve different purpos
393393

394394
Choose `replicationClusters` for simpler, intra-instance replication, and `geoReplicationRefs` for more complex, inter-instance geo-replication scenarios. These fields are mutually exclusive; use only one depending on your replication requirements.
395395

396+
## Infinite Retention Configuration
397+
398+
The `retentionTime` and `retentionSize` fields support infinite retention by using the special value `"-1"`. This is equivalent to passing -1 to Pulsar admin APIs and provides unlimited retention capabilities for all topics within the namespace.
399+
400+
### Infinite Retention Time
401+
402+
To set infinite retention time for the namespace, use the value `"-1"` for the `retentionTime` field:
403+
404+
```yaml
405+
spec:
406+
retentionTime: "-1" # Messages will be retained indefinitely regardless of age
407+
retentionSize: "100Gi" # Still limited by size
408+
```
409+
410+
### Infinite Retention Size
411+
412+
To set infinite retention size for the namespace, use the value `"-1"` for the `retentionSize` field:
413+
414+
```yaml
415+
spec:
416+
retentionTime: "30d" # Still limited by time
417+
retentionSize: "-1" # No size limit for message retention
418+
```
419+
420+
### Complete Infinite Retention
421+
422+
For completely unlimited retention (both time and size), set both fields to `"-1"`:
423+
424+
```yaml
425+
spec:
426+
retentionTime: "-1" # Infinite time retention
427+
retentionSize: "-1" # Infinite size retention
428+
```
429+
430+
**Important Notes:**
431+
- The `"-1"` value is case-sensitive and must be quoted in YAML
432+
- Infinite retention should be used carefully as it can lead to unlimited storage consumption
433+
- Retention quota must exceed configured backlog quota for the namespace
434+
- These settings apply to all topics within the namespace by default
435+
- Individual topics can override namespace-level retention settings
436+
- Consider the storage and cost implications before enabling infinite retention
437+
396438
## Create A Pulsar Namespace
397439

398440
1. Define a namespace named `test-tenant/testns` by using the YAML file and save the YAML file `namespace.yaml`.
@@ -466,8 +508,8 @@ spec:
466508
# maxProducersPerTopic: 2
467509
# maxConsumersPerTopic: 2
468510
# maxConsumersPerSubscription: 2
469-
# retentionTime: 20h
470-
# retentionSize: 2Gi
511+
# retentionTime: 20h # or "-1" for infinite retention time
512+
# retentionSize: 2Gi # or "-1" for infinite retention size
471513
# lifecyclePolicy: CleanUpAfterDeletion
472514
# topicAutoCreationConfig:
473515
# allow: true

docs/pulsar_topic.md

Lines changed: 43 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@ The `PulsarTopic` resource defines a topic in a Pulsar cluster. It allows you to
1919
| `messageTTL` | Time to Live (TTL) for messages in the topic. Messages older than this TTL will be automatically marked as consumed. | No |
2020
| `maxUnAckedMessagesPerConsumer` | Maximum number of unacknowledged messages allowed per consumer. | No |
2121
| `maxUnAckedMessagesPerSubscription` | Maximum number of unacknowledged messages allowed per subscription. | No |
22-
| `retentionTime` | Minimum time to retain messages in the topic. Should be set in conjunction with retentionSize for effective retention policy. | No |
23-
| `retentionSize` | Maximum size of backlog retained in the topic. Should be set in conjunction with retentionTime for effective retention policy. | No |
22+
| `retentionTime` | Minimum time to retain messages in the topic. Should be set in conjunction with retentionSize for effective retention policy. Use "-1" for infinite retention time. | No |
23+
| `retentionSize` | Maximum size of backlog retained in the topic. Should be set in conjunction with retentionTime for effective retention policy. Use "-1" for infinite retention size. | No |
2424
| `backlogQuotaLimitTime` | Time limit for message backlog. Messages older than this limit will be removed or handled according to the retention policy. | No |
2525
| `backlogQuotaLimitSize` | Size limit for message backlog. When the limit is reached, older messages will be removed or handled according to the retention policy. | No |
2626
| `backlogQuotaRetentionPolicy` | Retention policy for messages when backlog quota is exceeded. Options: "producer_request_hold", "producer_exception", or "consumer_backlog_eviction". | No |
@@ -68,6 +68,45 @@ The `replicationClusters` and `geoReplicationRefs` fields serve different purpos
6868

6969
Choose `replicationClusters` for simpler, intra-instance replication, and `geoReplicationRefs` for more complex, inter-instance geo-replication scenarios. These fields are mutually exclusive; use only one depending on your replication requirements.
7070

71+
## Infinite Retention Configuration
72+
73+
The `retentionTime` and `retentionSize` fields support infinite retention by using the special value `"-1"`. This is equivalent to passing -1 to Pulsar admin APIs and provides unlimited retention capabilities.
74+
75+
### Infinite Retention Time
76+
77+
To set infinite retention time, use the value `"-1"` for the `retentionTime` field:
78+
79+
```yaml
80+
spec:
81+
retentionTime: "-1" # Messages will be retained indefinitely regardless of age
82+
retentionSize: "10Gi" # Still limited by size
83+
```
84+
85+
### Infinite Retention Size
86+
87+
To set infinite retention size, use the value `"-1"` for the `retentionSize` field:
88+
89+
```yaml
90+
spec:
91+
retentionTime: "7d" # Still limited by time
92+
retentionSize: "-1" # No size limit for message retention
93+
```
94+
95+
### Complete Infinite Retention
96+
97+
For completely unlimited retention (both time and size), set both fields to `"-1"`:
98+
99+
```yaml
100+
spec:
101+
retentionTime: "-1" # Infinite time retention
102+
retentionSize: "-1" # Infinite size retention
103+
```
104+
105+
**Important Notes:**
106+
- The `"-1"` value is case-sensitive and must be quoted in YAML
107+
- Infinite retention should be used carefully as it can lead to unlimited storage consumption
108+
- Retention quota must exceed configured backlog quota for the topic
109+
- Consider the storage and cost implications before enabling infinite retention
71110

72111
## Create A Pulsar Topic
73112

@@ -89,8 +128,8 @@ spec:
89128
# messageTTL:
90129
# maxUnAckedMessagesPerConsumer:
91130
# maxUnAckedMessagesPerSubscription:
92-
# retentionTime: 20h
93-
# retentionSize: 2Gi
131+
# retentionTime: 20h # or "-1" for infinite retention time
132+
# retentionSize: 2Gi # or "-1" for infinite retention size
94133
# backlogQuotaLimitTime: 24h
95134
# backlogQuotaLimitSize: 1Gi
96135
# backlogQuotaRetentionPolicy: producer_request_hold

pkg/admin/impl.go

Lines changed: 26 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -284,14 +284,22 @@ func (p *PulsarAdminClient) applyTopicPolicies(topicName *utils.TopicName, param
284284
retentionTime := -1
285285
retentionSize := -1
286286
if params.RetentionTime != nil {
287-
t, err := params.RetentionTime.Parse()
288-
if err != nil {
289-
return err
287+
if params.RetentionTime.IsInfinite() {
288+
retentionTime = -1 // Infinite retention time
289+
} else {
290+
t, err := params.RetentionTime.Parse()
291+
if err != nil {
292+
return err
293+
}
294+
retentionTime = int(t.Minutes())
290295
}
291-
retentionTime = int(t.Minutes())
292296
}
293297
if params.RetentionSize != nil {
294-
retentionSize = int(params.RetentionSize.ScaledValue(resource.Mega))
298+
if rutils.IsInfiniteQuantity(params.RetentionSize) {
299+
retentionSize = -1 // Infinite retention size
300+
} else {
301+
retentionSize = int(params.RetentionSize.ScaledValue(resource.Mega))
302+
}
295303
}
296304
retentionPolicy := utils.NewRetentionPolicies(retentionTime, retentionSize)
297305
err = p.adminClient.Topics().SetRetention(*topicName, retentionPolicy)
@@ -646,14 +654,22 @@ func (p *PulsarAdminClient) applyNamespacePolicies(completeNSName string, params
646654
retentionTime := -1
647655
retentionSize := -1
648656
if params.RetentionTime != nil {
649-
t, err := params.RetentionTime.Parse()
650-
if err != nil {
651-
return err
657+
if params.RetentionTime.IsInfinite() {
658+
retentionTime = -1 // Infinite retention time
659+
} else {
660+
t, err := params.RetentionTime.Parse()
661+
if err != nil {
662+
return err
663+
}
664+
retentionTime = int(t.Minutes())
652665
}
653-
retentionTime = int(t.Minutes())
654666
}
655667
if params.RetentionSize != nil {
656-
retentionSize = int(params.RetentionSize.ScaledValue(resource.Mega))
668+
if rutils.IsInfiniteQuantity(params.RetentionSize) {
669+
retentionSize = -1 // Infinite retention size
670+
} else {
671+
retentionSize = int(params.RetentionSize.ScaledValue(resource.Mega))
672+
}
657673
}
658674
retentionPolicy := utils.NewRetentionPolicies(retentionTime, retentionSize)
659675
err = p.adminClient.Namespaces().SetRetention(completeNSName, retentionPolicy)

pkg/utils/duration.go

Lines changed: 56 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,21 +15,74 @@
1515
package utils
1616

1717
import (
18+
"fmt"
1819
"time"
1920

2021
str2duration "github.com/xhit/go-str2duration/v2"
2122
)
2223

2324
// Duration represents a elapsed time in string.
25+
// Supports standard duration formats (e.g., "1h", "30m", "5s") and special value "-1" for infinite duration.
2426
type Duration string
2527

28+
const (
29+
// InfiniteDurationValue represents the special value for infinite duration
30+
InfiniteDurationValue = "-1"
31+
)
32+
2633
// Parse parses a duration from string.
2734
// Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h", "d", "w".
35+
// Special value "-1" represents infinite duration and returns -1 nanosecond.
2836
func (d *Duration) Parse() (time.Duration, error) {
29-
var res time.Duration
30-
res, err := str2duration.ParseDuration(string(*d))
37+
s := string(*d)
38+
39+
// Handle infinite duration special case
40+
if s == InfiniteDurationValue {
41+
return time.Duration(-1), nil
42+
}
43+
44+
// Parse normal duration
45+
res, err := str2duration.ParseDuration(s)
3146
if err != nil {
32-
return res, err
47+
return 0, fmt.Errorf("invalid duration format '%s': %w", s, err)
3348
}
49+
50+
// Ensure duration is positive for non-infinite values
51+
if res < 0 {
52+
return 0, fmt.Errorf("duration must be positive or -1 for infinite, got: %s", s)
53+
}
54+
3455
return res, nil
3556
}
57+
58+
// IsInfinite returns true if the duration represents infinite duration ("-1").
59+
func (d *Duration) IsInfinite() bool {
60+
return string(*d) == InfiniteDurationValue
61+
}
62+
63+
// ToSeconds returns the duration in seconds.
64+
// Returns -1 for infinite duration, or the actual seconds for finite duration.
65+
func (d *Duration) ToSeconds() (int64, error) {
66+
if d.IsInfinite() {
67+
return -1, nil
68+
}
69+
70+
duration, err := d.Parse()
71+
if err != nil {
72+
return 0, err
73+
}
74+
75+
return int64(duration.Seconds()), nil
76+
}
77+
78+
// String returns the string representation of the duration.
79+
func (d Duration) String() string {
80+
return string(d)
81+
}
82+
83+
// Validate validates the duration format.
84+
// Accepts standard duration formats and the special value "-1".
85+
func (d *Duration) Validate() error {
86+
_, err := d.Parse()
87+
return err
88+
}

0 commit comments

Comments
 (0)