Skip to content

Commit 7b2795b

Browse files
authored
Magic
1 parent d5f17ab commit 7b2795b

File tree

2 files changed

+91
-0
lines changed

2 files changed

+91
-0
lines changed

common/buf/copy.go

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package buf
22

33
import (
44
"io"
5+
"sync"
56
"time"
67

78
"github.com/xtls/xray-core/common/errors"
@@ -113,6 +114,9 @@ func Copy(reader Reader, writer Writer, options ...CopyOption) error {
113114
for _, option := range options {
114115
option(&handler)
115116
}
117+
if sReader, ok := reader.(*SingleReader); ok {
118+
return copyV(sReader, writer, &handler)
119+
}
116120
err := copyInternal(reader, writer, &handler)
117121
if err != nil && errors.Cause(err) != io.EOF {
118122
return err
@@ -133,3 +137,85 @@ func CopyOnceTimeout(reader Reader, writer Writer, timeout time.Duration) error
133137
}
134138
return writer.WriteMultiBuffer(mb)
135139
}
140+
141+
func copyV(r *SingleReader, w Writer, handler *copyHandler) error {
142+
// max packet len is 8192, so buffer channel size is 512, about 4MB memory usage
143+
cache := make(chan *Buffer, 512)
144+
stopRead := make(chan struct{})
145+
var rErr error
146+
var wErr error
147+
wg := sync.WaitGroup{}
148+
wg.Add(2)
149+
// downlink
150+
go func() {
151+
defer wg.Done()
152+
defer close(cache)
153+
for {
154+
b, err := r.readBuffer()
155+
if err == nil {
156+
select {
157+
case cache <- b:
158+
// must be write error
159+
case <-stopRead:
160+
b.Release()
161+
return
162+
}
163+
} else {
164+
rErr = err
165+
select {
166+
case cache <- b:
167+
case <-stopRead:
168+
b.Release()
169+
}
170+
return
171+
}
172+
}
173+
}()
174+
// uplink
175+
go func() {
176+
defer wg.Done()
177+
for {
178+
b, ok := <-cache
179+
if !ok {
180+
return
181+
}
182+
var buffers = []*Buffer{b}
183+
for stop := false; !stop; {
184+
select {
185+
case b, ok := <-cache:
186+
if !ok {
187+
stop = true
188+
continue
189+
}
190+
buffers = append(buffers, b)
191+
default:
192+
stop = true
193+
}
194+
}
195+
mb := MultiBuffer(buffers)
196+
err := w.WriteMultiBuffer(mb)
197+
for _, handler := range handler.onData {
198+
handler(mb)
199+
}
200+
ReleaseMulti(mb)
201+
if err != nil {
202+
wErr = err
203+
close(stopRead)
204+
return
205+
}
206+
}
207+
}()
208+
wg.Wait()
209+
for range cache {
210+
// drain cache
211+
b := <-cache
212+
b.Release()
213+
}
214+
if wErr != nil {
215+
return writeError{wErr}
216+
}
217+
if rErr != nil && errors.Cause(rErr) != io.EOF {
218+
return readError{rErr}
219+
}
220+
return nil
221+
}

common/buf/reader.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,11 @@ func (r *SingleReader) ReadMultiBuffer() (MultiBuffer, error) {
159159
return MultiBuffer{b}, err
160160
}
161161

162+
func (r *SingleReader) readBuffer() (*Buffer, error) {
163+
b, err := ReadBuffer(r.Reader)
164+
return b, err
165+
}
166+
162167
// PacketReader is a Reader that read one Buffer every time.
163168
type PacketReader struct {
164169
io.Reader

0 commit comments

Comments
 (0)