Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 36 additions & 16 deletions pulsaradmin/pkg/admin/namespace.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,12 @@ type Namespaces interface {
// GetNamespaceMessageTTLWithContext returns the message TTL for a namespace. Returns -1 if not set
GetNamespaceMessageTTLWithContext(ctx context.Context, namespace string) (int, error)

// GetRetention returns the retention configuration for a namespace
// GetRetention returns the retention configuration for a namespace.
// Returns nil if the retention policy is not configured at the namespace level.
GetRetention(namespace string) (*utils.RetentionPolicies, error)

// GetRetentionWithContext returns the retention configuration for a namespace
// GetRetentionWithContext returns the retention configuration for a namespace.
// Returns nil if the retention policy is not configured at the namespace level.
GetRetentionWithContext(ctx context.Context, namespace string) (*utils.RetentionPolicies, error)

// SetRetention sets the retention configuration for all the topics on a namespace
Expand Down Expand Up @@ -132,10 +134,12 @@ type Namespaces interface {
// RemoveBacklogQuotaWithContext removes a backlog quota policy from a namespace
RemoveBacklogQuotaWithContext(ctx context.Context, namespace string) error

// GetTopicAutoCreation returns the topic auto-creation config for a namespace
// GetTopicAutoCreation returns the topic auto-creation config for a namespace.
// Returns nil if the topic auto-creation config is not configured at the namespace level.
GetTopicAutoCreation(namespace utils.NameSpaceName) (*utils.TopicAutoCreationConfig, error)

// GetTopicAutoCreationWithContext returns the topic auto-creation config for a namespace
// GetTopicAutoCreationWithContext returns the topic auto-creation config for a namespace.
// Returns nil if the topic auto-creation config is not configured at the namespace level.
GetTopicAutoCreationWithContext(
ctx context.Context,
namespace utils.NameSpaceName,
Expand Down Expand Up @@ -392,10 +396,12 @@ type Namespaces interface {
// SetPersistenceWithContext sets the persistence configuration for all the topics on a namespace
SetPersistenceWithContext(ctx context.Context, namespace string, persistence utils.PersistencePolicies) error

// GetPersistence returns the persistence configuration for a namespace
// GetPersistence returns the persistence configuration for a namespace.
// Returns nil if the persistence policy is not configured at the namespace level.
GetPersistence(namespace string) (*utils.PersistencePolicies, error)

// GetPersistenceWithContext returns the persistence configuration for a namespace
// GetPersistenceWithContext returns the persistence configuration for a namespace.
// Returns nil if the persistence policy is not configured at the namespace level.
GetPersistenceWithContext(ctx context.Context, namespace string) (*utils.PersistencePolicies, error)

// SetBookieAffinityGroup sets bookie affinity group for a namespace to isolate namespace write to bookies that are
Expand All @@ -416,10 +422,12 @@ type Namespaces interface {
// DeleteBookieAffinityGroupWithContext deletes bookie affinity group configured for a namespace
DeleteBookieAffinityGroupWithContext(ctx context.Context, namespace string) error

// GetBookieAffinityGroup returns bookie affinity group configured for a namespace
// GetBookieAffinityGroup returns bookie affinity group configured for a namespace.
// Returns nil if the bookie affinity group is not configured at the namespace level.
GetBookieAffinityGroup(namespace string) (*utils.BookieAffinityGroupData, error)

// GetBookieAffinityGroupWithContext returns bookie affinity group configured for a namespace
// GetBookieAffinityGroupWithContext returns bookie affinity group configured for a namespace.
// Returns nil if the bookie affinity group is not configured at the namespace level.
GetBookieAffinityGroupWithContext(ctx context.Context, namespace string) (*utils.BookieAffinityGroupData, error)

// Unload a namespace from the current serving broker
Expand Down Expand Up @@ -905,8 +913,11 @@ func (n *namespaces) GetRetentionWithContext(ctx context.Context, namespace stri
return nil, err
}
endpoint := n.pulsar.endpoint(n.basePath, nsName.String(), "retention")
err = n.pulsar.Client.GetWithContext(ctx, endpoint, &policy)
return &policy, err
body, err := n.pulsar.Client.GetBodyWithContext(ctx, endpoint, &policy)
if body != nil {
return &policy, err
}
return nil, err
}

func (n *namespaces) GetBacklogQuotaMap(namespace string) (map[utils.BacklogQuotaType]utils.BacklogQuota, error) {
Expand Down Expand Up @@ -970,8 +981,11 @@ func (n *namespaces) GetTopicAutoCreationWithContext(
) (*utils.TopicAutoCreationConfig, error) {
var topicAutoCreation utils.TopicAutoCreationConfig
endpoint := n.pulsar.endpoint(n.basePath, namespace.String(), "autoTopicCreation")
err := n.pulsar.Client.GetWithContext(ctx, endpoint, &topicAutoCreation)
return &topicAutoCreation, err
body, err := n.pulsar.Client.GetBodyWithContext(ctx, endpoint, &topicAutoCreation)
if body != nil {
return &topicAutoCreation, err
}
return nil, err
}

func (n *namespaces) SetTopicAutoCreation(namespace utils.NameSpaceName, config utils.TopicAutoCreationConfig) error {
Expand Down Expand Up @@ -1463,8 +1477,11 @@ func (n *namespaces) GetBookieAffinityGroupWithContext(
return nil, err
}
endpoint := n.pulsar.endpoint(n.basePath, nsName.String(), "persistence", "bookieAffinity")
err = n.pulsar.Client.GetWithContext(ctx, endpoint, &data)
return &data, err
body, err := n.pulsar.Client.GetBodyWithContext(ctx, endpoint, &data)
if body != nil {
return &data, err
}
return nil, err
}

func (n *namespaces) GetPersistence(namespace string) (*utils.PersistencePolicies, error) {
Expand All @@ -1481,8 +1498,11 @@ func (n *namespaces) GetPersistenceWithContext(
return nil, err
}
endpoint := n.pulsar.endpoint(n.basePath, nsName.String(), "persistence")
err = n.pulsar.Client.GetWithContext(ctx, endpoint, &persistence)
return &persistence, err
body, err := n.pulsar.Client.GetBodyWithContext(ctx, endpoint, &persistence)
if body != nil {
return &persistence, err
}
return nil, err
}

func (n *namespaces) Unload(namespace string) error {
Expand Down
111 changes: 105 additions & 6 deletions pulsaradmin/pkg/admin/namespace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package admin

import (
"os"
"testing"
"time"

Expand Down Expand Up @@ -156,23 +157,20 @@ func TestGetTopicAutoCreation(t *testing.T) {
assert.Equal(t, nil, err)
topicAutoCreation, err := admin.Namespaces().GetTopicAutoCreation(*namespace)
assert.Equal(t, nil, err)
assert.NotNil(t, topicAutoCreation, "Expected non-nil when topic auto creation is configured")
expected := utils.TopicAutoCreationConfig{
Allow: true,
Type: utils.NonPartitioned,
}
assert.Equal(t, expected, *topicAutoCreation)

// remove the topic auto creation config and get it
// remove the topic auto creation config and get it - should return nil
err = admin.Namespaces().RemoveTopicAutoCreation(*namespace)
assert.Equal(t, nil, err)

topicAutoCreation, err = admin.Namespaces().GetTopicAutoCreation(*namespace)
assert.Equal(t, nil, err)
expected = utils.TopicAutoCreationConfig{
Allow: false,
Type: "",
}
assert.Equal(t, expected, *topicAutoCreation)
assert.Nil(t, topicAutoCreation, "Expected nil when topic auto creation is not configured")
}

func TestRevokeSubPermission(t *testing.T) {
Expand Down Expand Up @@ -763,3 +761,104 @@ func TestNamespaces_MaxProducersPerTopic(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, 50, maxProducers)
}

func TestNamespaces_Retention(t *testing.T) {
config := &config.Config{}
admin, err := New(config)
require.NoError(t, err)
require.NotNil(t, admin)

namespaceName := "public/default"

// Initial state: policy not configured, should return nil
retention, err := admin.Namespaces().GetRetention(namespaceName)
assert.NoError(t, err)
assert.Nil(t, retention, "Expected nil when retention is not configured")

// Set new retention policy
newRetention := utils.RetentionPolicies{
RetentionSizeInMB: 1024,
RetentionTimeInMinutes: 60,
}
err = admin.Namespaces().SetRetention(namespaceName, newRetention)
assert.NoError(t, err)

// Verify retention is set
retention, err = admin.Namespaces().GetRetention(namespaceName)
assert.NoError(t, err)
assert.NotNil(t, retention, "Expected non-nil when retention is configured")
assert.Equal(t, int64(1024), retention.RetentionSizeInMB)
assert.Equal(t, 60, retention.RetentionTimeInMinutes)
}

func TestNamespaces_BookieAffinityGroup(t *testing.T) {
readFile, err := os.ReadFile("../../../integration-tests/tokens/admin-token")
require.NoError(t, err)

config := &config.Config{
Token: string(readFile),
}
admin, err := New(config)
require.NoError(t, err)
require.NotNil(t, admin)

namespaceName := "public/default"

// Initial state: policy not configured, should return nil
bookieAffinity, err := admin.Namespaces().GetBookieAffinityGroup(namespaceName)
assert.NoError(t, err)
assert.Nil(t, bookieAffinity, "Expected nil when bookie affinity group is not configured")

// Set new bookie affinity group
newBookieAffinity := utils.BookieAffinityGroupData{
BookkeeperAffinityGroupPrimary: "primary-group",
BookkeeperAffinityGroupSecondary: "secondary-group",
}
err = admin.Namespaces().SetBookieAffinityGroup(namespaceName, newBookieAffinity)
assert.NoError(t, err)

// Verify bookie affinity group is set
bookieAffinity, err = admin.Namespaces().GetBookieAffinityGroup(namespaceName)
assert.NoError(t, err)
assert.NotNil(t, bookieAffinity, "Expected non-nil when bookie affinity group is configured")
assert.Equal(t, "primary-group", bookieAffinity.BookkeeperAffinityGroupPrimary)
assert.Equal(t, "secondary-group", bookieAffinity.BookkeeperAffinityGroupSecondary)

// Remove bookie affinity group - should return nil
err = admin.Namespaces().DeleteBookieAffinityGroup(namespaceName)
assert.NoError(t, err)
bookieAffinity, err = admin.Namespaces().GetBookieAffinityGroup(namespaceName)
assert.NoError(t, err)
assert.Nil(t, bookieAffinity, "Expected nil after removing bookie affinity group")
}

func TestNamespaces_Persistence(t *testing.T) {
config := &config.Config{}
admin, err := New(config)
require.NoError(t, err)
require.NotNil(t, admin)

namespaceName := "public/default"

// Initial state: policy not configured, should return nil
persistence, err := admin.Namespaces().GetPersistence(namespaceName)
assert.NoError(t, err)
assert.Nil(t, persistence, "Expected nil when persistence is not configured")

// Set new persistence policy
newPersistence := utils.PersistencePolicies{
BookkeeperEnsemble: 1,
BookkeeperWriteQuorum: 1,
BookkeeperAckQuorum: 1,
ManagedLedgerMaxMarkDeleteRate: 10.0,
}
err = admin.Namespaces().SetPersistence(namespaceName, newPersistence)
assert.NoError(t, err)

// Verify persistence is set
persistence, err = admin.Namespaces().GetPersistence(namespaceName)
assert.NoError(t, err)
assert.NotNil(t, persistence, "Expected non-nil when persistence is configured")
assert.Equal(t, 1, persistence.BookkeeperEnsemble)
assert.Equal(t, 1, persistence.BookkeeperWriteQuorum)
}
Loading