Skip to content

Commit 0014d47

Browse files
wangshao1wangshaoyi
andauthored
fix proxy log print error && request time stat error (#3107)
Co-authored-by: wangshaoyi <[email protected]>
1 parent e981599 commit 0014d47

File tree

5 files changed

+39
-26
lines changed

5 files changed

+39
-26
lines changed

.gitignore

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ src/build_version.cc
6565
build/
6666
buildtrees
6767
deps
68-
pkg
68+
#pkg
6969

7070
#develop container
7171
.devcontainer

codis/pkg/proxy/backend.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"strconv"
1111
"strings"
1212
"sync"
13+
"sync/atomic"
1314
"time"
1415

1516
"pika/codis/v2/pkg/proxy/redis"
@@ -84,14 +85,14 @@ func (bc *BackendConn) KeepAlive() bool {
8485
}
8586
switch bc.state.Int64() {
8687
default:
87-
m := &Request{}
88+
m := &Request{ReceiveTime: new(int64), SendToPikaTime: new(int64), ReceiveFromPikaTime: new(int64)}
8889
m.Multi = []*redis.Resp{
8990
redis.NewBulkBytes([]byte("PING")),
9091
}
9192
bc.PushBack(m)
9293

9394
case stateDataStale:
94-
m := &Request{}
95+
m := &Request{ReceiveTime: new(int64), SendToPikaTime: new(int64), ReceiveFromPikaTime: new(int64)}
9596
m.Multi = []*redis.Resp{
9697
redis.NewBulkBytes([]byte("INFO")),
9798
}
@@ -284,7 +285,7 @@ func (bc *BackendConn) loopReader(tasks <-chan *Request, c *redis.Conn, round in
284285
}()
285286
for r := range tasks {
286287
resp, err := c.Decode()
287-
r.ReceiveFromServerTime = time.Now().UnixNano()
288+
atomic.StoreInt64(r.ReceiveFromPikaTime, time.Now().UnixNano())
288289
if err != nil {
289290
return bc.setResponse(r, nil, fmt.Errorf("backend conn failure, %s", err))
290291
}
@@ -364,7 +365,7 @@ func (bc *BackendConn) loopWriter(round int) (err error) {
364365
} else {
365366
tasks <- r
366367
}
367-
r.SendToServerTime = time.Now().UnixNano()
368+
atomic.CompareAndSwapInt64(r.SendToPikaTime, 0, time.Now().UnixNano())
368369
}
369370
return nil
370371
}

codis/pkg/proxy/request.go

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ package proxy
55

66
import (
77
"sync"
8+
"sync/atomic"
89
"unsafe"
910

1011
"pika/codis/v2/pkg/proxy/redis"
@@ -21,11 +22,11 @@ type Request struct {
2122
OpStr string
2223
OpFlag
2324

24-
Database int32
25-
ReceiveTime int64
26-
SendToServerTime int64
27-
ReceiveFromServerTime int64
28-
TasksLen int64
25+
Database int32
26+
ReceiveTime *int64
27+
SendToPikaTime *int64
28+
ReceiveFromPikaTime *int64
29+
TasksLen int64
2930

3031
*redis.Resp
3132
Err error
@@ -47,14 +48,16 @@ func (r *Request) MakeSubRequest(n int) []Request {
4748
x.Broken = r.Broken
4849
x.Database = r.Database
4950
x.ReceiveTime = r.ReceiveTime
51+
x.SendToPikaTime = r.SendToPikaTime
52+
x.ReceiveFromPikaTime = r.ReceiveFromPikaTime
5053
}
5154
return sub
5255
}
5356

5457
const GOLDEN_RATIO_PRIME_32 = 0x9e370001
5558

5659
func (r *Request) Seed16() uint {
57-
h32 := uint32(r.ReceiveTime) + uint32(uintptr(unsafe.Pointer(r)))
60+
h32 := uint32(atomic.LoadInt64(r.ReceiveTime)) + uint32(uintptr(unsafe.Pointer(r)))
5861
h32 *= GOLDEN_RATIO_PRIME_32
5962
return uint(h32 >> 16)
6063
}

codis/pkg/proxy/session.go

Lines changed: 23 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"strconv"
1111
"strings"
1212
"sync"
13+
"sync/atomic"
1314
"time"
1415

1516
"pika/codis/v2/pkg/models"
@@ -184,7 +185,10 @@ func (s *Session) loopReader(tasks *RequestChan, d *Router) (err error) {
184185
r.Multi = multi
185186
r.Batch = &sync.WaitGroup{}
186187
r.Database = s.database
187-
r.ReceiveTime = start.UnixNano()
188+
r.ReceiveTime = new(int64)
189+
r.SendToPikaTime = new(int64)
190+
r.ReceiveFromPikaTime = new(int64)
191+
*r.ReceiveTime = time.Now().UnixNano()
188192
r.TasksLen = int64(tasksLen)
189193

190194
if err := s.handleRequest(r, d); err != nil {
@@ -230,14 +234,16 @@ func (s *Session) loopWriter(tasks *RequestChan) (err error) {
230234
if err := p.Encode(resp); err != nil {
231235
return s.incrOpFails(r, err)
232236
}
237+
nowTime := time.Now().UnixNano()
238+
receiveTime := atomic.LoadInt64(r.ReceiveTime)
239+
duration := int64((nowTime - receiveTime) / 1e3)
240+
233241
fflush := tasks.IsEmpty()
234242
if err := p.Flush(fflush); err != nil {
235243
return s.incrOpFails(r, err)
236244
} else {
237-
s.incrOpStats(r, resp.Type)
245+
s.incrOpStats(r, resp.Type, duration)
238246
}
239-
nowTime := time.Now().UnixNano()
240-
duration := int64((nowTime - r.ReceiveTime) / 1e3)
241247
s.updateMaxDelay(duration, r)
242248
if fflush {
243249
s.flushOpStats(false)
@@ -248,19 +254,22 @@ func (s *Session) loopWriter(tasks *RequestChan) (err error) {
248254
//Record the waiting time from receiving the request from the client to sending it to the backend server
249255
//the waiting time from sending the request to the backend server to receiving the response from the server
250256
//the waiting time from receiving the server response to sending it to the client
257+
sendToPikaTime := atomic.LoadInt64(r.SendToPikaTime)
258+
receiveFromPikaTime := atomic.LoadInt64(r.ReceiveFromPikaTime)
259+
251260
var d0, d1, d2 int64 = -1, -1, -1
252-
if r.SendToServerTime > 0 {
253-
d0 = int64((r.SendToServerTime - r.ReceiveTime) / 1e3)
261+
if sendToPikaTime > 0 {
262+
d0 = int64((sendToPikaTime - receiveTime) / 1e3)
254263
}
255-
if r.SendToServerTime > 0 && r.ReceiveFromServerTime > 0 {
256-
d1 = int64((r.ReceiveFromServerTime - r.SendToServerTime) / 1e3)
264+
if sendToPikaTime > 0 && receiveFromPikaTime > 0 {
265+
d1 = int64((receiveFromPikaTime - sendToPikaTime) / 1e3)
257266
}
258-
if r.ReceiveFromServerTime > 0 {
259-
d2 = int64((nowTime - r.ReceiveFromServerTime) / 1e3)
267+
if receiveFromPikaTime > 0 {
268+
d2 = int64((nowTime - receiveFromPikaTime) / 1e3)
260269
}
261270
index := getWholeCmd(r.Multi, cmd)
262-
log.Errorf("%s remote:%s, start_time(us):%d, duration(us): [%d, %d, %d], %d, tasksLen:%d, command:[%s].",
263-
time.Unix(r.ReceiveTime/1e9, 0).Format("2006-01-02 15:04:05"), s.Conn.RemoteAddr(), r.ReceiveTime/1e3, d0, d1, d2, duration, r.TasksLen, string(cmd[:index]))
271+
log.Warnf("%s remote:%s, start_time(us):%d, duration(us): [%d, %d, %d], %d, tasksLen:%d, command:[%s].",
272+
time.Unix(receiveTime/1e9, 0).Format("2006-01-02 15:04:05"), s.Conn.RemoteAddr(), receiveTime/1e3, d0, d1, d2, duration, r.TasksLen, string(cmd[:index]))
264273
}
265274
return nil
266275
})
@@ -675,10 +684,10 @@ func (s *Session) getOpStats(opstr string) *opStats {
675684
return e
676685
}
677686

678-
func (s *Session) incrOpStats(r *Request, t redis.RespType) {
687+
func (s *Session) incrOpStats(r *Request, t redis.RespType, duration int64) {
679688
e := s.getOpStats(r.OpStr)
680689
e.calls.Incr()
681-
e.nsecs.Add(time.Now().UnixNano() - r.ReceiveTime)
690+
e.nsecs.Add(duration)
682691
switch t {
683692
case redis.TypeError:
684693
e.redis.errors.Incr()

codis/pkg/utils/log/log.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ func New(writer io.Writer, prefix string) *Logger {
138138
}
139139
return &Logger{
140140
out: out,
141-
log: log.New(out, prefix, LstdFlags|Lshortfile),
141+
log: log.New(out, prefix, LstdFlags|Lshortfile|Lmicroseconds),
142142
level: LevelAll,
143143
trace: LevelError,
144144
}

0 commit comments

Comments
 (0)