Skip to content

Commit 70858d3

Browse files
authored
Merge pull request #120 from SenseUnit/bwlimit_copy_offload
Offload copy for bwlimit forwarder
2 parents 62195c7 + 511481b commit 70858d3

File tree

1 file changed

+35
-30
lines changed

1 file changed

+35
-30
lines changed

forward/bwlimit.go

Lines changed: 35 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,13 @@ import (
44
"context"
55
"errors"
66
"io"
7+
"time"
78

89
"github.com/zeebo/xxh3"
910
"golang.org/x/time/rate"
1011
)
1112

12-
const copyBufSize = 128 * 1024
13+
const copyChunkSize = 128 * 1024
1314

1415
type BWLimit struct {
1516
d []rate.Limiter
@@ -20,7 +21,7 @@ func NewBWLimit(bytesPerSecond float64, buckets uint, separate bool) *BWLimit {
2021
if buckets == 0 {
2122
buckets = 1
2223
}
23-
lim := *(rate.NewLimiter(rate.Limit(bytesPerSecond), copyBufSize))
24+
lim := *(rate.NewLimiter(rate.Limit(bytesPerSecond), copyChunkSize))
2425
d := make([]rate.Limiter, buckets)
2526
for i := range d {
2627
d[i] = lim
@@ -38,40 +39,44 @@ func NewBWLimit(bytesPerSecond float64, buckets uint, separate bool) *BWLimit {
3839
}
3940
}
4041

41-
var errInvalidWrite = errors.New("invalid write result")
42-
4342
func (l *BWLimit) copy(ctx context.Context, rl *rate.Limiter, dst io.Writer, src io.Reader) (written int64, err error) {
44-
buf := make([]byte, copyBufSize)
43+
lim := &io.LimitedReader{
44+
R: src,
45+
N: copyChunkSize,
46+
}
47+
var n int64
4548
for {
46-
nr, er := src.Read(buf)
47-
if nr > 0 {
48-
if e := rl.WaitN(ctx, nr); e != nil {
49-
err = e
50-
break
51-
}
52-
nw, ew := dst.Write(buf[0:nr])
53-
if nw < 0 || nr < nw {
54-
nw = 0
55-
if ew == nil {
56-
ew = errInvalidWrite
57-
}
58-
}
59-
written += int64(nw)
60-
if ew != nil {
61-
err = ew
62-
break
63-
}
64-
if nr != nw {
65-
err = io.ErrShortWrite
66-
break
49+
t := time.Now()
50+
r := rl.ReserveN(t, copyChunkSize)
51+
if !r.OK() {
52+
err = errors.New("can't get rate limit reservation")
53+
return
54+
}
55+
delay := r.DelayFrom(t)
56+
if delay > 0 {
57+
select {
58+
case <-time.After(delay):
59+
case <-ctx.Done():
60+
err = ctx.Err()
61+
return
6762
}
6863
}
69-
if er != nil {
70-
if er != io.EOF {
71-
err = er
64+
n, err = io.Copy(dst, lim)
65+
written += n
66+
if n < copyChunkSize {
67+
r.CancelAt(t)
68+
if n > 0 {
69+
rl.ReserveN(t, int(n))
7270
}
73-
break
7471
}
72+
if err != nil {
73+
return
74+
}
75+
if lim.N > 0 {
76+
// EOF from underlying stream
77+
return
78+
}
79+
lim.N = copyChunkSize
7580
}
7681
return written, err
7782
}

0 commit comments

Comments
 (0)