Skip to content

Commit d68b0c1

Browse files
committed
Fixed bug
1 parent 9dd4337 commit d68b0c1

File tree

2 files changed

+27
-44
lines changed

2 files changed

+27
-44
lines changed

internal/transporter/internal/client/conn.go

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -159,18 +159,15 @@ func (c *Conn) read(conn net.Conn) {
159159

160160
c.lastHeartbeatTime.Store(xtime.Now().Unix())
161161

162-
isHeartbeat, _, seq := protocol.ParseBuffer(buf)
163-
164-
if isHeartbeat {
165-
continue
166-
}
167-
168-
call, ok := c.pending.extract(seq)
169-
if !ok {
170-
continue
162+
if isHeartbeat, _, seq := protocol.ParseBuffer(buf.Bytes()); isHeartbeat {
163+
buf.Release()
164+
} else {
165+
if call, ok := c.pending.extract(seq); ok {
166+
call <- buf
167+
} else {
168+
buf.Release()
169+
}
171170
}
172-
173-
call <- buf
174171
}
175172
}
176173
}

internal/transporter/internal/protocol/reader.go

Lines changed: 19 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -9,41 +9,30 @@ import (
99
)
1010

1111
// ReadMessage 读取消息
12-
func ReadMessage(reader io.Reader) (isHeartbeat bool, route uint8, seq uint64, data []byte, err error) {
12+
func ReadMessage(reader io.Reader) (bool, uint8, uint64, []byte, error) {
1313
buf := buffer.MallocBytes(defaultSizeBytes)
1414
defer buf.Release()
1515

16-
if _, err = io.ReadFull(reader, buf.Bytes()); err != nil {
17-
return
16+
if _, err := io.ReadFull(reader, buf.Bytes()); err != nil {
17+
return false, 0, 0, nil, err
1818
}
1919

2020
size := binary.BigEndian.Uint32(buf.Bytes())
2121

2222
if size == 0 {
23-
err = errors.ErrInvalidMessage
24-
return
23+
return false, 0, 0, nil, errors.ErrInvalidMessage
2524
}
2625

27-
data = make([]byte, defaultSizeBytes+size)
26+
data := make([]byte, defaultSizeBytes+size)
2827
copy(data[:defaultSizeBytes], buf.Bytes())
2928

30-
if _, err = io.ReadFull(reader, data[defaultSizeBytes:]); err != nil {
31-
return
32-
}
33-
34-
header := data[defaultSizeBytes : defaultSizeBytes+defaultHeaderBytes][0]
35-
36-
isHeartbeat = header&heartbeatBit == heartbeatBit
37-
38-
if isHeartbeat {
39-
return
29+
if _, err := io.ReadFull(reader, data[defaultSizeBytes:]); err != nil {
30+
return false, 0, 0, nil, err
4031
}
4132

42-
route = data[defaultSizeBytes+defaultHeaderBytes : defaultSizeBytes+defaultHeaderBytes+defaultRouteBytes][0]
33+
isHeartbeat, route, seq := ParseBuffer(data)
4334

44-
seq = binary.BigEndian.Uint64(data[defaultSizeBytes+defaultHeaderBytes+defaultRouteBytes : defaultSizeBytes+defaultHeaderBytes+defaultRouteBytes+8])
45-
46-
return
35+
return isHeartbeat, route, seq, data, nil
4736
}
4837

4938
// ReadBuffer 以buffer的形式读取消息
@@ -75,18 +64,15 @@ func ReaderBuffer(reader io.Reader) (buffer.Buffer, error) {
7564
}
7665

7766
// ParseBuffer 解析buffer
78-
func ParseBuffer(buf buffer.Buffer) (isHeartbeat bool, route uint8, seq uint64) {
79-
data := buf.Bytes()
80-
header := data[defaultSizeBytes+defaultHeaderBytes : defaultSizeBytes+defaultHeaderBytes+defaultRouteBytes][0]
81-
82-
isHeartbeat = header&heartbeatBit == heartbeatBit
83-
84-
if isHeartbeat {
85-
return
67+
func ParseBuffer(data []byte) (bool, uint8, uint64) {
68+
if header := data[defaultSizeBytes : defaultSizeBytes+defaultHeaderBytes][0]; header&heartbeatBit == heartbeatBit {
69+
return true, 0, 0
70+
} else {
71+
var (
72+
route = data[defaultSizeBytes+defaultHeaderBytes : defaultSizeBytes+defaultHeaderBytes+defaultRouteBytes][0]
73+
seq = binary.BigEndian.Uint64(data[defaultSizeBytes+defaultHeaderBytes+defaultRouteBytes : defaultSizeBytes+defaultHeaderBytes+defaultRouteBytes+8])
74+
)
75+
76+
return false, route, seq
8677
}
87-
88-
route = data[defaultSizeBytes+defaultHeaderBytes : defaultSizeBytes+defaultHeaderBytes+defaultRouteBytes][0]
89-
seq = binary.BigEndian.Uint64(data[defaultSizeBytes+defaultHeaderBytes+defaultRouteBytes : defaultSizeBytes+defaultHeaderBytes+defaultRouteBytes+8])
90-
91-
return
9278
}

0 commit comments

Comments
 (0)