Skip to content

Commit 7cdb7c0

Browse files
authored
support topic compaction (#331)
* support topic compaction * fix test * more error log * fix topic name * enable topicLevelPoliciesEnabled * Add logging for topic application errors and update test timeout settings - Added logging for creation and policy errors during topic application in `reconcile_topic.go`. - Increased timeout settings in tests for resource updates to ensure stability and reliability. * fix test
1 parent 07bf3fb commit 7cdb7c0

File tree

10 files changed

+131
-0
lines changed

10 files changed

+131
-0
lines changed

.ci/clusters/values.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,7 @@ broker:
121121
managedLedgerDefaultAckQuorum: "1"
122122
enablePackagesManagement: "true"
123123
PULSAR_PREFIX_enablePackagesManagement: "true"
124+
PULSAR_PREFIX_topicLevelPoliciesEnabled: "true"
124125
resources:
125126
requests:
126127
memory: 256Mi

api/v1alpha1/pulsartopic_types.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,11 @@ type PulsarTopicSpec struct {
131131
// Deduplication controls whether to enable message deduplication for the topic.
132132
// +optional
133133
Deduplication *bool `json:"deduplication,omitempty"`
134+
135+
// CompactionThreshold specifies the size threshold in bytes for automatic topic compaction.
136+
// When the topic reaches this size, compaction will be triggered automatically.
137+
// +optional
138+
CompactionThreshold *int64 `json:"compactionThreshold,omitempty"`
134139
}
135140

136141
// SchemaInfo defines the Pulsar Schema for a topic.

api/v1alpha1/zz_generated.deepcopy.go

Lines changed: 5 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

config/crd/bases/resource.streamnative.io_pulsartopics.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,12 @@ spec:
9898
BacklogQuotaRetentionPolicy specifies the retention policy for messages when backlog quota is exceeded.
9999
Valid values are "producer_request_hold", "producer_exception", or "consumer_backlog_eviction".
100100
type: string
101+
compactionThreshold:
102+
description: |-
103+
CompactionThreshold specifies the size threshold in bytes for automatic topic compaction.
104+
When the topic reaches this size, compaction will be triggered automatically.
105+
format: int64
106+
type: integer
101107
connectionRef:
102108
description: |-
103109
ConnectionRef is the reference to the PulsarConnection resource

config/samples/resource_v1alpha1_pulsartopic.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,4 +33,5 @@ spec:
3333
# backlogQuotaLimitTime: 24h
3434
# backlogQuotaLimitSize: 1Gi
3535
# backlogQuotaRetentionPolicy: producer_request_hold
36+
# compactionThreshold: 104857600 # 100MB in bytes
3637
lifecyclePolicy: CleanUpAfterDeletion

pkg/admin/impl.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -296,6 +296,13 @@ func (p *PulsarAdminClient) applyTopicPolicies(topicName *utils.TopicName, param
296296
}
297297
}
298298

299+
if params.CompactionThreshold != nil {
300+
err = p.adminClient.Topics().SetCompactionThreshold(*topicName, *params.CompactionThreshold)
301+
if err != nil {
302+
return err
303+
}
304+
}
305+
299306
return nil
300307
}
301308

pkg/admin/interface.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ type TopicParams struct {
7676
BacklogQuotaRetentionPolicy *string
7777
ReplicationClusters []string
7878
Deduplication *bool
79+
CompactionThreshold *int64
7980
}
8081

8182
// ClusterParams indicate the parameters for creating a cluster

pkg/connection/reconcile_topic.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -231,6 +231,7 @@ func (r *PulsarTopicReconciler) ReconcileTopic(ctx context.Context, pulsarAdmin
231231
}
232232

233233
creationErr, policyErr := pulsarAdmin.ApplyTopic(topic.Spec.Name, params)
234+
log.Info("Apply topic", "creationErr", creationErr, "policyErr", policyErr)
234235
if policyErr != nil {
235236
policyErrs = append(policyErrs, policyErr)
236237
}
@@ -288,6 +289,7 @@ func createTopicParams(topic *resourcev1alpha1.PulsarTopic) *admin.TopicParams {
288289
BacklogQuotaLimitSize: topic.Spec.BacklogQuotaLimitSize,
289290
BacklogQuotaRetentionPolicy: topic.Spec.BacklogQuotaRetentionPolicy,
290291
Deduplication: topic.Spec.Deduplication,
292+
CompactionThreshold: topic.Spec.CompactionThreshold,
291293
}
292294
}
293295

tests/operator/resources_test.go

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -370,6 +370,91 @@ var _ = Describe("Resources", func() {
370370

371371
})
372372

373+
Context("PulsarTopic Compaction Threshold", Ordered, func() {
374+
var (
375+
compactionTopic *v1alphav1.PulsarTopic
376+
compactionTopicName string = "test-compaction-topic"
377+
compactionThreshold int64 = 104857600 // 100MB in bytes
378+
)
379+
380+
BeforeAll(func() {
381+
compactionTopic = utils.MakePulsarTopicWithCompactionThreshold(
382+
namespaceName,
383+
compactionTopicName,
384+
"persistent://public/default/compaction-test",
385+
pconnName,
386+
compactionThreshold,
387+
lifecyclePolicy,
388+
)
389+
})
390+
391+
It("should create topic with compaction threshold successfully", func() {
392+
err := k8sClient.Create(ctx, compactionTopic)
393+
Expect(err == nil || apierrors.IsAlreadyExists(err)).Should(BeTrue())
394+
})
395+
396+
It("should be ready", func() {
397+
Eventually(func() bool {
398+
t := &v1alphav1.PulsarTopic{}
399+
tns := types.NamespacedName{Namespace: namespaceName, Name: compactionTopicName}
400+
Expect(k8sClient.Get(ctx, tns, t)).Should(Succeed())
401+
return v1alphav1.IsPulsarResourceReady(t)
402+
}, "20s", "100ms").Should(BeTrue())
403+
})
404+
405+
It("should update compaction threshold successfully", func() {
406+
newThreshold := int64(209715200) // 200MB in bytes
407+
408+
topic := &v1alphav1.PulsarTopic{}
409+
tns := types.NamespacedName{Namespace: namespaceName, Name: compactionTopicName}
410+
Expect(k8sClient.Get(ctx, tns, topic)).Should(Succeed())
411+
412+
topic.Spec.CompactionThreshold = &newThreshold
413+
err := k8sClient.Update(ctx, topic)
414+
Expect(err).Should(Succeed())
415+
})
416+
417+
It("should be ready after update", func() {
418+
Eventually(func() bool {
419+
t := &v1alphav1.PulsarTopic{}
420+
tns := types.NamespacedName{Namespace: namespaceName, Name: compactionTopicName}
421+
Expect(k8sClient.Get(ctx, tns, t)).Should(Succeed())
422+
return v1alphav1.IsPulsarResourceReady(t)
423+
}, "20s", "100ms").Should(BeTrue())
424+
})
425+
426+
It("should remove compaction threshold when set to nil", func() {
427+
topic := &v1alphav1.PulsarTopic{}
428+
tns := types.NamespacedName{Namespace: namespaceName, Name: compactionTopicName}
429+
Expect(k8sClient.Get(ctx, tns, topic)).Should(Succeed())
430+
431+
topic.Spec.CompactionThreshold = nil
432+
err := k8sClient.Update(ctx, topic)
433+
Expect(err).Should(Succeed())
434+
})
435+
436+
It("should be ready after removing threshold", func() {
437+
Eventually(func() bool {
438+
t := &v1alphav1.PulsarTopic{}
439+
tns := types.NamespacedName{Namespace: namespaceName, Name: compactionTopicName}
440+
Expect(k8sClient.Get(ctx, tns, t)).Should(Succeed())
441+
return v1alphav1.IsPulsarResourceReady(t)
442+
}, "20s", "100ms").Should(BeTrue())
443+
})
444+
445+
AfterAll(func() {
446+
// Clean up the compaction test topic after all tests complete
447+
if compactionTopic != nil {
448+
Eventually(func(g Gomega) {
449+
t := &v1alphav1.PulsarTopic{}
450+
tns := types.NamespacedName{Namespace: namespaceName, Name: compactionTopicName}
451+
g.Expect(k8sClient.Get(ctx, tns, t)).Should(Succeed())
452+
g.Expect(k8sClient.Delete(ctx, t)).Should(Succeed())
453+
}).Should(Succeed())
454+
}
455+
})
456+
})
457+
373458
Context("PulsarPermission operation", Label("Permissions"), func() {
374459
It("should grant the pulsarpermission successfully", func() {
375460
err := k8sClient.Create(ctx, ppermission)

tests/utils/spec.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,24 @@ func MakePulsarTopic(namespace, name, topicName, connectionName string, policy v
107107
}
108108
}
109109

110+
// MakePulsarTopicWithCompactionThreshold will generate a object of PulsarTopic with compaction threshold
111+
func MakePulsarTopicWithCompactionThreshold(namespace, name, topicName, connectionName string, compactionThreshold int64, policy v1alpha1.PulsarResourceLifeCyclePolicy) *v1alpha1.PulsarTopic {
112+
return &v1alpha1.PulsarTopic{
113+
ObjectMeta: metav1.ObjectMeta{
114+
Namespace: namespace,
115+
Name: name,
116+
},
117+
Spec: v1alpha1.PulsarTopicSpec{
118+
ConnectionRef: corev1.LocalObjectReference{
119+
Name: connectionName,
120+
},
121+
Name: topicName,
122+
CompactionThreshold: &compactionThreshold,
123+
LifecyclePolicy: policy,
124+
},
125+
}
126+
}
127+
110128
// MakePulsarPermission will generate a object of PulsarPermission
111129
func MakePulsarPermission(namespace, name, resourceName, connectionName string, resourceType v1alpha1.PulsarResourceType,
112130
roles, actions []string, policy v1alpha1.PulsarResourceLifeCyclePolicy) *v1alpha1.PulsarPermission {

0 commit comments

Comments
 (0)