Skip to content

Commit 8d8c6f9

Browse files
authored
feat: enhance Topic with missing policies (#337)
* feat: add new topic policies including persistence, delivery, dispatch, publish, and inactive topic management * feat: add comprehensive tests for PulsarTopic policies including persistence, delayed delivery, dispatch rate, publish rate, and inactive topic policies * feat: add advanced topic-level policies to PulsarTopic documentation, including compaction, persistence, delayed delivery, dispatch rate, publish rate, and inactive topic management
1 parent 3a13e37 commit 8d8c6f9

File tree

10 files changed

+1014
-2
lines changed

10 files changed

+1014
-2
lines changed

api/v1alpha1/pulsartopic_types.go

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,42 @@ type PulsarTopicSpec struct {
136136
// When the topic reaches this size, compaction will be triggered automatically.
137137
// +optional
138138
CompactionThreshold *int64 `json:"compactionThreshold,omitempty"`
139+
140+
// PersistencePolicies defines the persistence configuration for the topic.
141+
// This controls how data is stored and replicated in BookKeeper.
142+
// +optional
143+
PersistencePolicies *PersistencePolicies `json:"persistencePolicies,omitempty"`
144+
145+
// DelayedDelivery defines the delayed delivery policy for the topic.
146+
// This allows messages to be delivered with a delay.
147+
// +optional
148+
DelayedDelivery *DelayedDeliveryData `json:"delayedDelivery,omitempty"`
149+
150+
// DispatchRate defines the message dispatch rate limiting policy for the topic.
151+
// This controls the rate at which messages are delivered to consumers.
152+
// +optional
153+
DispatchRate *DispatchRate `json:"dispatchRate,omitempty"`
154+
155+
// PublishRate defines the message publish rate limiting policy for the topic.
156+
// This controls the rate at which producers can publish messages.
157+
// +optional
158+
PublishRate *PublishRate `json:"publishRate,omitempty"`
159+
160+
// InactiveTopicPolicies defines the inactive topic cleanup policy for the topic.
161+
// This controls how inactive topics are automatically cleaned up.
162+
// +optional
163+
InactiveTopicPolicies *InactiveTopicPolicies `json:"inactiveTopicPolicies,omitempty"`
164+
}
165+
166+
// DelayedDeliveryData defines the delayed delivery policy for a topic
167+
type DelayedDeliveryData struct {
168+
// Active determines whether delayed delivery is enabled for the topic
169+
// +optional
170+
Active *bool `json:"active,omitempty"`
171+
172+
// TickTimeMillis specifies the tick time for delayed message delivery in milliseconds
173+
// +optional
174+
TickTimeMillis *int64 `json:"tickTimeMillis,omitempty"`
139175
}
140176

141177
// SchemaInfo defines the Pulsar Schema for a topic.

api/v1alpha1/zz_generated.deepcopy.go

Lines changed: 50 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

config/crd/bases/resource.streamnative.io_pulsartopics.yaml

Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,46 @@ spec:
126126
description: Deduplication controls whether to enable message deduplication
127127
for the topic.
128128
type: boolean
129+
delayedDelivery:
130+
description: |-
131+
DelayedDelivery defines the delayed delivery policy for the topic.
132+
This allows messages to be delivered with a delay.
133+
properties:
134+
active:
135+
description: Active determines whether delayed delivery is enabled
136+
for the topic
137+
type: boolean
138+
tickTimeMillis:
139+
description: TickTimeMillis specifies the tick time for delayed
140+
message delivery in milliseconds
141+
format: int64
142+
type: integer
143+
type: object
144+
dispatchRate:
145+
description: |-
146+
DispatchRate defines the message dispatch rate limiting policy for the topic.
147+
This controls the rate at which messages are delivered to consumers.
148+
properties:
149+
dispatchThrottlingRateInByte:
150+
description: |-
151+
DispatchThrottlingRateInByte specifies the maximum number of bytes per second allowed
152+
-1 means unlimited
153+
format: int64
154+
type: integer
155+
dispatchThrottlingRateInMsg:
156+
description: |-
157+
DispatchThrottlingRateInMsg specifies the maximum number of messages per second allowed
158+
-1 means unlimited
159+
format: int32
160+
type: integer
161+
ratePeriodInSecond:
162+
default: 1
163+
description: RatePeriodInSecond specifies the time window in seconds
164+
for rate calculation
165+
format: int32
166+
minimum: 1
167+
type: integer
168+
type: object
129169
geoReplicationRefs:
130170
description: |-
131171
GeoReplicationRefs is a list of references to PulsarGeoReplication resources,
@@ -151,6 +191,30 @@ spec:
151191
type: object
152192
x-kubernetes-map-type: atomic
153193
type: array
194+
inactiveTopicPolicies:
195+
description: |-
196+
InactiveTopicPolicies defines the inactive topic cleanup policy for the topic.
197+
This controls how inactive topics are automatically cleaned up.
198+
properties:
199+
deleteWhileInactive:
200+
description: DeleteWhileInactive specifies whether to delete topics
201+
while they are inactive
202+
type: boolean
203+
inactiveTopicDeleteMode:
204+
description: |-
205+
InactiveTopicDeleteMode specifies how inactive topics should be handled
206+
Valid values: "delete_when_no_subscriptions", "delete_when_subscriptions_caught_up"
207+
enum:
208+
- delete_when_no_subscriptions
209+
- delete_when_subscriptions_caught_up
210+
type: string
211+
maxInactiveDurationInSeconds:
212+
description: MaxInactiveDurationInSeconds specifies how long a
213+
topic can remain inactive before being deleted
214+
format: int32
215+
minimum: 1
216+
type: integer
217+
type: object
154218
lifecyclePolicy:
155219
description: |-
156220
LifecyclePolicy determines whether to keep or delete the Pulsar topic
@@ -196,12 +260,63 @@ spec:
196260
Set to 0 for a non-partitioned topic.
197261
format: int32
198262
type: integer
263+
persistencePolicies:
264+
description: |-
265+
PersistencePolicies defines the persistence configuration for the topic.
266+
This controls how data is stored and replicated in BookKeeper.
267+
properties:
268+
bookkeeperAckQuorum:
269+
description: |-
270+
BookkeeperAckQuorum specifies the number of replicas to wait for acknowledgment
271+
Must be <= BookkeeperWriteQuorum
272+
format: int32
273+
minimum: 1
274+
type: integer
275+
bookkeeperEnsemble:
276+
description: |-
277+
BookkeeperEnsemble specifies the number of bookies to use for a ledger
278+
This determines the replication factor for storing data
279+
format: int32
280+
minimum: 1
281+
type: integer
282+
bookkeeperWriteQuorum:
283+
description: |-
284+
BookkeeperWriteQuorum specifies the number of replicas to write for each entry
285+
Must be <= BookkeeperEnsemble
286+
format: int32
287+
minimum: 1
288+
type: integer
289+
managedLedgerMaxMarkDeleteRate:
290+
description: |-
291+
ManagedLedgerMaxMarkDeleteRate specifies the maximum rate at which mark-delete operations can be performed
292+
This helps control the rate of acknowledgment processing
293+
Value should be a decimal number as string (e.g., "1.5", "2.0")
294+
type: string
295+
type: object
199296
persistent:
200297
default: true
201298
description: |-
202299
Persistent determines if the topic is persistent (true) or non-persistent (false).
203300
Defaults to true if not specified.
204301
type: boolean
302+
publishRate:
303+
description: |-
304+
PublishRate defines the message publish rate limiting policy for the topic.
305+
This controls the rate at which producers can publish messages.
306+
properties:
307+
publishThrottlingRateInByte:
308+
description: |-
309+
PublishThrottlingRateInByte specifies the maximum number of bytes per second that producers can publish
310+
-1 means unlimited
311+
format: int64
312+
type: integer
313+
publishThrottlingRateInMsg:
314+
description: |-
315+
PublishThrottlingRateInMsg specifies the maximum number of messages per second that producers can publish
316+
-1 means unlimited
317+
format: int32
318+
type: integer
319+
type: object
205320
replicationClusters:
206321
description: |-
207322
ReplicationClusters is the list of clusters to which the topic is replicated

0 commit comments

Comments
 (0)