diff --git a/pkg/ctl/topic/stats.go b/pkg/ctl/topic/stats.go index 93458a59..a3d248f7 100644 --- a/pkg/ctl/topic/stats.go +++ b/pkg/ctl/topic/stats.go @@ -142,8 +142,12 @@ func GetStatsCmd(vc *cmdutils.VerbCmd) { var partition bool var perPartition bool + var getPreciseBacklog bool + var subscriptionBacklogSize bool + var getEarliestTimeInBacklog bool + vc.SetRunFuncWithNameArg(func() error { - return doGetStats(vc, partition, perPartition) + return doGetStats(vc, partition, perPartition, getPreciseBacklog, subscriptionBacklogSize, getEarliestTimeInBacklog) }, "the topic name is not specified or the topic name is specified more than one") vc.FlagSetGroup.InFlagSet("Stats", func(set *pflag.FlagSet) { @@ -151,11 +155,17 @@ func GetStatsCmd(vc *cmdutils.VerbCmd) { "Get the partitioned topic stats") set.BoolVarP(&perPartition, "per-partition", "", false, "Get the per partition topic stats") + set.BoolVarP(&getPreciseBacklog, "get-precise-backlog", "", false, + "Get the precise backlog size") + set.BoolVarP(&subscriptionBacklogSize, "get-subscription-backlog-size", "", true, + "Get the backlog size for each subscription") + set.BoolVarP(&getEarliestTimeInBacklog, "get-earliest-time-in-backlog", "", false, + "Get the earliest time in backlog") }) vc.EnableOutputFlagSet() } -func doGetStats(vc *cmdutils.VerbCmd, partitionedTopic, perPartition bool) error { +func doGetStats(vc *cmdutils.VerbCmd, partitionedTopic, perPartition bool, getPreciseBacklog, subscriptionBacklogSize, getEarliestTimeInBacklog bool) error { // for testing if vc.NameError != nil { return vc.NameError @@ -166,10 +176,16 @@ func doGetStats(vc *cmdutils.VerbCmd, partitionedTopic, perPartition bool) error return err } + getStatsOptions := utils.GetStatsOptions{ + GetPreciseBacklog: getPreciseBacklog, + SubscriptionBacklogSize: subscriptionBacklogSize, + GetEarliestTimeInBacklog: getEarliestTimeInBacklog, + } + admin := cmdutils.NewPulsarClient() if partitionedTopic { - stats, err := admin.Topics().GetPartitionedStats(*topic, perPartition) + stats, err := admin.Topics().GetPartitionedStatsWithOption(*topic, perPartition, getStatsOptions) if err == nil { oc := cmdutils.NewOutputContent().WithObject(stats) err = vc.OutputConfig.WriteOutput(vc.Command.OutOrStdout(), oc) @@ -177,7 +193,7 @@ func doGetStats(vc *cmdutils.VerbCmd, partitionedTopic, perPartition bool) error return err } - topicStats, err := admin.Topics().GetStats(*topic) + topicStats, err := admin.Topics().GetStatsWithOption(*topic, getStatsOptions) if err == nil { oc := cmdutils.NewOutputContent().WithObject(topicStats) err = vc.OutputConfig.WriteOutput(vc.Command.OutOrStdout(), oc) diff --git a/pkg/ctl/topic/stats_test.go b/pkg/ctl/topic/stats_test.go index 0e8015b1..6c3002a5 100644 --- a/pkg/ctl/topic/stats_test.go +++ b/pkg/ctl/topic/stats_test.go @@ -180,3 +180,150 @@ func TestGetNonPartitionedTopicStatsError(t *testing.T) { assert.NotNil(t, execErr) assert.Contains(t, execErr.Error(), "code: 404 reason: Partitioned Topic") } + +func TestGetStatsWithPreciseBacklog(t *testing.T) { + args := []string{"create", "test-stats-precise-backlog", "0"} + _, execErr, _, _ := TestTopicCommands(CreateTopicCmd, args) + assert.Nil(t, execErr) + + args = []string{"stats", "--get-precise-backlog", "test-stats-precise-backlog"} + out, execErr, _, _ := TestTopicCommands(GetStatsCmd, args) + assert.Nil(t, execErr) + + var stats utils.TopicStats + err := json.Unmarshal(out.Bytes(), &stats) + if err != nil { + t.Fatal(err) + } + + assert.Equal(t, defaultStats, stats) +} + +func TestGetStatsWithoutSubscriptionBacklogSize(t *testing.T) { + args := []string{"create", "test-stats-no-subscription-backlog", "0"} + _, execErr, _, _ := TestTopicCommands(CreateTopicCmd, args) + assert.Nil(t, execErr) + + args = []string{"stats", "--get-subscription-backlog-size=false", "test-stats-no-subscription-backlog"} + out, execErr, _, _ := TestTopicCommands(GetStatsCmd, args) + assert.Nil(t, execErr) + + var stats utils.TopicStats + err := json.Unmarshal(out.Bytes(), &stats) + if err != nil { + t.Fatal(err) + } + + assert.Equal(t, defaultStats, stats) +} + +func TestGetStatsWithEarliestTimeInBacklog(t *testing.T) { + args := []string{"create", "test-stats-earliest-time-backlog", "0"} + _, execErr, _, _ := TestTopicCommands(CreateTopicCmd, args) + assert.Nil(t, execErr) + + args = []string{"stats", "--get-earliest-time-in-backlog", "test-stats-earliest-time-backlog"} + out, execErr, _, _ := TestTopicCommands(GetStatsCmd, args) + assert.Nil(t, execErr) + + var stats utils.TopicStats + err := json.Unmarshal(out.Bytes(), &stats) + if err != nil { + t.Fatal(err) + } + + assert.Equal(t, defaultStats, stats) +} + +func TestGetStatsWithMultipleNewFlags(t *testing.T) { + args := []string{"create", "test-stats-multiple-flags", "0"} + _, execErr, _, _ := TestTopicCommands(CreateTopicCmd, args) + assert.Nil(t, execErr) + + args = []string{"stats", "--get-precise-backlog", "--get-earliest-time-in-backlog", "test-stats-multiple-flags"} + out, execErr, _, _ := TestTopicCommands(GetStatsCmd, args) + assert.Nil(t, execErr) + + var stats utils.TopicStats + err := json.Unmarshal(out.Bytes(), &stats) + if err != nil { + t.Fatal(err) + } + + assert.Equal(t, defaultStats, stats) +} + +func TestGetPartitionedStatsWithNewFlags(t *testing.T) { + args := []string{"create", "test-partitioned-stats-new-flags", "2"} + _, execErr, _, _ := TestTopicCommands(CreateTopicCmd, args) + assert.Nil(t, execErr) + + args = []string{"stats", "--partitioned-topic", "--get-precise-backlog", "test-partitioned-stats-new-flags"} + out, execErr, _, _ := TestTopicCommands(GetStatsCmd, args) + assert.Nil(t, execErr) + + var stats utils.PartitionedTopicStats + err := json.Unmarshal(out.Bytes(), &stats) + if err != nil { + t.Fatal(err) + } + + assert.Equal(t, float64(0), stats.MsgRateIn) + assert.Equal(t, float64(0), stats.MsgRateOut) + assert.Equal(t, float64(0), stats.MsgThroughputIn) + assert.Equal(t, float64(0), stats.MsgThroughputOut) + assert.Equal(t, float64(0), stats.AverageMsgSize) + assert.Equal(t, int64(0), stats.StorageSize) + assert.Equal(t, 0, len(stats.Publishers)) + assert.Equal(t, 0, len(stats.Subscriptions)) + assert.Equal(t, 0, len(stats.Replication)) + assert.Equal(t, "", stats.DeDuplicationStatus) + assert.Equal(t, 2, stats.Metadata.Partitions) + assert.Equal(t, 0, len(stats.Partitions)) +} + +func TestGetPerPartitionStatsWithNewFlags(t *testing.T) { + args := []string{"create", "test-per-part-stats-new-flags", "1"} + _, execErr, _, _ := TestTopicCommands(CreateTopicCmd, args) + assert.Nil(t, execErr) + + args = []string{"stats", "--partitioned-topic", "--per-partition", "--get-earliest-time-in-backlog", "test-per-part-stats-new-flags"} + out, execErr, _, _ := TestTopicCommands(GetStatsCmd, args) + assert.Nil(t, execErr) + + var stats utils.PartitionedTopicStats + err := json.Unmarshal(out.Bytes(), &stats) + if err != nil { + t.Fatal(err) + } + + defaultStats := utils.PartitionedTopicStats{ + MsgRateIn: 0, + MsgRateOut: 0, + MsgThroughputIn: 0, + MsgThroughputOut: 0, + AverageMsgSize: 0, + StorageSize: 0, + Publishers: []utils.PublisherStats{}, + Subscriptions: map[string]utils.SubscriptionStats{}, + Replication: map[string]utils.ReplicatorStats{}, + DeDuplicationStatus: "", + Metadata: utils.PartitionedTopicMetadata{Partitions: 1}, + Partitions: map[string]utils.TopicStats{ + "persistent://public/default/test-per-part-stats-new-flags-partition-0": { + MsgRateIn: 0, + MsgRateOut: 0, + MsgThroughputIn: 0, + MsgThroughputOut: 0, + AverageMsgSize: 0, + StorageSize: 0, + Publishers: []utils.PublisherStats{}, + Subscriptions: map[string]utils.SubscriptionStats{}, + Replication: map[string]utils.ReplicatorStats{}, + DeDuplicationStatus: "Disabled", + }, + }, + } + + assert.Equal(t, defaultStats, stats) +}