Skip to content

Commit aa1c86a

Browse files
authored
Chore v20 (#871)
* remove envoy mod * feature: Add xds metrics
1 parent 925c977 commit aa1c86a

File tree

3 files changed

+34
-10
lines changed

3 files changed

+34
-10
lines changed

.asf.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ notifications:
2020
jira_options: link label
2121

2222
github:
23-
homepage: "https://dubbo-kubernetes.github.io/dsm-docs/"
23+
homepage: "https://dubbo-kubernetes.github.io/"
2424
description: "Dubbo Service Mesh for Kubernetes."
2525
features:
2626
# Enable wiki for documentation

dubbod/discovery/pkg/model/push_context.go

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package model
1818

1919
import (
2020
"cmp"
21+
"encoding/json"
2122
"sort"
2223
"strings"
2324
"sync"
@@ -1153,14 +1154,11 @@ func firstDestinationRule(csr *consolidatedSubRules, hostname host.Name) *networ
11531154
return nil
11541155
}
11551156

1156-
func ConfigNamesOfKind(configs sets.Set[ConfigKey], k kind.Kind) sets.String {
1157-
ret := sets.New[string]()
1158-
1159-
for conf := range configs {
1160-
if conf.Kind == k {
1161-
ret.Insert(conf.Name)
1162-
}
1157+
func (ps *PushContext) StatusJSON() ([]byte, error) {
1158+
if ps == nil {
1159+
return []byte{'{', '}'}, nil
11631160
}
1164-
1165-
return ret
1161+
ps.proxyStatusMutex.RLock()
1162+
defer ps.proxyStatusMutex.RUnlock()
1163+
return json.MarshalIndent(ps.ProxyStatus, "", " ")
11661164
}

dubbod/discovery/pkg/xds/discovery.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ import (
3636
"google.golang.org/grpc"
3737
)
3838

39+
var periodicRefreshMetrics = 10 * time.Second
40+
3941
var processStartTime = time.Now()
4042

4143
type DiscoveryServer struct {
@@ -104,6 +106,7 @@ func (s *DiscoveryServer) Register(rpcs *grpc.Server) {
104106
func (s *DiscoveryServer) Start(stopCh <-chan struct{}) {
105107
go s.handleUpdates(stopCh)
106108
go s.sendPushes(stopCh)
109+
go s.periodicRefreshMetrics(stopCh)
107110
go s.Cache.Run(stopCh)
108111
}
109112

@@ -306,6 +309,29 @@ func doSendPushes(stopCh <-chan struct{}, semaphore chan struct{}, queue *PushQu
306309
}
307310
}
308311

312+
func (s *DiscoveryServer) periodicRefreshMetrics(stopCh <-chan struct{}) {
313+
ticker := time.NewTicker(periodicRefreshMetrics)
314+
defer ticker.Stop()
315+
for {
316+
select {
317+
case <-ticker.C:
318+
push := s.globalPushContext()
319+
model.LastPushMutex.Lock()
320+
if model.LastPushStatus != push {
321+
model.LastPushStatus = push
322+
push.UpdateMetrics()
323+
out, _ := model.LastPushStatus.StatusJSON()
324+
if string(out) != "{}" {
325+
log.Infof("Push Status: %s", string(out))
326+
}
327+
}
328+
model.LastPushMutex.Unlock()
329+
case <-stopCh:
330+
return
331+
}
332+
}
333+
}
334+
309335
func (s *DiscoveryServer) dropCacheForRequest(req *model.PushRequest) {
310336
// If we don't know what updated, cannot safely cache. Clear the whole cache
311337
if req.Forced {

0 commit comments

Comments
 (0)