Skip to content

Commit 5ea29f6

Browse files
committed
support topic compaction
1 parent 19b2bc4 commit 5ea29f6

File tree

9 files changed

+172
-0
lines changed

9 files changed

+172
-0
lines changed

api/v1alpha1/pulsartopic_types.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,11 @@ type PulsarTopicSpec struct {
131131
// Deduplication controls whether to enable message deduplication for the topic.
132132
// +optional
133133
Deduplication *bool `json:"deduplication,omitempty"`
134+
135+
// CompactionThreshold specifies the size threshold in bytes for automatic topic compaction.
136+
// When the topic reaches this size, compaction will be triggered automatically.
137+
// +optional
138+
CompactionThreshold *int64 `json:"compactionThreshold,omitempty"`
134139
}
135140

136141
// SchemaInfo defines the Pulsar Schema for a topic.

api/v1alpha1/zz_generated.deepcopy.go

Lines changed: 5 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

config/crd/bases/resource.streamnative.io_pulsartopics.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,12 @@ spec:
9898
BacklogQuotaRetentionPolicy specifies the retention policy for messages when backlog quota is exceeded.
9999
Valid values are "producer_request_hold", "producer_exception", or "consumer_backlog_eviction".
100100
type: string
101+
compactionThreshold:
102+
description: |-
103+
CompactionThreshold specifies the size threshold in bytes for automatic topic compaction.
104+
When the topic reaches this size, compaction will be triggered automatically.
105+
format: int64
106+
type: integer
101107
connectionRef:
102108
description: |-
103109
ConnectionRef is the reference to the PulsarConnection resource

config/samples/resource_v1alpha1_pulsartopic.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,4 +33,5 @@ spec:
3333
# backlogQuotaLimitTime: 24h
3434
# backlogQuotaLimitSize: 1Gi
3535
# backlogQuotaRetentionPolicy: producer_request_hold
36+
# compactionThreshold: 104857600 # 100MB in bytes
3637
lifecyclePolicy: CleanUpAfterDeletion

pkg/admin/impl.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -296,6 +296,13 @@ func (p *PulsarAdminClient) applyTopicPolicies(topicName *utils.TopicName, param
296296
}
297297
}
298298

299+
if params.CompactionThreshold != nil {
300+
err = p.adminClient.Topics().SetCompactionThreshold(*topicName, *params.CompactionThreshold)
301+
if err != nil {
302+
return err
303+
}
304+
}
305+
299306
return nil
300307
}
301308

pkg/admin/interface.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ type TopicParams struct {
7676
BacklogQuotaRetentionPolicy *string
7777
ReplicationClusters []string
7878
Deduplication *bool
79+
CompactionThreshold *int64
7980
}
8081

8182
// ClusterParams indicate the parameters for creating a cluster

pkg/connection/reconcile_topic.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -288,6 +288,7 @@ func createTopicParams(topic *resourcev1alpha1.PulsarTopic) *admin.TopicParams {
288288
BacklogQuotaLimitSize: topic.Spec.BacklogQuotaLimitSize,
289289
BacklogQuotaRetentionPolicy: topic.Spec.BacklogQuotaRetentionPolicy,
290290
Deduplication: topic.Spec.Deduplication,
291+
CompactionThreshold: topic.Spec.CompactionThreshold,
291292
}
292293
}
293294

tests/operator/resources_test.go

Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -370,6 +370,134 @@ var _ = Describe("Resources", func() {
370370

371371
})
372372

373+
Context("PulsarTopic Compaction Threshold", Ordered, func() {
374+
var (
375+
compactionTopic *v1alphav1.PulsarTopic
376+
compactionTopicName string = "test-compaction-topic"
377+
compactionThreshold int64 = 104857600 // 100MB in bytes
378+
)
379+
380+
BeforeEach(func() {
381+
compactionTopic = utils.MakePulsarTopicWithCompactionThreshold(
382+
namespaceName,
383+
compactionTopicName,
384+
"persistent://cloud/stage/compaction-test",
385+
pconnName,
386+
compactionThreshold,
387+
lifecyclePolicy,
388+
)
389+
})
390+
391+
It("should create topic with compaction threshold successfully", func() {
392+
err := k8sClient.Create(ctx, compactionTopic)
393+
Expect(err == nil || apierrors.IsAlreadyExists(err)).Should(BeTrue())
394+
})
395+
396+
It("should be ready", func() {
397+
Eventually(func() bool {
398+
t := &v1alphav1.PulsarTopic{}
399+
tns := types.NamespacedName{Namespace: namespaceName, Name: compactionTopicName}
400+
Expect(k8sClient.Get(ctx, tns, t)).Should(Succeed())
401+
return v1alphav1.IsPulsarResourceReady(t)
402+
}, "20s", "100ms").Should(BeTrue())
403+
})
404+
405+
It("should have compaction threshold set in Pulsar", func() {
406+
Eventually(func(g Gomega) {
407+
podName := fmt.Sprintf("%s-broker-0", brokerName)
408+
containerName := fmt.Sprintf("%s-broker", brokerName)
409+
stdout, _, err := utils.ExecInPod(k8sConfig, namespaceName, podName, containerName,
410+
"./bin/pulsar-admin topics get-compaction-threshold "+compactionTopic.Spec.Name)
411+
g.Expect(err).Should(Succeed())
412+
g.Expect(stdout).Should(Not(BeEmpty()))
413+
// The output should contain the threshold value in bytes
414+
g.Expect(stdout).Should(ContainSubstring("104857600"))
415+
}, "20s", "100ms").Should(Succeed())
416+
})
417+
418+
It("should update compaction threshold successfully", func() {
419+
newThreshold := int64(209715200) // 200MB in bytes
420+
421+
topic := &v1alphav1.PulsarTopic{}
422+
tns := types.NamespacedName{Namespace: namespaceName, Name: compactionTopicName}
423+
Expect(k8sClient.Get(ctx, tns, topic)).Should(Succeed())
424+
425+
topic.Spec.CompactionThreshold = &newThreshold
426+
err := k8sClient.Update(ctx, topic)
427+
Expect(err).Should(Succeed())
428+
})
429+
430+
It("should be ready after update", func() {
431+
Eventually(func() bool {
432+
t := &v1alphav1.PulsarTopic{}
433+
tns := types.NamespacedName{Namespace: namespaceName, Name: compactionTopicName}
434+
Expect(k8sClient.Get(ctx, tns, t)).Should(Succeed())
435+
return v1alphav1.IsPulsarResourceReady(t)
436+
}, "20s", "100ms").Should(BeTrue())
437+
})
438+
439+
It("should have updated compaction threshold in Pulsar", func() {
440+
Eventually(func(g Gomega) {
441+
podName := fmt.Sprintf("%s-broker-0", brokerName)
442+
containerName := fmt.Sprintf("%s-broker", brokerName)
443+
stdout, _, err := utils.ExecInPod(k8sConfig, namespaceName, podName, containerName,
444+
"./bin/pulsar-admin topics get-compaction-threshold "+compactionTopic.Spec.Name)
445+
g.Expect(err).Should(Succeed())
446+
g.Expect(stdout).Should(Not(BeEmpty()))
447+
// The output should contain the new threshold value in bytes
448+
g.Expect(stdout).Should(ContainSubstring("209715200"))
449+
}, "20s", "100ms").Should(Succeed())
450+
})
451+
452+
It("should remove compaction threshold when set to nil", func() {
453+
topic := &v1alphav1.PulsarTopic{}
454+
tns := types.NamespacedName{Namespace: namespaceName, Name: compactionTopicName}
455+
Expect(k8sClient.Get(ctx, tns, topic)).Should(Succeed())
456+
457+
topic.Spec.CompactionThreshold = nil
458+
err := k8sClient.Update(ctx, topic)
459+
Expect(err).Should(Succeed())
460+
})
461+
462+
It("should be ready after removing threshold", func() {
463+
Eventually(func() bool {
464+
t := &v1alphav1.PulsarTopic{}
465+
tns := types.NamespacedName{Namespace: namespaceName, Name: compactionTopicName}
466+
Expect(k8sClient.Get(ctx, tns, t)).Should(Succeed())
467+
return v1alphav1.IsPulsarResourceReady(t)
468+
}, "20s", "100ms").Should(BeTrue())
469+
})
470+
471+
It("should have no compaction threshold in Pulsar after removal", func() {
472+
Eventually(func(g Gomega) {
473+
podName := fmt.Sprintf("%s-broker-0", brokerName)
474+
containerName := fmt.Sprintf("%s-broker", brokerName)
475+
stdout, stderr, err := utils.ExecInPod(k8sConfig, namespaceName, podName, containerName,
476+
"./bin/pulsar-admin topics get-compaction-threshold "+compactionTopic.Spec.Name)
477+
// When no compaction threshold is set, the command might fail or return empty/default value
478+
// We need to check both success and failure cases
479+
if err != nil {
480+
// Command failed, which might be expected when no threshold is set
481+
g.Expect(stderr).Should(ContainSubstring("not found"))
482+
} else {
483+
// Command succeeded but should show no threshold or default value
484+
g.Expect(stdout).Should(SatisfyAny(
485+
BeEmpty(),
486+
ContainSubstring("0"),
487+
ContainSubstring("-1"),
488+
))
489+
}
490+
}, "20s", "100ms").Should(Succeed())
491+
})
492+
493+
AfterEach(func() {
494+
// Clean up the compaction test topic
495+
if compactionTopic != nil {
496+
k8sClient.Delete(ctx, compactionTopic)
497+
}
498+
})
499+
})
500+
373501
Context("PulsarPermission operation", Label("Permissions"), func() {
374502
It("should grant the pulsarpermission successfully", func() {
375503
err := k8sClient.Create(ctx, ppermission)

tests/utils/spec.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,24 @@ func MakePulsarTopic(namespace, name, topicName, connectionName string, policy v
107107
}
108108
}
109109

110+
// MakePulsarTopicWithCompactionThreshold will generate a object of PulsarTopic with compaction threshold
111+
func MakePulsarTopicWithCompactionThreshold(namespace, name, topicName, connectionName string, compactionThreshold int64, policy v1alpha1.PulsarResourceLifeCyclePolicy) *v1alpha1.PulsarTopic {
112+
return &v1alpha1.PulsarTopic{
113+
ObjectMeta: metav1.ObjectMeta{
114+
Namespace: namespace,
115+
Name: name,
116+
},
117+
Spec: v1alpha1.PulsarTopicSpec{
118+
ConnectionRef: corev1.LocalObjectReference{
119+
Name: connectionName,
120+
},
121+
Name: topicName,
122+
CompactionThreshold: &compactionThreshold,
123+
LifecyclePolicy: policy,
124+
},
125+
}
126+
}
127+
110128
// MakePulsarPermission will generate a object of PulsarPermission
111129
func MakePulsarPermission(namespace, name, resourceName, connectionName string, resourceType v1alpha1.PulsarResourceType,
112130
roles, actions []string, policy v1alpha1.PulsarResourceLifeCyclePolicy) *v1alpha1.PulsarPermission {

0 commit comments

Comments
 (0)