diff --git a/pulsaradmin/pkg/admin/namespace.go b/pulsaradmin/pkg/admin/namespace.go index 6114f5ce79..d5d33f1222 100644 --- a/pulsaradmin/pkg/admin/namespace.go +++ b/pulsaradmin/pkg/admin/namespace.go @@ -94,10 +94,12 @@ type Namespaces interface { // GetNamespaceMessageTTLWithContext returns the message TTL for a namespace. Returns -1 if not set GetNamespaceMessageTTLWithContext(ctx context.Context, namespace string) (int, error) - // GetRetention returns the retention configuration for a namespace + // GetRetention returns the retention configuration for a namespace. + // Returns nil if the retention policy is not configured at the namespace level. GetRetention(namespace string) (*utils.RetentionPolicies, error) - // GetRetentionWithContext returns the retention configuration for a namespace + // GetRetentionWithContext returns the retention configuration for a namespace. + // Returns nil if the retention policy is not configured at the namespace level. GetRetentionWithContext(ctx context.Context, namespace string) (*utils.RetentionPolicies, error) // SetRetention sets the retention configuration for all the topics on a namespace @@ -132,10 +134,12 @@ type Namespaces interface { // RemoveBacklogQuotaWithContext removes a backlog quota policy from a namespace RemoveBacklogQuotaWithContext(ctx context.Context, namespace string) error - // GetTopicAutoCreation returns the topic auto-creation config for a namespace + // GetTopicAutoCreation returns the topic auto-creation config for a namespace. + // Returns nil if the topic auto-creation config is not configured at the namespace level. GetTopicAutoCreation(namespace utils.NameSpaceName) (*utils.TopicAutoCreationConfig, error) - // GetTopicAutoCreationWithContext returns the topic auto-creation config for a namespace + // GetTopicAutoCreationWithContext returns the topic auto-creation config for a namespace. + // Returns nil if the topic auto-creation config is not configured at the namespace level. GetTopicAutoCreationWithContext( ctx context.Context, namespace utils.NameSpaceName, @@ -392,10 +396,12 @@ type Namespaces interface { // SetPersistenceWithContext sets the persistence configuration for all the topics on a namespace SetPersistenceWithContext(ctx context.Context, namespace string, persistence utils.PersistencePolicies) error - // GetPersistence returns the persistence configuration for a namespace + // GetPersistence returns the persistence configuration for a namespace. + // Returns nil if the persistence policy is not configured at the namespace level. GetPersistence(namespace string) (*utils.PersistencePolicies, error) - // GetPersistenceWithContext returns the persistence configuration for a namespace + // GetPersistenceWithContext returns the persistence configuration for a namespace. + // Returns nil if the persistence policy is not configured at the namespace level. GetPersistenceWithContext(ctx context.Context, namespace string) (*utils.PersistencePolicies, error) // SetBookieAffinityGroup sets bookie affinity group for a namespace to isolate namespace write to bookies that are @@ -416,10 +422,12 @@ type Namespaces interface { // DeleteBookieAffinityGroupWithContext deletes bookie affinity group configured for a namespace DeleteBookieAffinityGroupWithContext(ctx context.Context, namespace string) error - // GetBookieAffinityGroup returns bookie affinity group configured for a namespace + // GetBookieAffinityGroup returns bookie affinity group configured for a namespace. + // Returns nil if the bookie affinity group is not configured at the namespace level. GetBookieAffinityGroup(namespace string) (*utils.BookieAffinityGroupData, error) - // GetBookieAffinityGroupWithContext returns bookie affinity group configured for a namespace + // GetBookieAffinityGroupWithContext returns bookie affinity group configured for a namespace. + // Returns nil if the bookie affinity group is not configured at the namespace level. GetBookieAffinityGroupWithContext(ctx context.Context, namespace string) (*utils.BookieAffinityGroupData, error) // Unload a namespace from the current serving broker @@ -905,8 +913,11 @@ func (n *namespaces) GetRetentionWithContext(ctx context.Context, namespace stri return nil, err } endpoint := n.pulsar.endpoint(n.basePath, nsName.String(), "retention") - err = n.pulsar.Client.GetWithContext(ctx, endpoint, &policy) - return &policy, err + body, err := n.pulsar.Client.GetBodyWithContext(ctx, endpoint, &policy) + if body != nil { + return &policy, err + } + return nil, err } func (n *namespaces) GetBacklogQuotaMap(namespace string) (map[utils.BacklogQuotaType]utils.BacklogQuota, error) { @@ -970,8 +981,11 @@ func (n *namespaces) GetTopicAutoCreationWithContext( ) (*utils.TopicAutoCreationConfig, error) { var topicAutoCreation utils.TopicAutoCreationConfig endpoint := n.pulsar.endpoint(n.basePath, namespace.String(), "autoTopicCreation") - err := n.pulsar.Client.GetWithContext(ctx, endpoint, &topicAutoCreation) - return &topicAutoCreation, err + body, err := n.pulsar.Client.GetBodyWithContext(ctx, endpoint, &topicAutoCreation) + if body != nil { + return &topicAutoCreation, err + } + return nil, err } func (n *namespaces) SetTopicAutoCreation(namespace utils.NameSpaceName, config utils.TopicAutoCreationConfig) error { @@ -1463,8 +1477,11 @@ func (n *namespaces) GetBookieAffinityGroupWithContext( return nil, err } endpoint := n.pulsar.endpoint(n.basePath, nsName.String(), "persistence", "bookieAffinity") - err = n.pulsar.Client.GetWithContext(ctx, endpoint, &data) - return &data, err + body, err := n.pulsar.Client.GetBodyWithContext(ctx, endpoint, &data) + if body != nil { + return &data, err + } + return nil, err } func (n *namespaces) GetPersistence(namespace string) (*utils.PersistencePolicies, error) { @@ -1481,8 +1498,11 @@ func (n *namespaces) GetPersistenceWithContext( return nil, err } endpoint := n.pulsar.endpoint(n.basePath, nsName.String(), "persistence") - err = n.pulsar.Client.GetWithContext(ctx, endpoint, &persistence) - return &persistence, err + body, err := n.pulsar.Client.GetBodyWithContext(ctx, endpoint, &persistence) + if body != nil { + return &persistence, err + } + return nil, err } func (n *namespaces) Unload(namespace string) error { diff --git a/pulsaradmin/pkg/admin/namespace_test.go b/pulsaradmin/pkg/admin/namespace_test.go index cfa44951c8..1ca235f50d 100644 --- a/pulsaradmin/pkg/admin/namespace_test.go +++ b/pulsaradmin/pkg/admin/namespace_test.go @@ -18,6 +18,7 @@ package admin import ( + "os" "testing" "time" @@ -156,23 +157,20 @@ func TestGetTopicAutoCreation(t *testing.T) { assert.Equal(t, nil, err) topicAutoCreation, err := admin.Namespaces().GetTopicAutoCreation(*namespace) assert.Equal(t, nil, err) + assert.NotNil(t, topicAutoCreation, "Expected non-nil when topic auto creation is configured") expected := utils.TopicAutoCreationConfig{ Allow: true, Type: utils.NonPartitioned, } assert.Equal(t, expected, *topicAutoCreation) - // remove the topic auto creation config and get it + // remove the topic auto creation config and get it - should return nil err = admin.Namespaces().RemoveTopicAutoCreation(*namespace) assert.Equal(t, nil, err) topicAutoCreation, err = admin.Namespaces().GetTopicAutoCreation(*namespace) assert.Equal(t, nil, err) - expected = utils.TopicAutoCreationConfig{ - Allow: false, - Type: "", - } - assert.Equal(t, expected, *topicAutoCreation) + assert.Nil(t, topicAutoCreation, "Expected nil when topic auto creation is not configured") } func TestRevokeSubPermission(t *testing.T) { @@ -763,3 +761,104 @@ func TestNamespaces_MaxProducersPerTopic(t *testing.T) { assert.NoError(t, err) assert.Equal(t, 50, maxProducers) } + +func TestNamespaces_Retention(t *testing.T) { + config := &config.Config{} + admin, err := New(config) + require.NoError(t, err) + require.NotNil(t, admin) + + namespaceName := "public/default" + + // Initial state: policy not configured, should return nil + retention, err := admin.Namespaces().GetRetention(namespaceName) + assert.NoError(t, err) + assert.Nil(t, retention, "Expected nil when retention is not configured") + + // Set new retention policy + newRetention := utils.RetentionPolicies{ + RetentionSizeInMB: 1024, + RetentionTimeInMinutes: 60, + } + err = admin.Namespaces().SetRetention(namespaceName, newRetention) + assert.NoError(t, err) + + // Verify retention is set + retention, err = admin.Namespaces().GetRetention(namespaceName) + assert.NoError(t, err) + assert.NotNil(t, retention, "Expected non-nil when retention is configured") + assert.Equal(t, int64(1024), retention.RetentionSizeInMB) + assert.Equal(t, 60, retention.RetentionTimeInMinutes) +} + +func TestNamespaces_BookieAffinityGroup(t *testing.T) { + readFile, err := os.ReadFile("../../../integration-tests/tokens/admin-token") + require.NoError(t, err) + + config := &config.Config{ + Token: string(readFile), + } + admin, err := New(config) + require.NoError(t, err) + require.NotNil(t, admin) + + namespaceName := "public/default" + + // Initial state: policy not configured, should return nil + bookieAffinity, err := admin.Namespaces().GetBookieAffinityGroup(namespaceName) + assert.NoError(t, err) + assert.Nil(t, bookieAffinity, "Expected nil when bookie affinity group is not configured") + + // Set new bookie affinity group + newBookieAffinity := utils.BookieAffinityGroupData{ + BookkeeperAffinityGroupPrimary: "primary-group", + BookkeeperAffinityGroupSecondary: "secondary-group", + } + err = admin.Namespaces().SetBookieAffinityGroup(namespaceName, newBookieAffinity) + assert.NoError(t, err) + + // Verify bookie affinity group is set + bookieAffinity, err = admin.Namespaces().GetBookieAffinityGroup(namespaceName) + assert.NoError(t, err) + assert.NotNil(t, bookieAffinity, "Expected non-nil when bookie affinity group is configured") + assert.Equal(t, "primary-group", bookieAffinity.BookkeeperAffinityGroupPrimary) + assert.Equal(t, "secondary-group", bookieAffinity.BookkeeperAffinityGroupSecondary) + + // Remove bookie affinity group - should return nil + err = admin.Namespaces().DeleteBookieAffinityGroup(namespaceName) + assert.NoError(t, err) + bookieAffinity, err = admin.Namespaces().GetBookieAffinityGroup(namespaceName) + assert.NoError(t, err) + assert.Nil(t, bookieAffinity, "Expected nil after removing bookie affinity group") +} + +func TestNamespaces_Persistence(t *testing.T) { + config := &config.Config{} + admin, err := New(config) + require.NoError(t, err) + require.NotNil(t, admin) + + namespaceName := "public/default" + + // Initial state: policy not configured, should return nil + persistence, err := admin.Namespaces().GetPersistence(namespaceName) + assert.NoError(t, err) + assert.Nil(t, persistence, "Expected nil when persistence is not configured") + + // Set new persistence policy + newPersistence := utils.PersistencePolicies{ + BookkeeperEnsemble: 1, + BookkeeperWriteQuorum: 1, + BookkeeperAckQuorum: 1, + ManagedLedgerMaxMarkDeleteRate: 10.0, + } + err = admin.Namespaces().SetPersistence(namespaceName, newPersistence) + assert.NoError(t, err) + + // Verify persistence is set + persistence, err = admin.Namespaces().GetPersistence(namespaceName) + assert.NoError(t, err) + assert.NotNil(t, persistence, "Expected non-nil when persistence is configured") + assert.Equal(t, 1, persistence.BookkeeperEnsemble) + assert.Equal(t, 1, persistence.BookkeeperWriteQuorum) +} diff --git a/pulsaradmin/pkg/admin/topic.go b/pulsaradmin/pkg/admin/topic.go index 37bbe94aaa..de19513898 100644 --- a/pulsaradmin/pkg/admin/topic.go +++ b/pulsaradmin/pkg/admin/topic.go @@ -543,10 +543,12 @@ type Topics interface { // RemoveMaxUnackMessagesPerSubscriptionWithContext removes max unacked messages policy on subscription for a topic RemoveMaxUnackMessagesPerSubscriptionWithContext(context.Context, utils.TopicName) error - // GetPersistence returns the persistence policies for a topic + // GetPersistence returns the persistence policies for a topic. + // Returns nil if the persistence policy is not configured at the topic level. GetPersistence(utils.TopicName) (*utils.PersistenceData, error) - // GetPersistenceWithContext returns the persistence policies for a topic + // GetPersistenceWithContext returns the persistence policies for a topic. + // Returns nil if the persistence policy is not configured at the topic level. GetPersistenceWithContext(context.Context, utils.TopicName) (*utils.PersistenceData, error) // SetPersistence sets the persistence policies for a topic @@ -561,10 +563,12 @@ type Topics interface { // RemovePersistenceWithContext removes the persistence policies for a topic RemovePersistenceWithContext(context.Context, utils.TopicName) error - // GetDelayedDelivery returns the delayed delivery policy for a topic + // GetDelayedDelivery returns the delayed delivery policy for a topic. + // Returns nil if the delayed delivery policy is not configured at the topic level. GetDelayedDelivery(utils.TopicName) (*utils.DelayedDeliveryData, error) - // GetDelayedDeliveryWithContext returns the delayed delivery policy for a topic + // GetDelayedDeliveryWithContext returns the delayed delivery policy for a topic. + // Returns nil if the delayed delivery policy is not configured at the topic level. GetDelayedDeliveryWithContext(context.Context, utils.TopicName) (*utils.DelayedDeliveryData, error) // SetDelayedDelivery sets the delayed delivery policy on a topic @@ -579,10 +583,12 @@ type Topics interface { // RemoveDelayedDeliveryWithContext removes the delayed delivery policy on a topic RemoveDelayedDeliveryWithContext(context.Context, utils.TopicName) error - // GetDispatchRate returns message dispatch rate for a topic + // GetDispatchRate returns message dispatch rate for a topic. + // Returns nil if the dispatch rate is not configured at the topic level. GetDispatchRate(utils.TopicName) (*utils.DispatchRateData, error) - // GetDispatchRateWithContext returns message dispatch rate for a topic + // GetDispatchRateWithContext returns message dispatch rate for a topic. + // Returns nil if the dispatch rate is not configured at the topic level. GetDispatchRateWithContext(context.Context, utils.TopicName) (*utils.DispatchRateData, error) // SetDispatchRate sets message dispatch rate for a topic @@ -597,10 +603,12 @@ type Topics interface { // RemoveDispatchRateWithContext removes message dispatch rate for a topic RemoveDispatchRateWithContext(context.Context, utils.TopicName) error - // GetPublishRate returns message publish rate for a topic + // GetPublishRate returns message publish rate for a topic. + // Returns nil if the publish rate is not configured at the topic level. GetPublishRate(utils.TopicName) (*utils.PublishRateData, error) - // GetPublishRateWithContext returns message publish rate for a topic + // GetPublishRateWithContext returns message publish rate for a topic. + // Returns nil if the publish rate is not configured at the topic level. GetPublishRateWithContext(context.Context, utils.TopicName) (*utils.PublishRateData, error) // SetPublishRate sets message publish rate for a topic @@ -645,7 +653,8 @@ type Topics interface { // RemoveDeduplicationStatusWithContext removes the deduplication policy for a topic RemoveDeduplicationStatusWithContext(context.Context, utils.TopicName) error - // GetRetention returns the retention configuration for a topic + // GetRetention returns the retention configuration for a topic. + // Returns nil if the retention policy is not configured at the topic level. // // @param topic // topicName struct @@ -654,7 +663,8 @@ type Topics interface { // in namespace or broker level, if no policy set in topic level GetRetention(topic utils.TopicName, applied bool) (*utils.RetentionPolicies, error) - // GetRetentionWithContext returns the retention configuration for a topic + // GetRetentionWithContext returns the retention configuration for a topic. + // Returns nil if the retention policy is not configured at the topic level. // // @param ctx // context used for the request @@ -821,10 +831,12 @@ type Topics interface { // list of replication cluster id SetReplicationClustersWithContext(ctx context.Context, topic utils.TopicName, data []string) error - // GetSubscribeRate returns subscribe rate configuration for a topic + // GetSubscribeRate returns subscribe rate configuration for a topic. + // Returns nil if the subscribe rate is not configured at the topic level. GetSubscribeRate(utils.TopicName) (*utils.SubscribeRate, error) - // GetSubscribeRateWithContext returns subscribe rate configuration for a topic + // GetSubscribeRateWithContext returns subscribe rate configuration for a topic. + // Returns nil if the subscribe rate is not configured at the topic level. GetSubscribeRateWithContext(context.Context, utils.TopicName) (*utils.SubscribeRate, error) // SetSubscribeRate sets subscribe rate configuration for a topic @@ -839,10 +851,12 @@ type Topics interface { // RemoveSubscribeRateWithContext removes subscribe rate configuration for a topic RemoveSubscribeRateWithContext(context.Context, utils.TopicName) error - // GetSubscriptionDispatchRate returns subscription dispatch rate for a topic + // GetSubscriptionDispatchRate returns subscription dispatch rate for a topic. + // Returns nil if the subscription dispatch rate is not configured at the topic level. GetSubscriptionDispatchRate(utils.TopicName) (*utils.DispatchRateData, error) - // GetSubscriptionDispatchRateWithContext returns subscription dispatch rate for a topic + // GetSubscriptionDispatchRateWithContext returns subscription dispatch rate for a topic. + // Returns nil if the subscription dispatch rate is not configured at the topic level. GetSubscriptionDispatchRateWithContext(context.Context, utils.TopicName) (*utils.DispatchRateData, error) // SetSubscriptionDispatchRate sets subscription dispatch rate for a topic @@ -948,10 +962,12 @@ type Topics interface { // RemoveDeduplicationSnapshotIntervalWithContext removes deduplication snapshot interval for a topic RemoveDeduplicationSnapshotIntervalWithContext(context.Context, utils.TopicName) error - // GetReplicatorDispatchRate returns replicator dispatch rate for a topic + // GetReplicatorDispatchRate returns replicator dispatch rate for a topic. + // Returns nil if the replicator dispatch rate is not configured at the topic level. GetReplicatorDispatchRate(utils.TopicName) (*utils.DispatchRateData, error) - // GetReplicatorDispatchRateWithContext returns replicator dispatch rate for a topic + // GetReplicatorDispatchRateWithContext returns replicator dispatch rate for a topic. + // Returns nil if the replicator dispatch rate is not configured at the topic level. GetReplicatorDispatchRateWithContext(context.Context, utils.TopicName) (*utils.DispatchRateData, error) // SetReplicatorDispatchRate sets replicator dispatch rate for a topic @@ -966,10 +982,12 @@ type Topics interface { // RemoveReplicatorDispatchRateWithContext removes replicator dispatch rate for a topic RemoveReplicatorDispatchRateWithContext(context.Context, utils.TopicName) error - // GetOffloadPolicies returns offload policies for a topic + // GetOffloadPolicies returns offload policies for a topic. + // Returns nil if the offload policies are not configured at the topic level. GetOffloadPolicies(utils.TopicName) (*utils.OffloadPolicies, error) - // GetOffloadPoliciesWithContext returns offload policies for a topic + // GetOffloadPoliciesWithContext returns offload policies for a topic. + // Returns nil if the offload policies are not configured at the topic level. GetOffloadPoliciesWithContext(context.Context, utils.TopicName) (*utils.OffloadPolicies, error) // SetOffloadPolicies sets offload policies for a topic @@ -984,10 +1002,12 @@ type Topics interface { // RemoveOffloadPoliciesWithContext removes offload policies for a topic RemoveOffloadPoliciesWithContext(context.Context, utils.TopicName) error - // GetAutoSubscriptionCreation returns auto subscription creation override for a topic + // GetAutoSubscriptionCreation returns auto subscription creation override for a topic. + // Returns nil if the auto subscription creation override is not configured at the topic level. GetAutoSubscriptionCreation(utils.TopicName) (*utils.AutoSubscriptionCreationOverride, error) - // GetAutoSubscriptionCreationWithContext returns auto subscription creation override for a topic + // GetAutoSubscriptionCreationWithContext returns auto subscription creation override for a topic. + // Returns nil if the auto subscription creation override is not configured at the topic level. GetAutoSubscriptionCreationWithContext( context.Context, utils.TopicName, @@ -1629,8 +1649,11 @@ func (t *topics) GetPersistence(topic utils.TopicName) (*utils.PersistenceData, func (t *topics) GetPersistenceWithContext(ctx context.Context, topic utils.TopicName) (*utils.PersistenceData, error) { var persistenceData utils.PersistenceData endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "persistence") - err := t.pulsar.Client.GetWithContext(ctx, endpoint, &persistenceData) - return &persistenceData, err + body, err := t.pulsar.Client.GetBodyWithContext(ctx, endpoint, &persistenceData) + if body != nil { + return &persistenceData, err + } + return nil, err } func (t *topics) SetPersistence(topic utils.TopicName, persistenceData utils.PersistenceData) error { @@ -1665,8 +1688,11 @@ func (t *topics) GetDelayedDeliveryWithContext( ) (*utils.DelayedDeliveryData, error) { var delayedDeliveryData utils.DelayedDeliveryData endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "delayedDelivery") - err := t.pulsar.Client.GetWithContext(ctx, endpoint, &delayedDeliveryData) - return &delayedDeliveryData, err + body, err := t.pulsar.Client.GetBodyWithContext(ctx, endpoint, &delayedDeliveryData) + if body != nil { + return &delayedDeliveryData, err + } + return nil, err } func (t *topics) SetDelayedDelivery(topic utils.TopicName, delayedDeliveryData utils.DelayedDeliveryData) error { @@ -1701,8 +1727,11 @@ func (t *topics) GetDispatchRateWithContext( ) (*utils.DispatchRateData, error) { var dispatchRateData utils.DispatchRateData endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "dispatchRate") - err := t.pulsar.Client.GetWithContext(ctx, endpoint, &dispatchRateData) - return &dispatchRateData, err + body, err := t.pulsar.Client.GetBodyWithContext(ctx, endpoint, &dispatchRateData) + if body != nil { + return &dispatchRateData, err + } + return nil, err } func (t *topics) SetDispatchRate(topic utils.TopicName, dispatchRateData utils.DispatchRateData) error { @@ -1734,8 +1763,11 @@ func (t *topics) GetPublishRate(topic utils.TopicName) (*utils.PublishRateData, func (t *topics) GetPublishRateWithContext(ctx context.Context, topic utils.TopicName) (*utils.PublishRateData, error) { var publishRateData utils.PublishRateData endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "publishRate") - err := t.pulsar.Client.GetWithContext(ctx, endpoint, &publishRateData) - return &publishRateData, err + body, err := t.pulsar.Client.GetBodyWithContext(ctx, endpoint, &publishRateData) + if body != nil { + return &publishRateData, err + } + return nil, err } func (t *topics) SetPublishRate(topic utils.TopicName, publishRateData utils.PublishRateData) error { @@ -1800,10 +1832,13 @@ func (t *topics) GetRetentionWithContext( ) (*utils.RetentionPolicies, error) { var policy utils.RetentionPolicies endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "retention") - _, err := t.pulsar.Client.GetWithQueryParamsWithContext(ctx, endpoint, &policy, map[string]string{ + body, err := t.pulsar.Client.GetWithQueryParamsWithContext(ctx, endpoint, &policy, map[string]string{ "applied": strconv.FormatBool(applied), }, true) - return &policy, err + if body != nil { + return &policy, err + } + return nil, err } func (t *topics) RemoveRetention(topic utils.TopicName) error { @@ -1979,8 +2014,11 @@ func (t *topics) GetSubscribeRate(topic utils.TopicName) (*utils.SubscribeRate, func (t *topics) GetSubscribeRateWithContext(ctx context.Context, topic utils.TopicName) (*utils.SubscribeRate, error) { var subscribeRate utils.SubscribeRate endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "subscribeRate") - err := t.pulsar.Client.GetWithContext(ctx, endpoint, &subscribeRate) - return &subscribeRate, err + body, err := t.pulsar.Client.GetBodyWithContext(ctx, endpoint, &subscribeRate) + if body != nil { + return &subscribeRate, err + } + return nil, err } func (t *topics) SetSubscribeRate(topic utils.TopicName, subscribeRate utils.SubscribeRate) error { @@ -2015,8 +2053,11 @@ func (t *topics) GetSubscriptionDispatchRateWithContext( ) (*utils.DispatchRateData, error) { var dispatchRate utils.DispatchRateData endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "subscriptionDispatchRate") - err := t.pulsar.Client.GetWithContext(ctx, endpoint, &dispatchRate) - return &dispatchRate, err + body, err := t.pulsar.Client.GetBodyWithContext(ctx, endpoint, &dispatchRate) + if body != nil { + return &dispatchRate, err + } + return nil, err } func (t *topics) SetSubscriptionDispatchRate(topic utils.TopicName, dispatchRate utils.DispatchRateData) error { @@ -2212,8 +2253,11 @@ func (t *topics) GetReplicatorDispatchRateWithContext( ) (*utils.DispatchRateData, error) { var dispatchRate utils.DispatchRateData endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "replicatorDispatchRate") - err := t.pulsar.Client.GetWithContext(ctx, endpoint, &dispatchRate) - return &dispatchRate, err + body, err := t.pulsar.Client.GetBodyWithContext(ctx, endpoint, &dispatchRate) + if body != nil { + return &dispatchRate, err + } + return nil, err } func (t *topics) SetReplicatorDispatchRate(topic utils.TopicName, dispatchRate utils.DispatchRateData) error { @@ -2248,8 +2292,11 @@ func (t *topics) GetAutoSubscriptionCreationWithContext( ) (*utils.AutoSubscriptionCreationOverride, error) { var autoSubCreation utils.AutoSubscriptionCreationOverride endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "autoSubscriptionCreation") - err := t.pulsar.Client.GetWithContext(ctx, endpoint, &autoSubCreation) - return &autoSubCreation, err + body, err := t.pulsar.Client.GetBodyWithContext(ctx, endpoint, &autoSubCreation) + if body != nil { + return &autoSubCreation, err + } + return nil, err } func (t *topics) SetAutoSubscriptionCreation(topic utils.TopicName, @@ -2316,8 +2363,11 @@ func (t *topics) GetOffloadPoliciesWithContext( ) (*utils.OffloadPolicies, error) { var offloadPolicies utils.OffloadPolicies endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "offloadPolicies") - err := t.pulsar.Client.GetWithContext(ctx, endpoint, &offloadPolicies) - return &offloadPolicies, err + body, err := t.pulsar.Client.GetBodyWithContext(ctx, endpoint, &offloadPolicies) + if body != nil { + return &offloadPolicies, err + } + return nil, err } func (t *topics) SetOffloadPolicies(topic utils.TopicName, offloadPolicies utils.OffloadPolicies) error { diff --git a/pulsaradmin/pkg/admin/topic_test.go b/pulsaradmin/pkg/admin/topic_test.go index 8493cf76d9..f40c0bbc25 100644 --- a/pulsaradmin/pkg/admin/topic_test.go +++ b/pulsaradmin/pkg/admin/topic_test.go @@ -516,10 +516,10 @@ func TestRetention(t *testing.T) { err = admin.Topics().Create(*topicName, 4) assert.NoError(t, err) + // Initial state: policy not configured, should return nil topicRetentionPolicy, err := admin.Topics().GetRetention(*topicName, false) assert.NoError(t, err) - assert.Equal(t, int64(0), topicRetentionPolicy.RetentionSizeInMB) - assert.Equal(t, 0, topicRetentionPolicy.RetentionTimeInMinutes) + assert.Nil(t, topicRetentionPolicy, "Expected nil when retention policy is not configured") err = admin.Topics().SetRetention(*topicName, utils.RetentionPolicies{ RetentionSizeInMB: 20480, RetentionTimeInMinutes: 1440, @@ -540,13 +540,12 @@ func TestRetention(t *testing.T) { ) err = admin.Topics().RemoveRetention(*topicName) assert.NoError(t, err) + // After removal, should return nil (not configured) assert.Eventually( t, func() bool { topicRetentionPolicy, err = admin.Topics().GetRetention(*topicName, false) - return err == nil && - topicRetentionPolicy.RetentionSizeInMB == int64(0) && - topicRetentionPolicy.RetentionTimeInMinutes == 0 + return err == nil && topicRetentionPolicy == nil }, 10*time.Second, 100*time.Millisecond, @@ -565,12 +564,10 @@ func TestSubscribeRate(t *testing.T) { err = admin.Topics().Create(*topicName, 4) assert.NoError(t, err) - // Get default subscribe rate (adapt to actual server behavior) + // Initial state: policy not configured, should return nil initialSubscribeRate, err := admin.Topics().GetSubscribeRate(*topicName) assert.NoError(t, err) - // Store initial values for later comparison instead of assuming specific defaults - initialConsumerRate := initialSubscribeRate.SubscribeThrottlingRatePerConsumer - initialRatePeriod := initialSubscribeRate.RatePeriodInSecond + assert.Nil(t, initialSubscribeRate, "Expected nil when subscribe rate is not configured") // Set new subscribe rate newSubscribeRate := utils.SubscribeRate{ @@ -594,16 +591,14 @@ func TestSubscribeRate(t *testing.T) { 100*time.Millisecond, ) - // Remove subscribe rate policy + // Remove subscribe rate policy - should return nil err = admin.Topics().RemoveSubscribeRate(*topicName) assert.NoError(t, err) assert.Eventually( t, func() bool { subscribeRate, err := admin.Topics().GetSubscribeRate(*topicName) - return err == nil && - subscribeRate.SubscribeThrottlingRatePerConsumer == initialConsumerRate && - subscribeRate.RatePeriodInSecond == initialRatePeriod + return err == nil && subscribeRate == nil }, 10*time.Second, 100*time.Millisecond, @@ -622,14 +617,10 @@ func TestSubscriptionDispatchRate(t *testing.T) { err = admin.Topics().Create(*topicName, 4) assert.NoError(t, err) - // Get default subscription dispatch rate (adapt to actual server behavior) + // Initial state: policy not configured, should return nil initialDispatchRate, err := admin.Topics().GetSubscriptionDispatchRate(*topicName) assert.NoError(t, err) - // Store initial values for later comparison instead of assuming specific defaults - initialMsgRate := initialDispatchRate.DispatchThrottlingRateInMsg - initialByteRate := initialDispatchRate.DispatchThrottlingRateInByte - initialRatePeriod := initialDispatchRate.RatePeriodInSecond - initialRelativeToPublish := initialDispatchRate.RelativeToPublishRate + assert.Nil(t, initialDispatchRate, "Expected nil when subscription dispatch rate is not configured") // Set new subscription dispatch rate newDispatchRate := utils.DispatchRateData{ @@ -657,18 +648,14 @@ func TestSubscriptionDispatchRate(t *testing.T) { 100*time.Millisecond, ) - // Remove subscription dispatch rate policy + // Remove subscription dispatch rate policy - should return nil err = admin.Topics().RemoveSubscriptionDispatchRate(*topicName) assert.NoError(t, err) assert.Eventually( t, func() bool { dispatchRate, err := admin.Topics().GetSubscriptionDispatchRate(*topicName) - return err == nil && - dispatchRate.DispatchThrottlingRateInMsg == initialMsgRate && - dispatchRate.DispatchThrottlingRateInByte == initialByteRate && - dispatchRate.RatePeriodInSecond == initialRatePeriod && - dispatchRate.RelativeToPublishRate == initialRelativeToPublish + return err == nil && dispatchRate == nil }, 10*time.Second, 100*time.Millisecond, @@ -943,14 +930,10 @@ func TestReplicatorDispatchRate(t *testing.T) { err = admin.Topics().Create(*topicName, 4) assert.NoError(t, err) - // Get default replicator dispatch rate (adapt to actual server behavior) + // Initial state: policy not configured, should return nil initialDispatchRate, err := admin.Topics().GetReplicatorDispatchRate(*topicName) assert.NoError(t, err) - // Store initial values for later comparison instead of assuming specific defaults - initialMsgRate := initialDispatchRate.DispatchThrottlingRateInMsg - initialByteRate := initialDispatchRate.DispatchThrottlingRateInByte - initialRatePeriod := initialDispatchRate.RatePeriodInSecond - initialRelativeToPublish := initialDispatchRate.RelativeToPublishRate + assert.Nil(t, initialDispatchRate, "Expected nil when replicator dispatch rate is not configured") // Set new replicator dispatch rate newDispatchRate := utils.DispatchRateData{ @@ -978,18 +961,14 @@ func TestReplicatorDispatchRate(t *testing.T) { 100*time.Millisecond, ) - // Remove replicator dispatch rate policy + // Remove replicator dispatch rate policy - should return nil err = admin.Topics().RemoveReplicatorDispatchRate(*topicName) assert.NoError(t, err) assert.Eventually( t, func() bool { dispatchRate, err := admin.Topics().GetReplicatorDispatchRate(*topicName) - return err == nil && - dispatchRate.DispatchThrottlingRateInMsg == initialMsgRate && - dispatchRate.DispatchThrottlingRateInByte == initialByteRate && - dispatchRate.RatePeriodInSecond == initialRatePeriod && - dispatchRate.RelativeToPublishRate == initialRelativeToPublish + return err == nil && dispatchRate == nil }, 10*time.Second, 100*time.Millisecond, @@ -1008,12 +987,10 @@ func TestOffloadPolicies(t *testing.T) { err = admin.Topics().Create(*topicName, 4) assert.NoError(t, err) - // Get default offload policies + // Initial state: policy not configured, should return nil offloadPolicies, err := admin.Topics().GetOffloadPolicies(*topicName) assert.NoError(t, err) - // Default values should be empty/default - assert.Equal(t, "", offloadPolicies.ManagedLedgerOffloadDriver) - assert.Equal(t, 0, offloadPolicies.ManagedLedgerOffloadMaxThreads) + assert.Nil(t, offloadPolicies, "Expected nil when offload policies are not configured") // Set new offload policies newOffloadPolicies := utils.OffloadPolicies{ @@ -1045,16 +1022,14 @@ func TestOffloadPolicies(t *testing.T) { 100*time.Millisecond, ) - // Remove offload policies + // Remove offload policies - should return nil err = admin.Topics().RemoveOffloadPolicies(*topicName) assert.NoError(t, err) assert.Eventually( t, func() bool { offloadPolicies, err = admin.Topics().GetOffloadPolicies(*topicName) - return err == nil && - offloadPolicies.ManagedLedgerOffloadDriver == "" && - offloadPolicies.ManagedLedgerOffloadMaxThreads == 0 + return err == nil && offloadPolicies == nil }, 10*time.Second, 100*time.Millisecond, @@ -1073,10 +1048,10 @@ func TestAutoSubscriptionCreation(t *testing.T) { err = admin.Topics().Create(*topicName, 4) assert.NoError(t, err) - // Get default auto subscription creation + // Initial state: policy not configured, should return nil autoSubCreation, err := admin.Topics().GetAutoSubscriptionCreation(*topicName) assert.NoError(t, err) - assert.Equal(t, false, autoSubCreation.AllowAutoSubscriptionCreation) + assert.Nil(t, autoSubCreation, "Expected nil when auto subscription creation is not configured") // Set auto subscription creation to true newAutoSubCreation := utils.AutoSubscriptionCreationOverride{ @@ -1098,15 +1073,14 @@ func TestAutoSubscriptionCreation(t *testing.T) { 100*time.Millisecond, ) - // Remove auto subscription creation policy + // Remove auto subscription creation policy - should return nil err = admin.Topics().RemoveAutoSubscriptionCreation(*topicName) assert.NoError(t, err) assert.Eventually( t, func() bool { autoSubCreation, err = admin.Topics().GetAutoSubscriptionCreation(*topicName) - return err == nil && - autoSubCreation.AllowAutoSubscriptionCreation == false + return err == nil && autoSubCreation == nil }, 10*time.Second, 100*time.Millisecond, @@ -1369,3 +1343,214 @@ func TestTopics_MaxUnackMessagesPerSubscription(t *testing.T) { err = admin.Topics().RemoveMaxUnackMessagesPerSubscription(*topicName) assert.NoError(t, err) } + +func TestTopics_Persistence(t *testing.T) { + randomName := newTopicName() + topic := "persistent://public/default/" + randomName + cfg := &config.Config{} + admin, err := New(cfg) + assert.NoError(t, err) + assert.NotNil(t, admin) + topicName, err := utils.GetTopicName(topic) + assert.NoError(t, err) + err = admin.Topics().Create(*topicName, 4) + assert.NoError(t, err) + + // Initial state: policy not configured, should return nil + persistence, err := admin.Topics().GetPersistence(*topicName) + assert.NoError(t, err) + assert.Nil(t, persistence, "Expected nil when persistence is not configured") + + // Set new persistence policy + newPersistence := utils.PersistenceData{ + BookkeeperEnsemble: 1, + BookkeeperWriteQuorum: 1, + BookkeeperAckQuorum: 1, + ManagedLedgerMaxMarkDeleteRate: 10.0, + } + err = admin.Topics().SetPersistence(*topicName, newPersistence) + assert.NoError(t, err) + + // Verify persistence is set + assert.Eventually( + t, + func() bool { + persistence, err = admin.Topics().GetPersistence(*topicName) + return err == nil && persistence != nil && + persistence.BookkeeperEnsemble == 1 && + persistence.BookkeeperWriteQuorum == 1 + }, + 10*time.Second, + 100*time.Millisecond, + ) + + // Remove persistence policy - should return nil + err = admin.Topics().RemovePersistence(*topicName) + assert.NoError(t, err) + assert.Eventually( + t, + func() bool { + persistence, err = admin.Topics().GetPersistence(*topicName) + return err == nil && persistence == nil + }, + 10*time.Second, + 100*time.Millisecond, + ) +} + +func TestTopics_DelayedDelivery(t *testing.T) { + randomName := newTopicName() + topic := "persistent://public/default/" + randomName + cfg := &config.Config{} + admin, err := New(cfg) + assert.NoError(t, err) + assert.NotNil(t, admin) + topicName, err := utils.GetTopicName(topic) + assert.NoError(t, err) + err = admin.Topics().Create(*topicName, 4) + assert.NoError(t, err) + + // Initial state: policy not configured, should return nil + delayedDelivery, err := admin.Topics().GetDelayedDelivery(*topicName) + assert.NoError(t, err) + assert.Nil(t, delayedDelivery, "Expected nil when delayed delivery is not configured") + + // Set new delayed delivery policy + newDelayedDelivery := utils.DelayedDeliveryData{ + Active: true, + TickTime: 1000, + MaxDelayInMillis: 60000, + } + err = admin.Topics().SetDelayedDelivery(*topicName, newDelayedDelivery) + assert.NoError(t, err) + + // Verify delayed delivery is set + assert.Eventually( + t, + func() bool { + delayedDelivery, err = admin.Topics().GetDelayedDelivery(*topicName) + return err == nil && delayedDelivery != nil && + delayedDelivery.Active == true && + delayedDelivery.TickTime == 1000 + }, + 10*time.Second, + 100*time.Millisecond, + ) + + // Remove delayed delivery policy - should return nil + err = admin.Topics().RemoveDelayedDelivery(*topicName) + assert.NoError(t, err) + assert.Eventually( + t, + func() bool { + delayedDelivery, err = admin.Topics().GetDelayedDelivery(*topicName) + return err == nil && delayedDelivery == nil + }, + 10*time.Second, + 100*time.Millisecond, + ) +} + +func TestTopics_DispatchRate(t *testing.T) { + randomName := newTopicName() + topic := "persistent://public/default/" + randomName + cfg := &config.Config{} + admin, err := New(cfg) + assert.NoError(t, err) + assert.NotNil(t, admin) + topicName, err := utils.GetTopicName(topic) + assert.NoError(t, err) + err = admin.Topics().Create(*topicName, 4) + assert.NoError(t, err) + + // Initial state: policy not configured, should return nil + dispatchRate, err := admin.Topics().GetDispatchRate(*topicName) + assert.NoError(t, err) + assert.Nil(t, dispatchRate, "Expected nil when dispatch rate is not configured") + + // Set new dispatch rate + newDispatchRate := utils.DispatchRateData{ + DispatchThrottlingRateInMsg: 100, + DispatchThrottlingRateInByte: 1048576, + RatePeriodInSecond: 60, + RelativeToPublishRate: false, + } + err = admin.Topics().SetDispatchRate(*topicName, newDispatchRate) + assert.NoError(t, err) + + // Verify dispatch rate is set + assert.Eventually( + t, + func() bool { + dispatchRate, err = admin.Topics().GetDispatchRate(*topicName) + return err == nil && dispatchRate != nil && + dispatchRate.DispatchThrottlingRateInMsg == 100 + }, + 10*time.Second, + 100*time.Millisecond, + ) + + // Remove dispatch rate policy - should return nil + err = admin.Topics().RemoveDispatchRate(*topicName) + assert.NoError(t, err) + assert.Eventually( + t, + func() bool { + dispatchRate, err = admin.Topics().GetDispatchRate(*topicName) + return err == nil && dispatchRate == nil + }, + 10*time.Second, + 100*time.Millisecond, + ) +} + +func TestTopics_PublishRate(t *testing.T) { + randomName := newTopicName() + topic := "persistent://public/default/" + randomName + cfg := &config.Config{} + admin, err := New(cfg) + assert.NoError(t, err) + assert.NotNil(t, admin) + topicName, err := utils.GetTopicName(topic) + assert.NoError(t, err) + err = admin.Topics().Create(*topicName, 4) + assert.NoError(t, err) + + // Initial state: policy not configured, should return nil + publishRate, err := admin.Topics().GetPublishRate(*topicName) + assert.NoError(t, err) + assert.Nil(t, publishRate, "Expected nil when publish rate is not configured") + + // Set new publish rate + newPublishRate := utils.PublishRateData{ + PublishThrottlingRateInMsg: 200, + PublishThrottlingRateInByte: 2097152, + } + err = admin.Topics().SetPublishRate(*topicName, newPublishRate) + assert.NoError(t, err) + + // Verify publish rate is set + assert.Eventually( + t, + func() bool { + publishRate, err = admin.Topics().GetPublishRate(*topicName) + return err == nil && publishRate != nil && + publishRate.PublishThrottlingRateInMsg == 200 + }, + 10*time.Second, + 100*time.Millisecond, + ) + + // Remove publish rate policy - should return nil + err = admin.Topics().RemovePublishRate(*topicName) + assert.NoError(t, err) + assert.Eventually( + t, + func() bool { + publishRate, err = admin.Topics().GetPublishRate(*topicName) + return err == nil && publishRate == nil + }, + 10*time.Second, + 100*time.Millisecond, + ) +} diff --git a/pulsaradmin/pkg/rest/client.go b/pulsaradmin/pkg/rest/client.go index ffb0c29ad6..82e460b1f7 100644 --- a/pulsaradmin/pkg/rest/client.go +++ b/pulsaradmin/pkg/rest/client.go @@ -141,6 +141,10 @@ func (c *Client) GetWithContext(ctx context.Context, endpoint string, obj interf return err } +func (c *Client) GetBodyWithContext(ctx context.Context, endpoint string, obj interface{}) ([]byte, error) { + return c.GetWithQueryParamsWithContext(ctx, endpoint, obj, nil, true) +} + func (c *Client) GetWithQueryParams(endpoint string, obj interface{}, params map[string]string, decode bool) ([]byte, error) { return c.GetWithQueryParamsWithContext(context.Background(), endpoint, obj, params, decode) @@ -190,12 +194,14 @@ func (c *Client) GetWithOptionsWithContext( defer safeRespClose(resp) if obj != nil { - if err := decodeJSONBody(resp, &obj); err != nil { + body, err := decodeJSONWithBody(resp, &obj) + if err != nil { if err == io.EOF { return nil, nil } return nil, err } + return body, nil } else if !decode { if file != nil { _, err := io.Copy(file, resp.Body) @@ -533,6 +539,25 @@ func decodeJSONBody(resp *http.Response, out interface{}) error { return dec.Decode(out) } +// decodeJSONWithBody is used to JSON decode a body AND ALSO return the raw body bytes +func decodeJSONWithBody(resp *http.Response, out interface{}) ([]byte, error) { + // Read the body first so we can return it even after decoding + body, err := io.ReadAll(resp.Body) + if err != nil { + return nil, err + } + + if len(body) == 0 { + return nil, nil + } + + if err := json.Unmarshal(body, &out); err != nil { + return nil, err + } + + return body, nil +} + // safeRespClose is used to close a response body func safeRespClose(resp *http.Response) { if resp != nil {