Skip to content

Commit c0db482

Browse files
authored
Use -1 as sentinel value for namespace and topic admin commands (#1430)
Fixes #1429 ### Motivation Use -1 as a sentinel value for all the namespace / topic admin get commands that return an empty body to mean "unset" ### Modifications -1 is the default return for the various get commands: ##### Namespace Admin Methods - GetNamespaceMessageTTL / GetNamespaceMessageTTLWithContext - GetMaxConsumersPerTopic / GetMaxConsumersPerTopicWithContext - GetMaxProducersPerTopic / GetMaxProducersPerTopicWithContext - GetMaxConsumersPerSubscription / GetMaxConsumersPerSubscriptionWithContext - GetOffloadThreshold / GetOffloadThresholdWithContext - GetOffloadThresholdInSeconds / GetOffloadThresholdInSecondsWithContext - GetOffloadDeleteLag / GetOffloadDeleteLagWithContext - GetCompactionThreshold / GetCompactionThresholdWithContext ##### Topic Admin Methods - GetMessageTTL / GetMessageTTLWithContext - GetMaxProducers / GetMaxProducersWithContext - GetMaxConsumers / GetMaxConsumersWithContext - GetMaxUnackMessagesPerConsumer / GetMaxUnackMessagesPerConsumerWithContext - GetMaxUnackMessagesPerSubscription / GetMaxUnackMessagesPerSubscriptionWithContext - GetCompactionThreshold / GetCompactionThresholdWithContext - GetMaxConsumersPerSubscription / GetMaxConsumersPerSubscriptionWithContext - GetMaxMessageSize / GetMaxMessageSizeWithContext - GetMaxSubscriptionsPerTopic / GetMaxSubscriptionsPerTopicWithContext - GetDeduplicationSnapshotInterval / GetDeduplicationSnapshotIntervalWithContext
1 parent d99d2c0 commit c0db482

File tree

4 files changed

+443
-65
lines changed

4 files changed

+443
-65
lines changed

pulsaradmin/pkg/admin/namespace.go

Lines changed: 26 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -88,10 +88,10 @@ type Namespaces interface {
8888
// SetNamespaceMessageTTLWithContext sets the messages Time to Live for all the topics within a namespace
8989
SetNamespaceMessageTTLWithContext(ctx context.Context, namespace string, ttlInSeconds int) error
9090

91-
// GetNamespaceMessageTTL returns the message TTL for a namespace
91+
// GetNamespaceMessageTTL returns the message TTL for a namespace. Returns -1 if not set
9292
GetNamespaceMessageTTL(namespace string) (int, error)
9393

94-
// GetNamespaceMessageTTLWithContext returns the message TTL for a namespace
94+
// GetNamespaceMessageTTLWithContext returns the message TTL for a namespace. Returns -1 if not set
9595
GetNamespaceMessageTTLWithContext(ctx context.Context, namespace string) (int, error)
9696

9797
// GetRetention returns the retention configuration for a namespace
@@ -226,10 +226,11 @@ type Namespaces interface {
226226
// SetOffloadDeleteLagWithContext sets the offload deletion lag for a namespace
227227
SetOffloadDeleteLagWithContext(ctx context.Context, namespace utils.NameSpaceName, timeMs int64) error
228228

229-
// GetOffloadDeleteLag returns the offload deletion lag for a namespace, in milliseconds
229+
// GetOffloadDeleteLag returns the offload deletion lag for a namespace, in milliseconds. Returns -1 if not set
230230
GetOffloadDeleteLag(namespace utils.NameSpaceName) (int64, error)
231231

232-
// GetOffloadDeleteLagWithContext returns the offload deletion lag for a namespace, in milliseconds
232+
// GetOffloadDeleteLagWithContext returns the offload deletion lag for a namespace, in milliseconds.
233+
// Returns -1 if not set
233234
GetOffloadDeleteLagWithContext(ctx context.Context, namespace utils.NameSpaceName) (int64, error)
234235

235236
// SetOffloadThreshold sets the offloadThreshold for a namespace
@@ -238,10 +239,10 @@ type Namespaces interface {
238239
// SetOffloadThresholdWithContext sets the offloadThreshold for a namespace
239240
SetOffloadThresholdWithContext(ctx context.Context, namespace utils.NameSpaceName, threshold int64) error
240241

241-
// GetOffloadThreshold returns the offloadThreshold for a namespace
242+
// GetOffloadThreshold returns the offloadThreshold for a namespace. Returns -1 if not set
242243
GetOffloadThreshold(namespace utils.NameSpaceName) (int64, error)
243244

244-
// GetOffloadThresholdWithContext returns the offloadThreshold for a namespace
245+
// GetOffloadThresholdWithContext returns the offloadThreshold for a namespace. Returns -1 if not set
245246
GetOffloadThresholdWithContext(ctx context.Context, namespace utils.NameSpaceName) (int64, error)
246247

247248
// SetOffloadThresholdInSeconds sets the offloadThresholdInSeconds for a namespace
@@ -250,10 +251,10 @@ type Namespaces interface {
250251
// SetOffloadThresholdInSecondsWithContext sets the offloadThresholdInSeconds for a namespace
251252
SetOffloadThresholdInSecondsWithContext(ctx context.Context, namespace utils.NameSpaceName, threshold int64) error
252253

253-
// GetOffloadThresholdInSeconds returns the offloadThresholdInSeconds for a namespace
254+
// GetOffloadThresholdInSeconds returns the offloadThresholdInSeconds for a namespace. Returns -1 if not set
254255
GetOffloadThresholdInSeconds(namespace utils.NameSpaceName) (int64, error)
255256

256-
// GetOffloadThresholdInSecondsWithContext returns the offloadThresholdInSeconds for a namespace
257+
// GetOffloadThresholdInSecondsWithContext returns the offloadThresholdInSeconds for a namespace. Returns -1 if not set
257258
GetOffloadThresholdInSecondsWithContext(ctx context.Context, namespace utils.NameSpaceName) (int64, error)
258259

259260
// SetCompactionThreshold sets the compactionThreshold for a namespace
@@ -262,10 +263,10 @@ type Namespaces interface {
262263
// SetCompactionThresholdWithContext sets the compactionThreshold for a namespace
263264
SetCompactionThresholdWithContext(ctx context.Context, namespace utils.NameSpaceName, threshold int64) error
264265

265-
// GetCompactionThreshold returns the compactionThreshold for a namespace
266+
// GetCompactionThreshold returns the compactionThreshold for a namespace. Returns -1 if not set
266267
GetCompactionThreshold(namespace utils.NameSpaceName) (int64, error)
267268

268-
// GetCompactionThresholdWithContext returns the compactionThreshold for a namespace
269+
// GetCompactionThresholdWithContext returns the compactionThreshold for a namespace. Returns -1 if not set
269270
GetCompactionThresholdWithContext(ctx context.Context, namespace utils.NameSpaceName) (int64, error)
270271

271272
// SetMaxConsumersPerSubscription sets maxConsumersPerSubscription for a namespace.
@@ -276,10 +277,11 @@ type Namespaces interface {
276277
//nolint: revive // It's ok here to use a built-in function name (max)
277278
SetMaxConsumersPerSubscriptionWithContext(ctx context.Context, namespace utils.NameSpaceName, max int) error
278279

279-
// GetMaxConsumersPerSubscription returns the maxConsumersPerSubscription for a namespace.
280+
// GetMaxConsumersPerSubscription returns the maxConsumersPerSubscription for a namespace. Returns -1 if not set
280281
GetMaxConsumersPerSubscription(namespace utils.NameSpaceName) (int, error)
281282

282283
// GetMaxConsumersPerSubscriptionWithContext returns the maxConsumersPerSubscription for a namespace.
284+
// Returns -1 if not set
283285
GetMaxConsumersPerSubscriptionWithContext(ctx context.Context, namespace utils.NameSpaceName) (int, error)
284286

285287
// SetMaxConsumersPerTopic sets maxConsumersPerTopic for a namespace.
@@ -290,10 +292,10 @@ type Namespaces interface {
290292
//nolint: revive // It's ok here to use a built-in function name (max)
291293
SetMaxConsumersPerTopicWithContext(ctx context.Context, namespace utils.NameSpaceName, max int) error
292294

293-
// GetMaxConsumersPerTopic returns the maxProducersPerTopic for a namespace.
295+
// GetMaxConsumersPerTopic returns the maxProducersPerTopic for a namespace. Returns -1 if not set
294296
GetMaxConsumersPerTopic(namespace utils.NameSpaceName) (int, error)
295297

296-
// GetMaxConsumersPerTopicWithContext returns the maxProducersPerTopic for a namespace.
298+
// GetMaxConsumersPerTopicWithContext returns the maxProducersPerTopic for a namespace. Returns -1 if not set
297299
GetMaxConsumersPerTopicWithContext(ctx context.Context, namespace utils.NameSpaceName) (int, error)
298300

299301
// SetMaxProducersPerTopic sets maxProducersPerTopic for a namespace.
@@ -304,10 +306,10 @@ type Namespaces interface {
304306
//nolint: revive // It's ok here to use a built-in function name (max)
305307
SetMaxProducersPerTopicWithContext(ctx context.Context, namespace utils.NameSpaceName, max int) error
306308

307-
// GetMaxProducersPerTopic returns the maxProducersPerTopic for a namespace.
309+
// GetMaxProducersPerTopic returns the maxProducersPerTopic for a namespace. Returns -1 if not set
308310
GetMaxProducersPerTopic(namespace utils.NameSpaceName) (int, error)
309311

310-
// GetMaxProducersPerTopicWithContext returns the maxProducersPerTopic for a namespace.
312+
// GetMaxProducersPerTopicWithContext returns the maxProducersPerTopic for a namespace. Returns -1 if not set
311313
GetMaxProducersPerTopicWithContext(ctx context.Context, namespace utils.NameSpaceName) (int, error)
312314

313315
// SetMaxTopicsPerNamespace sets maxTopicsPerNamespace for a namespace.
@@ -851,7 +853,7 @@ func (n *namespaces) GetNamespaceMessageTTL(namespace string) (int, error) {
851853
}
852854

853855
func (n *namespaces) GetNamespaceMessageTTLWithContext(ctx context.Context, namespace string) (int, error) {
854-
var ttl int
856+
var ttl = -1
855857
nsName, err := utils.GetNamespaceName(namespace)
856858
if err != nil {
857859
return 0, err
@@ -1114,7 +1116,7 @@ func (n *namespaces) GetOffloadDeleteLag(namespace utils.NameSpaceName) (int64,
11141116
}
11151117

11161118
func (n *namespaces) GetOffloadDeleteLagWithContext(ctx context.Context, namespace utils.NameSpaceName) (int64, error) {
1117-
var result int64
1119+
var result int64 = -1
11181120
endpoint := n.pulsar.endpoint(n.basePath, namespace.String(), "offloadDeletionLagMs")
11191121
err := n.pulsar.Client.GetWithContext(ctx, endpoint, &result)
11201122
return result, err
@@ -1143,7 +1145,7 @@ func (n *namespaces) GetMaxConsumersPerSubscriptionWithContext(
11431145
ctx context.Context,
11441146
namespace utils.NameSpaceName,
11451147
) (int, error) {
1146-
var result int
1148+
var result = -1
11471149
endpoint := n.pulsar.endpoint(n.basePath, namespace.String(), "maxConsumersPerSubscription")
11481150
err := n.pulsar.Client.GetWithContext(ctx, endpoint, &result)
11491151
return result, err
@@ -1167,7 +1169,7 @@ func (n *namespaces) GetOffloadThreshold(namespace utils.NameSpaceName) (int64,
11671169
}
11681170

11691171
func (n *namespaces) GetOffloadThresholdWithContext(ctx context.Context, namespace utils.NameSpaceName) (int64, error) {
1170-
var result int64
1172+
var result int64 = -1
11711173
endpoint := n.pulsar.endpoint(n.basePath, namespace.String(), "offloadThreshold")
11721174
err := n.pulsar.Client.GetWithContext(ctx, endpoint, &result)
11731175
return result, err
@@ -1194,7 +1196,7 @@ func (n *namespaces) GetOffloadThresholdInSecondsWithContext(
11941196
ctx context.Context,
11951197
namespace utils.NameSpaceName,
11961198
) (int64, error) {
1197-
var result int64
1199+
var result int64 = -1
11981200
endpoint := n.pulsar.endpoint(n.basePath, namespace.String(), "offloadThresholdInSeconds")
11991201
err := n.pulsar.Client.GetWithContext(ctx, endpoint, &result)
12001202
return result, err
@@ -1223,7 +1225,7 @@ func (n *namespaces) GetMaxConsumersPerTopicWithContext(
12231225
ctx context.Context,
12241226
namespace utils.NameSpaceName,
12251227
) (int, error) {
1226-
var result int
1228+
var result = -1
12271229
endpoint := n.pulsar.endpoint(n.basePath, namespace.String(), "maxConsumersPerTopic")
12281230
err := n.pulsar.Client.GetWithContext(ctx, endpoint, &result)
12291231
return result, err
@@ -1250,7 +1252,7 @@ func (n *namespaces) GetCompactionThresholdWithContext(
12501252
ctx context.Context,
12511253
namespace utils.NameSpaceName,
12521254
) (int64, error) {
1253-
var result int64
1255+
var result int64 = -1
12541256
endpoint := n.pulsar.endpoint(n.basePath, namespace.String(), "compactionThreshold")
12551257
err := n.pulsar.Client.GetWithContext(ctx, endpoint, &result)
12561258
return result, err
@@ -1279,7 +1281,7 @@ func (n *namespaces) GetMaxProducersPerTopicWithContext(
12791281
ctx context.Context,
12801282
namespace utils.NameSpaceName,
12811283
) (int, error) {
1282-
var result int
1284+
var result = -1
12831285
endpoint := n.pulsar.endpoint(n.basePath, namespace.String(), "maxProducersPerTopic")
12841286
err := n.pulsar.Client.GetWithContext(ctx, endpoint, &result)
12851287
return result, err
@@ -2014,7 +2016,7 @@ func (n *namespaces) GetMaxTopicsPerNamespaceWithContext(
20142016
ctx context.Context,
20152017
namespace utils.NameSpaceName,
20162018
) (int, error) {
2017-
var result int
2019+
var result int // This method does not require a sentinel value of -1 since the API never returns empty
20182020
endpoint := n.pulsar.endpoint(n.basePath, namespace.String(), "maxTopicsPerNamespace")
20192021
err := n.pulsar.Client.GetWithContext(ctx, endpoint, &result)
20202022
return result, err

pulsaradmin/pkg/admin/namespace_test.go

Lines changed: 165 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -333,7 +333,11 @@ func TestNamespaces_GetOffloadThresholdInSeconds(t *testing.T) {
333333

334334
namespace, _ := utils.GetNamespaceName("public/default")
335335

336-
// set the subscription expiration time and get it
336+
// Get default (should be -1)
337+
threshold, err := admin.Namespaces().GetOffloadThreshold(*namespace)
338+
assert.NoError(t, err)
339+
assert.Equal(t, int64(-1), threshold)
340+
337341
err = admin.Namespaces().SetOffloadThresholdInSeconds(*namespace,
338342
60)
339343
assert.Equal(t, nil, err)
@@ -599,3 +603,163 @@ func TestNamespaces_GetMaxTopicsPerNamespace(t *testing.T) {
599603
expected = 0
600604
assert.Equal(t, expected, maxTopics)
601605
}
606+
607+
func TestNamespaces_MessageTTL(t *testing.T) {
608+
config := &config.Config{}
609+
admin, err := New(config)
610+
require.NoError(t, err)
611+
require.NotNil(t, admin)
612+
613+
namespace, _ := utils.GetNamespaceName("public/default")
614+
615+
// Get default (should be -1)
616+
ttl, err := admin.Namespaces().GetNamespaceMessageTTL(namespace.String())
617+
assert.NoError(t, err)
618+
assert.Equal(t, -1, ttl)
619+
620+
// Set to 0 explicitly
621+
err = admin.Namespaces().SetNamespaceMessageTTL(namespace.String(), 0)
622+
assert.NoError(t, err)
623+
624+
// Verify returns 0
625+
ttl, err = admin.Namespaces().GetNamespaceMessageTTL(namespace.String())
626+
assert.NoError(t, err)
627+
assert.Equal(t, 0, ttl)
628+
629+
// Set to positive value
630+
err = admin.Namespaces().SetNamespaceMessageTTL(namespace.String(), 3600)
631+
assert.NoError(t, err)
632+
633+
// Verify returns value
634+
ttl, err = admin.Namespaces().GetNamespaceMessageTTL(namespace.String())
635+
assert.NoError(t, err)
636+
assert.Equal(t, 3600, ttl)
637+
}
638+
639+
func TestNamespaces_OffloadDeleteLag(t *testing.T) {
640+
config := &config.Config{}
641+
admin, err := New(config)
642+
require.NoError(t, err)
643+
require.NotNil(t, admin)
644+
645+
namespace, _ := utils.GetNamespaceName("public/default")
646+
647+
// Get default (should be -1)
648+
lag, err := admin.Namespaces().GetOffloadDeleteLag(*namespace)
649+
assert.NoError(t, err)
650+
assert.Equal(t, int64(-1), lag)
651+
652+
// Set to 0 explicitly
653+
err = admin.Namespaces().SetOffloadDeleteLag(*namespace, 0)
654+
assert.NoError(t, err)
655+
656+
// Verify returns 0
657+
lag, err = admin.Namespaces().GetOffloadDeleteLag(*namespace)
658+
assert.NoError(t, err)
659+
assert.Equal(t, int64(0), lag)
660+
661+
// Set to positive value
662+
err = admin.Namespaces().SetOffloadDeleteLag(*namespace, 1000)
663+
assert.NoError(t, err)
664+
665+
// Verify returns value
666+
lag, err = admin.Namespaces().GetOffloadDeleteLag(*namespace)
667+
assert.NoError(t, err)
668+
assert.Equal(t, int64(1000), lag)
669+
}
670+
671+
func TestNamespaces_MaxConsumersPerTopic(t *testing.T) {
672+
config := &config.Config{}
673+
admin, err := New(config)
674+
require.NoError(t, err)
675+
require.NotNil(t, admin)
676+
677+
namespace, _ := utils.GetNamespaceName("public/default")
678+
679+
// Get default (should be -1)
680+
maxConsumers, err := admin.Namespaces().GetMaxConsumersPerTopic(*namespace)
681+
assert.NoError(t, err)
682+
assert.Equal(t, -1, maxConsumers)
683+
684+
// Set to 0 explicitly
685+
err = admin.Namespaces().SetMaxConsumersPerTopic(*namespace, 0)
686+
assert.NoError(t, err)
687+
688+
// Verify returns 0
689+
maxConsumers, err = admin.Namespaces().GetMaxConsumersPerTopic(*namespace)
690+
assert.NoError(t, err)
691+
assert.Equal(t, 0, maxConsumers)
692+
693+
// Set to positive value
694+
err = admin.Namespaces().SetMaxConsumersPerTopic(*namespace, 100)
695+
assert.NoError(t, err)
696+
697+
// Verify returns value
698+
maxConsumers, err = admin.Namespaces().GetMaxConsumersPerTopic(*namespace)
699+
assert.NoError(t, err)
700+
assert.Equal(t, 100, maxConsumers)
701+
}
702+
703+
func TestNamespaces_CompactionThreshold(t *testing.T) {
704+
config := &config.Config{}
705+
admin, err := New(config)
706+
require.NoError(t, err)
707+
require.NotNil(t, admin)
708+
709+
namespace, _ := utils.GetNamespaceName("public/default")
710+
711+
// Get default (should be -1)
712+
threshold, err := admin.Namespaces().GetCompactionThreshold(*namespace)
713+
assert.NoError(t, err)
714+
assert.Equal(t, int64(-1), threshold)
715+
716+
// Set to 0 explicitly
717+
err = admin.Namespaces().SetCompactionThreshold(*namespace, 0)
718+
assert.NoError(t, err)
719+
720+
// Verify returns 0
721+
threshold, err = admin.Namespaces().GetCompactionThreshold(*namespace)
722+
assert.NoError(t, err)
723+
assert.Equal(t, int64(0), threshold)
724+
725+
// Set to positive value
726+
err = admin.Namespaces().SetCompactionThreshold(*namespace, 1024*1024) // 1MB
727+
assert.NoError(t, err)
728+
729+
// Verify returns value
730+
threshold, err = admin.Namespaces().GetCompactionThreshold(*namespace)
731+
assert.NoError(t, err)
732+
assert.Equal(t, int64(1024*1024), threshold)
733+
}
734+
735+
func TestNamespaces_MaxProducersPerTopic(t *testing.T) {
736+
config := &config.Config{}
737+
admin, err := New(config)
738+
require.NoError(t, err)
739+
require.NotNil(t, admin)
740+
741+
namespace, _ := utils.GetNamespaceName("public/default")
742+
743+
// Get default (should be -1)
744+
maxProducers, err := admin.Namespaces().GetMaxProducersPerTopic(*namespace)
745+
assert.NoError(t, err)
746+
assert.Equal(t, -1, maxProducers)
747+
748+
// Set to 0 explicitly
749+
err = admin.Namespaces().SetMaxProducersPerTopic(*namespace, 0)
750+
assert.NoError(t, err)
751+
752+
// Verify returns 0
753+
maxProducers, err = admin.Namespaces().GetMaxProducersPerTopic(*namespace)
754+
assert.NoError(t, err)
755+
assert.Equal(t, 0, maxProducers)
756+
757+
// Set to positive value
758+
err = admin.Namespaces().SetMaxProducersPerTopic(*namespace, 50)
759+
assert.NoError(t, err)
760+
761+
// Verify returns value
762+
maxProducers, err = admin.Namespaces().GetMaxProducersPerTopic(*namespace)
763+
assert.NoError(t, err)
764+
assert.Equal(t, 50, maxProducers)
765+
}

0 commit comments

Comments
 (0)