From d5f17ab4fc7ef26602fba794b073da4484e59f44 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=A3=8E=E6=89=87=E6=BB=91=E7=BF=94=E7=BF=BC?= Date: Thu, 11 Dec 2025 11:59:22 +0000 Subject: [PATCH 1/4] Try to optimize pipe performance --- transport/pipe/impl.go | 45 +++++++++++++++++------------------------- 1 file changed, 18 insertions(+), 27 deletions(-) diff --git a/transport/pipe/impl.go b/transport/pipe/impl.go index e5d678272ab9..81172906c656 100644 --- a/transport/pipe/impl.go +++ b/transport/pipe/impl.go @@ -3,7 +3,6 @@ package pipe import ( "errors" "io" - "runtime" "sync" "time" @@ -136,11 +135,10 @@ func (p *pipe) writeMultiBufferInternal(mb buf.MultiBuffer) error { if p.data == nil { p.data = mb - return nil + } else { + p.data, _ = buf.MergeMulti(p.data, mb) } - - p.data, _ = buf.MergeMulti(p.data, mb) - return errSlowDown + return nil } func (p *pipe) WriteMultiBuffer(mb buf.MultiBuffer) error { @@ -155,30 +153,23 @@ func (p *pipe) WriteMultiBuffer(mb buf.MultiBuffer) error { return nil } - if err == errSlowDown { - p.readSignal.Signal() - - // Yield current goroutine. Hopefully the reading counterpart can pick up the payload. - runtime.Gosched() - return nil - } - - if err == errBufferFull && p.option.discardOverflow { - buf.ReleaseMulti(mb) - return nil + if err == errBufferFull { + if p.option.discardOverflow { + buf.ReleaseMulti(mb) + return nil + } + select { + case <-p.writeSignal.Wait(): + continue + case <-p.done.Wait(): + buf.ReleaseMulti(mb) + return io.ErrClosedPipe + } } - if err != errBufferFull { - buf.ReleaseMulti(mb) - p.readSignal.Signal() - return err - } - - select { - case <-p.writeSignal.Wait(): - case <-p.done.Wait(): - return io.ErrClosedPipe - } + buf.ReleaseMulti(mb) + p.readSignal.Signal() + return err } } From 8c0d32f6f128dc56f964f16ed4a173666c89ebfc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=A3=8E=E6=89=87=E6=BB=91=E7=BF=94=E7=BF=BC?= Date: Sun, 21 Dec 2025 17:30:52 +0000 Subject: [PATCH 2/4] Magic --- common/buf/copy.go | 90 +++++++++++++++++++++++++++++++++++++++++++- common/buf/reader.go | 5 +++ 2 files changed, 94 insertions(+), 1 deletion(-) diff --git a/common/buf/copy.go b/common/buf/copy.go index 4cc3be881d88..55881ebd67a0 100644 --- a/common/buf/copy.go +++ b/common/buf/copy.go @@ -2,6 +2,7 @@ package buf import ( "io" + "sync" "time" "github.com/xtls/xray-core/common/errors" @@ -113,7 +114,12 @@ func Copy(reader Reader, writer Writer, options ...CopyOption) error { for _, option := range options { option(&handler) } - err := copyInternal(reader, writer, &handler) + var err error + if sReader, ok := reader.(*SingleReader); ok && false { + err = copyV(sReader, writer, &handler) + } else { + err = copyInternal(reader, writer, &handler) + } if err != nil && errors.Cause(err) != io.EOF { return err } @@ -133,3 +139,85 @@ func CopyOnceTimeout(reader Reader, writer Writer, timeout time.Duration) error } return writer.WriteMultiBuffer(mb) } + +func copyV(r *SingleReader, w Writer, handler *copyHandler) error { + // max packet len is 8192, so buffer channel size is 512, about 4MB memory usage + cache := make(chan *Buffer, 512) + stopRead := make(chan struct{}) + var rErr error + var wErr error + wg := sync.WaitGroup{} + wg.Add(2) + // downlink + go func() { + defer wg.Done() + defer close(cache) + for { + b, err := r.readBuffer() + if err == nil { + select { + case cache <- b: + // must be write error + case <-stopRead: + b.Release() + return + } + } else { + rErr = err + select { + case cache <- b: + case <-stopRead: + b.Release() + } + return + } + } + }() + // uplink + go func() { + defer wg.Done() + for { + b, ok := <-cache + if !ok { + return + } + var buffers = []*Buffer{b} + for stop := false; !stop; { + select { + case b, ok := <-cache: + if !ok { + stop = true + continue + } + buffers = append(buffers, b) + default: + stop = true + } + } + mb := MultiBuffer(buffers) + err := w.WriteMultiBuffer(mb) + for _, handler := range handler.onData { + handler(mb) + } + ReleaseMulti(mb) + if err != nil { + wErr = err + close(stopRead) + return + } + } + }() + wg.Wait() + for range cache { + // drain cache + b := <-cache + b.Release() + } + if wErr != nil { + return writeError{wErr} + } + if rErr != nil { + return readError{rErr} + } + return nil +} diff --git a/common/buf/reader.go b/common/buf/reader.go index 33d362d427af..ca00043b4785 100644 --- a/common/buf/reader.go +++ b/common/buf/reader.go @@ -159,6 +159,11 @@ func (r *SingleReader) ReadMultiBuffer() (MultiBuffer, error) { return MultiBuffer{b}, err } +func (r *SingleReader) readBuffer() (*Buffer, error) { + b, err := ReadBuffer(r.Reader) + return b, err +} + // PacketReader is a Reader that read one Buffer every time. type PacketReader struct { io.Reader From 42214c32875009905a4b37d846cf9a457a25c4f0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=A3=8E=E6=89=87=E6=BB=91=E7=BF=94=E7=BF=BC?= Date: Mon, 22 Dec 2025 02:44:45 +0000 Subject: [PATCH 3/4] Use bufferSize to calc channel buffer --- common/buf/copy.go | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/common/buf/copy.go b/common/buf/copy.go index 55881ebd67a0..6219b30bc2de 100644 --- a/common/buf/copy.go +++ b/common/buf/copy.go @@ -5,6 +5,7 @@ import ( "sync" "time" + "github.com/xtls/xray-core/features/policy" "github.com/xtls/xray-core/common/errors" "github.com/xtls/xray-core/common/signal" "github.com/xtls/xray-core/features/stats" @@ -141,8 +142,16 @@ func CopyOnceTimeout(reader Reader, writer Writer, timeout time.Duration) error } func copyV(r *SingleReader, w Writer, handler *copyHandler) error { - // max packet len is 8192, so buffer channel size is 512, about 4MB memory usage - cache := make(chan *Buffer, 512) + // channel buffer size is maxBuffer/maxPerPacketLen (ignore the case of many small packets) + // default buffer size: + // 0 in ARM MIPS MIPSLE + // 4kb in ARM64 MIPS64 MIPS64LE + // 512kb in others + channelBuffer := (policy.SessionDefault().Buffer.PerConnection)/Size + if channelBuffer <= 0 { + channelBuffer = 4 + } + cache := make(chan *Buffer, channelBuffer) stopRead := make(chan struct{}) var rErr error var wErr error From c6a76ff281fd38401f620a146cb14e6b95d48889 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=A3=8E=E6=89=87=E6=BB=91=E7=BF=94=E7=BF=BC?= Date: Mon, 22 Dec 2025 05:13:52 +0000 Subject: [PATCH 4/4] format --- common/buf/copy.go | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/common/buf/copy.go b/common/buf/copy.go index 6219b30bc2de..9ec5b50e6142 100644 --- a/common/buf/copy.go +++ b/common/buf/copy.go @@ -5,9 +5,9 @@ import ( "sync" "time" - "github.com/xtls/xray-core/features/policy" "github.com/xtls/xray-core/common/errors" "github.com/xtls/xray-core/common/signal" + "github.com/xtls/xray-core/features/policy" "github.com/xtls/xray-core/features/stats" ) @@ -147,7 +147,7 @@ func copyV(r *SingleReader, w Writer, handler *copyHandler) error { // 0 in ARM MIPS MIPSLE // 4kb in ARM64 MIPS64 MIPS64LE // 512kb in others - channelBuffer := (policy.SessionDefault().Buffer.PerConnection)/Size + channelBuffer := (policy.SessionDefault().Buffer.PerConnection) / Size if channelBuffer <= 0 { channelBuffer = 4 } @@ -217,9 +217,8 @@ func copyV(r *SingleReader, w Writer, handler *copyHandler) error { } }() wg.Wait() - for range cache { - // drain cache - b := <-cache + // drain cache + for b := range cache { b.Release() } if wErr != nil {