Skip to content

Commit f87b3cf

Browse files
Merge pull request #19 from GetStream/improve-workflows
get metrics for all clusters, not just for the active one
2 parents 93935a3 + 8be91b6 commit f87b3cf

File tree

3 files changed

+75
-43
lines changed

3 files changed

+75
-43
lines changed

cmd/main.go

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,10 +44,16 @@ func main() {
4444
srv.AddCluster(ctx, cluster.Name, cluster.PDAddrs)
4545
}
4646

47-
// Start metrics monitor with dynamic PD address from active cluster
47+
// Start metrics monitor for all clusters
4848
metrics := services.NewMonitor(
49-
srv.GetActivePDAddr,
50-
srv.GetActiveClusterName,
49+
func() []services.ClusterInfo {
50+
clusters := srv.GetAllClusters()
51+
result := make([]services.ClusterInfo, len(clusters))
52+
for i, c := range clusters {
53+
result[i] = services.ClusterInfo{Name: c.Name, PDAddr: c.PDAddr}
54+
}
55+
return result
56+
},
5157
5*time.Second,
5258
cache,
5359
)

pkg/server/server.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,29 @@ func (s *Server) GetActivePDAddr() string {
126126
return ""
127127
}
128128

129+
// ClusterInfo holds basic cluster information for metrics polling
130+
type ClusterInfo struct {
131+
Name string
132+
PDAddr string
133+
}
134+
135+
// GetAllClusters returns info for all clusters (for metrics polling)
136+
func (s *Server) GetAllClusters() []ClusterInfo {
137+
s.mu.RLock()
138+
defer s.mu.RUnlock()
139+
140+
clusters := make([]ClusterInfo, 0, len(s.clusters))
141+
for _, conn := range s.clusters {
142+
if len(conn.PDAddrs) > 0 {
143+
clusters = append(clusters, ClusterInfo{
144+
Name: conn.Name,
145+
PDAddr: conn.PDAddrs[0],
146+
})
147+
}
148+
}
149+
return clusters
150+
}
151+
129152
// Close closes all cluster connections
130153
func (s *Server) Close() {
131154
s.mu.Lock()

pkg/services/metrics.go

Lines changed: 43 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -13,31 +13,34 @@ import (
1313
"github.com/GetStream/tikv-ui/pkg/utils"
1414
)
1515

16+
// ClusterInfo holds basic cluster information for metrics polling
17+
type ClusterInfo struct {
18+
Name string
19+
PDAddr string
20+
}
21+
1622
type Monitor struct {
17-
getPDAddr func() string
18-
getClusterName func() string
19-
interval time.Duration
20-
client *http.Client
21-
cache *utils.Cache
23+
getClusters func() []ClusterInfo
24+
interval time.Duration
25+
client *http.Client
26+
cache *utils.Cache
2227
}
2328

24-
func NewMonitor(getPDAddr func() string, getClusterName func() string, interval time.Duration, cache *utils.Cache) *Monitor {
29+
func NewMonitor(getClusters func() []ClusterInfo, interval time.Duration, cache *utils.Cache) *Monitor {
2530
return &Monitor{
26-
getPDAddr: getPDAddr,
27-
getClusterName: getClusterName,
28-
interval: interval,
29-
cache: cache,
31+
getClusters: getClusters,
32+
interval: interval,
33+
cache: cache,
3034
client: &http.Client{
31-
Timeout: 10 * time.Second,
35+
Timeout: 60 * time.Second,
3236
},
3337
}
3438
}
3539

3640
func (m *Monitor) Start(ctx context.Context) {
3741
ticker := time.NewTicker(m.interval)
3842

39-
m.pollStores(ctx)
40-
m.pollTiKVMetrics(ctx)
43+
m.pollAllClusters(ctx)
4144

4245
go func() {
4346
defer ticker.Stop()
@@ -47,28 +50,34 @@ func (m *Monitor) Start(ctx context.Context) {
4750
case <-ctx.Done():
4851
return
4952
case <-ticker.C:
50-
go m.pollStores(ctx)
51-
go m.pollTiKVMetrics(ctx)
53+
go m.pollAllClusters(ctx)
5254
}
5355
}
5456
}()
5557
}
5658

57-
func (m *Monitor) pollStores(ctx context.Context) {
58-
pdAddr := m.getPDAddr()
59-
if pdAddr == "" {
60-
log.Printf("pd metrics: no active PD address")
59+
func (m *Monitor) pollAllClusters(ctx context.Context) {
60+
clusters := m.getClusters()
61+
if len(clusters) == 0 {
62+
log.Printf("metrics: no clusters available")
6163
return
6264
}
6365

64-
clusterName := m.getClusterName()
65-
if clusterName == "" {
66-
log.Printf("pd metrics: no active cluster name")
66+
for _, cluster := range clusters {
67+
m.pollStores(ctx, cluster)
68+
m.pollTiKVMetrics(ctx, cluster.Name)
69+
}
70+
}
71+
72+
func (m *Monitor) pollStores(ctx context.Context, cluster ClusterInfo) {
73+
if cluster.PDAddr == "" {
74+
log.Printf("pd metrics [%s]: no PD address", cluster.Name)
6775
return
6876
}
69-
storesURL := pdAddr + "/pd/api/v1/stores"
70-
if !strings.HasPrefix(pdAddr, "http") {
71-
storesURL = "http://" + pdAddr + "/pd/api/v1/stores"
77+
78+
storesURL := cluster.PDAddr + "/pd/api/v1/stores"
79+
if !strings.HasPrefix(cluster.PDAddr, "http") {
80+
storesURL = "http://" + cluster.PDAddr + "/pd/api/v1/stores"
7281
}
7382
req, err := http.NewRequestWithContext(
7483
ctx,
@@ -77,52 +86,46 @@ func (m *Monitor) pollStores(ctx context.Context) {
7786
nil,
7887
)
7988
if err != nil {
80-
log.Printf("pd metrics: request error: %v", err)
89+
log.Printf("pd metrics [%s]: request error: %v", cluster.Name, err)
8190
return
8291
}
8392

8493
resp, err := m.client.Do(req)
8594
if err != nil {
86-
log.Printf("pd metrics: http error: %v", err)
95+
log.Printf("pd metrics [%s]: http error: %v", cluster.Name, err)
8796
return
8897
}
8998
defer resp.Body.Close()
9099

91100
if resp.StatusCode != http.StatusOK {
92-
log.Printf("pd metrics: bad status %d", resp.StatusCode)
101+
log.Printf("pd metrics [%s]: bad status %d", cluster.Name, resp.StatusCode)
93102
return
94103
}
95104

96105
var data types.PDStoresResponse
97106
if err := json.NewDecoder(resp.Body).Decode(&data); err != nil {
98-
log.Printf("pd metrics: decode error: %v", err)
107+
log.Printf("pd metrics [%s]: decode error: %v", cluster.Name, err)
99108
return
100109
}
101110

102111
sort.Slice(data.Stores, func(i, j int) bool {
103112
return data.Stores[i].Store.ID < data.Stores[j].Store.ID
104113
})
105114

106-
m.cache.Set("metrics:"+clusterName, "pd", data)
115+
m.cache.Set("metrics:"+cluster.Name, "pd", data)
107116
}
108117

109-
func (m *Monitor) pollTiKVMetrics(ctx context.Context) {
110-
clusterName := m.getClusterName()
111-
if clusterName == "" {
112-
log.Printf("tikv metrics: no active cluster name")
113-
return
114-
}
115-
118+
func (m *Monitor) pollTiKVMetrics(ctx context.Context, clusterName string) {
116119
// Get stores from cache
117120
cached, ok := m.cache.Get("metrics:"+clusterName, "pd")
118121
if !ok {
119-
log.Printf("tikv metrics: no PD stores in cache for cluster %s", clusterName)
122+
log.Printf("tikv metrics [%s]: no PD stores in cache", clusterName)
120123
return
121124
}
122125

123126
pdData, ok := cached.(types.PDStoresResponse)
124127
if !ok {
125-
log.Printf("tikv metrics: invalid PD cache data")
128+
log.Printf("tikv metrics [%s]: invalid PD cache data", clusterName)
126129
return
127130
}
128131

@@ -147,7 +150,7 @@ func (m *Monitor) pollTiKVMetrics(ctx context.Context) {
147150
}
148151
newMetrics, err := m.fetchNodeMetrics(ctx, metricsURL, statusAddr)
149152
if err != nil {
150-
log.Printf("tikv metrics: node %s error: %v", statusAddr, err)
153+
log.Printf("tikv metrics [%s]: node %s error: %v", clusterName, statusAddr, err)
151154
continue
152155
}
153156

0 commit comments

Comments
 (0)