Skip to content

Commit 60795d6

Browse files
committed
fix: align namespace and topic policies operation
1 parent b73fd3b commit 60795d6

File tree

10 files changed

+200
-52
lines changed

10 files changed

+200
-52
lines changed

api/v1alpha1/pulsarnamespace_types.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,7 @@ type PulsarNamespaceSpec struct {
209209

210210
// BacklogQuotaRetentionPolicy specifies the retention policy for messages when backlog quota is exceeded.
211211
// Valid values are "producer_request_hold", "producer_exception", or "consumer_backlog_eviction".
212+
// +kubebuilder:validation:Enum=producer_request_hold;producer_exception;consumer_backlog_eviction
212213
// +optional
213214
BacklogQuotaRetentionPolicy *string `json:"backlogQuotaRetentionPolicy,omitempty"`
214215

api/v1alpha1/pulsartopic_types.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,9 +111,16 @@ type PulsarTopicSpec struct {
111111

112112
// BacklogQuotaRetentionPolicy specifies the retention policy for messages when backlog quota is exceeded.
113113
// Valid values are "producer_request_hold", "producer_exception", or "consumer_backlog_eviction".
114+
// +kubebuilder:validation:Enum=producer_request_hold;producer_exception;consumer_backlog_eviction
114115
// +optional
115116
BacklogQuotaRetentionPolicy *string `json:"backlogQuotaRetentionPolicy,omitempty"`
116117

118+
// BacklogQuotaType controls how the backlog quota is enforced.
119+
// "destination_storage" limits backlog by size (in bytes), while "message_age" limits by time.
120+
// +kubebuilder:validation:Enum=destination_storage;message_age
121+
// +optional
122+
BacklogQuotaType *string `json:"backlogQuotaType,omitempty"`
123+
117124
// SchemaInfo defines the schema for the topic, if any.
118125
// +optional
119126
SchemaInfo *SchemaInfo `json:"schemaInfo,omitempty"`

api/v1alpha1/zz_generated.deepcopy.go

Lines changed: 5 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

charts/pulsar-resources-operator/crds/resource.streamnative.io_pulsarnamespaces.yaml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,10 @@ spec:
9999
description: |-
100100
BacklogQuotaRetentionPolicy specifies the retention policy for messages when backlog quota is exceeded.
101101
Valid values are "producer_request_hold", "producer_exception", or "consumer_backlog_eviction".
102+
enum:
103+
- producer_request_hold
104+
- producer_exception
105+
- consumer_backlog_eviction
102106
type: string
103107
backlogQuotaType:
104108
description: |-

charts/pulsar-resources-operator/crds/resource.streamnative.io_pulsartopics.yaml

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,18 @@ spec:
105105
description: |-
106106
BacklogQuotaRetentionPolicy specifies the retention policy for messages when backlog quota is exceeded.
107107
Valid values are "producer_request_hold", "producer_exception", or "consumer_backlog_eviction".
108+
enum:
109+
- producer_request_hold
110+
- producer_exception
111+
- consumer_backlog_eviction
112+
type: string
113+
backlogQuotaType:
114+
description: |-
115+
BacklogQuotaType controls how the backlog quota is enforced.
116+
"destination_storage" limits backlog by size (in bytes), while "message_age" limits by time.
117+
enum:
118+
- destination_storage
119+
- message_age
108120
type: string
109121
compactionThreshold:
110122
description: |-

config/crd/bases/resource.streamnative.io_pulsarnamespaces.yaml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,10 @@ spec:
9999
description: |-
100100
BacklogQuotaRetentionPolicy specifies the retention policy for messages when backlog quota is exceeded.
101101
Valid values are "producer_request_hold", "producer_exception", or "consumer_backlog_eviction".
102+
enum:
103+
- producer_request_hold
104+
- producer_exception
105+
- consumer_backlog_eviction
102106
type: string
103107
backlogQuotaType:
104108
description: |-

config/crd/bases/resource.streamnative.io_pulsartopics.yaml

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,18 @@ spec:
105105
description: |-
106106
BacklogQuotaRetentionPolicy specifies the retention policy for messages when backlog quota is exceeded.
107107
Valid values are "producer_request_hold", "producer_exception", or "consumer_backlog_eviction".
108+
enum:
109+
- producer_request_hold
110+
- producer_exception
111+
- consumer_backlog_eviction
112+
type: string
113+
backlogQuotaType:
114+
description: |-
115+
BacklogQuotaType controls how the backlog quota is enforced.
116+
"destination_storage" limits backlog by size (in bytes), while "message_age" limits by time.
117+
enum:
118+
- destination_storage
119+
- message_age
108120
type: string
109121
compactionThreshold:
110122
description: |-

pkg/admin/impl.go

Lines changed: 152 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -306,29 +306,14 @@ func (p *PulsarAdminClient) applyTopicPolicies(topicName *utils.TopicName, param
306306
retentionPolicy = &policy
307307
}
308308

309-
var backlogQuotaPolicy *utils.BacklogQuota
310-
var backlogQuotaType utils.BacklogQuotaType
311-
if (params.BacklogQuotaLimitTime != nil || params.BacklogQuotaLimitSize != nil) &&
312-
params.BacklogQuotaRetentionPolicy != nil {
313-
backlogTime := int64(-1)
314-
backlogSize := int64(-1)
315-
if params.BacklogQuotaLimitTime != nil {
316-
t, err := params.BacklogQuotaLimitTime.Parse()
317-
if err != nil {
318-
return err
319-
}
320-
backlogTime = int64(t.Seconds())
321-
backlogQuotaType = utils.MessageAge
322-
}
323-
if params.BacklogQuotaLimitSize != nil {
324-
backlogSize = params.BacklogQuotaLimitSize.Value()
325-
backlogQuotaType = utils.DestinationStorage
326-
}
327-
backlogQuotaPolicy = &utils.BacklogQuota{
328-
LimitTime: backlogTime,
329-
LimitSize: backlogSize,
330-
Policy: utils.RetentionPolicy(*params.BacklogQuotaRetentionPolicy),
331-
}
309+
backlogQuotaPolicy, backlogQuotaType, err := buildBacklogQuota(
310+
params.BacklogQuotaLimitTime,
311+
params.BacklogQuotaLimitSize,
312+
params.BacklogQuotaRetentionPolicy,
313+
params.BacklogQuotaType,
314+
)
315+
if err != nil {
316+
return err
332317
}
333318

334319
switch {
@@ -619,6 +604,56 @@ func isRetentionBacklogOrderingError(err error) bool {
619604
return strings.Contains(err.Error(), "Retention Quota must exceed configured backlog quota")
620605
}
621606

607+
func buildBacklogQuota(limitTime *utils.Duration, limitSize *resource.Quantity, retentionPolicyStr *string,
608+
backlogQuotaTypeStr *string) (*utils.BacklogQuota, utils.BacklogQuotaType, error) {
609+
if limitTime == nil && limitSize == nil && retentionPolicyStr == nil && backlogQuotaTypeStr == nil {
610+
return nil, "", nil
611+
}
612+
613+
if retentionPolicyStr == nil {
614+
return nil, "", fmt.Errorf("backlogQuotaRetentionPolicy is required when configuring backlog quota")
615+
}
616+
retentionPolicy, err := utils.ParseRetentionPolicy(*retentionPolicyStr)
617+
if err != nil {
618+
return nil, "", err
619+
}
620+
621+
backlogQuotaType := utils.DestinationStorage
622+
if backlogQuotaTypeStr != nil {
623+
backlogQuotaType, err = utils.ParseBacklogQuotaType(*backlogQuotaTypeStr)
624+
if err != nil {
625+
return nil, "", err
626+
}
627+
}
628+
629+
backlogQuota := utils.BacklogQuota{
630+
LimitTime: -1,
631+
LimitSize: -1,
632+
Policy: retentionPolicy,
633+
}
634+
635+
switch backlogQuotaType {
636+
case utils.DestinationStorage:
637+
if limitSize == nil {
638+
return nil, "", fmt.Errorf("backlogQuotaLimitSize is required when backlogQuotaType is %s", utils.DestinationStorage)
639+
}
640+
backlogQuota.LimitSize = limitSize.Value()
641+
case utils.MessageAge:
642+
if limitTime == nil {
643+
return nil, "", fmt.Errorf("backlogQuotaLimitTime is required when backlogQuotaType is %s", utils.MessageAge)
644+
}
645+
t, err := limitTime.Parse()
646+
if err != nil {
647+
return nil, "", err
648+
}
649+
backlogQuota.LimitTime = int64(t.Seconds())
650+
default:
651+
return nil, "", fmt.Errorf("unsupported backlog quota type %s", backlogQuotaType)
652+
}
653+
654+
return &backlogQuota, backlogQuotaType, nil
655+
}
656+
622657
// GetTopicClusters get the assigned clusters of the topic to the local default cluster
623658
func (p *PulsarAdminClient) GetTopicClusters(name string, persistent *bool) ([]string, error) {
624659
completeTopicName := MakeCompleteTopicName(name, persistent)
@@ -1117,35 +1152,17 @@ func (p *PulsarAdminClient) applyNamespacePolicies(completeNSName string, params
11171152
}
11181153
}
11191154

1120-
if (params.BacklogQuotaLimitTime != nil || params.BacklogQuotaLimitSize != nil) &&
1121-
params.BacklogQuotaRetentionPolicy != nil {
1122-
backlogTime := int64(-1)
1123-
backlogSize := int64(-1)
1124-
if params.BacklogQuotaLimitTime != nil {
1125-
t, err := params.BacklogQuotaLimitTime.Parse()
1126-
if err != nil {
1127-
return err
1128-
}
1129-
backlogTime = int64(t.Seconds())
1130-
}
1131-
if params.BacklogQuotaLimitSize != nil {
1132-
backlogSize = params.BacklogQuotaLimitSize.Value()
1133-
}
1134-
backlogQuotaPolicy := utils.BacklogQuota{
1135-
LimitTime: backlogTime,
1136-
LimitSize: backlogSize,
1137-
Policy: utils.RetentionPolicy(*params.BacklogQuotaRetentionPolicy),
1138-
}
1139-
1140-
var backlogQuotaType utils.BacklogQuotaType
1141-
if params.BacklogQuotaType != nil {
1142-
backlogQuotaType, err = utils.ParseBacklogQuotaType(*params.BacklogQuotaType)
1143-
if err != nil {
1144-
return err
1145-
}
1146-
}
1147-
err = p.adminClient.Namespaces().SetBacklogQuota(completeNSName, backlogQuotaPolicy, backlogQuotaType)
1148-
if err != nil {
1155+
backlogQuotaPolicy, backlogQuotaType, err := buildBacklogQuota(
1156+
params.BacklogQuotaLimitTime,
1157+
params.BacklogQuotaLimitSize,
1158+
params.BacklogQuotaRetentionPolicy,
1159+
params.BacklogQuotaType,
1160+
)
1161+
if err != nil {
1162+
return err
1163+
}
1164+
if backlogQuotaPolicy != nil {
1165+
if err := p.adminClient.Namespaces().SetBacklogQuota(completeNSName, *backlogQuotaPolicy, backlogQuotaType); err != nil {
11491166
return err
11501167
}
11511168
}
@@ -1188,6 +1205,35 @@ func (p *PulsarAdminClient) applyNamespacePolicies(completeNSName string, params
11881205
return err
11891206
}
11901207
}
1208+
// Handle persistence policies
1209+
if params.PersistencePolicies != nil {
1210+
var markDeleteRate float64
1211+
if params.PersistencePolicies.ManagedLedgerMaxMarkDeleteRate != nil {
1212+
var err error
1213+
markDeleteRate, err = strconv.ParseFloat(*params.PersistencePolicies.ManagedLedgerMaxMarkDeleteRate, 64)
1214+
if err != nil {
1215+
return err
1216+
}
1217+
}
1218+
1219+
persistenceData := utils.PersistencePolicies{
1220+
ManagedLedgerMaxMarkDeleteRate: markDeleteRate,
1221+
}
1222+
if params.PersistencePolicies.BookkeeperEnsemble != nil {
1223+
persistenceData.BookkeeperEnsemble = int(*params.PersistencePolicies.BookkeeperEnsemble)
1224+
}
1225+
if params.PersistencePolicies.BookkeeperWriteQuorum != nil {
1226+
persistenceData.BookkeeperWriteQuorum = int(*params.PersistencePolicies.BookkeeperWriteQuorum)
1227+
}
1228+
if params.PersistencePolicies.BookkeeperAckQuorum != nil {
1229+
persistenceData.BookkeeperAckQuorum = int(*params.PersistencePolicies.BookkeeperAckQuorum)
1230+
}
1231+
1232+
err = p.adminClient.Namespaces().SetPersistence(completeNSName, persistenceData)
1233+
if err != nil {
1234+
return err
1235+
}
1236+
}
11911237
if params.BookieAffinityGroup != nil {
11921238
err = p.adminClient.Namespaces().SetBookieAffinityGroup(completeNSName, utils.BookieAffinityGroupData{
11931239
BookkeeperAffinityGroupPrimary: params.BookieAffinityGroup.BookkeeperAffinityGroupPrimary,
@@ -1243,6 +1289,30 @@ func (p *PulsarAdminClient) applyNamespacePolicies(completeNSName string, params
12431289
}
12441290
}
12451291

1292+
// Handle inactive topic policies
1293+
if params.InactiveTopicPolicies != nil {
1294+
inactiveTopicPolicies := utils.InactiveTopicPolicies{}
1295+
if params.InactiveTopicPolicies.InactiveTopicDeleteMode != nil {
1296+
deleteMode := utils.InactiveTopicDeleteMode(*params.InactiveTopicPolicies.InactiveTopicDeleteMode)
1297+
inactiveTopicPolicies.InactiveTopicDeleteMode = &deleteMode
1298+
}
1299+
if params.InactiveTopicPolicies.MaxInactiveDurationInSeconds != nil {
1300+
inactiveTopicPolicies.MaxInactiveDurationSeconds = int(*params.InactiveTopicPolicies.MaxInactiveDurationInSeconds)
1301+
}
1302+
if params.InactiveTopicPolicies.DeleteWhileInactive != nil {
1303+
inactiveTopicPolicies.DeleteWhileInactive = *params.InactiveTopicPolicies.DeleteWhileInactive
1304+
}
1305+
err = p.adminClient.Namespaces().SetInactiveTopicPolicies(*naName, inactiveTopicPolicies)
1306+
if err != nil {
1307+
return err
1308+
}
1309+
} else {
1310+
err = p.adminClient.Namespaces().RemoveInactiveTopicPolicies(*naName)
1311+
if err != nil && !IsNotFound(err) {
1312+
return err
1313+
}
1314+
}
1315+
12461316
// Handle dispatch rate limiting
12471317
if params.DispatchRate != nil {
12481318
rate := utils.DispatchRate{
@@ -1363,6 +1433,25 @@ func (p *PulsarAdminClient) applyNamespacePolicies(completeNSName string, params
13631433
}
13641434
}
13651435

1436+
// Handle subscription expiration time
1437+
if params.SubscriptionExpirationTime != nil {
1438+
if params.SubscriptionExpirationTime.IsInfinite() {
1439+
// Remove explicit expiration to inherit broker defaults
1440+
if err := p.adminClient.Namespaces().RemoveSubscriptionExpirationTime(*naName); err != nil {
1441+
return err
1442+
}
1443+
} else {
1444+
duration, err := params.SubscriptionExpirationTime.Parse()
1445+
if err != nil {
1446+
return err
1447+
}
1448+
expirationMinutes := int(duration.Minutes())
1449+
if err := p.adminClient.Namespaces().SetSubscriptionExpirationTime(*naName, expirationMinutes); err != nil {
1450+
return err
1451+
}
1452+
}
1453+
}
1454+
13661455
// Handle schema auto-update policy
13671456
if params.IsAllowAutoUpdateSchema != nil {
13681457
err = p.adminClient.Namespaces().SetIsAllowAutoUpdateSchema(*naName, *params.IsAllowAutoUpdateSchema)
@@ -1379,6 +1468,17 @@ func (p *PulsarAdminClient) applyNamespacePolicies(completeNSName string, params
13791468
}
13801469
}
13811470

1471+
// Handle namespace properties
1472+
if len(params.Properties) > 0 {
1473+
if err := p.adminClient.Namespaces().UpdateProperties(*naName, params.Properties); err != nil {
1474+
return err
1475+
}
1476+
} else if params.Properties != nil {
1477+
if err := p.adminClient.Namespaces().RemoveProperties(*naName); err != nil {
1478+
return err
1479+
}
1480+
}
1481+
13821482
// Handle encryption requirement
13831483
if params.EncryptionRequired != nil {
13841484
err = p.adminClient.Namespaces().SetEncryptionRequiredStatus(*naName, *params.EncryptionRequired)

pkg/admin/interface.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@ type TopicParams struct {
9090
BacklogQuotaLimitTime *utils.Duration
9191
BacklogQuotaLimitSize *resource.Quantity
9292
BacklogQuotaRetentionPolicy *string
93+
BacklogQuotaType *string
9394
ReplicationClusters []string
9495
Deduplication *bool
9596
CompactionThreshold *int64

pkg/connection/reconcile_topic.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -327,6 +327,7 @@ func createTopicParams(topic *resourcev1alpha1.PulsarTopic) *admin.TopicParams {
327327
BacklogQuotaLimitTime: topic.Spec.BacklogQuotaLimitTime,
328328
BacklogQuotaLimitSize: topic.Spec.BacklogQuotaLimitSize,
329329
BacklogQuotaRetentionPolicy: topic.Spec.BacklogQuotaRetentionPolicy,
330+
BacklogQuotaType: topic.Spec.BacklogQuotaType,
330331
Deduplication: topic.Spec.Deduplication,
331332
CompactionThreshold: topic.Spec.CompactionThreshold,
332333
PersistencePolicies: topic.Spec.PersistencePolicies,
@@ -374,6 +375,7 @@ func summarizeTopicParamsForLogging(params *admin.TopicParams) map[string]interf
374375
addIfNotNil(summary, "backlogQuotaLimitTime", params.BacklogQuotaLimitTime)
375376
addIfNotNil(summary, "backlogQuotaLimitSize", params.BacklogQuotaLimitSize)
376377
addIfNotNil(summary, "backlogQuotaRetentionPolicy", params.BacklogQuotaRetentionPolicy)
378+
addIfNotNil(summary, "backlogQuotaType", params.BacklogQuotaType)
377379
addIfNotNil(summary, "deduplication", params.Deduplication)
378380
addIfNotNil(summary, "compactionThreshold", params.CompactionThreshold)
379381
addIfNotNil(summary, "persistencePolicies", params.PersistencePolicies)

0 commit comments

Comments
 (0)