diff --git a/api/v1alpha1/pulsartopic_types.go b/api/v1alpha1/pulsartopic_types.go index ca3a7688..872581e2 100644 --- a/api/v1alpha1/pulsartopic_types.go +++ b/api/v1alpha1/pulsartopic_types.go @@ -136,6 +136,42 @@ type PulsarTopicSpec struct { // When the topic reaches this size, compaction will be triggered automatically. // +optional CompactionThreshold *int64 `json:"compactionThreshold,omitempty"` + + // PersistencePolicies defines the persistence configuration for the topic. + // This controls how data is stored and replicated in BookKeeper. + // +optional + PersistencePolicies *PersistencePolicies `json:"persistencePolicies,omitempty"` + + // DelayedDelivery defines the delayed delivery policy for the topic. + // This allows messages to be delivered with a delay. + // +optional + DelayedDelivery *DelayedDeliveryData `json:"delayedDelivery,omitempty"` + + // DispatchRate defines the message dispatch rate limiting policy for the topic. + // This controls the rate at which messages are delivered to consumers. + // +optional + DispatchRate *DispatchRate `json:"dispatchRate,omitempty"` + + // PublishRate defines the message publish rate limiting policy for the topic. + // This controls the rate at which producers can publish messages. + // +optional + PublishRate *PublishRate `json:"publishRate,omitempty"` + + // InactiveTopicPolicies defines the inactive topic cleanup policy for the topic. + // This controls how inactive topics are automatically cleaned up. + // +optional + InactiveTopicPolicies *InactiveTopicPolicies `json:"inactiveTopicPolicies,omitempty"` +} + +// DelayedDeliveryData defines the delayed delivery policy for a topic +type DelayedDeliveryData struct { + // Active determines whether delayed delivery is enabled for the topic + // +optional + Active *bool `json:"active,omitempty"` + + // TickTimeMillis specifies the tick time for delayed message delivery in milliseconds + // +optional + TickTimeMillis *int64 `json:"tickTimeMillis,omitempty"` } // SchemaInfo defines the Pulsar Schema for a topic. diff --git a/api/v1alpha1/zz_generated.deepcopy.go b/api/v1alpha1/zz_generated.deepcopy.go index 3733f7b8..54b3eea0 100644 --- a/api/v1alpha1/zz_generated.deepcopy.go +++ b/api/v1alpha1/zz_generated.deepcopy.go @@ -808,6 +808,31 @@ func (in *CryptoConfig) DeepCopy() *CryptoConfig { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *DelayedDeliveryData) DeepCopyInto(out *DelayedDeliveryData) { + *out = *in + if in.Active != nil { + in, out := &in.Active, &out.Active + *out = new(bool) + **out = **in + } + if in.TickTimeMillis != nil { + in, out := &in.TickTimeMillis, &out.TickTimeMillis + *out = new(int64) + **out = **in + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new DelayedDeliveryData. +func (in *DelayedDeliveryData) DeepCopy() *DelayedDeliveryData { + if in == nil { + return nil + } + out := new(DelayedDeliveryData) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *DispatchRate) DeepCopyInto(out *DispatchRate) { *out = *in @@ -2922,6 +2947,31 @@ func (in *PulsarTopicSpec) DeepCopyInto(out *PulsarTopicSpec) { *out = new(int64) **out = **in } + if in.PersistencePolicies != nil { + in, out := &in.PersistencePolicies, &out.PersistencePolicies + *out = new(PersistencePolicies) + (*in).DeepCopyInto(*out) + } + if in.DelayedDelivery != nil { + in, out := &in.DelayedDelivery, &out.DelayedDelivery + *out = new(DelayedDeliveryData) + (*in).DeepCopyInto(*out) + } + if in.DispatchRate != nil { + in, out := &in.DispatchRate, &out.DispatchRate + *out = new(DispatchRate) + (*in).DeepCopyInto(*out) + } + if in.PublishRate != nil { + in, out := &in.PublishRate, &out.PublishRate + *out = new(PublishRate) + (*in).DeepCopyInto(*out) + } + if in.InactiveTopicPolicies != nil { + in, out := &in.InactiveTopicPolicies, &out.InactiveTopicPolicies + *out = new(InactiveTopicPolicies) + (*in).DeepCopyInto(*out) + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PulsarTopicSpec. diff --git a/config/crd/bases/resource.streamnative.io_pulsartopics.yaml b/config/crd/bases/resource.streamnative.io_pulsartopics.yaml index 84870a8e..91996d86 100644 --- a/config/crd/bases/resource.streamnative.io_pulsartopics.yaml +++ b/config/crd/bases/resource.streamnative.io_pulsartopics.yaml @@ -126,6 +126,46 @@ spec: description: Deduplication controls whether to enable message deduplication for the topic. type: boolean + delayedDelivery: + description: |- + DelayedDelivery defines the delayed delivery policy for the topic. + This allows messages to be delivered with a delay. + properties: + active: + description: Active determines whether delayed delivery is enabled + for the topic + type: boolean + tickTimeMillis: + description: TickTimeMillis specifies the tick time for delayed + message delivery in milliseconds + format: int64 + type: integer + type: object + dispatchRate: + description: |- + DispatchRate defines the message dispatch rate limiting policy for the topic. + This controls the rate at which messages are delivered to consumers. + properties: + dispatchThrottlingRateInByte: + description: |- + DispatchThrottlingRateInByte specifies the maximum number of bytes per second allowed + -1 means unlimited + format: int64 + type: integer + dispatchThrottlingRateInMsg: + description: |- + DispatchThrottlingRateInMsg specifies the maximum number of messages per second allowed + -1 means unlimited + format: int32 + type: integer + ratePeriodInSecond: + default: 1 + description: RatePeriodInSecond specifies the time window in seconds + for rate calculation + format: int32 + minimum: 1 + type: integer + type: object geoReplicationRefs: description: |- GeoReplicationRefs is a list of references to PulsarGeoReplication resources, @@ -151,6 +191,30 @@ spec: type: object x-kubernetes-map-type: atomic type: array + inactiveTopicPolicies: + description: |- + InactiveTopicPolicies defines the inactive topic cleanup policy for the topic. + This controls how inactive topics are automatically cleaned up. + properties: + deleteWhileInactive: + description: DeleteWhileInactive specifies whether to delete topics + while they are inactive + type: boolean + inactiveTopicDeleteMode: + description: |- + InactiveTopicDeleteMode specifies how inactive topics should be handled + Valid values: "delete_when_no_subscriptions", "delete_when_subscriptions_caught_up" + enum: + - delete_when_no_subscriptions + - delete_when_subscriptions_caught_up + type: string + maxInactiveDurationInSeconds: + description: MaxInactiveDurationInSeconds specifies how long a + topic can remain inactive before being deleted + format: int32 + minimum: 1 + type: integer + type: object lifecyclePolicy: description: |- LifecyclePolicy determines whether to keep or delete the Pulsar topic @@ -196,12 +260,63 @@ spec: Set to 0 for a non-partitioned topic. format: int32 type: integer + persistencePolicies: + description: |- + PersistencePolicies defines the persistence configuration for the topic. + This controls how data is stored and replicated in BookKeeper. + properties: + bookkeeperAckQuorum: + description: |- + BookkeeperAckQuorum specifies the number of replicas to wait for acknowledgment + Must be <= BookkeeperWriteQuorum + format: int32 + minimum: 1 + type: integer + bookkeeperEnsemble: + description: |- + BookkeeperEnsemble specifies the number of bookies to use for a ledger + This determines the replication factor for storing data + format: int32 + minimum: 1 + type: integer + bookkeeperWriteQuorum: + description: |- + BookkeeperWriteQuorum specifies the number of replicas to write for each entry + Must be <= BookkeeperEnsemble + format: int32 + minimum: 1 + type: integer + managedLedgerMaxMarkDeleteRate: + description: |- + ManagedLedgerMaxMarkDeleteRate specifies the maximum rate at which mark-delete operations can be performed + This helps control the rate of acknowledgment processing + Value should be a decimal number as string (e.g., "1.5", "2.0") + type: string + type: object persistent: default: true description: |- Persistent determines if the topic is persistent (true) or non-persistent (false). Defaults to true if not specified. type: boolean + publishRate: + description: |- + PublishRate defines the message publish rate limiting policy for the topic. + This controls the rate at which producers can publish messages. + properties: + publishThrottlingRateInByte: + description: |- + PublishThrottlingRateInByte specifies the maximum number of bytes per second that producers can publish + -1 means unlimited + format: int64 + type: integer + publishThrottlingRateInMsg: + description: |- + PublishThrottlingRateInMsg specifies the maximum number of messages per second that producers can publish + -1 means unlimited + format: int32 + type: integer + type: object replicationClusters: description: |- ReplicationClusters is the list of clusters to which the topic is replicated diff --git a/docs/pulsar_topic.md b/docs/pulsar_topic.md index 74d19f24..76788111 100644 --- a/docs/pulsar_topic.md +++ b/docs/pulsar_topic.md @@ -29,6 +29,12 @@ The `PulsarTopic` resource defines a topic in a Pulsar cluster. It allows you to | `geoReplicationRefs` | List of references to PulsarGeoReplication resources, used to enable geo-replication at the topic level. | No | | `replicationClusters` | List of clusters to which the topic is replicated. Use only if replicating clusters within the same Pulsar instance. | No | | `deduplication` | whether to enable message deduplication for the topic. | No | +| `compactionThreshold` | Size threshold in bytes for automatic topic compaction. When the topic reaches this size, compaction will be triggered automatically. | No | +| `persistencePolicies` | Persistence configuration for the topic, controlling how data is stored and replicated in BookKeeper. See [persistencePolicies](#persistencePolicies) for more details. | No | +| `delayedDelivery` | Delayed delivery policy for the topic, allowing messages to be delivered with a delay. See [delayedDelivery](#delayedDelivery) for more details. | No | +| `dispatchRate` | Message dispatch rate limiting policy for the topic, controlling the rate at which messages are delivered to consumers. See [dispatchRate](#dispatchRate) for more details. | No | +| `publishRate` | Message publish rate limiting policy for the topic, controlling the rate at which producers can publish messages. See [publishRate](#publishRate) for more details. | No | +| `inactiveTopicPolicies` | Inactive topic cleanup policy for the topic, controlling how inactive topics are automatically cleaned up. See [inactiveTopicPolicies](#inactiveTopicPolicies) for more details. | No | Note: Valid time units for duration fields are "s" (seconds), "m" (minutes), "h" (hours), "d" (days), "w" (weeks). @@ -77,6 +83,26 @@ spec: # backlogQuotaLimitSize: 1Gi # backlogQuotaRetentionPolicy: producer_request_hold # lifecyclePolicy: CleanUpAfterDeletion +# compactionThreshold: 104857600 # 100MB +# persistencePolicies: +# bookkeeperEnsemble: 3 +# bookkeeperWriteQuorum: 2 +# bookkeeperAckQuorum: 2 +# managedLedgerMaxMarkDeleteRate: "1.0" +# delayedDelivery: +# active: true +# tickTimeMillis: 1000 +# dispatchRate: +# dispatchThrottlingRateInMsg: 1000 +# dispatchThrottlingRateInByte: 1048576 +# ratePeriodInSecond: 1 +# publishRate: +# publishThrottlingRateInMsg: 2000 +# publishThrottlingRateInByte: 2097152 +# inactiveTopicPolicies: +# inactiveTopicDeleteMode: "delete_when_no_subscriptions" +# maxInactiveDurationInSeconds: 3600 +# deleteWhileInactive: true ``` 2. Apply the YAML file to create the topic. @@ -104,7 +130,7 @@ Important notes when updating a Pulsar topic: 1. The fields `name` and `persistent` are immutable and cannot be updated after the topic is created. -2. Other fields such as `partitions`, `maxProducers`, `maxConsumers`, `messageTTL`, `retentionTime`, `retentionSize`, `backlogQuotaLimitTime`, `backlogQuotaLimitSize`, and `backlogQuotaRetentionPolicy` can be modified. +2. Other fields such as `partitions`, `maxProducers`, `maxConsumers`, `messageTTL`, `retentionTime`, `retentionSize`, `backlogQuotaLimitTime`, `backlogQuotaLimitSize`, `backlogQuotaRetentionPolicy`, `compactionThreshold`, `persistencePolicies`, `delayedDelivery`, `dispatchRate`, `publishRate`, and `inactiveTopicPolicies` can be modified. 3. If you want to change the `connectionRef`, ensure that the new PulsarConnection resource exists and is properly configured. Changing the `connectionRef` can have significant implications: @@ -142,6 +168,103 @@ If you want to delete the topic in the pulsar cluster, you can use the following pulsarctl topics delete persistent://test-tenant/testns/topic123 ``` +## Topic-Level Policies + +The PulsarTopic resource supports several advanced topic-level policies that provide fine-grained control over topic behavior. + +### persistencePolicies + +The `persistencePolicies` field configures how data is stored and replicated in BookKeeper, the storage layer for Pulsar. + +| Field | Description | Type | Required | +|-------|-------------|------|----------| +| `bookkeeperEnsemble` | Number of bookies to store ledger data across | int32 | No | +| `bookkeeperWriteQuorum` | Number of replicas to write for each ledger entry | int32 | No | +| `bookkeeperAckQuorum` | Number of replicas that must acknowledge writes | int32 | No | +| `managedLedgerMaxMarkDeleteRate` | Rate limit for mark-delete operations as a string (e.g., "1.0", "2.5") | string | No | + +**Example:** +```yaml +spec: + persistencePolicies: + bookkeeperEnsemble: 3 + bookkeeperWriteQuorum: 2 + bookkeeperAckQuorum: 2 + managedLedgerMaxMarkDeleteRate: "1.5" +``` + +### delayedDelivery + +The `delayedDelivery` field configures delayed message delivery, allowing messages to be delivered after a specified delay. + +| Field | Description | Type | Required | +|-------|-------------|------|----------| +| `active` | Whether delayed delivery is enabled for the topic | bool | No | +| `tickTimeMillis` | Tick time for delayed message delivery in milliseconds | int64 | No | + +**Example:** +```yaml +spec: + delayedDelivery: + active: true + tickTimeMillis: 1000 # 1 second +``` + +### dispatchRate + +The `dispatchRate` field configures rate limiting for message delivery to consumers. + +| Field | Description | Type | Required | +|-------|-------------|------|----------| +| `dispatchThrottlingRateInMsg` | Maximum number of messages dispatched per rate period | int32 | No | +| `dispatchThrottlingRateInByte` | Maximum number of bytes dispatched per rate period | int64 | No | +| `ratePeriodInSecond` | Rate period in seconds | int32 | No | + +**Example:** +```yaml +spec: + dispatchRate: + dispatchThrottlingRateInMsg: 1000 + dispatchThrottlingRateInByte: 1048576 # 1MB + ratePeriodInSecond: 1 +``` + +### publishRate + +The `publishRate` field configures rate limiting for message publishing from producers. + +| Field | Description | Type | Required | +|-------|-------------|------|----------| +| `publishThrottlingRateInMsg` | Maximum number of messages published per rate period | int32 | No | +| `publishThrottlingRateInByte` | Maximum number of bytes published per rate period | int64 | No | + +**Example:** +```yaml +spec: + publishRate: + publishThrottlingRateInMsg: 2000 + publishThrottlingRateInByte: 2097152 # 2MB +``` + +### inactiveTopicPolicies + +The `inactiveTopicPolicies` field configures automatic cleanup of inactive topics. + +| Field | Description | Type | Required | +|-------|-------------|------|----------| +| `inactiveTopicDeleteMode` | How to delete inactive topics: "delete_when_no_subscriptions" or "delete_when_subscriptions_caught_up" | string | No | +| `maxInactiveDurationInSeconds` | Maximum time in seconds a topic can be inactive before deletion | int32 | No | +| `deleteWhileInactive` | Whether to delete the topic while it's inactive | bool | No | + +**Example:** +```yaml +spec: + inactiveTopicPolicies: + inactiveTopicDeleteMode: "delete_when_no_subscriptions" + maxInactiveDurationInSeconds: 3600 # 1 hour + deleteWhileInactive: true +``` + ## SchemaInfo The `schemaInfo` field in the PulsarTopic specification allows you to define the schema for the topic. For more details about Pulsar schemas, refer to the [official documentation](https://pulsar.apache.org/docs/schema-understand/). @@ -204,4 +327,79 @@ spec: "owner": "pulsar" ``` -This example defines a JSON schema with two fields, `ID` and `Name`, both of which are required. The `type` field is set to `JSON`, indicating that the schema is in JSON format. The `schema` field contains the actual JSON schema definition. The `properties` field is optional and can be used to add any application-specific logic. \ No newline at end of file +This example defines a JSON schema with two fields, `ID` and `Name`, both of which are required. The `type` field is set to `JSON`, indicating that the schema is in JSON format. The `schema` field contains the actual JSON schema definition. The `properties` field is optional and can be used to add any application-specific logic. + +## Complete Example with Advanced Policies + +Here's a comprehensive example of a PulsarTopic resource that demonstrates the use of the new topic-level policies: + +```yaml +apiVersion: resource.streamnative.io/v1alpha1 +kind: PulsarTopic +metadata: + name: "advanced-pulsar-topic" + namespace: production +spec: + name: persistent://production-tenant/high-throughput/events + connectionRef: + name: "production-pulsar-connection" + persistent: true + partitions: 4 + maxProducers: 10 + maxConsumers: 20 + messageTTL: 7d + retentionTime: 30d + retentionSize: 100Gi + compactionThreshold: 1073741824 # 1GB + + # Advanced persistence configuration for high availability + persistencePolicies: + bookkeeperEnsemble: 5 + bookkeeperWriteQuorum: 3 + bookkeeperAckQuorum: 2 + managedLedgerMaxMarkDeleteRate: "2.0" + + # Enable delayed delivery for scheduled messages + delayedDelivery: + active: true + tickTimeMillis: 1000 + + # Rate limiting for consumer message delivery + dispatchRate: + dispatchThrottlingRateInMsg: 10000 + dispatchThrottlingRateInByte: 10485760 # 10MB + ratePeriodInSecond: 1 + + # Rate limiting for producer message publishing + publishRate: + publishThrottlingRateInMsg: 5000 + publishThrottlingRateInByte: 5242880 # 5MB + + # Automatic cleanup of inactive topics + inactiveTopicPolicies: + inactiveTopicDeleteMode: "delete_when_no_subscriptions" + maxInactiveDurationInSeconds: 86400 # 24 hours + deleteWhileInactive: false + + # Message deduplication + deduplication: true + + # Schema definition + schemaInfo: + type: "JSON" + schema: "{\"type\":\"record\",\"name\":\"Event\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"timestamp\",\"type\":\"long\"},{\"name\":\"data\",\"type\":\"string\"}]}" + properties: + "version": "1.0" + "owner": "data-platform-team" + + lifecyclePolicy: CleanUpAfterDeletion +``` + +This example demonstrates a production-ready topic configuration with: +- High availability persistence settings (5 bookies ensemble, 3 write quorum) +- Rate limiting for both producers and consumers +- Delayed delivery capability for scheduled messages +- Automatic topic compaction at 1GB +- Inactive topic cleanup after 24 hours +- Message deduplication enabled +- JSON schema enforcement \ No newline at end of file diff --git a/go.work.sum b/go.work.sum index e30cc6d3..141cfed7 100644 --- a/go.work.sum +++ b/go.work.sum @@ -1469,6 +1469,7 @@ github.com/intel/goresctrl v0.3.0/go.mod h1:fdz3mD85cmP9sHD8JUlrNWAxvwM86CrbmVXl github.com/intel/goresctrl v0.5.0 h1:kcDhjE3ZF/mNrJuRzLS3LY2Hp6atFaF1XVFBT7SVL2g= github.com/intel/goresctrl v0.5.0/go.mod h1:mIe63ggylWYr0cU/l8n11FAkesqfvuP3oktIsxvu0T0= github.com/jawher/mow.cli v1.2.0 h1:e6ViPPy+82A/NFF/cfbq3Lr6q4JHKT9tyHwTCcUQgQw= +github.com/jawher/mow.cli v1.2.0/go.mod h1:y+pcA3jBAdo/GIZx/0rFjw/K2bVEODP9rfZOfaiq8Ko= github.com/jessevdk/go-flags v1.4.0 h1:4IU2WS7AumrZ/40jfhf4QVDMsQwqA7VEHozFRrGARJA= github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= github.com/jmespath/go-jmespath v0.3.0 h1:OS12ieG61fsCg5+qLJ+SsW9NicxNkg3b25OyT2yCeUc= diff --git a/pkg/admin/impl.go b/pkg/admin/impl.go index c3285f7d..3405752b 100644 --- a/pkg/admin/impl.go +++ b/pkg/admin/impl.go @@ -303,6 +303,84 @@ func (p *PulsarAdminClient) applyTopicPolicies(topicName *utils.TopicName, param } } + // Handle persistence policies + if params.PersistencePolicies != nil { + // Parse ManagedLedgerMaxMarkDeleteRate from string to float64 + var markDeleteRate float64 + if params.PersistencePolicies.ManagedLedgerMaxMarkDeleteRate != nil { + var err error + markDeleteRate, err = strconv.ParseFloat(*params.PersistencePolicies.ManagedLedgerMaxMarkDeleteRate, 64) + if err != nil { + return err + } + } + + persistenceData := utils.PersistenceData{ + BookkeeperEnsemble: int64(*params.PersistencePolicies.BookkeeperEnsemble), + BookkeeperWriteQuorum: int64(*params.PersistencePolicies.BookkeeperWriteQuorum), + BookkeeperAckQuorum: int64(*params.PersistencePolicies.BookkeeperAckQuorum), + ManagedLedgerMaxMarkDeleteRate: markDeleteRate, + } + err = p.adminClient.Topics().SetPersistence(*topicName, persistenceData) + if err != nil { + return err + } + } + + // Handle delayed delivery + if params.DelayedDelivery != nil { + delayedDeliveryData := utils.DelayedDeliveryData{ + Active: *params.DelayedDelivery.Active, + } + if params.DelayedDelivery.TickTimeMillis != nil { + // Convert milliseconds to seconds (float64) + delayedDeliveryData.TickTime = float64(*params.DelayedDelivery.TickTimeMillis) / 1000.0 + } + err = p.adminClient.Topics().SetDelayedDelivery(*topicName, delayedDeliveryData) + if err != nil { + return err + } + } + + // Handle dispatch rate + if params.DispatchRate != nil { + dispatchRateData := utils.DispatchRateData{ + DispatchThrottlingRateInMsg: int64(*params.DispatchRate.DispatchThrottlingRateInMsg), + DispatchThrottlingRateInByte: *params.DispatchRate.DispatchThrottlingRateInByte, + RatePeriodInSecond: int64(*params.DispatchRate.RatePeriodInSecond), + } + err = p.adminClient.Topics().SetDispatchRate(*topicName, dispatchRateData) + if err != nil { + return err + } + } + + // Handle publish rate + if params.PublishRate != nil { + publishRateData := utils.PublishRateData{ + PublishThrottlingRateInMsg: int64(*params.PublishRate.PublishThrottlingRateInMsg), + PublishThrottlingRateInByte: *params.PublishRate.PublishThrottlingRateInByte, + } + err = p.adminClient.Topics().SetPublishRate(*topicName, publishRateData) + if err != nil { + return err + } + } + + // Handle inactive topic policies + if params.InactiveTopicPolicies != nil { + deleteMode := utils.InactiveTopicDeleteMode(*params.InactiveTopicPolicies.InactiveTopicDeleteMode) + inactiveTopicPolicies := utils.InactiveTopicPolicies{ + InactiveTopicDeleteMode: &deleteMode, + MaxInactiveDurationSeconds: int(*params.InactiveTopicPolicies.MaxInactiveDurationInSeconds), + DeleteWhileInactive: *params.InactiveTopicPolicies.DeleteWhileInactive, + } + err = p.adminClient.Topics().SetInactiveTopicPolicies(*topicName, inactiveTopicPolicies) + if err != nil { + return err + } + } + return nil } diff --git a/pkg/admin/interface.go b/pkg/admin/interface.go index 46918b59..d10d0c49 100644 --- a/pkg/admin/interface.go +++ b/pkg/admin/interface.go @@ -93,6 +93,11 @@ type TopicParams struct { ReplicationClusters []string Deduplication *bool CompactionThreshold *int64 + PersistencePolicies *v1alpha1.PersistencePolicies + DelayedDelivery *v1alpha1.DelayedDeliveryData + DispatchRate *v1alpha1.DispatchRate + PublishRate *v1alpha1.PublishRate + InactiveTopicPolicies *v1alpha1.InactiveTopicPolicies } // ClusterParams indicate the parameters for creating a cluster diff --git a/pkg/connection/reconcile_topic.go b/pkg/connection/reconcile_topic.go index 3c636baf..bd055915 100644 --- a/pkg/connection/reconcile_topic.go +++ b/pkg/connection/reconcile_topic.go @@ -290,6 +290,11 @@ func createTopicParams(topic *resourcev1alpha1.PulsarTopic) *admin.TopicParams { BacklogQuotaRetentionPolicy: topic.Spec.BacklogQuotaRetentionPolicy, Deduplication: topic.Spec.Deduplication, CompactionThreshold: topic.Spec.CompactionThreshold, + PersistencePolicies: topic.Spec.PersistencePolicies, + DelayedDelivery: topic.Spec.DelayedDelivery, + DispatchRate: topic.Spec.DispatchRate, + PublishRate: topic.Spec.PublishRate, + InactiveTopicPolicies: topic.Spec.InactiveTopicPolicies, } } diff --git a/tests/operator/resources_test.go b/tests/operator/resources_test.go index f8924b2b..3dc5b178 100644 --- a/tests/operator/resources_test.go +++ b/tests/operator/resources_test.go @@ -504,6 +504,380 @@ var _ = Describe("Resources", func() { }) }) + Context("PulsarTopic Persistence Policies", Ordered, func() { + var ( + persistenceTopic *v1alphav1.PulsarTopic + persistenceTopicName string = "test-persistence-topic" + ) + + BeforeAll(func() { + persistenceTopic = utils.MakePulsarTopicWithPersistencePolicies( + namespaceName, + persistenceTopicName, + "persistent://public/default/persistence-test", + pconnName, + lifecyclePolicy, + ) + }) + + It("should create topic with persistence policies successfully", func() { + err := k8sClient.Create(ctx, persistenceTopic) + Expect(err == nil || apierrors.IsAlreadyExists(err)).Should(BeTrue()) + }) + + It("should be ready", func() { + Eventually(func() bool { + t := &v1alphav1.PulsarTopic{} + tns := types.NamespacedName{Namespace: namespaceName, Name: persistenceTopicName} + Expect(k8sClient.Get(ctx, tns, t)).Should(Succeed()) + return v1alphav1.IsPulsarResourceReady(t) + }, "20s", "100ms").Should(BeTrue()) + }) + + It("should have correct persistence policies configuration", func() { + topic := &v1alphav1.PulsarTopic{} + tns := types.NamespacedName{Namespace: namespaceName, Name: persistenceTopicName} + Expect(k8sClient.Get(ctx, tns, topic)).Should(Succeed()) + + // Verify PersistencePolicies + Expect(topic.Spec.PersistencePolicies).ShouldNot(BeNil()) + Expect(*topic.Spec.PersistencePolicies.BookkeeperEnsemble).Should(Equal(int32(3))) + Expect(*topic.Spec.PersistencePolicies.BookkeeperWriteQuorum).Should(Equal(int32(2))) + Expect(*topic.Spec.PersistencePolicies.BookkeeperAckQuorum).Should(Equal(int32(2))) + Expect(*topic.Spec.PersistencePolicies.ManagedLedgerMaxMarkDeleteRate).Should(Equal("2.0")) + }) + + It("should update persistence policies successfully", func() { + topic := &v1alphav1.PulsarTopic{} + tns := types.NamespacedName{Namespace: namespaceName, Name: persistenceTopicName} + Expect(k8sClient.Get(ctx, tns, topic)).Should(Succeed()) + + // Update persistence policies + topic.Spec.PersistencePolicies.BookkeeperEnsemble = pointer.Int32(5) + topic.Spec.PersistencePolicies.BookkeeperWriteQuorum = pointer.Int32(3) + topic.Spec.PersistencePolicies.BookkeeperAckQuorum = pointer.Int32(3) + err := k8sClient.Update(ctx, topic) + Expect(err).Should(Succeed()) + }) + + It("should be ready after update", func() { + Eventually(func() bool { + t := &v1alphav1.PulsarTopic{} + tns := types.NamespacedName{Namespace: namespaceName, Name: persistenceTopicName} + Expect(k8sClient.Get(ctx, tns, t)).Should(Succeed()) + return v1alphav1.IsPulsarResourceReady(t) + }, "20s", "100ms").Should(BeTrue()) + }) + + AfterAll(func() { + if persistenceTopic != nil { + Eventually(func(g Gomega) { + t := &v1alphav1.PulsarTopic{} + tns := types.NamespacedName{Namespace: namespaceName, Name: persistenceTopicName} + g.Expect(k8sClient.Get(ctx, tns, t)).Should(Succeed()) + g.Expect(k8sClient.Delete(ctx, t)).Should(Succeed()) + }).Should(Succeed()) + } + }) + }) + + Context("PulsarTopic Delayed Delivery", Ordered, func() { + var ( + delayedDeliveryTopic *v1alphav1.PulsarTopic + delayedDeliveryTopicName string = "test-delayed-delivery-topic" + ) + + BeforeAll(func() { + delayedDeliveryTopic = utils.MakePulsarTopicWithDelayedDelivery( + namespaceName, + delayedDeliveryTopicName, + "persistent://public/default/delayed-delivery-test", + pconnName, + lifecyclePolicy, + ) + }) + + It("should create topic with delayed delivery successfully", func() { + err := k8sClient.Create(ctx, delayedDeliveryTopic) + Expect(err == nil || apierrors.IsAlreadyExists(err)).Should(BeTrue()) + }) + + It("should be ready", func() { + Eventually(func() bool { + t := &v1alphav1.PulsarTopic{} + tns := types.NamespacedName{Namespace: namespaceName, Name: delayedDeliveryTopicName} + Expect(k8sClient.Get(ctx, tns, t)).Should(Succeed()) + return v1alphav1.IsPulsarResourceReady(t) + }, "20s", "100ms").Should(BeTrue()) + }) + + It("should have correct delayed delivery configuration", func() { + topic := &v1alphav1.PulsarTopic{} + tns := types.NamespacedName{Namespace: namespaceName, Name: delayedDeliveryTopicName} + Expect(k8sClient.Get(ctx, tns, topic)).Should(Succeed()) + + // Verify DelayedDelivery + Expect(topic.Spec.DelayedDelivery).ShouldNot(BeNil()) + Expect(*topic.Spec.DelayedDelivery.Active).Should(Equal(true)) + Expect(*topic.Spec.DelayedDelivery.TickTimeMillis).Should(Equal(int64(1000))) + }) + + It("should update delayed delivery configuration successfully", func() { + topic := &v1alphav1.PulsarTopic{} + tns := types.NamespacedName{Namespace: namespaceName, Name: delayedDeliveryTopicName} + Expect(k8sClient.Get(ctx, tns, topic)).Should(Succeed()) + + // Update delayed delivery + topic.Spec.DelayedDelivery.TickTimeMillis = pointer.Int64(2000) + err := k8sClient.Update(ctx, topic) + Expect(err).Should(Succeed()) + }) + + It("should be ready after update", func() { + Eventually(func() bool { + t := &v1alphav1.PulsarTopic{} + tns := types.NamespacedName{Namespace: namespaceName, Name: delayedDeliveryTopicName} + Expect(k8sClient.Get(ctx, tns, t)).Should(Succeed()) + return v1alphav1.IsPulsarResourceReady(t) + }, "20s", "100ms").Should(BeTrue()) + }) + + AfterAll(func() { + if delayedDeliveryTopic != nil { + Eventually(func(g Gomega) { + t := &v1alphav1.PulsarTopic{} + tns := types.NamespacedName{Namespace: namespaceName, Name: delayedDeliveryTopicName} + g.Expect(k8sClient.Get(ctx, tns, t)).Should(Succeed()) + g.Expect(k8sClient.Delete(ctx, t)).Should(Succeed()) + }).Should(Succeed()) + } + }) + }) + + Context("PulsarTopic Dispatch Rate", Ordered, func() { + var ( + dispatchRateTopic *v1alphav1.PulsarTopic + dispatchRateTopicName string = "test-dispatch-rate-topic" + ) + + BeforeAll(func() { + dispatchRateTopic = utils.MakePulsarTopicWithDispatchRate( + namespaceName, + dispatchRateTopicName, + "persistent://public/default/dispatch-rate-test", + pconnName, + lifecyclePolicy, + ) + }) + + It("should create topic with dispatch rate successfully", func() { + err := k8sClient.Create(ctx, dispatchRateTopic) + Expect(err == nil || apierrors.IsAlreadyExists(err)).Should(BeTrue()) + }) + + It("should be ready", func() { + Eventually(func() bool { + t := &v1alphav1.PulsarTopic{} + tns := types.NamespacedName{Namespace: namespaceName, Name: dispatchRateTopicName} + Expect(k8sClient.Get(ctx, tns, t)).Should(Succeed()) + return v1alphav1.IsPulsarResourceReady(t) + }, "20s", "100ms").Should(BeTrue()) + }) + + It("should have correct dispatch rate configuration", func() { + topic := &v1alphav1.PulsarTopic{} + tns := types.NamespacedName{Namespace: namespaceName, Name: dispatchRateTopicName} + Expect(k8sClient.Get(ctx, tns, topic)).Should(Succeed()) + + // Verify DispatchRate + Expect(topic.Spec.DispatchRate).ShouldNot(BeNil()) + Expect(*topic.Spec.DispatchRate.DispatchThrottlingRateInMsg).Should(Equal(int32(500))) + Expect(*topic.Spec.DispatchRate.DispatchThrottlingRateInByte).Should(Equal(int64(524288))) + Expect(*topic.Spec.DispatchRate.RatePeriodInSecond).Should(Equal(int32(1))) + }) + + It("should update dispatch rate successfully", func() { + topic := &v1alphav1.PulsarTopic{} + tns := types.NamespacedName{Namespace: namespaceName, Name: dispatchRateTopicName} + Expect(k8sClient.Get(ctx, tns, topic)).Should(Succeed()) + + // Update dispatch rate + topic.Spec.DispatchRate.DispatchThrottlingRateInMsg = pointer.Int32(1000) + topic.Spec.DispatchRate.DispatchThrottlingRateInByte = pointer.Int64(1048576) + err := k8sClient.Update(ctx, topic) + Expect(err).Should(Succeed()) + }) + + It("should be ready after update", func() { + Eventually(func() bool { + t := &v1alphav1.PulsarTopic{} + tns := types.NamespacedName{Namespace: namespaceName, Name: dispatchRateTopicName} + Expect(k8sClient.Get(ctx, tns, t)).Should(Succeed()) + return v1alphav1.IsPulsarResourceReady(t) + }, "20s", "100ms").Should(BeTrue()) + }) + + AfterAll(func() { + if dispatchRateTopic != nil { + Eventually(func(g Gomega) { + t := &v1alphav1.PulsarTopic{} + tns := types.NamespacedName{Namespace: namespaceName, Name: dispatchRateTopicName} + g.Expect(k8sClient.Get(ctx, tns, t)).Should(Succeed()) + g.Expect(k8sClient.Delete(ctx, t)).Should(Succeed()) + }).Should(Succeed()) + } + }) + }) + + Context("PulsarTopic Publish Rate", Ordered, func() { + var ( + publishRateTopic *v1alphav1.PulsarTopic + publishRateTopicName string = "test-publish-rate-topic" + ) + + BeforeAll(func() { + publishRateTopic = utils.MakePulsarTopicWithPublishRate( + namespaceName, + publishRateTopicName, + "persistent://public/default/publish-rate-test", + pconnName, + lifecyclePolicy, + ) + }) + + It("should create topic with publish rate successfully", func() { + err := k8sClient.Create(ctx, publishRateTopic) + Expect(err == nil || apierrors.IsAlreadyExists(err)).Should(BeTrue()) + }) + + It("should be ready", func() { + Eventually(func() bool { + t := &v1alphav1.PulsarTopic{} + tns := types.NamespacedName{Namespace: namespaceName, Name: publishRateTopicName} + Expect(k8sClient.Get(ctx, tns, t)).Should(Succeed()) + return v1alphav1.IsPulsarResourceReady(t) + }, "20s", "100ms").Should(BeTrue()) + }) + + It("should have correct publish rate configuration", func() { + topic := &v1alphav1.PulsarTopic{} + tns := types.NamespacedName{Namespace: namespaceName, Name: publishRateTopicName} + Expect(k8sClient.Get(ctx, tns, topic)).Should(Succeed()) + + // Verify PublishRate + Expect(topic.Spec.PublishRate).ShouldNot(BeNil()) + Expect(*topic.Spec.PublishRate.PublishThrottlingRateInMsg).Should(Equal(int32(1000))) + Expect(*topic.Spec.PublishRate.PublishThrottlingRateInByte).Should(Equal(int64(1048576))) + }) + + It("should update publish rate successfully", func() { + topic := &v1alphav1.PulsarTopic{} + tns := types.NamespacedName{Namespace: namespaceName, Name: publishRateTopicName} + Expect(k8sClient.Get(ctx, tns, topic)).Should(Succeed()) + + // Update publish rate + topic.Spec.PublishRate.PublishThrottlingRateInMsg = pointer.Int32(2000) + topic.Spec.PublishRate.PublishThrottlingRateInByte = pointer.Int64(2097152) + err := k8sClient.Update(ctx, topic) + Expect(err).Should(Succeed()) + }) + + It("should be ready after update", func() { + Eventually(func() bool { + t := &v1alphav1.PulsarTopic{} + tns := types.NamespacedName{Namespace: namespaceName, Name: publishRateTopicName} + Expect(k8sClient.Get(ctx, tns, t)).Should(Succeed()) + return v1alphav1.IsPulsarResourceReady(t) + }, "20s", "100ms").Should(BeTrue()) + }) + + AfterAll(func() { + if publishRateTopic != nil { + Eventually(func(g Gomega) { + t := &v1alphav1.PulsarTopic{} + tns := types.NamespacedName{Namespace: namespaceName, Name: publishRateTopicName} + g.Expect(k8sClient.Get(ctx, tns, t)).Should(Succeed()) + g.Expect(k8sClient.Delete(ctx, t)).Should(Succeed()) + }).Should(Succeed()) + } + }) + }) + + Context("PulsarTopic Inactive Topic Policies", Ordered, func() { + var ( + inactiveTopicPoliciesTopic *v1alphav1.PulsarTopic + inactiveTopicPoliciesTopicName string = "test-inactive-topic-policies-topic" + ) + + BeforeAll(func() { + inactiveTopicPoliciesTopic = utils.MakePulsarTopicWithInactiveTopicPolicies( + namespaceName, + inactiveTopicPoliciesTopicName, + "persistent://public/default/inactive-topic-policies-test", + pconnName, + lifecyclePolicy, + ) + }) + + It("should create topic with inactive topic policies successfully", func() { + err := k8sClient.Create(ctx, inactiveTopicPoliciesTopic) + Expect(err == nil || apierrors.IsAlreadyExists(err)).Should(BeTrue()) + }) + + It("should be ready", func() { + Eventually(func() bool { + t := &v1alphav1.PulsarTopic{} + tns := types.NamespacedName{Namespace: namespaceName, Name: inactiveTopicPoliciesTopicName} + Expect(k8sClient.Get(ctx, tns, t)).Should(Succeed()) + return v1alphav1.IsPulsarResourceReady(t) + }, "20s", "100ms").Should(BeTrue()) + }) + + It("should have correct inactive topic policies configuration", func() { + topic := &v1alphav1.PulsarTopic{} + tns := types.NamespacedName{Namespace: namespaceName, Name: inactiveTopicPoliciesTopicName} + Expect(k8sClient.Get(ctx, tns, topic)).Should(Succeed()) + + // Verify InactiveTopicPolicies + Expect(topic.Spec.InactiveTopicPolicies).ShouldNot(BeNil()) + Expect(*topic.Spec.InactiveTopicPolicies.InactiveTopicDeleteMode).Should(Equal("delete_when_no_subscriptions")) + Expect(*topic.Spec.InactiveTopicPolicies.MaxInactiveDurationInSeconds).Should(Equal(int32(1800))) + Expect(*topic.Spec.InactiveTopicPolicies.DeleteWhileInactive).Should(Equal(true)) + }) + + It("should update inactive topic policies successfully", func() { + topic := &v1alphav1.PulsarTopic{} + tns := types.NamespacedName{Namespace: namespaceName, Name: inactiveTopicPoliciesTopicName} + Expect(k8sClient.Get(ctx, tns, topic)).Should(Succeed()) + + // Update inactive topic policies + topic.Spec.InactiveTopicPolicies.MaxInactiveDurationInSeconds = pointer.Int32(3600) + topic.Spec.InactiveTopicPolicies.DeleteWhileInactive = pointer.Bool(false) + err := k8sClient.Update(ctx, topic) + Expect(err).Should(Succeed()) + }) + + It("should be ready after update", func() { + Eventually(func() bool { + t := &v1alphav1.PulsarTopic{} + tns := types.NamespacedName{Namespace: namespaceName, Name: inactiveTopicPoliciesTopicName} + Expect(k8sClient.Get(ctx, tns, t)).Should(Succeed()) + return v1alphav1.IsPulsarResourceReady(t) + }, "20s", "100ms").Should(BeTrue()) + }) + + AfterAll(func() { + if inactiveTopicPoliciesTopic != nil { + Eventually(func(g Gomega) { + t := &v1alphav1.PulsarTopic{} + tns := types.NamespacedName{Namespace: namespaceName, Name: inactiveTopicPoliciesTopicName} + g.Expect(k8sClient.Get(ctx, tns, t)).Should(Succeed()) + g.Expect(k8sClient.Delete(ctx, t)).Should(Succeed()) + }).Should(Succeed()) + } + }) + }) + Context("PulsarNamespace Rate Limiting", Ordered, func() { var ( rateLimitingNamespace *v1alphav1.PulsarNamespace diff --git a/tests/utils/spec.go b/tests/utils/spec.go index 944f6170..d5947f83 100644 --- a/tests/utils/spec.go +++ b/tests/utils/spec.go @@ -301,6 +301,156 @@ func MakePulsarTopicWithCompactionThreshold(namespace, name, topicName, connecti } } +// MakePulsarTopicWithPersistencePolicies will generate a PulsarTopic with persistence configurations +func MakePulsarTopicWithPersistencePolicies(namespace, name, topicName, connectionName string, policy v1alpha1.PulsarResourceLifeCyclePolicy) *v1alpha1.PulsarTopic { + return &v1alpha1.PulsarTopic{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: namespace, + Name: name, + }, + Spec: v1alpha1.PulsarTopicSpec{ + ConnectionRef: corev1.LocalObjectReference{ + Name: connectionName, + }, + Name: topicName, + LifecyclePolicy: policy, + PersistencePolicies: &v1alpha1.PersistencePolicies{ + BookkeeperEnsemble: ptr.To[int32](3), + BookkeeperWriteQuorum: ptr.To[int32](2), + BookkeeperAckQuorum: ptr.To[int32](2), + ManagedLedgerMaxMarkDeleteRate: ptr.To("2.0"), + }, + }, + } +} + +// MakePulsarTopicWithDelayedDelivery will generate a PulsarTopic with delayed delivery configuration +func MakePulsarTopicWithDelayedDelivery(namespace, name, topicName, connectionName string, policy v1alpha1.PulsarResourceLifeCyclePolicy) *v1alpha1.PulsarTopic { + return &v1alpha1.PulsarTopic{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: namespace, + Name: name, + }, + Spec: v1alpha1.PulsarTopicSpec{ + ConnectionRef: corev1.LocalObjectReference{ + Name: connectionName, + }, + Name: topicName, + LifecyclePolicy: policy, + DelayedDelivery: &v1alpha1.DelayedDeliveryData{ + Active: ptr.To(true), + TickTimeMillis: ptr.To[int64](1000), // 1 second + }, + }, + } +} + +// MakePulsarTopicWithDispatchRate will generate a PulsarTopic with dispatch rate configuration +func MakePulsarTopicWithDispatchRate(namespace, name, topicName, connectionName string, policy v1alpha1.PulsarResourceLifeCyclePolicy) *v1alpha1.PulsarTopic { + return &v1alpha1.PulsarTopic{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: namespace, + Name: name, + }, + Spec: v1alpha1.PulsarTopicSpec{ + ConnectionRef: corev1.LocalObjectReference{ + Name: connectionName, + }, + Name: topicName, + LifecyclePolicy: policy, + DispatchRate: &v1alpha1.DispatchRate{ + DispatchThrottlingRateInMsg: ptr.To[int32](500), + DispatchThrottlingRateInByte: ptr.To[int64](524288), // 512KB + RatePeriodInSecond: ptr.To[int32](1), + }, + }, + } +} + +// MakePulsarTopicWithPublishRate will generate a PulsarTopic with publish rate configuration +func MakePulsarTopicWithPublishRate(namespace, name, topicName, connectionName string, policy v1alpha1.PulsarResourceLifeCyclePolicy) *v1alpha1.PulsarTopic { + return &v1alpha1.PulsarTopic{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: namespace, + Name: name, + }, + Spec: v1alpha1.PulsarTopicSpec{ + ConnectionRef: corev1.LocalObjectReference{ + Name: connectionName, + }, + Name: topicName, + LifecyclePolicy: policy, + PublishRate: &v1alpha1.PublishRate{ + PublishThrottlingRateInMsg: ptr.To[int32](1000), + PublishThrottlingRateInByte: ptr.To[int64](1048576), // 1MB + }, + }, + } +} + +// MakePulsarTopicWithInactiveTopicPolicies will generate a PulsarTopic with inactive topic policies +func MakePulsarTopicWithInactiveTopicPolicies(namespace, name, topicName, connectionName string, policy v1alpha1.PulsarResourceLifeCyclePolicy) *v1alpha1.PulsarTopic { + return &v1alpha1.PulsarTopic{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: namespace, + Name: name, + }, + Spec: v1alpha1.PulsarTopicSpec{ + ConnectionRef: corev1.LocalObjectReference{ + Name: connectionName, + }, + Name: topicName, + LifecyclePolicy: policy, + InactiveTopicPolicies: &v1alpha1.InactiveTopicPolicies{ + InactiveTopicDeleteMode: ptr.To("delete_when_no_subscriptions"), + MaxInactiveDurationInSeconds: ptr.To[int32](1800), // 30 minutes + DeleteWhileInactive: ptr.To(true), + }, + }, + } +} + +// MakePulsarTopicWithAllNewPolicies will generate a PulsarTopic with all new policy configurations +func MakePulsarTopicWithAllNewPolicies(namespace, name, topicName, connectionName string, policy v1alpha1.PulsarResourceLifeCyclePolicy) *v1alpha1.PulsarTopic { + return &v1alpha1.PulsarTopic{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: namespace, + Name: name, + }, + Spec: v1alpha1.PulsarTopicSpec{ + ConnectionRef: corev1.LocalObjectReference{ + Name: connectionName, + }, + Name: topicName, + LifecyclePolicy: policy, + PersistencePolicies: &v1alpha1.PersistencePolicies{ + BookkeeperEnsemble: ptr.To[int32](5), + BookkeeperWriteQuorum: ptr.To[int32](3), + BookkeeperAckQuorum: ptr.To[int32](2), + ManagedLedgerMaxMarkDeleteRate: ptr.To("1.5"), + }, + DelayedDelivery: &v1alpha1.DelayedDeliveryData{ + Active: ptr.To(true), + TickTimeMillis: ptr.To[int64](2000), // 2 seconds + }, + DispatchRate: &v1alpha1.DispatchRate{ + DispatchThrottlingRateInMsg: ptr.To[int32](750), + DispatchThrottlingRateInByte: ptr.To[int64](786432), // 768KB + RatePeriodInSecond: ptr.To[int32](1), + }, + PublishRate: &v1alpha1.PublishRate{ + PublishThrottlingRateInMsg: ptr.To[int32](1500), + PublishThrottlingRateInByte: ptr.To[int64](1572864), // 1.5MB + }, + InactiveTopicPolicies: &v1alpha1.InactiveTopicPolicies{ + InactiveTopicDeleteMode: ptr.To("delete_when_subscriptions_caught_up"), + MaxInactiveDurationInSeconds: ptr.To[int32](3600), // 1 hour + DeleteWhileInactive: ptr.To(false), + }, + }, + } +} + // MakePulsarPermission will generate a object of PulsarPermission func MakePulsarPermission(namespace, name, resourceName, connectionName string, resourceType v1alpha1.PulsarResourceType, roles, actions []string, policy v1alpha1.PulsarResourceLifeCyclePolicy) *v1alpha1.PulsarPermission {