Skip to content

Commit 8c0d32f

Browse files
authored
Magic
1 parent d5f17ab commit 8c0d32f

File tree

2 files changed

+94
-1
lines changed

2 files changed

+94
-1
lines changed

common/buf/copy.go

Lines changed: 89 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package buf
22

33
import (
44
"io"
5+
"sync"
56
"time"
67

78
"github.com/xtls/xray-core/common/errors"
@@ -113,7 +114,12 @@ func Copy(reader Reader, writer Writer, options ...CopyOption) error {
113114
for _, option := range options {
114115
option(&handler)
115116
}
116-
err := copyInternal(reader, writer, &handler)
117+
var err error
118+
if sReader, ok := reader.(*SingleReader); ok && false {
119+
err = copyV(sReader, writer, &handler)
120+
} else {
121+
err = copyInternal(reader, writer, &handler)
122+
}
117123
if err != nil && errors.Cause(err) != io.EOF {
118124
return err
119125
}
@@ -133,3 +139,85 @@ func CopyOnceTimeout(reader Reader, writer Writer, timeout time.Duration) error
133139
}
134140
return writer.WriteMultiBuffer(mb)
135141
}
142+
143+
func copyV(r *SingleReader, w Writer, handler *copyHandler) error {
144+
// max packet len is 8192, so buffer channel size is 512, about 4MB memory usage
145+
cache := make(chan *Buffer, 512)
146+
stopRead := make(chan struct{})
147+
var rErr error
148+
var wErr error
149+
wg := sync.WaitGroup{}
150+
wg.Add(2)
151+
// downlink
152+
go func() {
153+
defer wg.Done()
154+
defer close(cache)
155+
for {
156+
b, err := r.readBuffer()
157+
if err == nil {
158+
select {
159+
case cache <- b:
160+
// must be write error
161+
case <-stopRead:
162+
b.Release()
163+
return
164+
}
165+
} else {
166+
rErr = err
167+
select {
168+
case cache <- b:
169+
case <-stopRead:
170+
b.Release()
171+
}
172+
return
173+
}
174+
}
175+
}()
176+
// uplink
177+
go func() {
178+
defer wg.Done()
179+
for {
180+
b, ok := <-cache
181+
if !ok {
182+
return
183+
}
184+
var buffers = []*Buffer{b}
185+
for stop := false; !stop; {
186+
select {
187+
case b, ok := <-cache:
188+
if !ok {
189+
stop = true
190+
continue
191+
}
192+
buffers = append(buffers, b)
193+
default:
194+
stop = true
195+
}
196+
}
197+
mb := MultiBuffer(buffers)
198+
err := w.WriteMultiBuffer(mb)
199+
for _, handler := range handler.onData {
200+
handler(mb)
201+
}
202+
ReleaseMulti(mb)
203+
if err != nil {
204+
wErr = err
205+
close(stopRead)
206+
return
207+
}
208+
}
209+
}()
210+
wg.Wait()
211+
for range cache {
212+
// drain cache
213+
b := <-cache
214+
b.Release()
215+
}
216+
if wErr != nil {
217+
return writeError{wErr}
218+
}
219+
if rErr != nil {
220+
return readError{rErr}
221+
}
222+
return nil
223+
}

common/buf/reader.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,11 @@ func (r *SingleReader) ReadMultiBuffer() (MultiBuffer, error) {
159159
return MultiBuffer{b}, err
160160
}
161161

162+
func (r *SingleReader) readBuffer() (*Buffer, error) {
163+
b, err := ReadBuffer(r.Reader)
164+
return b, err
165+
}
166+
162167
// PacketReader is a Reader that read one Buffer every time.
163168
type PacketReader struct {
164169
io.Reader

0 commit comments

Comments
 (0)