Skip to content

Commit a54e1f2

Browse files
authored
Remove redundant stats in mux and bridge dispatcher (#5466)
Fixes #5446
1 parent 5d94a62 commit a54e1f2

File tree

5 files changed

+16
-31
lines changed

5 files changed

+16
-31
lines changed

app/dispatcher/default.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,7 @@ func (d *DefaultDispatcher) getLink(ctx context.Context) (*transport.Link, *tran
196196
return inboundLink, outboundLink
197197
}
198198

199-
func (d *DefaultDispatcher) WrapLink(ctx context.Context, link *transport.Link) *transport.Link {
199+
func WrapLink(ctx context.Context, policyManager policy.Manager, statsManager stats.Manager, link *transport.Link) *transport.Link {
200200
sessionInbound := session.InboundFromContext(ctx)
201201
var user *protocol.MemoryUser
202202
if sessionInbound != nil {
@@ -206,16 +206,16 @@ func (d *DefaultDispatcher) WrapLink(ctx context.Context, link *transport.Link)
206206
link.Reader = &buf.TimeoutWrapperReader{Reader: link.Reader}
207207

208208
if user != nil && len(user.Email) > 0 {
209-
p := d.policy.ForLevel(user.Level)
209+
p := policyManager.ForLevel(user.Level)
210210
if p.Stats.UserUplink {
211211
name := "user>>>" + user.Email + ">>>traffic>>>uplink"
212-
if c, _ := stats.GetOrRegisterCounter(d.stats, name); c != nil {
212+
if c, _ := stats.GetOrRegisterCounter(statsManager, name); c != nil {
213213
link.Reader.(*buf.TimeoutWrapperReader).Counter = c
214214
}
215215
}
216216
if p.Stats.UserDownlink {
217217
name := "user>>>" + user.Email + ">>>traffic>>>downlink"
218-
if c, _ := stats.GetOrRegisterCounter(d.stats, name); c != nil {
218+
if c, _ := stats.GetOrRegisterCounter(statsManager, name); c != nil {
219219
link.Writer = &SizeStatWriter{
220220
Counter: c,
221221
Writer: link.Writer,
@@ -224,7 +224,7 @@ func (d *DefaultDispatcher) WrapLink(ctx context.Context, link *transport.Link)
224224
}
225225
if p.Stats.UserOnline {
226226
name := "user>>>" + user.Email + ">>>online"
227-
if om, _ := stats.GetOrRegisterOnlineMap(d.stats, name); om != nil {
227+
if om, _ := stats.GetOrRegisterOnlineMap(statsManager, name); om != nil {
228228
sessionInbounds := session.InboundFromContext(ctx)
229229
userIP := sessionInbounds.Source.Address.String()
230230
om.AddIP(userIP)
@@ -357,7 +357,7 @@ func (d *DefaultDispatcher) DispatchLink(ctx context.Context, destination net.De
357357
content = new(session.Content)
358358
ctx = session.ContextWithContent(ctx, content)
359359
}
360-
outbound = d.WrapLink(ctx, outbound)
360+
outbound = WrapLink(ctx, d.policy, d.stats, outbound)
361361
sniffingRequest := content.SniffingRequest
362362
if !sniffingRequest.Enabled {
363363
d.routedDispatch(ctx, outbound, destination)
@@ -449,6 +449,7 @@ func sniffer(ctx context.Context, cReader *cachedReader, metadataOnly bool, netw
449449
}
450450
return contentResult, contentErr
451451
}
452+
452453
func (d *DefaultDispatcher) routedDispatch(ctx context.Context, link *transport.Link, destination net.Destination) {
453454
outbounds := session.OutboundsFromContext(ctx)
454455
ob := outbounds[len(outbounds)-1]

app/reverse/bridge.go

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -229,10 +229,6 @@ func (w *BridgeWorker) DispatchLink(ctx context.Context, dest net.Destination, l
229229
}
230230
return w.Dispatcher.DispatchLink(ctx, dest, link)
231231
}
232-
233-
if d, ok := w.Dispatcher.(routing.WrapLinkDispatcher); ok {
234-
link = d.WrapLink(ctx, link)
235-
}
236232
w.handleInternalConn(link)
237233

238234
return nil

common/mux/server.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -63,9 +63,6 @@ func (s *Server) DispatchLink(ctx context.Context, dest net.Destination, link *t
6363
if dest.Address != muxCoolAddress {
6464
return s.dispatcher.DispatchLink(ctx, dest, link)
6565
}
66-
if d, ok := s.dispatcher.(routing.WrapLinkDispatcher); ok {
67-
link = d.WrapLink(ctx, link)
68-
}
6966
worker, err := NewServerWorker(ctx, s.dispatcher, link)
7067
if err != nil {
7168
return err

features/routing/dispatcher.go

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,3 @@ type Dispatcher interface {
2626
func DispatcherType() interface{} {
2727
return (*Dispatcher)(nil)
2828
}
29-
30-
// Just for type assertion
31-
type WrapLinkDispatcher interface {
32-
Dispatcher
33-
WrapLink(ctx context.Context, link *transport.Link) *transport.Link
34-
}

proxy/vless/inbound/inbound.go

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"time"
1313
"unsafe"
1414

15+
"github.com/xtls/xray-core/app/dispatcher"
1516
"github.com/xtls/xray-core/app/reverse"
1617
"github.com/xtls/xray-core/common"
1718
"github.com/xtls/xray-core/common/buf"
@@ -31,6 +32,7 @@ import (
3132
"github.com/xtls/xray-core/features/outbound"
3233
"github.com/xtls/xray-core/features/policy"
3334
"github.com/xtls/xray-core/features/routing"
35+
"github.com/xtls/xray-core/features/stats"
3436
"github.com/xtls/xray-core/proxy"
3537
"github.com/xtls/xray-core/proxy/vless"
3638
"github.com/xtls/xray-core/proxy/vless/encoding"
@@ -72,10 +74,11 @@ func init() {
7274
type Handler struct {
7375
inboundHandlerManager feature_inbound.Manager
7476
policyManager policy.Manager
77+
stats stats.Manager
7578
validator vless.Validator
7679
decryption *encryption.ServerInstance
7780
outboundHandlerManager outbound.Manager
78-
wrapLink func(ctx context.Context, link *transport.Link) *transport.Link
81+
defaultDispatcher routing.Dispatcher
7982
ctx context.Context
8083
fallbacks map[string]map[string]map[string]*Fallback // or nil
8184
// regexps map[string]*regexp.Regexp // or nil
@@ -84,16 +87,13 @@ type Handler struct {
8487
// New creates a new VLess inbound handler.
8588
func New(ctx context.Context, config *Config, dc dns.Client, validator vless.Validator) (*Handler, error) {
8689
v := core.MustFromContext(ctx)
87-
var wrapLinkFunc func(ctx context.Context, link *transport.Link) *transport.Link
88-
if dispatcher, ok := v.GetFeature(routing.DispatcherType()).(routing.WrapLinkDispatcher); ok {
89-
wrapLinkFunc = dispatcher.WrapLink
90-
}
9190
handler := &Handler{
9291
inboundHandlerManager: v.GetFeature(feature_inbound.ManagerType()).(feature_inbound.Manager),
9392
policyManager: v.GetFeature(policy.ManagerType()).(policy.Manager),
93+
stats: v.GetFeature(stats.ManagerType()).(stats.Manager),
9494
validator: validator,
9595
outboundHandlerManager: v.GetFeature(outbound.ManagerType()).(outbound.Manager),
96-
wrapLink: wrapLinkFunc,
96+
defaultDispatcher: v.GetFeature(routing.DispatcherType()).(routing.Dispatcher),
9797
ctx: ctx,
9898
}
9999

@@ -264,7 +264,7 @@ func (*Handler) Network() []net.Network {
264264
}
265265

266266
// Process implements proxy.Inbound.Process().
267-
func (h *Handler) Process(ctx context.Context, network net.Network, connection stat.Connection, dispatcher routing.Dispatcher) error {
267+
func (h *Handler) Process(ctx context.Context, network net.Network, connection stat.Connection, dispatch routing.Dispatcher) error {
268268
iConn := stat.TryUnwrapStatsConn(connection)
269269

270270
if h.decryption != nil {
@@ -623,13 +623,10 @@ func (h *Handler) Process(ctx context.Context, network net.Network, connection s
623623
if err != nil {
624624
return err
625625
}
626-
if h.wrapLink == nil {
627-
return errors.New("VLESS reverse must have a dispatcher that implemented routing.WrapLinkDispatcher")
628-
}
629-
return r.NewMux(ctx, h.wrapLink(ctx, &transport.Link{Reader: clientReader, Writer: clientWriter}))
626+
return r.NewMux(ctx, dispatcher.WrapLink(ctx, h.policyManager, h.stats, &transport.Link{Reader: clientReader, Writer: clientWriter}))
630627
}
631628

632-
if err := dispatcher.DispatchLink(ctx, request.Destination(), &transport.Link{
629+
if err := dispatch.DispatchLink(ctx, request.Destination(), &transport.Link{
633630
Reader: clientReader,
634631
Writer: clientWriter},
635632
); err != nil {

0 commit comments

Comments
 (0)