Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 20 additions & 4 deletions pkg/ctl/topic/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,20 +142,30 @@ func GetStatsCmd(vc *cmdutils.VerbCmd) {

var partition bool
var perPartition bool
var getPreciseBacklog bool
var subscriptionBacklogSize bool
var getEarliestTimeInBacklog bool

vc.SetRunFuncWithNameArg(func() error {
return doGetStats(vc, partition, perPartition)
return doGetStats(vc, partition, perPartition, getPreciseBacklog, subscriptionBacklogSize, getEarliestTimeInBacklog)
}, "the topic name is not specified or the topic name is specified more than one")

vc.FlagSetGroup.InFlagSet("Stats", func(set *pflag.FlagSet) {
set.BoolVarP(&partition, "partitioned-topic", "p", false,
"Get the partitioned topic stats")
set.BoolVarP(&perPartition, "per-partition", "", false,
"Get the per partition topic stats")
set.BoolVarP(&getPreciseBacklog, "get-precise-backlog", "", false,
"Get the precise backlog size")
set.BoolVarP(&subscriptionBacklogSize, "get-subscription-backlog-size", "", true,
"Get the backlog size for each subscription")
set.BoolVarP(&getEarliestTimeInBacklog, "get-earliest-time-in-backlog", "", false,
"Get the earliest time in backlog")
})
vc.EnableOutputFlagSet()
}

func doGetStats(vc *cmdutils.VerbCmd, partitionedTopic, perPartition bool) error {
func doGetStats(vc *cmdutils.VerbCmd, partitionedTopic, perPartition bool, getPreciseBacklog, subscriptionBacklogSize, getEarliestTimeInBacklog bool) error {
// for testing
if vc.NameError != nil {
return vc.NameError
Expand All @@ -166,18 +176,24 @@ func doGetStats(vc *cmdutils.VerbCmd, partitionedTopic, perPartition bool) error
return err
}

getStatsOptions := utils.GetStatsOptions{
GetPreciseBacklog: getPreciseBacklog,
SubscriptionBacklogSize: subscriptionBacklogSize,
GetEarliestTimeInBacklog: getEarliestTimeInBacklog,
}

admin := cmdutils.NewPulsarClient()

if partitionedTopic {
stats, err := admin.Topics().GetPartitionedStats(*topic, perPartition)
stats, err := admin.Topics().GetPartitionedStatsWithOption(*topic, perPartition, getStatsOptions)
if err == nil {
oc := cmdutils.NewOutputContent().WithObject(stats)
err = vc.OutputConfig.WriteOutput(vc.Command.OutOrStdout(), oc)
}
return err
}

topicStats, err := admin.Topics().GetStats(*topic)
topicStats, err := admin.Topics().GetStatsWithOption(*topic, getStatsOptions)
if err == nil {
oc := cmdutils.NewOutputContent().WithObject(topicStats)
err = vc.OutputConfig.WriteOutput(vc.Command.OutOrStdout(), oc)
Expand Down
147 changes: 147 additions & 0 deletions pkg/ctl/topic/stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,3 +180,150 @@ func TestGetNonPartitionedTopicStatsError(t *testing.T) {
assert.NotNil(t, execErr)
assert.Contains(t, execErr.Error(), "code: 404 reason: Partitioned Topic")
}

func TestGetStatsWithPreciseBacklog(t *testing.T) {
args := []string{"create", "test-stats-precise-backlog", "0"}
_, execErr, _, _ := TestTopicCommands(CreateTopicCmd, args)
assert.Nil(t, execErr)

args = []string{"stats", "--get-precise-backlog", "test-stats-precise-backlog"}
out, execErr, _, _ := TestTopicCommands(GetStatsCmd, args)
assert.Nil(t, execErr)

var stats utils.TopicStats
err := json.Unmarshal(out.Bytes(), &stats)
if err != nil {
t.Fatal(err)
}

assert.Equal(t, defaultStats, stats)
}

func TestGetStatsWithoutSubscriptionBacklogSize(t *testing.T) {
args := []string{"create", "test-stats-no-subscription-backlog", "0"}
_, execErr, _, _ := TestTopicCommands(CreateTopicCmd, args)
assert.Nil(t, execErr)

args = []string{"stats", "--get-subscription-backlog-size=false", "test-stats-no-subscription-backlog"}
out, execErr, _, _ := TestTopicCommands(GetStatsCmd, args)
assert.Nil(t, execErr)

var stats utils.TopicStats
err := json.Unmarshal(out.Bytes(), &stats)
if err != nil {
t.Fatal(err)
}

assert.Equal(t, defaultStats, stats)
}

func TestGetStatsWithEarliestTimeInBacklog(t *testing.T) {
args := []string{"create", "test-stats-earliest-time-backlog", "0"}
_, execErr, _, _ := TestTopicCommands(CreateTopicCmd, args)
assert.Nil(t, execErr)

args = []string{"stats", "--get-earliest-time-in-backlog", "test-stats-earliest-time-backlog"}
out, execErr, _, _ := TestTopicCommands(GetStatsCmd, args)
assert.Nil(t, execErr)

var stats utils.TopicStats
err := json.Unmarshal(out.Bytes(), &stats)
if err != nil {
t.Fatal(err)
}

assert.Equal(t, defaultStats, stats)
}

func TestGetStatsWithMultipleNewFlags(t *testing.T) {
args := []string{"create", "test-stats-multiple-flags", "0"}
_, execErr, _, _ := TestTopicCommands(CreateTopicCmd, args)
assert.Nil(t, execErr)

args = []string{"stats", "--get-precise-backlog", "--get-earliest-time-in-backlog", "test-stats-multiple-flags"}
out, execErr, _, _ := TestTopicCommands(GetStatsCmd, args)
assert.Nil(t, execErr)

var stats utils.TopicStats
err := json.Unmarshal(out.Bytes(), &stats)
if err != nil {
t.Fatal(err)
}

assert.Equal(t, defaultStats, stats)
}

func TestGetPartitionedStatsWithNewFlags(t *testing.T) {
args := []string{"create", "test-partitioned-stats-new-flags", "2"}
_, execErr, _, _ := TestTopicCommands(CreateTopicCmd, args)
assert.Nil(t, execErr)

args = []string{"stats", "--partitioned-topic", "--get-precise-backlog", "test-partitioned-stats-new-flags"}
out, execErr, _, _ := TestTopicCommands(GetStatsCmd, args)
assert.Nil(t, execErr)

var stats utils.PartitionedTopicStats
err := json.Unmarshal(out.Bytes(), &stats)
if err != nil {
t.Fatal(err)
}

assert.Equal(t, float64(0), stats.MsgRateIn)
assert.Equal(t, float64(0), stats.MsgRateOut)
assert.Equal(t, float64(0), stats.MsgThroughputIn)
assert.Equal(t, float64(0), stats.MsgThroughputOut)
assert.Equal(t, float64(0), stats.AverageMsgSize)
assert.Equal(t, int64(0), stats.StorageSize)
assert.Equal(t, 0, len(stats.Publishers))
assert.Equal(t, 0, len(stats.Subscriptions))
assert.Equal(t, 0, len(stats.Replication))
assert.Equal(t, "", stats.DeDuplicationStatus)
assert.Equal(t, 2, stats.Metadata.Partitions)
assert.Equal(t, 0, len(stats.Partitions))
}

func TestGetPerPartitionStatsWithNewFlags(t *testing.T) {
args := []string{"create", "test-per-part-stats-new-flags", "1"}
_, execErr, _, _ := TestTopicCommands(CreateTopicCmd, args)
assert.Nil(t, execErr)

args = []string{"stats", "--partitioned-topic", "--per-partition", "--get-earliest-time-in-backlog", "test-per-part-stats-new-flags"}
out, execErr, _, _ := TestTopicCommands(GetStatsCmd, args)
assert.Nil(t, execErr)

var stats utils.PartitionedTopicStats
err := json.Unmarshal(out.Bytes(), &stats)
if err != nil {
t.Fatal(err)
}

defaultStats := utils.PartitionedTopicStats{
MsgRateIn: 0,
MsgRateOut: 0,
MsgThroughputIn: 0,
MsgThroughputOut: 0,
AverageMsgSize: 0,
StorageSize: 0,
Publishers: []utils.PublisherStats{},
Subscriptions: map[string]utils.SubscriptionStats{},
Replication: map[string]utils.ReplicatorStats{},
DeDuplicationStatus: "",
Metadata: utils.PartitionedTopicMetadata{Partitions: 1},
Partitions: map[string]utils.TopicStats{
"persistent://public/default/test-per-part-stats-new-flags-partition-0": {
MsgRateIn: 0,
MsgRateOut: 0,
MsgThroughputIn: 0,
MsgThroughputOut: 0,
AverageMsgSize: 0,
StorageSize: 0,
Publishers: []utils.PublisherStats{},
Subscriptions: map[string]utils.SubscriptionStats{},
Replication: map[string]utils.ReplicatorStats{},
DeDuplicationStatus: "Disabled",
},
},
}

assert.Equal(t, defaultStats, stats)
}
Loading