Skip to content

Commit 30c615a

Browse files
committed
Try to optimize pipe performance (#5409)
1 parent fa64775 commit 30c615a

File tree

3 files changed

+120
-28
lines changed

3 files changed

+120
-28
lines changed

common/buf/copy.go

Lines changed: 97 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,12 @@ package buf
22

33
import (
44
"io"
5+
"sync"
56
"time"
67

78
"github.com/xtls/xray-core/common/errors"
89
"github.com/xtls/xray-core/common/signal"
10+
"github.com/xtls/xray-core/features/policy"
911
"github.com/xtls/xray-core/features/stats"
1012
)
1113

@@ -113,7 +115,12 @@ func Copy(reader Reader, writer Writer, options ...CopyOption) error {
113115
for _, option := range options {
114116
option(&handler)
115117
}
116-
err := copyInternal(reader, writer, &handler)
118+
var err error
119+
if sReader, ok := reader.(*SingleReader); ok {
120+
err = copyV(sReader, writer, &handler)
121+
} else {
122+
err = copyInternal(reader, writer, &handler)
123+
}
117124
if err != nil && errors.Cause(err) != io.EOF {
118125
return err
119126
}
@@ -133,3 +140,92 @@ func CopyOnceTimeout(reader Reader, writer Writer, timeout time.Duration) error
133140
}
134141
return writer.WriteMultiBuffer(mb)
135142
}
143+
144+
func copyV(r *SingleReader, w Writer, handler *copyHandler) error {
145+
// channel buffer size is maxBuffer/maxPerPacketLen (ignore the case of many small packets)
146+
// default buffer size:
147+
// 0 in ARM MIPS MIPSLE
148+
// 4kb in ARM64 MIPS64 MIPS64LE
149+
// 512kb in others
150+
channelBuffer := (policy.SessionDefault().Buffer.PerConnection) / Size
151+
if channelBuffer <= 0 {
152+
channelBuffer = 4
153+
}
154+
cache := make(chan *Buffer, channelBuffer)
155+
stopRead := make(chan struct{})
156+
var rErr error
157+
var wErr error
158+
wg := sync.WaitGroup{}
159+
wg.Add(2)
160+
// downlink
161+
go func() {
162+
defer wg.Done()
163+
defer close(cache)
164+
for {
165+
b, err := r.readBuffer()
166+
if err == nil {
167+
select {
168+
case cache <- b:
169+
// must be write error
170+
case <-stopRead:
171+
b.Release()
172+
return
173+
}
174+
} else {
175+
rErr = err
176+
select {
177+
case cache <- b:
178+
case <-stopRead:
179+
b.Release()
180+
}
181+
return
182+
}
183+
}
184+
}()
185+
// uplink
186+
go func() {
187+
defer wg.Done()
188+
for {
189+
b, ok := <-cache
190+
if !ok {
191+
return
192+
}
193+
var buffers = []*Buffer{b}
194+
for stop := false; !stop; {
195+
select {
196+
case b, ok := <-cache:
197+
if !ok {
198+
stop = true
199+
continue
200+
}
201+
buffers = append(buffers, b)
202+
default:
203+
stop = true
204+
}
205+
}
206+
mb := MultiBuffer(buffers)
207+
err := w.WriteMultiBuffer(mb)
208+
for _, handler := range handler.onData {
209+
handler(mb)
210+
}
211+
ReleaseMulti(mb)
212+
if err != nil {
213+
wErr = err
214+
close(stopRead)
215+
return
216+
}
217+
}
218+
}()
219+
wg.Wait()
220+
// drain cache
221+
for b := range cache {
222+
b.Release()
223+
}
224+
if wErr != nil {
225+
return writeError{wErr}
226+
}
227+
if rErr != nil {
228+
return readError{rErr}
229+
}
230+
return nil
231+
}

common/buf/reader.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,11 @@ func (r *SingleReader) ReadMultiBuffer() (MultiBuffer, error) {
159159
return MultiBuffer{b}, err
160160
}
161161

162+
func (r *SingleReader) readBuffer() (*Buffer, error) {
163+
b, err := ReadBuffer(r.Reader)
164+
return b, err
165+
}
166+
162167
// PacketReader is a Reader that read one Buffer every time.
163168
type PacketReader struct {
164169
io.Reader

transport/pipe/impl.go

Lines changed: 18 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package pipe
33
import (
44
"errors"
55
"io"
6-
"runtime"
76
"sync"
87
"time"
98

@@ -136,11 +135,10 @@ func (p *pipe) writeMultiBufferInternal(mb buf.MultiBuffer) error {
136135

137136
if p.data == nil {
138137
p.data = mb
139-
return nil
138+
} else {
139+
p.data, _ = buf.MergeMulti(p.data, mb)
140140
}
141-
142-
p.data, _ = buf.MergeMulti(p.data, mb)
143-
return errSlowDown
141+
return nil
144142
}
145143

146144
func (p *pipe) WriteMultiBuffer(mb buf.MultiBuffer) error {
@@ -155,30 +153,23 @@ func (p *pipe) WriteMultiBuffer(mb buf.MultiBuffer) error {
155153
return nil
156154
}
157155

158-
if err == errSlowDown {
159-
p.readSignal.Signal()
160-
161-
// Yield current goroutine. Hopefully the reading counterpart can pick up the payload.
162-
runtime.Gosched()
163-
return nil
164-
}
165-
166-
if err == errBufferFull && p.option.discardOverflow {
167-
buf.ReleaseMulti(mb)
168-
return nil
156+
if err == errBufferFull {
157+
if p.option.discardOverflow {
158+
buf.ReleaseMulti(mb)
159+
return nil
160+
}
161+
select {
162+
case <-p.writeSignal.Wait():
163+
continue
164+
case <-p.done.Wait():
165+
buf.ReleaseMulti(mb)
166+
return io.ErrClosedPipe
167+
}
169168
}
170169

171-
if err != errBufferFull {
172-
buf.ReleaseMulti(mb)
173-
p.readSignal.Signal()
174-
return err
175-
}
176-
177-
select {
178-
case <-p.writeSignal.Wait():
179-
case <-p.done.Wait():
180-
return io.ErrClosedPipe
181-
}
170+
buf.ReleaseMulti(mb)
171+
p.readSignal.Signal()
172+
return err
182173
}
183174
}
184175

0 commit comments

Comments
 (0)