Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .ci/clusters/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ broker:
managedLedgerDefaultAckQuorum: "1"
enablePackagesManagement: "true"
PULSAR_PREFIX_enablePackagesManagement: "true"
PULSAR_PREFIX_topicLevelPoliciesEnabled: "true"
resources:
requests:
memory: 256Mi
Expand Down
5 changes: 5 additions & 0 deletions api/v1alpha1/pulsartopic_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,11 @@ type PulsarTopicSpec struct {
// Deduplication controls whether to enable message deduplication for the topic.
// +optional
Deduplication *bool `json:"deduplication,omitempty"`

// CompactionThreshold specifies the size threshold in bytes for automatic topic compaction.
// When the topic reaches this size, compaction will be triggered automatically.
// +optional
CompactionThreshold *int64 `json:"compactionThreshold,omitempty"`
}

// SchemaInfo defines the Pulsar Schema for a topic.
Expand Down
5 changes: 5 additions & 0 deletions api/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions config/crd/bases/resource.streamnative.io_pulsartopics.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,12 @@ spec:
BacklogQuotaRetentionPolicy specifies the retention policy for messages when backlog quota is exceeded.
Valid values are "producer_request_hold", "producer_exception", or "consumer_backlog_eviction".
type: string
compactionThreshold:
description: |-
CompactionThreshold specifies the size threshold in bytes for automatic topic compaction.
When the topic reaches this size, compaction will be triggered automatically.
format: int64
type: integer
connectionRef:
description: |-
ConnectionRef is the reference to the PulsarConnection resource
Expand Down
1 change: 1 addition & 0 deletions config/samples/resource_v1alpha1_pulsartopic.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,5 @@ spec:
# backlogQuotaLimitTime: 24h
# backlogQuotaLimitSize: 1Gi
# backlogQuotaRetentionPolicy: producer_request_hold
# compactionThreshold: 104857600 # 100MB in bytes
lifecyclePolicy: CleanUpAfterDeletion
7 changes: 7 additions & 0 deletions pkg/admin/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,13 @@ func (p *PulsarAdminClient) applyTopicPolicies(topicName *utils.TopicName, param
}
}

if params.CompactionThreshold != nil {
err = p.adminClient.Topics().SetCompactionThreshold(*topicName, *params.CompactionThreshold)
if err != nil {
return err
}
}

return nil
}

Expand Down
1 change: 1 addition & 0 deletions pkg/admin/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ type TopicParams struct {
BacklogQuotaRetentionPolicy *string
ReplicationClusters []string
Deduplication *bool
CompactionThreshold *int64
}

// ClusterParams indicate the parameters for creating a cluster
Expand Down
2 changes: 2 additions & 0 deletions pkg/connection/reconcile_topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,7 @@ func (r *PulsarTopicReconciler) ReconcileTopic(ctx context.Context, pulsarAdmin
}

creationErr, policyErr := pulsarAdmin.ApplyTopic(topic.Spec.Name, params)
log.Info("Apply topic", "creationErr", creationErr, "policyErr", policyErr)
if policyErr != nil {
policyErrs = append(policyErrs, policyErr)
}
Expand Down Expand Up @@ -288,6 +289,7 @@ func createTopicParams(topic *resourcev1alpha1.PulsarTopic) *admin.TopicParams {
BacklogQuotaLimitSize: topic.Spec.BacklogQuotaLimitSize,
BacklogQuotaRetentionPolicy: topic.Spec.BacklogQuotaRetentionPolicy,
Deduplication: topic.Spec.Deduplication,
CompactionThreshold: topic.Spec.CompactionThreshold,
}
}

Expand Down
85 changes: 85 additions & 0 deletions tests/operator/resources_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,91 @@ var _ = Describe("Resources", func() {

})

Context("PulsarTopic Compaction Threshold", Ordered, func() {
var (
compactionTopic *v1alphav1.PulsarTopic
compactionTopicName string = "test-compaction-topic"
compactionThreshold int64 = 104857600 // 100MB in bytes
)

BeforeAll(func() {
compactionTopic = utils.MakePulsarTopicWithCompactionThreshold(
namespaceName,
compactionTopicName,
"persistent://public/default/compaction-test",
pconnName,
compactionThreshold,
lifecyclePolicy,
)
})

It("should create topic with compaction threshold successfully", func() {
err := k8sClient.Create(ctx, compactionTopic)
Expect(err == nil || apierrors.IsAlreadyExists(err)).Should(BeTrue())
})

It("should be ready", func() {
Eventually(func() bool {
t := &v1alphav1.PulsarTopic{}
tns := types.NamespacedName{Namespace: namespaceName, Name: compactionTopicName}
Expect(k8sClient.Get(ctx, tns, t)).Should(Succeed())
return v1alphav1.IsPulsarResourceReady(t)
}, "20s", "100ms").Should(BeTrue())
})

It("should update compaction threshold successfully", func() {
newThreshold := int64(209715200) // 200MB in bytes

topic := &v1alphav1.PulsarTopic{}
tns := types.NamespacedName{Namespace: namespaceName, Name: compactionTopicName}
Expect(k8sClient.Get(ctx, tns, topic)).Should(Succeed())

topic.Spec.CompactionThreshold = &newThreshold
err := k8sClient.Update(ctx, topic)
Expect(err).Should(Succeed())
})

It("should be ready after update", func() {
Eventually(func() bool {
t := &v1alphav1.PulsarTopic{}
tns := types.NamespacedName{Namespace: namespaceName, Name: compactionTopicName}
Expect(k8sClient.Get(ctx, tns, t)).Should(Succeed())
return v1alphav1.IsPulsarResourceReady(t)
}, "20s", "100ms").Should(BeTrue())
})

It("should remove compaction threshold when set to nil", func() {
topic := &v1alphav1.PulsarTopic{}
tns := types.NamespacedName{Namespace: namespaceName, Name: compactionTopicName}
Expect(k8sClient.Get(ctx, tns, topic)).Should(Succeed())

topic.Spec.CompactionThreshold = nil
err := k8sClient.Update(ctx, topic)
Expect(err).Should(Succeed())
})

It("should be ready after removing threshold", func() {
Eventually(func() bool {
t := &v1alphav1.PulsarTopic{}
tns := types.NamespacedName{Namespace: namespaceName, Name: compactionTopicName}
Expect(k8sClient.Get(ctx, tns, t)).Should(Succeed())
return v1alphav1.IsPulsarResourceReady(t)
}, "20s", "100ms").Should(BeTrue())
})

AfterAll(func() {
// Clean up the compaction test topic after all tests complete
if compactionTopic != nil {
Eventually(func(g Gomega) {
t := &v1alphav1.PulsarTopic{}
tns := types.NamespacedName{Namespace: namespaceName, Name: compactionTopicName}
g.Expect(k8sClient.Get(ctx, tns, t)).Should(Succeed())
g.Expect(k8sClient.Delete(ctx, t)).Should(Succeed())
}).Should(Succeed())
}
})
})

Context("PulsarPermission operation", Label("Permissions"), func() {
It("should grant the pulsarpermission successfully", func() {
err := k8sClient.Create(ctx, ppermission)
Expand Down
18 changes: 18 additions & 0 deletions tests/utils/spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,24 @@ func MakePulsarTopic(namespace, name, topicName, connectionName string, policy v
}
}

// MakePulsarTopicWithCompactionThreshold will generate a object of PulsarTopic with compaction threshold
func MakePulsarTopicWithCompactionThreshold(namespace, name, topicName, connectionName string, compactionThreshold int64, policy v1alpha1.PulsarResourceLifeCyclePolicy) *v1alpha1.PulsarTopic {
return &v1alpha1.PulsarTopic{
ObjectMeta: metav1.ObjectMeta{
Namespace: namespace,
Name: name,
},
Spec: v1alpha1.PulsarTopicSpec{
ConnectionRef: corev1.LocalObjectReference{
Name: connectionName,
},
Name: topicName,
CompactionThreshold: &compactionThreshold,
LifecyclePolicy: policy,
},
}
}

// MakePulsarPermission will generate a object of PulsarPermission
func MakePulsarPermission(namespace, name, resourceName, connectionName string, resourceType v1alpha1.PulsarResourceType,
roles, actions []string, policy v1alpha1.PulsarResourceLifeCyclePolicy) *v1alpha1.PulsarPermission {
Expand Down