Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions pulsaradmin/pkg/admin/namespace.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,16 @@ type Namespaces interface {
// GetMaxProducersPerTopic returns the maxProducersPerTopic for a namespace.
GetMaxProducersPerTopic(namespace utils.NameSpaceName) (int, error)

// SetMaxTopicsPerNamespace sets maxTopicsPerNamespace for a namespace.
SetMaxTopicsPerNamespace(namespace utils.NameSpaceName, max int) error

// GetMaxTopicsPerNamespace returns the maxTopicsPerNamespace for a namespace.
GetMaxTopicsPerNamespace(namespace utils.NameSpaceName) (int, error)

// RemoveMaxTopicsPerNamespace removes maxTopicsPerNamespace configuration for a namespace,
// defaulting to broker settings
RemoveMaxTopicsPerNamespace(namespace utils.NameSpaceName) error

// GetNamespaceReplicationClusters returns the replication clusters for a namespace
GetNamespaceReplicationClusters(namespace string) ([]string, error)

Expand Down Expand Up @@ -996,3 +1006,20 @@ func (n *namespaces) RemoveProperties(namespace utils.NameSpaceName) error {
endpoint := n.pulsar.endpoint(n.basePath, namespace.String(), "properties")
return n.pulsar.Client.Delete(endpoint)
}

func (n *namespaces) SetMaxTopicsPerNamespace(namespace utils.NameSpaceName, max int) error {
endpoint := n.pulsar.endpoint(n.basePath, namespace.String(), "maxTopicsPerNamespace")
return n.pulsar.Client.Post(endpoint, max)
}

func (n *namespaces) GetMaxTopicsPerNamespace(namespace utils.NameSpaceName) (int, error) {
var result int
endpoint := n.pulsar.endpoint(n.basePath, namespace.String(), "maxTopicsPerNamespace")
err := n.pulsar.Client.Get(endpoint, &result)
return result, err
}

func (n *namespaces) RemoveMaxTopicsPerNamespace(namespace utils.NameSpaceName) error {
endpoint := n.pulsar.endpoint(n.basePath, namespace.String(), "maxTopicsPerNamespace")
return n.pulsar.Client.Delete(endpoint)
}
76 changes: 76 additions & 0 deletions pulsaradmin/pkg/admin/namespace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -523,3 +523,79 @@ func TestNamespaces_Properties(t *testing.T) {
assert.Equal(t, err, nil)
assert.Equal(t, actualPropertiesAfterRemoveCall, map[string]string{})
}

func TestNamespaces_SetMaxTopicsPerNamespace(t *testing.T) {
config := &config.Config{}
admin, err := New(config)
require.NoError(t, err)
require.NotNil(t, admin)

tests := []struct {
name string
namespace string
maxTopics int
errReason string
}{
{
name: "Set valid max topics per namespace",
namespace: "public/default",
maxTopics: 100,
errReason: "",
},
{
name: "Set invalid max topics per namespace",
namespace: "public/default",
maxTopics: -1,
errReason: "maxTopicsPerNamespace must be 0 or more",
},
{
name: "Set valid max topics per namespace: 0",
namespace: "public/default",
maxTopics: 0,
errReason: "",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
namespace, _ := utils.GetNamespaceName(tt.namespace)
err := admin.Namespaces().SetMaxTopicsPerNamespace(*namespace, tt.maxTopics)
if tt.errReason == "" {
assert.Equal(t, nil, err)

err = admin.Namespaces().RemoveMaxTopicsPerNamespace(*namespace)
assert.Equal(t, nil, err)
}
if err != nil {
restError := err.(rest.Error)
assert.Equal(t, tt.errReason, restError.Reason)
}
})
}
}

func TestNamespaces_GetMaxTopicsPerNamespace(t *testing.T) {

config := &config.Config{}
admin, err := New(config)
require.NoError(t, err)
require.NotNil(t, admin)

namespace, _ := utils.GetNamespaceName("public/default")

// set the max topics per namespace and get it
err = admin.Namespaces().SetMaxTopicsPerNamespace(*namespace, 100)
assert.Equal(t, nil, err)
maxTopics, err := admin.Namespaces().GetMaxTopicsPerNamespace(*namespace)
assert.Equal(t, nil, err)
expected := 100
assert.Equal(t, expected, maxTopics)

// remove the max topics per namespace and get it
err = admin.Namespaces().RemoveMaxTopicsPerNamespace(*namespace)
assert.Equal(t, nil, err)

maxTopics, err = admin.Namespaces().GetMaxTopicsPerNamespace(*namespace)
assert.Equal(t, nil, err)
expected = 0
assert.Equal(t, expected, maxTopics)
}