Skip to content

Commit 56fc43a

Browse files
authored
Optimized RequestChan for Client Session (#1186)
* proxy: rename RequestChan.Len => Buffered * proxy: set DefaultRequestChanBuffer=128 * proxy: replacing tasks channel with RequestChan, default buffer size=1024 * proxy: set default session_max_pipeline=10000
1 parent a262e54 commit 56fc43a

File tree

5 files changed

+48
-38
lines changed

5 files changed

+48
-38
lines changed

config/proxy.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ session_send_timeout = "30s"
7979

8080
# Make sure this is higher than the max number of requests for each pipeline request, or your client may be blocked.
8181
# Set session pipeline buffer size.
82-
session_max_pipeline = 1024
82+
session_max_pipeline = 10000
8383

8484
# Set session tcp keepalive period. (0 to disable)
8585
session_keepalive_period = "75s"

pkg/proxy/config.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ session_send_timeout = "30s"
9595
9696
# Make sure this is higher than the max number of requests for each pipeline request, or your client may be blocked.
9797
# Set session pipeline buffer size.
98-
session_max_pipeline = 1024
98+
session_max_pipeline = 10000
9999
100100
# Set session tcp keepalive period. (0 to disable)
101101
session_keepalive_period = "75s"

pkg/proxy/request.go

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ import (
88
"unsafe"
99

1010
"github.com/CodisLabs/codis/pkg/proxy/redis"
11-
"github.com/CodisLabs/codis/pkg/utils/math2"
1211
"github.com/CodisLabs/codis/pkg/utils/sync2/atomic2"
1312
)
1413

@@ -68,15 +67,18 @@ type RequestChan struct {
6867
closed bool
6968
}
7069

71-
const MinRequestChanBuffer = 128
70+
const DefaultRequestChanBuffer = 128
7271

7372
func NewRequestChan() *RequestChan {
74-
return NewRequestChanBuffer(MinRequestChanBuffer)
73+
return NewRequestChanBuffer(0)
7574
}
7675

7776
func NewRequestChanBuffer(n int) *RequestChan {
77+
if n <= 0 {
78+
n = DefaultRequestChanBuffer
79+
}
7880
var ch = &RequestChan{
79-
buff: make([]*Request, math2.MaxInt(n, MinRequestChanBuffer)),
81+
buff: make([]*Request, n),
8082
}
8183
ch.cond = sync.NewCond(&ch.lock)
8284
return ch
@@ -91,7 +93,7 @@ func (c *RequestChan) Close() {
9193
c.lock.Unlock()
9294
}
9395

94-
func (c *RequestChan) Len() int {
96+
func (c *RequestChan) Buffered() int {
9597
c.lock.Lock()
9698
n := len(c.data)
9799
c.lock.Unlock()
@@ -139,7 +141,7 @@ func (c *RequestChan) lockedPopFront() (*Request, bool) {
139141
}
140142

141143
func (c *RequestChan) IsEmpty() bool {
142-
return c.Len() == 0
144+
return c.Buffered() == 0
143145
}
144146

145147
func (c *RequestChan) PopFrontAll(onRequest func(r *Request) error) error {

pkg/proxy/request_test.go

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ import (
1414
)
1515

1616
func TestRequestChan1(t *testing.T) {
17-
var ch = NewRequestChanBuffer(512)
17+
var ch = NewRequestChanBuffer(0)
1818
for i := 0; i < 8192; i++ {
1919
n := ch.PushBack(&Request{UnixNano: int64(i)})
2020
assert.Must(n == i+1)
@@ -23,7 +23,7 @@ func TestRequestChan1(t *testing.T) {
2323
r, ok := ch.PopFront()
2424
assert.Must(ok && r.UnixNano == int64(i))
2525
}
26-
assert.Must(ch.Len() == 0)
26+
assert.Must(ch.Buffered() == 0)
2727

2828
ch.Close()
2929

@@ -39,13 +39,13 @@ func TestRequestChan2(t *testing.T) {
3939
}
4040
ch.Close()
4141

42-
assert.Must(ch.Len() == 8192)
42+
assert.Must(ch.Buffered() == 8192)
4343

4444
for i := 0; i < 8192; i++ {
4545
r, ok := ch.PopFront()
4646
assert.Must(ok && r.UnixNano == int64(i))
4747
}
48-
assert.Must(ch.Len() == 0)
48+
assert.Must(ch.Buffered() == 0)
4949

5050
_, ok := ch.PopFront()
5151
assert.Must(!ok)
@@ -55,10 +55,12 @@ func TestRequestChan3(t *testing.T) {
5555
var wg sync.WaitGroup
5656
var ch = NewRequestChanBuffer(512)
5757

58+
const n = 1000 * 1000 * 4
59+
5860
wg.Add(1)
5961
go func() {
6062
defer wg.Done()
61-
for i := 0; i < 8192; i++ {
63+
for i := 0; i < n; i++ {
6264
ch.PushBack(&Request{UnixNano: int64(i)})
6365
if i%1024 == 0 {
6466
runtime.Gosched()
@@ -69,9 +71,12 @@ func TestRequestChan3(t *testing.T) {
6971
wg.Add(1)
7072
go func() {
7173
defer wg.Done()
72-
for i := 0; i < 8192; i++ {
74+
for i := 0; i < n; i++ {
7375
r, ok := ch.PopFront()
7476
assert.Must(ok && r.UnixNano == int64(i))
77+
if i%4096 == 0 {
78+
runtime.Gosched()
79+
}
7580
}
7681
}()
7782

pkg/proxy/session.go

Lines changed: 27 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ import (
1515
"github.com/CodisLabs/codis/pkg/proxy/redis"
1616
"github.com/CodisLabs/codis/pkg/utils/errors"
1717
"github.com/CodisLabs/codis/pkg/utils/log"
18-
"github.com/CodisLabs/codis/pkg/utils/math2"
1918
"github.com/CodisLabs/codis/pkg/utils/sync2/atomic2"
2019
)
2120

@@ -104,9 +103,8 @@ func (s *Session) CloseWithError(err error) error {
104103
}
105104

106105
var (
107-
ErrTooManySessions = errors.New("too many sessions")
108-
ErrRouterNotOnline = errors.New("router is not online")
109-
106+
ErrRouterNotOnline = errors.New("router is not online")
107+
ErrTooManySessions = errors.New("too many sessions")
110108
ErrTooManyPipelinedRequests = errors.New("too many pipelined requests")
111109
)
112110

@@ -132,7 +130,7 @@ func (s *Session) Start(d *Router) {
132130
return
133131
}
134132

135-
tasks := make(chan *Request, math2.MaxInt(1, s.config.SessionMaxPipeline))
133+
tasks := NewRequestChanBuffer(1024)
136134

137135
go func() {
138136
s.loopWriter(tasks)
@@ -141,17 +139,20 @@ func (s *Session) Start(d *Router) {
141139

142140
go func() {
143141
s.loopReader(tasks, d)
144-
close(tasks)
142+
tasks.Close()
145143
}()
146144
})
147145
}
148146

149-
func (s *Session) loopReader(tasks chan<- *Request, d *Router) (err error) {
147+
func (s *Session) loopReader(tasks *RequestChan, d *Router) (err error) {
150148
defer func() {
151149
s.CloseReaderWithError(err)
152150
}()
153151

154-
var sensitive = s.config.SessionBreakOnFailure
152+
var (
153+
breakOnFailure = s.config.SessionBreakOnFailure
154+
maxPipelineLen = s.config.SessionMaxPipeline
155+
)
155156

156157
for !s.quit {
157158
multi, err := s.Conn.DecodeMultiBulk()
@@ -160,6 +161,10 @@ func (s *Session) loopReader(tasks chan<- *Request, d *Router) (err error) {
160161
}
161162
s.incrOpTotal()
162163

164+
if tasks.Buffered() > maxPipelineLen {
165+
return ErrTooManyPipelinedRequests
166+
}
167+
163168
start := time.Now()
164169
s.LastOpUnix = start.Unix()
165170
s.Ops++
@@ -170,59 +175,57 @@ func (s *Session) loopReader(tasks chan<- *Request, d *Router) (err error) {
170175
r.Database = s.database
171176
r.UnixNano = start.UnixNano()
172177

173-
if len(tasks) == cap(tasks) {
174-
return ErrTooManyPipelinedRequests
175-
}
176178
if err := s.handleRequest(r, d); err != nil {
177179
r.Resp = redis.NewErrorf("ERR handle request, %s", err)
178-
tasks <- r
179-
if sensitive {
180+
tasks.PushBack(r)
181+
if breakOnFailure {
180182
return err
181183
}
182184
} else {
183-
tasks <- r
185+
tasks.PushBack(r)
184186
}
185187
}
186188
return nil
187189
}
188190

189-
func (s *Session) loopWriter(tasks <-chan *Request) (err error) {
191+
func (s *Session) loopWriter(tasks *RequestChan) (err error) {
190192
defer func() {
191193
s.CloseWithError(err)
192-
for r := range tasks {
194+
tasks.PopFrontAllVoid(func(r *Request) {
193195
s.incrOpFails(r, nil)
194-
}
196+
})
195197
s.flushOpStats(true)
196198
}()
197199

198-
var sensitive = s.config.SessionBreakOnFailure
200+
var breakOnFailure = s.config.SessionBreakOnFailure
199201

200202
p := s.Conn.FlushEncoder()
201203
p.MaxInterval = time.Millisecond
202204
p.MaxBuffered = 256
203205

204-
for r := range tasks {
206+
return tasks.PopFrontAll(func(r *Request) error {
205207
resp, err := s.handleResponse(r)
206208
if err != nil {
207209
resp = redis.NewErrorf("ERR handle response, %s", err)
208-
if sensitive {
210+
if breakOnFailure {
209211
s.Conn.Encode(resp, true)
210212
return s.incrOpFails(r, err)
211213
}
212214
}
213215
if err := p.Encode(resp); err != nil {
214216
return s.incrOpFails(r, err)
215217
}
216-
if err := p.Flush(len(tasks) == 0); err != nil {
218+
fflush := tasks.IsEmpty()
219+
if err := p.Flush(fflush); err != nil {
217220
return s.incrOpFails(r, err)
218221
} else {
219222
s.incrOpStats(r, resp.Type)
220223
}
221-
if len(tasks) == 0 {
224+
if fflush {
222225
s.flushOpStats(false)
223226
}
224-
}
225-
return nil
227+
return nil
228+
})
226229
}
227230

228231
func (s *Session) handleResponse(r *Request) (*redis.Resp, error) {

0 commit comments

Comments
 (0)