Skip to content

Commit 9da4f5f

Browse files
committed
feat: chunk add end reason field
1 parent 12ede78 commit 9da4f5f

File tree

3 files changed

+68
-6
lines changed

3 files changed

+68
-6
lines changed

chunk.go

Lines changed: 49 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,49 @@
11
package wkproto
22

3+
type EndReason uint8
4+
5+
// EndReason constants define why a stream was completed
6+
const (
7+
// EndReasonSuccess indicates the stream completed successfully (default)
8+
EndReasonSuccess EndReason = 0
9+
// EndReasonTimeout indicates the stream ended due to inactivity timeout
10+
EndReasonTimeout EndReason = 1
11+
// EndReasonError indicates the stream ended due to an error
12+
EndReasonError EndReason = 2
13+
// EndReasonCancelled indicates the stream was manually cancelled
14+
EndReasonCancelled EndReason = 3
15+
// EndReasonForce indicates the stream was forcefully ended (e.g., channel closure)
16+
EndReasonForce EndReason = 4
17+
)
18+
19+
func (e EndReason) String() string {
20+
switch e {
21+
case EndReasonSuccess:
22+
return "success"
23+
case EndReasonTimeout:
24+
return "timeout"
25+
case EndReasonError:
26+
return "error"
27+
case EndReasonCancelled:
28+
return "cancelled"
29+
case EndReasonForce:
30+
return "force"
31+
default:
32+
return "unknown"
33+
}
34+
}
35+
36+
func (e EndReason) Value() uint8 {
37+
return uint8(e)
38+
}
39+
340
// ChunkPacket 消息块
441
type ChunkPacket struct {
542
Framer
6-
MessageID int64 // 消息ID(同个消息多个块的消息ID相同)
7-
ChunkID uint64 // 块ID(顺序递增)
8-
Payload []byte // 消息内容
43+
MessageID int64 // 消息ID(同个消息多个块的消息ID相同)
44+
ChunkID uint64 // 块ID(顺序递增)
45+
EndReason EndReason // 结束原因
46+
Payload []byte // 消息内容
947
}
1048

1149
// GetPacketType 获得包类型
@@ -24,6 +62,7 @@ func (c *ChunkPacket) SizeWithProtoVersion(protVersion uint8) int {
2462
func encodeChunk(chunkPacket *ChunkPacket, enc *Encoder, _ uint8) error {
2563
enc.WriteInt64(chunkPacket.MessageID)
2664
enc.WriteUint64(chunkPacket.ChunkID)
65+
enc.WriteUint8(chunkPacket.EndReason.Value())
2766
enc.WriteBytes(chunkPacket.Payload)
2867
return nil
2968
}
@@ -41,6 +80,12 @@ func decodeChunk(frame Frame, data []byte, _ uint8) (Frame, error) {
4180
return nil, err
4281
}
4382

83+
var endReason uint8
84+
if endReason, err = dec.Uint8(); err != nil {
85+
return nil, err
86+
}
87+
chunkPacket.EndReason = EndReason(endReason)
88+
4489
if chunkPacket.Payload, err = dec.BinaryAll(); err != nil {
4590
return nil, err
4691
}
@@ -53,6 +98,7 @@ func encodeChunkSize(packet *ChunkPacket, _ uint8) int {
5398
size := 0
5499
size += MessageIDByteSize // 消息ID
55100
size += ChunkIDByteSize // 块ID
101+
size += EndReasonByteSize // 结束原因
56102
size += len(packet.Payload) // 消息内容
57103
return size
58104
}

chunk_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ func TestChunkEncodeAndDecode(t *testing.T) {
1111
MessageID: 1,
1212
ChunkID: 1,
1313
Payload: []byte("hello world"),
14+
EndReason: 1,
1415
}
1516

1617
codec := New()

common.go

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,16 @@ type Framer struct {
1313
SyncOnce bool // 此消息只被同步或被消费一次
1414
DUP bool // 是否是重发消息
1515
HasServerVersion bool // 是否有服务端版本 connack包用到
16-
17-
FrameSize int64
16+
End bool // 消息块:是否是最后一包
17+
FrameSize int64
1818
}
1919

2020
// ToFixHeaderUint8 ToFixHeaderUint8
2121
func ToFixHeaderUint8(f Frame) uint8 {
22+
23+
if f.GetFrameType() == Chunk {
24+
return byte(int(f.GetFrameType()<<4) | encodeBool(f.GetEnd()))
25+
}
2226
typeAndFlags := encodeBool(f.GetDUP())<<3 | encodeBool(f.GetsyncOnce())<<2 | encodeBool(f.GetRedDot())<<1 | encodeBool(f.GetNoPersist())
2327
if f.GetFrameType() == CONNACK {
2428
typeAndFlags = encodeBool(f.GetHasServerVersion())
@@ -34,8 +38,12 @@ func FramerFromUint8(v uint8) Framer {
3438
p.SyncOnce = (v >> 2 & 0x01) > 0
3539
p.DUP = (v >> 3 & 0x01) > 0
3640
p.FrameType = FrameType(v >> 4)
37-
if p.FrameType == CONNACK {
41+
42+
switch p.FrameType {
43+
case CONNACK:
3844
p.HasServerVersion = (v & 0x01) > 0
45+
case Chunk:
46+
p.End = (v & 0x01) > 0
3947
}
4048

4149
return p
@@ -79,6 +87,10 @@ func (f Framer) GetHasServerVersion() bool {
7987
return f.HasServerVersion
8088
}
8189

90+
func (f Framer) GetEnd() bool {
91+
return f.End
92+
}
93+
8294
func (f Framer) String() string {
8395
return fmt.Sprintf("packetType: %s remainingLength:%d NoPersist:%v redDot:%v syncOnce:%v DUP:%v", f.GetFrameType().String(), f.RemainingLength, f.NoPersist, f.RedDot, f.SyncOnce, f.DUP)
8496
}
@@ -327,6 +339,8 @@ type Frame interface {
327339
GetDUP() bool
328340
GetFrameSize() int64 // 总个frame的大小(不参与编码解码)
329341
GetHasServerVersion() bool // 是否有服务端版本 connack包用到
342+
343+
GetEnd() bool
330344
}
331345

332346
type Channel struct {
@@ -353,6 +367,7 @@ const (
353367
ExpireByteSize = 4
354368
NodeIdByteSize = 8
355369
ChunkIDByteSize = 8
370+
EndReasonByteSize = 1
356371
)
357372

358373
const (

0 commit comments

Comments
 (0)