Skip to content

Commit 9600267

Browse files
committed
feat(metrics): add endpoint server and phase1 gauge collectors
1 parent 3ac5f5d commit 9600267

File tree

19 files changed

+625
-20
lines changed

19 files changed

+625
-20
lines changed

cmd/run.go

Lines changed: 56 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,6 @@ import (
2626
"github.com/daeuniverse/outbound/protocol/direct"
2727
"gopkg.in/natefinch/lumberjack.v2"
2828

29-
_ "net/http/pprof"
30-
3129
"github.com/daeuniverse/dae/cmd/internal"
3230
"github.com/daeuniverse/dae/common"
3331
"github.com/daeuniverse/dae/common/consts"
@@ -37,6 +35,7 @@ import (
3735
"github.com/daeuniverse/dae/control"
3836
"github.com/daeuniverse/dae/pkg/config_parser"
3937
"github.com/daeuniverse/dae/pkg/logger"
38+
"github.com/daeuniverse/dae/pkg/metrics"
4039
"github.com/mohae/deepcopy"
4140
"github.com/okzk/sdnotify"
4241
"github.com/sirupsen/logrus"
@@ -134,12 +133,25 @@ func Run(log *logrus.Logger, conf *config.Config, externGeoDataDirs []string) (e
134133
return err
135134
}
136135

137-
var pprofServer *http.Server
138-
if conf.Global.PprofPort != 0 {
139-
pprofAddr := fmt.Sprintf("localhost:%d", conf.Global.PprofPort)
140-
pprofServer = &http.Server{Addr: pprofAddr, Handler: nil}
141-
go pprofServer.ListenAndServe()
136+
metricsState := metrics.NewState()
137+
metricsState.SetControlPlane(c)
138+
metricsRegistry := metrics.NewRegistry(metricsState)
139+
140+
var endpointServer *http.Server
141+
startEndpointServer := func(cfg metrics.EndpointConfig) {
142+
if cfg.ListenAddress == "" {
143+
endpointServer = nil
144+
return
145+
}
146+
endpointServer = metrics.NewEndpointServer(cfg, metricsRegistry)
147+
go func(server *http.Server, endpointCfg metrics.EndpointConfig) {
148+
if e := metrics.StartEndpointServer(server, endpointCfg); e != nil && !errors.Is(e, http.ErrServerClosed) {
149+
log.WithError(e).Errorln("Endpoint server stopped with error")
150+
}
151+
}(endpointServer, cfg)
142152
}
153+
endpointCfg := endpointConfigFromGlobal(conf, log)
154+
startEndpointServer(endpointCfg)
143155

144156
// Serve tproxy TCP/UDP server util signals.
145157
var listener *control.Listener
@@ -259,6 +271,10 @@ loop:
259271
// Only keep dns cache when ip version preference not change.
260272
dnsCache = c.CloneDnsCache()
261273
}
274+
// Stop old DNS listener before creating new one to avoid port conflicts
275+
if err := c.StopDNSListener(); err != nil {
276+
log.Warnf("[Reload] Failed to stop old DNS listener: %v", err)
277+
}
262278
log.Warnln("[Reload] Load new control plane")
263279
newC, err := newControlPlane(log, obj, dnsCache, newConf, externGeoDataDirs)
264280
if err != nil {
@@ -290,21 +306,21 @@ loop:
290306
c = newC
291307
conf = newConf
292308
reloading = true
309+
metricsState.SetControlPlane(newC)
293310

294311
// Ready to close.
295312
if abortConnections {
296313
oldC.AbortConnections()
297314
}
298315
oldC.Close()
299316

300-
if pprofServer != nil {
301-
pprofServer.Shutdown(context.Background())
302-
pprofServer = nil
303-
}
304-
if newConf.Global.PprofPort != 0 {
305-
pprofAddr := fmt.Sprintf("localhost:%d", conf.Global.PprofPort)
306-
pprofServer = &http.Server{Addr: pprofAddr, Handler: nil}
307-
go pprofServer.ListenAndServe()
317+
newEndpointCfg := endpointConfigFromGlobal(newConf, log)
318+
if endpointConfigChanged(endpointCfg, newEndpointCfg) {
319+
if endpointServer != nil {
320+
_ = endpointServer.Shutdown(context.Background())
321+
}
322+
endpointCfg = newEndpointCfg
323+
startEndpointServer(endpointCfg)
308324
}
309325
case syscall.SIGHUP:
310326
// Ignore.
@@ -316,12 +332,37 @@ loop:
316332
}
317333
defer os.Remove(PidFilePath)
318334
defer control.GetDaeNetns().Close()
335+
if endpointServer != nil {
336+
_ = endpointServer.Shutdown(context.Background())
337+
}
319338
if e := c.Close(); e != nil {
320339
return fmt.Errorf("close control plane: %w", e)
321340
}
322341
return nil
323342
}
324343

344+
func endpointConfigFromGlobal(conf *config.Config, log *logrus.Logger) metrics.EndpointConfig {
345+
cfg := metrics.EndpointConfig{
346+
ListenAddress: conf.Global.EndpointListenAddress,
347+
Username: conf.Global.EndpointUsername,
348+
Password: conf.Global.EndpointPassword,
349+
TlsCertificate: conf.Global.EndpointTlsCertificate,
350+
TlsKey: conf.Global.EndpointTlsKey,
351+
PrometheusEnabled: conf.Global.EndpointPrometheusEnabled,
352+
PrometheusPath: conf.Global.EndpointPrometheusPath,
353+
PprofEnabled: conf.Global.PprofPort != 0,
354+
}
355+
if cfg.ListenAddress == "" && conf.Global.PprofPort != 0 {
356+
log.Warnln("pprof_port is deprecated, please use endpoint_listen_address instead")
357+
cfg.ListenAddress = fmt.Sprintf("localhost:%d", conf.Global.PprofPort)
358+
}
359+
return cfg
360+
}
361+
362+
func endpointConfigChanged(a, b metrics.EndpointConfig) bool {
363+
return a != b
364+
}
365+
325366
func newControlPlane(log *logrus.Logger, bpf interface{}, dnsCache map[string]*control.DnsCache, conf *config.Config, externGeoDataDirs []string) (c *control.ControlPlane, err error) {
326367
// Deep copy to prevent modification.
327368
conf = deepcopy.Copy(conf).(*config.Config)

component/outbound/dialer/alive_dialer_set.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,12 @@ func (a *AliveDialerSet) GetRand() *Dialer {
102102
return a.inorderedAliveDialerSet[ind]
103103
}
104104

105+
func (a *AliveDialerSet) AliveCount() int {
106+
a.mu.Lock()
107+
defer a.mu.Unlock()
108+
return len(a.inorderedAliveDialerSet)
109+
}
110+
105111
func (a *AliveDialerSet) SortingLatency(d *Dialer) time.Duration {
106112
return a.dialerToLatency[d] + a.dialerToLatencyOffset[d]
107113
}

component/outbound/dialer/connectivity_check.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -503,6 +503,18 @@ func (d *Dialer) MustGetLatencies10(typ *NetworkType) *LatenciesN {
503503
return d.mustGetCollection(typ).Latencies10
504504
}
505505

506+
// GetCollectionState returns a snapshot of the dialer's health state for the given network type.
507+
func (d *Dialer) GetCollectionState(typ *NetworkType) (alive bool, lastLatency, avg10, movingAvg time.Duration) {
508+
d.collectionFineMu.Lock()
509+
col := d.mustGetCollection(typ)
510+
alive = col.Alive
511+
movingAvg = col.MovingAverage
512+
d.collectionFineMu.Unlock()
513+
lastLatency, _ = col.Latencies10.LastLatency()
514+
avg10, _ = col.Latencies10.AvgLatency()
515+
return
516+
}
517+
506518
// RegisterAliveDialerSet is thread-safe.
507519
func (d *Dialer) RegisterAliveDialerSet(a *AliveDialerSet) {
508520
if a == nil {

component/outbound/dialer_group.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,17 @@ func (g *DialerGroup) GetSelectionPolicy() (policy consts.DialerSelectionPolicy)
183183
return g.selectionPolicy.Policy
184184
}
185185

186+
func (g *DialerGroup) AliveDialerSets() [6]*dialer.AliveDialerSet {
187+
return g.aliveDialerSets
188+
}
189+
190+
func (g *DialerGroup) SelectionPolicyName() string {
191+
if g.selectionPolicy == nil {
192+
return ""
193+
}
194+
return string(g.selectionPolicy.Policy)
195+
}
196+
186197
func (d *DialerGroup) MustGetAliveDialerSet(typ *dialer.NetworkType) *dialer.AliveDialerSet {
187198
if typ.IsDns {
188199
switch typ.L4Proto {

config/config.go

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -46,13 +46,25 @@ type Global struct {
4646
SniffingTimeout time.Duration `mapstructure:"sniffing_timeout" default:"100ms"`
4747
TlsImplementation string `mapstructure:"tls_implementation" default:"tls"`
4848
UtlsImitate string `mapstructure:"utls_imitate" default:"chrome_auto"`
49+
TlsFragment bool `mapstructure:"tls_fragment" default:"false"`
50+
TlsFragmentLength string `mapstructure:"tls_fragment_length" default:"50-100"`
51+
TlsFragmentInterval string `mapstructure:"tls_fragment_interval" default:"10-20"`
4952
PprofPort uint16 `mapstructure:"pprof_port" default:"0"`
5053
Mptcp bool `mapstructure:"mptcp" default:"false"`
5154
FallbackResolver string `mapstructure:"fallback_resolver" default:"8.8.8.8:53"`
5255
BandwidthMaxTx string `mapstructure:"bandwidth_max_tx" default:"0"`
5356
BandwidthMaxRx string `mapstructure:"bandwidth_max_rx" default:"0"`
57+
UDPHopInterval time.Duration `mapstructure:"udphop_interval" default:"30s"`
5458
DnsPerformanceLevel string `mapstructure:"dns_performance_level" default:"balanced"`
5559
DnsIngressManual DnsIngressManual `mapstructure:"dns_ingress_manual"`
60+
// Endpoint (metrics + diagnostics)
61+
EndpointListenAddress string `mapstructure:"endpoint_listen_address" default:""`
62+
EndpointUsername string `mapstructure:"endpoint_username" default:""`
63+
EndpointPassword string `mapstructure:"endpoint_password" default:""`
64+
EndpointTlsCertificate string `mapstructure:"endpoint_tls_certificate" default:""`
65+
EndpointTlsKey string `mapstructure:"endpoint_tls_key" default:""`
66+
EndpointPrometheusEnabled bool `mapstructure:"endpoint_prometheus_enabled" default:"false"`
67+
EndpointPrometheusPath string `mapstructure:"endpoint_prometheus_path" default:"/metrics"`
5668
}
5769

5870
type Utls struct {
@@ -121,10 +133,14 @@ type DnsRouting struct {
121133
}
122134
type KeyableString string
123135
type Dns struct {
124-
IpVersionPrefer int `mapstructure:"ipversion_prefer"`
125-
FixedDomainTtl []KeyableString `mapstructure:"fixed_domain_ttl"`
126-
Upstream []KeyableString `mapstructure:"upstream"`
127-
Routing DnsRouting `mapstructure:"routing"`
136+
IpVersionPrefer int `mapstructure:"ipversion_prefer"`
137+
FixedDomainTtl []KeyableString `mapstructure:"fixed_domain_ttl"`
138+
Upstream []KeyableString `mapstructure:"upstream"`
139+
Routing DnsRouting `mapstructure:"routing"`
140+
Bind string `mapstructure:"bind"`
141+
OptimisticCache bool `mapstructure:"optimistic_cache" default:"true"`
142+
OptimisticCacheTtl int `mapstructure:"optimistic_cache_ttl" default:"60"`
143+
MaxCacheSize int `mapstructure:"max_cache_size" default:"0"`
128144
}
129145

130146
type Routing struct {

control/control_plane.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -616,6 +616,23 @@ func (c *ControlPlane) InjectBpf(bpf *bpfObjects) {
616616
c.core.InjectBpf(bpf)
617617
}
618618

619+
func (c *ControlPlane) Outbounds() []*outbound.DialerGroup {
620+
return c.outbounds
621+
}
622+
623+
func (c *ControlPlane) GetDnsController() *DnsController {
624+
return c.dnsController
625+
}
626+
627+
func (c *ControlPlane) CountTcpConnections() int {
628+
count := 0
629+
c.inConnections.Range(func(_, _ interface{}) bool {
630+
count++
631+
return true
632+
})
633+
return count
634+
}
635+
619636
func (c *ControlPlane) CloneDnsCache() map[string]*DnsCache {
620637
c.dnsController.dnsCacheMu.Lock()
621638
defer c.dnsController.dnsCacheMu.Unlock()

control/dns_control.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,27 @@ func (c *DnsController) cacheKey(qname string, qtype uint16) string {
132132
return dnsmessage.CanonicalName(qname) + strconv.Itoa(int(qtype))
133133
}
134134

135+
func (c *DnsController) CacheSize() int {
136+
c.dnsCacheMu.Lock()
137+
defer c.dnsCacheMu.Unlock()
138+
return len(c.dnsCache)
139+
}
140+
141+
// ConcurrencyInfo returns in-flight request count and limit.
142+
// This implementation uses handling map size as in-flight and has no hard limit.
143+
func (c *DnsController) ConcurrencyInfo() (inUse, limit int) {
144+
c.handling.Range(func(_, _ interface{}) bool {
145+
inUse++
146+
return true
147+
})
148+
return inUse, 0
149+
}
150+
151+
// ForwarderCacheInfo is empty on this branch because forwarders are not cached.
152+
func (c *DnsController) ForwarderCacheInfo() (count int, inFlightByUpstream map[string]int32) {
153+
return 0, map[string]int32{}
154+
}
155+
135156
func (c *DnsController) RemoveDnsRespCache(cacheKey string) {
136157
c.dnsCacheMu.Lock()
137158
_, ok := c.dnsCache[cacheKey]

control/udp_endpoint_pool.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,15 @@ func (p *UdpEndpointPool) Get(lAddr netip.AddrPort) (udpEndpoint *UdpEndpoint, o
138138
return _ue.(*UdpEndpoint), ok
139139
}
140140

141+
func (p *UdpEndpointPool) Count() int {
142+
count := 0
143+
p.pool.Range(func(_, _ interface{}) bool {
144+
count++
145+
return true
146+
})
147+
return count
148+
}
149+
141150
func (p *UdpEndpointPool) GetOrCreate(lAddr netip.AddrPort, createOption *UdpEndpointOptions) (udpEndpoint *UdpEndpoint, isNew bool, err error) {
142151
_ue, ok := p.pool.Load(lAddr)
143152
begin:

control/udp_task_pool.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,12 @@ func NewUdpTaskPool() *UdpTaskPool {
5757
return p
5858
}
5959

60+
func (p *UdpTaskPool) Count() int {
61+
p.mu.Lock()
62+
defer p.mu.Unlock()
63+
return len(p.m)
64+
}
65+
6066
// EmitTask: Make sure packets with the same key (4 tuples) will be sent in order.
6167
func (p *UdpTaskPool) EmitTask(key string, task UdpTask) {
6268
p.mu.Lock()

example.dae

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,16 @@ global {
1212
# Set non-zero value to enable pprof.
1313
pprof_port: 0
1414

15+
# Endpoint configuration for metrics and diagnostics.
16+
# Set a listen address to enable the endpoint server (disabled by default).
17+
#endpoint_listen_address: '0.0.0.0:5556'
18+
#endpoint_username: ''
19+
#endpoint_password: ''
20+
#endpoint_tls_certificate: ''
21+
#endpoint_tls_key: ''
22+
#endpoint_prometheus_enabled: true
23+
#endpoint_prometheus_path: /metrics
24+
1525
# DNS ingress performance level.
1626
# Options: lean, balanced (default), aggressive, manual.
1727
# - lean: for low-power/embedded devices (32 workers)

0 commit comments

Comments
 (0)