Skip to content

Commit afa35c5

Browse files
committed
Add max topics per namespace methods
1 parent a5fb2fc commit afa35c5

File tree

2 files changed

+105
-0
lines changed

2 files changed

+105
-0
lines changed

pulsaradmin/pkg/admin/namespace.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,16 @@ type Namespaces interface {
154154
// GetMaxProducersPerTopic returns the maxProducersPerTopic for a namespace.
155155
GetMaxProducersPerTopic(namespace utils.NameSpaceName) (int, error)
156156

157+
// SetMaxTopicsPerNamespace sets maxTopicsPerNamespace for a namespace.
158+
SetMaxTopicsPerNamespace(namespace utils.NameSpaceName, max int) error
159+
160+
// GetMaxTopicsPerNamespace returns the maxTopicsPerNamespace for a namespace.
161+
GetMaxTopicsPerNamespace(namespace utils.NameSpaceName) (int, error)
162+
163+
// RemoveMaxTopicsPerNamespace removes maxTopicsPerNamespace configuration for a namespace,
164+
// defaulting to broker settings
165+
RemoveMaxTopicsPerNamespace(namespace utils.NameSpaceName) error
166+
157167
// GetNamespaceReplicationClusters returns the replication clusters for a namespace
158168
GetNamespaceReplicationClusters(namespace string) ([]string, error)
159169

@@ -994,5 +1004,22 @@ func (n *namespaces) GetProperties(namespace utils.NameSpaceName) (map[string]st
9941004

9951005
func (n *namespaces) RemoveProperties(namespace utils.NameSpaceName) error {
9961006
endpoint := n.pulsar.endpoint(n.basePath, namespace.String(), "properties")
1007+
return n.pulsar.Client.Post(endpoint, nil)
1008+
}
1009+
1010+
func (n *namespaces) SetMaxTopicsPerNamespace(namespace utils.NameSpaceName, max int) error {
1011+
endpoint := n.pulsar.endpoint(n.basePath, namespace.String(), "maxTopicsPerNamespace")
1012+
return n.pulsar.Client.Post(endpoint, max)
1013+
}
1014+
1015+
func (n *namespaces) GetMaxTopicsPerNamespace(namespace utils.NameSpaceName) (int, error) {
1016+
var result int
1017+
endpoint := n.pulsar.endpoint(n.basePath, namespace.String(), "maxTopicsPerNamespace")
1018+
err := n.pulsar.Client.Get(endpoint, &result)
1019+
return result, err
1020+
}
1021+
1022+
func (n *namespaces) RemoveMaxTopicsPerNamespace(namespace utils.NameSpaceName) error {
1023+
endpoint := n.pulsar.endpoint(n.basePath, namespace.String(), "maxTopicsPerNamespace")
9971024
return n.pulsar.Client.Delete(endpoint)
9981025
}

pulsaradmin/pkg/admin/namespace_test.go

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -447,6 +447,7 @@ func TestNamespaces_SetSchemaCompatibilityStrategy(t *testing.T) {
447447
}
448448

449449
func TestNamespaces_GetSchemaCompatibilityStrategy(t *testing.T) {
450+
450451
config := &config.Config{}
451452
admin, err := New(config)
452453
require.NoError(t, err)
@@ -496,6 +497,56 @@ func TestNamespaces_GetSchemaCompatibilityStrategy(t *testing.T) {
496497
assert.Equal(t, utils.SchemaCompatibilityStrategyUndefined, defaultStrategy)
497498
}
498499

500+
func TestNamespaces_SetMaxTopicsPerNamespace(t *testing.T) {
501+
config := &config.Config{}
502+
admin, err := New(config)
503+
require.NoError(t, err)
504+
require.NotNil(t, admin)
505+
506+
tests := []struct {
507+
name string
508+
namespace string
509+
maxTopics int
510+
errReason string
511+
}{
512+
{
513+
name: "Set valid max topics per namespace",
514+
namespace: "public/default",
515+
maxTopics: 100,
516+
errReason: "",
517+
},
518+
{
519+
name: "Set invalid max topics per namespace",
520+
namespace: "public/default",
521+
maxTopics: -1,
522+
errReason: "maxTopicsPerNamespace must be 0 or more",
523+
},
524+
{
525+
name: "Set valid max topics per namespace: 0",
526+
namespace: "public/default",
527+
maxTopics: 0,
528+
errReason: "",
529+
},
530+
}
531+
for _, tt := range tests {
532+
t.Run(tt.name, func(t *testing.T) {
533+
namespace, _ := utils.GetNamespaceName(tt.namespace)
534+
err := admin.Namespaces().SetMaxTopicsPerNamespace(*namespace, tt.maxTopics)
535+
if tt.errReason == "" {
536+
assert.Equal(t, nil, err)
537+
538+
err = admin.Namespaces().RemoveMaxTopicsPerNamespace(*namespace)
539+
assert.Equal(t, nil, err)
540+
}
541+
if err != nil {
542+
restError := err.(rest.Error)
543+
assert.Equal(t, tt.errReason, restError.Reason)
544+
}
545+
})
546+
}
547+
548+
}
549+
499550
func TestNamespaces_Properties(t *testing.T) {
500551
config := &config.Config{}
501552
admin, err := New(config)
@@ -523,3 +574,30 @@ func TestNamespaces_Properties(t *testing.T) {
523574
assert.Equal(t, err, nil)
524575
assert.Equal(t, actualPropertiesAfterRemoveCall, map[string]string{})
525576
}
577+
578+
func TestNamespaces_GetMaxTopicsPerNamespace(t *testing.T) {
579+
580+
config := &config.Config{}
581+
admin, err := New(config)
582+
require.NoError(t, err)
583+
require.NotNil(t, admin)
584+
585+
namespace, _ := utils.GetNamespaceName("public/default")
586+
587+
// set the max topics per namespace and get it
588+
err = admin.Namespaces().SetMaxTopicsPerNamespace(*namespace, 100)
589+
assert.Equal(t, nil, err)
590+
maxTopics, err := admin.Namespaces().GetMaxTopicsPerNamespace(*namespace)
591+
assert.Equal(t, nil, err)
592+
expected := 100
593+
assert.Equal(t, expected, maxTopics)
594+
595+
// remove the max topics per namespace and get it
596+
err = admin.Namespaces().RemoveMaxTopicsPerNamespace(*namespace)
597+
assert.Equal(t, nil, err)
598+
599+
maxTopics, err = admin.Namespaces().GetMaxTopicsPerNamespace(*namespace)
600+
assert.Equal(t, nil, err)
601+
expected = 0
602+
assert.Equal(t, expected, maxTopics)
603+
}

0 commit comments

Comments
 (0)