Skip to content

File tree

6 files changed

+712
-43
lines changed

6 files changed

+712
-43
lines changed

pulsaradmin/pkg/admin/namespace.go

Lines changed: 35 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -90,14 +90,24 @@ type Namespaces interface {
9090
// GetSchemaValidationEnforced returns schema validation enforced for namespace
9191
GetSchemaValidationEnforced(namespace utils.NameSpaceName) (bool, error)
9292

93-
// SetSchemaAutoUpdateCompatibilityStrategy sets the strategy used to check the a new schema provided
93+
// SetSchemaAutoUpdateCompatibilityStrategy sets the strategy used to check the new schema provided
9494
// by a producer is compatible with the current schema before it is installed
9595
SetSchemaAutoUpdateCompatibilityStrategy(namespace utils.NameSpaceName,
96+
strategy utils.SchemaAutoUpdateCompatibilityStrategy) error
97+
98+
// GetSchemaAutoUpdateCompatibilityStrategy returns the strategy used to check the new schema provided
99+
// by a producer is compatible with the current schema before it is installed
100+
GetSchemaAutoUpdateCompatibilityStrategy(namespace utils.NameSpaceName) (
101+
utils.SchemaAutoUpdateCompatibilityStrategy, error)
102+
103+
// SetSchemaCompatibilityStrategy sets the strategy used to check the new schema provided
104+
// by a producer is compatible with the current schema before it is installed
105+
SetSchemaCompatibilityStrategy(namespace utils.NameSpaceName,
96106
strategy utils.SchemaCompatibilityStrategy) error
97107

98-
// GetSchemaAutoUpdateCompatibilityStrategy returns the strategy used to check the a new schema provided
108+
// GetSchemaCompatibilityStrategy returns the strategy used to check the new schema provided
99109
// by a producer is compatible with the current schema before it is installed
100-
GetSchemaAutoUpdateCompatibilityStrategy(namespace utils.NameSpaceName) (utils.SchemaCompatibilityStrategy, error)
110+
GetSchemaCompatibilityStrategy(namespace utils.NameSpaceName) (utils.SchemaCompatibilityStrategy, error)
101111

102112
// ClearOffloadDeleteLag clears the offload deletion lag for a namespace.
103113
ClearOffloadDeleteLag(namespace utils.NameSpaceName) error
@@ -497,13 +507,13 @@ func (n *namespaces) GetSchemaValidationEnforced(namespace utils.NameSpaceName)
497507
}
498508

499509
func (n *namespaces) SetSchemaAutoUpdateCompatibilityStrategy(namespace utils.NameSpaceName,
500-
strategy utils.SchemaCompatibilityStrategy) error {
510+
strategy utils.SchemaAutoUpdateCompatibilityStrategy) error {
501511
endpoint := n.pulsar.endpoint(n.basePath, namespace.String(), "schemaAutoUpdateCompatibilityStrategy")
502512
return n.pulsar.Client.Put(endpoint, strategy.String())
503513
}
504514

505515
func (n *namespaces) GetSchemaAutoUpdateCompatibilityStrategy(namespace utils.NameSpaceName) (
506-
utils.SchemaCompatibilityStrategy, error) {
516+
utils.SchemaAutoUpdateCompatibilityStrategy, error) {
507517
endpoint := n.pulsar.endpoint(n.basePath, namespace.String(), "schemaAutoUpdateCompatibilityStrategy")
508518
b, err := n.pulsar.Client.GetWithQueryParams(endpoint, nil, nil, false)
509519
if err != nil {
@@ -516,6 +526,26 @@ func (n *namespaces) GetSchemaAutoUpdateCompatibilityStrategy(namespace utils.Na
516526
return s, nil
517527
}
518528

529+
func (n *namespaces) SetSchemaCompatibilityStrategy(namespace utils.NameSpaceName,
530+
strategy utils.SchemaCompatibilityStrategy) error {
531+
endpoint := n.pulsar.endpoint(n.basePath, namespace.String(), "schemaCompatibilityStrategy")
532+
return n.pulsar.Client.Put(endpoint, strategy.String())
533+
}
534+
535+
func (n *namespaces) GetSchemaCompatibilityStrategy(namespace utils.NameSpaceName) (
536+
utils.SchemaCompatibilityStrategy, error) {
537+
endpoint := n.pulsar.endpoint(n.basePath, namespace.String(), "schemaCompatibilityStrategy")
538+
b, err := n.pulsar.Client.GetWithQueryParams(endpoint, nil, nil, false)
539+
if err != nil {
540+
return "", err
541+
}
542+
s, err := utils.ParseSchemaCompatibilityStrategy(strings.ReplaceAll(string(b), "\"", ""))
543+
if err != nil {
544+
return "", err
545+
}
546+
return s, nil
547+
}
548+
519549
func (n *namespaces) ClearOffloadDeleteLag(namespace utils.NameSpaceName) error {
520550
endpoint := n.pulsar.endpoint(n.basePath, namespace.String(), "offloadDeletionLagMs")
521551
return n.pulsar.Client.Delete(endpoint)

pulsaradmin/pkg/admin/namespace_test.go

Lines changed: 154 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package admin
1919

2020
import (
2121
"testing"
22+
"time"
2223

2324
"github.com/apache/pulsar-client-go/pulsaradmin/pkg/admin/config"
2425
"github.com/apache/pulsar-client-go/pulsaradmin/pkg/rest"
@@ -341,3 +342,156 @@ func TestNamespaces_GetOffloadThresholdInSeconds(t *testing.T) {
341342
expected := int64(60)
342343
assert.Equal(t, expected, offloadThresholdInSeconds)
343344
}
345+
346+
func TestNamespaces_SetSchemaCompatibilityStrategy(t *testing.T) {
347+
config := &config.Config{}
348+
admin, err := New(config)
349+
require.NoError(t, err)
350+
require.NotNil(t, admin)
351+
352+
tests := []struct {
353+
name string
354+
namespace string
355+
strategy utils.SchemaCompatibilityStrategy
356+
errReason string
357+
}{
358+
{
359+
name: "Set Undefined strategy",
360+
namespace: "public/default",
361+
strategy: utils.SchemaCompatibilityStrategyUndefined,
362+
errReason: "",
363+
},
364+
{
365+
name: "Set AlwaysIncompatible strategy",
366+
namespace: "public/default",
367+
strategy: utils.SchemaCompatibilityStrategyAlwaysIncompatible,
368+
errReason: "",
369+
},
370+
{
371+
name: "Set AlwaysCompatible strategy",
372+
namespace: "public/default",
373+
strategy: utils.SchemaCompatibilityStrategyAlwaysCompatible,
374+
errReason: "",
375+
},
376+
{
377+
name: "Set Backward strategy",
378+
namespace: "public/default",
379+
strategy: utils.SchemaCompatibilityStrategyBackward,
380+
errReason: "",
381+
},
382+
{
383+
name: "Set Forward strategy",
384+
namespace: "public/default",
385+
strategy: utils.SchemaCompatibilityStrategyForward,
386+
errReason: "",
387+
},
388+
{
389+
name: "Set Full strategy",
390+
namespace: "public/default",
391+
strategy: utils.SchemaCompatibilityStrategyFull,
392+
errReason: "",
393+
},
394+
{
395+
name: "Set BackwardTransitive strategy",
396+
namespace: "public/default",
397+
strategy: utils.SchemaCompatibilityStrategyBackwardTransitive,
398+
errReason: "",
399+
},
400+
{
401+
name: "Set ForwardTransitive strategy",
402+
namespace: "public/default",
403+
strategy: utils.SchemaCompatibilityStrategyForwardTransitive,
404+
errReason: "",
405+
},
406+
{
407+
name: "Set FullTransitive strategy",
408+
namespace: "public/default",
409+
strategy: utils.SchemaCompatibilityStrategyFullTransitive,
410+
errReason: "",
411+
},
412+
{
413+
name: "Set strategy on non-existent namespace",
414+
namespace: "public/nonexist",
415+
strategy: utils.SchemaCompatibilityStrategyFull,
416+
errReason: "Namespace does not exist",
417+
},
418+
{
419+
name: "Set strategy on non-existent tenant",
420+
namespace: "non-exist/default",
421+
strategy: utils.SchemaCompatibilityStrategyFull,
422+
errReason: "Tenant does not exist",
423+
},
424+
}
425+
426+
for _, tt := range tests {
427+
t.Run(tt.name, func(t *testing.T) {
428+
namespace, _ := utils.GetNamespaceName(tt.namespace)
429+
err := admin.Namespaces().SetSchemaCompatibilityStrategy(*namespace, tt.strategy)
430+
431+
// Skip test if network connection fails (Pulsar server not running)
432+
if err != nil {
433+
if _, ok := err.(rest.Error); !ok {
434+
t.Skipf("Skipping test due to network error: %v", err)
435+
}
436+
}
437+
438+
if tt.errReason == "" {
439+
assert.Equal(t, nil, err)
440+
} else {
441+
if restError, ok := err.(rest.Error); ok {
442+
assert.Equal(t, tt.errReason, restError.Reason)
443+
}
444+
}
445+
})
446+
}
447+
}
448+
449+
func TestNamespaces_GetSchemaCompatibilityStrategy(t *testing.T) {
450+
config := &config.Config{}
451+
admin, err := New(config)
452+
require.NoError(t, err)
453+
require.NotNil(t, admin)
454+
455+
namespace, _ := utils.GetNamespaceName("public/default")
456+
457+
// Test setting and getting different strategies
458+
testStrategies := []utils.SchemaCompatibilityStrategy{
459+
utils.SchemaCompatibilityStrategyFull,
460+
utils.SchemaCompatibilityStrategyBackward,
461+
utils.SchemaCompatibilityStrategyForward,
462+
utils.SchemaCompatibilityStrategyAlwaysCompatible,
463+
}
464+
465+
for _, strategy := range testStrategies {
466+
// Set the schema compatibility strategy
467+
err = admin.Namespaces().SetSchemaCompatibilityStrategy(*namespace, strategy)
468+
if err != nil {
469+
t.Skipf("Skipping test due to connection error: %v", err)
470+
}
471+
472+
// Wait for the strategy to be set
473+
time.Sleep(5 * time.Second)
474+
475+
// Get and verify the strategy
476+
retrievedStrategy, err := admin.Namespaces().GetSchemaCompatibilityStrategy(*namespace)
477+
if err != nil {
478+
t.Skipf("Skipping test due to connection error: %v", err)
479+
}
480+
assert.Equal(t, strategy, retrievedStrategy)
481+
}
482+
483+
// Test getting default strategy (should be Undefined after reset)
484+
err = admin.Namespaces().SetSchemaCompatibilityStrategy(*namespace, utils.SchemaCompatibilityStrategyUndefined)
485+
if err != nil {
486+
t.Skipf("Skipping test due to connection error: %v", err)
487+
}
488+
489+
// Wait for the strategy to be set
490+
time.Sleep(5 * time.Second)
491+
492+
defaultStrategy, err := admin.Namespaces().GetSchemaCompatibilityStrategy(*namespace)
493+
if err != nil {
494+
t.Skipf("Skipping test due to connection error: %v", err)
495+
}
496+
assert.Equal(t, utils.SchemaCompatibilityStrategyUndefined, defaultStrategy)
497+
}

pulsaradmin/pkg/admin/schema_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -133,12 +133,12 @@ func TestSchemas_CreateSchemaBySchemaInfo(t *testing.T) {
133133
compatibility, err := admin.Schemas().TestCompatibilityWithSchemaInfo(topic, schemaInfo)
134134
assert.NoError(t, err)
135135
assert.Equal(t, compatibility.IsCompatibility, true)
136-
assert.Equal(t, compatibility.SchemaCompatibilityStrategy, utils.SchemaCompatibilityStrategy("FULL"))
136+
assert.Equal(t, compatibility.SchemaCompatibilityStrategy, utils.SchemaCompatibilityStrategyFull)
137137

138138
compatibility, err = admin.Schemas().TestCompatibilityWithPostSchemaPayload(topic, schemaPayload)
139139
assert.NoError(t, err)
140140
assert.Equal(t, compatibility.IsCompatibility, true)
141-
assert.Equal(t, compatibility.SchemaCompatibilityStrategy, utils.SchemaCompatibilityStrategy("FULL"))
141+
assert.Equal(t, compatibility.SchemaCompatibilityStrategy, utils.SchemaCompatibilityStrategyFull)
142142

143143
err = admin.Schemas().ForceDeleteSchema(topic)
144144
assert.NoError(t, err)

pulsaradmin/pkg/utils/policies.go

Lines changed: 27 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -47,33 +47,36 @@ type Policies struct {
4747
PublishMaxMessageRate map[string]PublishRate `json:"publishMaxMessageRate"`
4848
ClusterSubscribeRate map[string]SubscribeRate `json:"clusterSubscribeRate"`
4949
TopicAutoCreationConfig *TopicAutoCreationConfig `json:"autoTopicCreationOverride"`
50-
SchemaCompatibilityStrategy SchemaCompatibilityStrategy `json:"schema_auto_update_compatibility_strategy"`
51-
AuthPolicies AuthPolicies `json:"auth_policies"`
52-
SubscriptionAuthMode SubscriptionAuthMode `json:"subscription_auth_mode"`
53-
IsAllowAutoUpdateSchema *bool `json:"is_allow_auto_update_schema"`
50+
//nolint:lll
51+
SchemaAutoUpdateCompatibilityStrategy SchemaAutoUpdateCompatibilityStrategy `json:"schema_auto_update_compatibility_strategy"`
52+
SchemaCompatibilityStrategy SchemaCompatibilityStrategy `json:"schema_compatibility_strategy"`
53+
AuthPolicies AuthPolicies `json:"auth_policies"`
54+
SubscriptionAuthMode SubscriptionAuthMode `json:"subscription_auth_mode"`
55+
IsAllowAutoUpdateSchema *bool `json:"is_allow_auto_update_schema"`
5456
}
5557

5658
func NewDefaultPolicies() *Policies {
5759
return &Policies{
58-
AuthPolicies: *NewAuthPolicies(),
59-
ReplicationClusters: make([]string, 0, 10),
60-
BacklogQuotaMap: make(map[BacklogQuotaType]BacklogQuota),
61-
TopicDispatchRate: make(map[string]DispatchRate),
62-
SubscriptionDispatchRate: make(map[string]DispatchRate),
63-
ReplicatorDispatchRate: make(map[string]DispatchRate),
64-
PublishMaxMessageRate: make(map[string]PublishRate),
65-
ClusterSubscribeRate: make(map[string]SubscribeRate),
66-
LatencyStatsSampleRate: make(map[string]int),
67-
MessageTTLInSeconds: nil,
68-
Deleted: false,
69-
EncryptionRequired: false,
70-
SubscriptionAuthMode: None,
71-
MaxProducersPerTopic: nil,
72-
MaxConsumersPerSubscription: nil,
73-
MaxConsumersPerTopic: nil,
74-
CompactionThreshold: nil,
75-
OffloadThreshold: -1,
76-
SchemaCompatibilityStrategy: Full,
77-
SchemaValidationEnforced: false,
60+
AuthPolicies: *NewAuthPolicies(),
61+
ReplicationClusters: make([]string, 0, 10),
62+
BacklogQuotaMap: make(map[BacklogQuotaType]BacklogQuota),
63+
TopicDispatchRate: make(map[string]DispatchRate),
64+
SubscriptionDispatchRate: make(map[string]DispatchRate),
65+
ReplicatorDispatchRate: make(map[string]DispatchRate),
66+
PublishMaxMessageRate: make(map[string]PublishRate),
67+
ClusterSubscribeRate: make(map[string]SubscribeRate),
68+
LatencyStatsSampleRate: make(map[string]int),
69+
MessageTTLInSeconds: nil,
70+
Deleted: false,
71+
EncryptionRequired: false,
72+
SubscriptionAuthMode: None,
73+
MaxProducersPerTopic: nil,
74+
MaxConsumersPerSubscription: nil,
75+
MaxConsumersPerTopic: nil,
76+
CompactionThreshold: nil,
77+
OffloadThreshold: -1,
78+
SchemaCompatibilityStrategy: SchemaCompatibilityStrategyUndefined,
79+
SchemaAutoUpdateCompatibilityStrategy: Full,
80+
SchemaValidationEnforced: false,
7881
}
7982
}

pulsaradmin/pkg/utils/schema_strategy.go

Lines changed: 57 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -17,22 +17,24 @@
1717

1818
package utils
1919

20-
import "github.com/pkg/errors"
20+
import (
21+
"github.com/pkg/errors"
22+
)
2123

22-
type SchemaCompatibilityStrategy string
24+
type SchemaAutoUpdateCompatibilityStrategy string
2325

2426
const (
25-
AutoUpdateDisabled SchemaCompatibilityStrategy = "AutoUpdateDisabled"
26-
Backward SchemaCompatibilityStrategy = "Backward"
27-
Forward SchemaCompatibilityStrategy = "Forward"
28-
Full SchemaCompatibilityStrategy = "Full"
29-
AlwaysCompatible SchemaCompatibilityStrategy = "AlwaysCompatible"
30-
BackwardTransitive SchemaCompatibilityStrategy = "BackwardTransitive"
31-
ForwardTransitive SchemaCompatibilityStrategy = "ForwardTransitive"
32-
FullTransitive SchemaCompatibilityStrategy = "FullTransitive"
27+
AutoUpdateDisabled SchemaAutoUpdateCompatibilityStrategy = "AutoUpdateDisabled"
28+
Backward SchemaAutoUpdateCompatibilityStrategy = "Backward"
29+
Forward SchemaAutoUpdateCompatibilityStrategy = "Forward"
30+
Full SchemaAutoUpdateCompatibilityStrategy = "Full"
31+
AlwaysCompatible SchemaAutoUpdateCompatibilityStrategy = "AlwaysCompatible"
32+
BackwardTransitive SchemaAutoUpdateCompatibilityStrategy = "BackwardTransitive"
33+
ForwardTransitive SchemaAutoUpdateCompatibilityStrategy = "ForwardTransitive"
34+
FullTransitive SchemaAutoUpdateCompatibilityStrategy = "FullTransitive"
3335
)
3436

35-
func ParseSchemaAutoUpdateCompatibilityStrategy(str string) (SchemaCompatibilityStrategy, error) {
37+
func ParseSchemaAutoUpdateCompatibilityStrategy(str string) (SchemaAutoUpdateCompatibilityStrategy, error) {
3638
switch str {
3739
case "AutoUpdateDisabled":
3840
return AutoUpdateDisabled, nil
@@ -51,7 +53,50 @@ func ParseSchemaAutoUpdateCompatibilityStrategy(str string) (SchemaCompatibility
5153
case "FullTransitive":
5254
return FullTransitive, nil
5355
default:
54-
return "", errors.Errorf("Invalid auth strategy %s", str)
56+
return "", errors.Errorf("Invalid schema auto update compatibility strategy %s", str)
57+
}
58+
}
59+
60+
func (s SchemaAutoUpdateCompatibilityStrategy) String() string {
61+
return string(s)
62+
}
63+
64+
type SchemaCompatibilityStrategy string
65+
66+
const (
67+
SchemaCompatibilityStrategyUndefined SchemaCompatibilityStrategy = "UNDEFINED"
68+
SchemaCompatibilityStrategyAlwaysIncompatible SchemaCompatibilityStrategy = "ALWAYS_INCOMPATIBLE"
69+
SchemaCompatibilityStrategyAlwaysCompatible SchemaCompatibilityStrategy = "ALWAYS_COMPATIBLE"
70+
SchemaCompatibilityStrategyBackward SchemaCompatibilityStrategy = "BACKWARD"
71+
SchemaCompatibilityStrategyForward SchemaCompatibilityStrategy = "FORWARD"
72+
SchemaCompatibilityStrategyFull SchemaCompatibilityStrategy = "FULL"
73+
SchemaCompatibilityStrategyBackwardTransitive SchemaCompatibilityStrategy = "BACKWARD_TRANSITIVE"
74+
SchemaCompatibilityStrategyForwardTransitive SchemaCompatibilityStrategy = "FORWARD_TRANSITIVE"
75+
SchemaCompatibilityStrategyFullTransitive SchemaCompatibilityStrategy = "FULL_TRANSITIVE"
76+
)
77+
78+
func ParseSchemaCompatibilityStrategy(str string) (SchemaCompatibilityStrategy, error) {
79+
switch str {
80+
case "UNDEFINED":
81+
return SchemaCompatibilityStrategyUndefined, nil
82+
case "ALWAYS_INCOMPATIBLE":
83+
return SchemaCompatibilityStrategyAlwaysIncompatible, nil
84+
case "ALWAYS_COMPATIBLE":
85+
return SchemaCompatibilityStrategyAlwaysCompatible, nil
86+
case "BACKWARD":
87+
return SchemaCompatibilityStrategyBackward, nil
88+
case "FORWARD":
89+
return SchemaCompatibilityStrategyForward, nil
90+
case "FULL":
91+
return SchemaCompatibilityStrategyFull, nil
92+
case "BACKWARD_TRANSITIVE":
93+
return SchemaCompatibilityStrategyBackwardTransitive, nil
94+
case "FORWARD_TRANSITIVE":
95+
return SchemaCompatibilityStrategyForwardTransitive, nil
96+
case "FULL_TRANSITIVE":
97+
return SchemaCompatibilityStrategyFullTransitive, nil
98+
default:
99+
return "", errors.Errorf("Invalid schema compatibility strategy %s", str)
55100
}
56101
}
57102

0 commit comments

Comments
 (0)