Skip to content

Commit 2698566

Browse files
chyezhclaude
andcommitted
enhance: enrich show replica/resource-group display with session hostname
- show replica: display RW/RO Query Nodes and RW/RO Streaming Nodes with hostname from session info, one node per line with tabwriter alignment - show resource-group: display full ResourceGroupConfig (request, limit, transfer_from, transfer_to, node_filter) and nodes with hostname - show wal-distribution: replace StreamingNode address with hostname - show wal-recovery-storage: replace StreamingNode address with hostname - All node displays show "NotFound" when session is missing Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> Signed-off-by: chyezh <chyezh@outlook.com>
1 parent 79ce64f commit 2698566

File tree

4 files changed

+210
-44
lines changed

4 files changed

+210
-44
lines changed

states/etcd/show/replica.go

Lines changed: 85 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,9 @@ package show
33
import (
44
"context"
55
"fmt"
6-
"sort"
6+
"slices"
77
"strings"
8+
"text/tabwriter"
89

910
"github.com/cockroachdb/errors"
1011
"github.com/samber/lo"
@@ -45,29 +46,42 @@ func (c *ComponentShow) ReplicaCommand(ctx context.Context, p *ReplicaParam) (*f
4546
return nil, errors.Wrap(err, "failed to list replica info")
4647
}
4748

49+
sessions, err := common.ListSessions(ctx, c.client, c.metaPath)
50+
if err != nil {
51+
return nil, errors.Wrap(err, "failed to list sessions")
52+
}
53+
4854
rs := framework.NewListResult[Replicas](replicas)
4955
rs.collections = lo.SliceToMap(collections, func(c *models.Collection) (int64, *models.Collection) { return c.GetProto().GetID(), c })
56+
rs.sessionMap = lo.SliceToMap(sessions, func(s *models.Session) (int64, *models.Session) { return s.ServerID, s })
5057
return framework.NewPresetResultSet(rs, framework.NameFormat(p.Format)), nil
5158
}
5259

5360
type Replicas struct {
5461
framework.ListResultSet[*models.Replica]
5562
collections map[int64]*models.Collection
63+
sessionMap map[int64]*models.Session
5664
}
5765

5866
func (rs *Replicas) PrintAs(format framework.Format) string {
5967
switch format {
6068
case framework.FormatDefault, framework.FormatPlain:
6169
sb := &strings.Builder{}
70+
tw := tabwriter.NewWriter(sb, 0, 0, 2, ' ', 0)
6271
groups := lo.GroupBy(rs.Data, func(replica *models.Replica) int64 { return replica.GetProto().CollectionID })
6372
for collectionID, replicas := range groups {
64-
fmt.Fprintf(sb, "CollectionID: %d\t CollectionName:%s\n", collectionID, rs.collections[collectionID].GetProto().Schema.Name)
73+
collName := ""
74+
if coll, ok := rs.collections[collectionID]; ok {
75+
collName = coll.GetProto().Schema.Name
76+
}
77+
fmt.Fprintf(tw, "CollectionID: %d\tCollectionName: %s\n", collectionID, collName)
6578
for _, replica := range replicas {
66-
rs.printReplica(sb, replica)
79+
rs.printReplica(tw, replica)
6780
}
68-
fmt.Fprintln(sb, "================================================================================")
69-
fmt.Fprintln(sb)
81+
fmt.Fprintln(tw, "================================================================================")
82+
fmt.Fprintln(tw)
7083
}
84+
tw.Flush()
7185
return sb.String()
7286
case framework.FormatJSON:
7387
return rs.printAsJSON()
@@ -76,18 +90,26 @@ func (rs *Replicas) PrintAs(format framework.Format) string {
7690
return ""
7791
}
7892

93+
type nodeInfoJSON struct {
94+
NodeID int64 `json:"node_id"`
95+
HostName string `json:"hostname"`
96+
}
97+
7998
func (rs *Replicas) printAsJSON() string {
8099
type ShardReplicaJSON struct {
81-
Shard string `json:"shard"`
82-
Nodes []int64 `json:"nodes"`
100+
Shard string `json:"shard"`
101+
Nodes []nodeInfoJSON `json:"nodes"`
83102
}
84103

85104
type ReplicaJSON struct {
86-
ReplicaID int64 `json:"replica_id"`
87-
CollectionID int64 `json:"collection_id"`
88-
ResourceGroup string `json:"resource_group"`
89-
Nodes []int64 `json:"nodes"`
90-
ShardReplicas []ShardReplicaJSON `json:"shard_replicas,omitempty"`
105+
ReplicaID int64 `json:"replica_id"`
106+
CollectionID int64 `json:"collection_id"`
107+
ResourceGroup string `json:"resource_group"`
108+
RwNodes []nodeInfoJSON `json:"rw_nodes"`
109+
RoNodes []nodeInfoJSON `json:"ro_nodes,omitempty"`
110+
RwStreamingNodes []nodeInfoJSON `json:"rw_streaming_nodes,omitempty"`
111+
RoStreamingNodes []nodeInfoJSON `json:"ro_streaming_nodes,omitempty"`
112+
ShardReplicas []ShardReplicaJSON `json:"shard_replicas,omitempty"`
91113
}
92114

93115
type CollectionReplicasJSON struct {
@@ -120,15 +142,18 @@ func (rs *Replicas) printAsJSON() string {
120142
for shard, shardReplica := range replica.ChannelNodeInfos {
121143
shardReplicas = append(shardReplicas, ShardReplicaJSON{
122144
Shard: shard,
123-
Nodes: shardReplica.RwNodes,
145+
Nodes: rs.toNodeInfoJSONs(shardReplica.RwNodes),
124146
})
125147
}
126148
replicaJSONs = append(replicaJSONs, ReplicaJSON{
127-
ReplicaID: replica.ID,
128-
CollectionID: replica.CollectionID,
129-
ResourceGroup: replica.ResourceGroup,
130-
Nodes: replica.Nodes,
131-
ShardReplicas: shardReplicas,
149+
ReplicaID: replica.ID,
150+
CollectionID: replica.CollectionID,
151+
ResourceGroup: replica.ResourceGroup,
152+
RwNodes: rs.toNodeInfoJSONs(replica.Nodes),
153+
RoNodes: rs.toNodeInfoJSONs(replica.RoNodes),
154+
RwStreamingNodes: rs.toNodeInfoJSONs(replica.RwSqNodes),
155+
RoStreamingNodes: rs.toNodeInfoJSONs(replica.RoSqNodes),
156+
ShardReplicas: shardReplicas,
132157
})
133158
}
134159

@@ -142,14 +167,49 @@ func (rs *Replicas) printAsJSON() string {
142167
return framework.MarshalJSON(output)
143168
}
144169

145-
func (rs *Replicas) printReplica(sb *strings.Builder, r *models.Replica) {
170+
func (rs *Replicas) toNodeInfoJSONs(nodeIDs []int64) []nodeInfoJSON {
171+
if len(nodeIDs) == 0 {
172+
return nil
173+
}
174+
result := make([]nodeInfoJSON, 0, len(nodeIDs))
175+
for _, id := range nodeIDs {
176+
result = append(result, nodeInfoJSON{NodeID: id, HostName: rs.getHostName(id)})
177+
}
178+
return result
179+
}
180+
181+
func (rs *Replicas) getHostName(nodeID int64) string {
182+
if sess, ok := rs.sessionMap[nodeID]; ok {
183+
return sess.HostName
184+
}
185+
return "NotFound"
186+
}
187+
188+
func (rs *Replicas) printReplica(w *tabwriter.Writer, r *models.Replica) {
146189
replica := r.GetProto()
147-
fmt.Fprintln(sb, "================================================================================")
148-
fmt.Fprintf(sb, "ReplicaID: %d \n", replica.ID)
149-
fmt.Fprintf(sb, "ResourceGroup: %s\n", replica.ResourceGroup)
150-
sort.Slice(replica.Nodes, func(i, j int) bool { return replica.Nodes[i] < replica.Nodes[j] })
151-
fmt.Fprintf(sb, "All Nodes:%v\n", replica.Nodes)
190+
fmt.Fprintln(w, "================================================================================")
191+
fmt.Fprintf(w, "ReplicaID: %d\n", replica.ID)
192+
fmt.Fprintf(w, "ResourceGroup: %s\n", replica.ResourceGroup)
193+
rs.printNodeList(w, "RW Query Nodes", replica.Nodes)
194+
rs.printNodeList(w, "RO Query Nodes", replica.RoNodes)
195+
rs.printNodeList(w, "RW Streaming Nodes", replica.RwSqNodes)
196+
rs.printNodeList(w, "RO Streaming Nodes", replica.RoSqNodes)
152197
for shard, shardReplica := range replica.ChannelNodeInfos {
153-
fmt.Fprintf(sb, "-- Shard Replica: Shard (%s) Nodes:%v\n", shard, shardReplica.RwNodes)
198+
fmt.Fprintf(w, "-- Shard (%s) Nodes:\n", shard)
199+
for _, id := range shardReplica.RwNodes {
200+
fmt.Fprintf(w, " - %d\t%s\n", id, rs.getHostName(id))
201+
}
202+
}
203+
}
204+
205+
func (rs *Replicas) printNodeList(w *tabwriter.Writer, label string, nodeIDs []int64) {
206+
if len(nodeIDs) == 0 {
207+
return
208+
}
209+
sorted := append([]int64{}, nodeIDs...)
210+
slices.Sort(sorted)
211+
fmt.Fprintf(w, "%s (%d):\n", label, len(sorted))
212+
for _, id := range sorted {
213+
fmt.Fprintf(w, " - %d\t%s\n", id, rs.getHostName(id))
154214
}
155215
}

states/etcd/show/resource_group.go

Lines changed: 94 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,18 @@ package show
33
import (
44
"context"
55
"fmt"
6+
"slices"
67
"strings"
8+
"text/tabwriter"
9+
10+
"github.com/cockroachdb/errors"
11+
"github.com/samber/lo"
712

813
"github.com/milvus-io/birdwatcher/framework"
914
"github.com/milvus-io/birdwatcher/models"
1015
"github.com/milvus-io/birdwatcher/states/etcd/common"
16+
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
17+
"github.com/milvus-io/milvus-proto/go-api/v2/rgpb"
1118
)
1219

1320
type ResourceGroupParam struct {
@@ -23,21 +30,62 @@ func (c *ComponentShow) ResourceGroupCommand(ctx context.Context, p *ResourceGro
2330
return nil, err
2431
}
2532

26-
return framework.NewPresetResultSet(framework.NewListResult[ResourceGroups](rgs), framework.NameFormat(p.Format)), nil
33+
sessions, err := common.ListSessions(ctx, c.client, c.metaPath)
34+
if err != nil {
35+
return nil, errors.Wrap(err, "failed to list sessions")
36+
}
37+
38+
rs := framework.NewListResult[ResourceGroups](rgs)
39+
rs.sessionMap = lo.SliceToMap(sessions, func(s *models.Session) (int64, *models.Session) { return s.ServerID, s })
40+
return framework.NewPresetResultSet(rs, framework.NameFormat(p.Format)), nil
2741
}
2842

2943
type ResourceGroups struct {
3044
framework.ListResultSet[*models.ResourceGroup]
45+
sessionMap map[int64]*models.Session
46+
}
47+
48+
func (rs *ResourceGroups) getHostName(nodeID int64) string {
49+
if sess, ok := rs.sessionMap[nodeID]; ok {
50+
return sess.HostName
51+
}
52+
return "NotFound"
3153
}
3254

3355
func (rs *ResourceGroups) PrintAs(format framework.Format) string {
3456
switch format {
3557
case framework.FormatDefault, framework.FormatPlain:
3658
sb := &strings.Builder{}
59+
tw := tabwriter.NewWriter(sb, 0, 0, 2, ' ', 0)
3760
for _, info := range rs.Data {
3861
rg := info.GetProto()
39-
fmt.Fprintf(sb, "Resource Group Name: %s\tCapacity[Legacy]: %d\tNodes: %v\tLimit: %d\tRequest: %d\n", rg.GetName(), rg.GetCapacity(), rg.GetNodes(), rg.GetConfig().GetLimits().GetNodeNum(), rg.GetConfig().GetRequests().GetNodeNum())
62+
fmt.Fprintf(tw, "Resource Group: %s\n", rg.GetName())
63+
// Config
64+
cfg := rg.GetConfig()
65+
fmt.Fprintf(tw, " Config:\n")
66+
fmt.Fprintf(tw, " Request: %d\tLimit: %d\n", cfg.GetRequests().GetNodeNum(), cfg.GetLimits().GetNodeNum())
67+
if transferFrom := cfg.GetTransferFrom(); len(transferFrom) > 0 {
68+
fmt.Fprintf(tw, " TransferFrom: %v\n", lo.Map(transferFrom, func(t *rgpb.ResourceGroupTransfer, _ int) string { return t.GetResourceGroup() }))
69+
}
70+
if transferTo := cfg.GetTransferTo(); len(transferTo) > 0 {
71+
fmt.Fprintf(tw, " TransferTo: %v\n", lo.Map(transferTo, func(t *rgpb.ResourceGroupTransfer, _ int) string { return t.GetResourceGroup() }))
72+
}
73+
if nodeFilter := cfg.GetNodeFilter(); nodeFilter != nil && len(nodeFilter.GetNodeLabels()) > 0 {
74+
labels := lo.Map(nodeFilter.GetNodeLabels(), func(kv *commonpb.KeyValuePair, _ int) string {
75+
return fmt.Sprintf("%s=%s", kv.GetKey(), kv.GetValue())
76+
})
77+
fmt.Fprintf(tw, " NodeFilter: [%s]\n", strings.Join(labels, ", "))
78+
}
79+
// Nodes
80+
nodes := append([]int64{}, rg.GetNodes()...)
81+
slices.Sort(nodes)
82+
fmt.Fprintf(tw, " Nodes (%d):\n", len(nodes))
83+
for _, nodeID := range nodes {
84+
fmt.Fprintf(tw, " - %d\t%s\n", nodeID, rs.getHostName(nodeID))
85+
}
86+
fmt.Fprintln(tw)
4087
}
88+
tw.Flush()
4189
fmt.Fprintf(sb, "--- Total Resource Group(s): %d\n", len(rs.Data))
4290
return sb.String()
4391
case framework.FormatJSON:
@@ -47,12 +95,23 @@ func (rs *ResourceGroups) PrintAs(format framework.Format) string {
4795
}
4896

4997
func (rs *ResourceGroups) printAsJSON() string {
98+
type NodeInfoJSON struct {
99+
NodeID int64 `json:"node_id"`
100+
HostName string `json:"hostname"`
101+
}
102+
103+
type ConfigJSON struct {
104+
RequestNodeNum int32 `json:"request_node_num"`
105+
LimitNodeNum int32 `json:"limit_node_num"`
106+
TransferFrom []string `json:"transfer_from,omitempty"`
107+
TransferTo []string `json:"transfer_to,omitempty"`
108+
NodeLabels []string `json:"node_labels,omitempty"`
109+
}
110+
50111
type ResourceGroupJSON struct {
51-
Name string `json:"name"`
52-
CapacityLegacy int32 `json:"capacity_legacy"`
53-
Nodes []int64 `json:"nodes"`
54-
LimitNodeNum int32 `json:"limit_node_num"`
55-
RequestNodeNum int32 `json:"request_node_num"`
112+
Name string `json:"name"`
113+
Config ConfigJSON `json:"config"`
114+
Nodes []NodeInfoJSON `json:"nodes"`
56115
}
57116

58117
type OutputJSON struct {
@@ -67,12 +126,35 @@ func (rs *ResourceGroups) printAsJSON() string {
67126

68127
for _, info := range rs.Data {
69128
rg := info.GetProto()
129+
cfg := rg.GetConfig()
130+
131+
cfgJSON := ConfigJSON{
132+
RequestNodeNum: cfg.GetRequests().GetNodeNum(),
133+
LimitNodeNum: cfg.GetLimits().GetNodeNum(),
134+
}
135+
if transferFrom := cfg.GetTransferFrom(); len(transferFrom) > 0 {
136+
cfgJSON.TransferFrom = lo.Map(transferFrom, func(t *rgpb.ResourceGroupTransfer, _ int) string { return t.GetResourceGroup() })
137+
}
138+
if transferTo := cfg.GetTransferTo(); len(transferTo) > 0 {
139+
cfgJSON.TransferTo = lo.Map(transferTo, func(t *rgpb.ResourceGroupTransfer, _ int) string { return t.GetResourceGroup() })
140+
}
141+
if nodeFilter := cfg.GetNodeFilter(); nodeFilter != nil && len(nodeFilter.GetNodeLabels()) > 0 {
142+
cfgJSON.NodeLabels = lo.Map(nodeFilter.GetNodeLabels(), func(kv *commonpb.KeyValuePair, _ int) string {
143+
return fmt.Sprintf("%s=%s", kv.GetKey(), kv.GetValue())
144+
})
145+
}
146+
147+
sortedNodes := append([]int64{}, rg.GetNodes()...)
148+
slices.Sort(sortedNodes)
149+
nodes := make([]NodeInfoJSON, 0, len(sortedNodes))
150+
for _, nodeID := range sortedNodes {
151+
nodes = append(nodes, NodeInfoJSON{NodeID: nodeID, HostName: rs.getHostName(nodeID)})
152+
}
153+
70154
output.ResourceGroups = append(output.ResourceGroups, ResourceGroupJSON{
71-
Name: rg.GetName(),
72-
CapacityLegacy: rg.GetCapacity(),
73-
Nodes: rg.GetNodes(),
74-
LimitNodeNum: rg.GetConfig().GetLimits().GetNodeNum(),
75-
RequestNodeNum: rg.GetConfig().GetRequests().GetNodeNum(),
155+
Name: rg.GetName(),
156+
Config: cfgJSON,
157+
Nodes: nodes,
76158
})
77159
}
78160

states/etcd/show/wal_distribution.go

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,10 @@ import (
88
"time"
99

1010
"github.com/jedib0t/go-pretty/v6/table"
11+
"github.com/samber/lo"
1112

1213
"github.com/milvus-io/birdwatcher/framework"
14+
"github.com/milvus-io/birdwatcher/models"
1315
"github.com/milvus-io/birdwatcher/states/etcd/common"
1416
"github.com/milvus-io/milvus/pkg/v2/proto/streamingpb"
1517
"github.com/milvus-io/milvus/pkg/v2/streaming/util/types"
@@ -27,6 +29,12 @@ func (c *ComponentShow) WalDistributionCommand(ctx context.Context, p *WALDistri
2729
return err
2830
}
2931

32+
sessions, err := common.ListSessions(ctx, c.client, c.metaPath)
33+
if err != nil {
34+
return err
35+
}
36+
sessionMap := lo.SliceToMap(sessions, func(s *models.Session) (int64, *models.Session) { return s.ServerID, s })
37+
3038
t := table.NewWriter()
3139
t.SetTitle("WAL Distribution At Coordinator")
3240
t.SetOutputMirror(os.Stdout)
@@ -37,31 +45,38 @@ func (c *ComponentShow) WalDistributionCommand(ctx context.Context, p *WALDistri
3745
t.AppendHeader(header)
3846
for _, meta := range metas {
3947
channelInfo := types.NewPChannelInfoFromProto(meta.Channel)
40-
assignedTo := types.NewStreamingNodeInfoFromProto(meta.Node)
48+
nodeInfo := types.NewStreamingNodeInfoFromProto(meta.Node)
4149
lastAssignTimestamp := time.Unix(int64(meta.LastAssignTimestampSeconds), 0)
4250
row := table.Row{
4351
channelInfo,
44-
assignedTo,
52+
formatStreamingNode(nodeInfo, sessionMap),
4553
strings.TrimPrefix(meta.State.String(), "PCHANNEL_META_STATE_"),
4654
lastAssignTimestamp,
4755
}
4856
if p.WithHistory {
49-
row = append(row, c.formatHistory(meta.Histories))
57+
row = append(row, c.formatHistory(meta.Histories, sessionMap))
5058
}
5159
t.AppendRow(row)
5260
}
5361
t.Render()
5462
return nil
5563
}
5664

57-
func (c *ComponentShow) formatHistory(histories []*streamingpb.PChannelAssignmentLog) string {
65+
func (c *ComponentShow) formatHistory(histories []*streamingpb.PChannelAssignmentLog, sessionMap map[int64]*models.Session) string {
5866
if len(histories) == 0 {
5967
return ""
6068
}
6169
ss := make([]string, 0, len(histories))
6270
for _, history := range histories {
63-
assignedTo := types.NewStreamingNodeInfoFromProto(history.Node)
64-
ss = append(ss, fmt.Sprintf("%s@%d->%s", types.AccessMode(history.AccessMode).String(), history.Term, assignedTo.String()))
71+
nodeInfo := types.NewStreamingNodeInfoFromProto(history.Node)
72+
ss = append(ss, fmt.Sprintf("%s@%d->%s", types.AccessMode(history.AccessMode).String(), history.Term, formatStreamingNode(nodeInfo, sessionMap)))
6573
}
6674
return strings.Join(ss, "\n")
6775
}
76+
77+
func formatStreamingNode(node types.StreamingNodeInfo, sessionMap map[int64]*models.Session) string {
78+
if sess, ok := sessionMap[node.ServerID]; ok {
79+
return fmt.Sprintf("%d(%s)", node.ServerID, sess.HostName)
80+
}
81+
return fmt.Sprintf("%d(NotFound)", node.ServerID)
82+
}

0 commit comments

Comments
 (0)