Skip to content

Commit f7c7693

Browse files
committed
Add application layer keepalive probe for czar
1 parent ce509c4 commit f7c7693

File tree

2 files changed

+33
-1
lines changed

2 files changed

+33
-1
lines changed

protocol/czar/engine.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,17 +41,29 @@ import (
4141
// +-----+-----+-----+-----+
4242
// | Sid | 2 | 0/1 | Rsv |
4343
// +-----+-----+-----+-----+
44+
//
45+
// Keep alive probe and reply.
46+
//
47+
// +-----+-----+-----+-----+
48+
// | 0x0 | 3 | 0/1 | Rsv |
49+
// +-----+-----+-----+-----+
4450

4551
// Conf is acting as package level configuration.
4652
var Conf = struct {
4753
// The newly created stream has a higher write priority.
4854
FastWriteDuration time.Duration
55+
// The duration a connection needs to be idle before mux begins sending out keep-alive probe.
56+
IdleProbeDuration time.Duration
57+
// If no data is read for more than this time, the connection is closed.
58+
IdleReplyDuration time.Duration
4959
// Packet size. Since the size of the packet header is 4, this value must be greater than 4. If the value is too
5060
// small, the transmission efficiency will be reduced, and if it is too large, the concurrency capability of mux
5161
// will be reduced.
5262
PacketSize int
5363
}{
5464
FastWriteDuration: time.Second * 8,
65+
IdleProbeDuration: time.Second * 120,
66+
IdleReplyDuration: time.Second * 128,
5567
PacketSize: 2048,
5668
}
5769

protocol/czar/mux.go

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -204,9 +204,21 @@ func (m *Mux) Recv() {
204204
idx uint8
205205
msg []byte
206206
old *Stream
207+
prb = time.AfterFunc(Conf.IdleProbeDuration, func() {
208+
if m.pri.Pri(0, func() error {
209+
return doa.Err(m.con.Write([]byte{0x00, 0x03, 0x00, 0x00}))
210+
}) != nil {
211+
m.Close()
212+
}
213+
})
214+
rst = time.AfterFunc(Conf.IdleReplyDuration, func() {
215+
m.Close()
216+
})
207217
stm *Stream
208218
)
209219
for {
220+
prb.Reset(Conf.IdleProbeDuration)
221+
rst.Reset(Conf.IdleReplyDuration)
210222
_, err = io.ReadFull(m.con, buf[:4])
211223
if err != nil {
212224
m.rer.Put(err)
@@ -247,7 +259,15 @@ func (m *Mux) Recv() {
247259
stm.Esolc()
248260
old = NewWither(idx, m)
249261
m.usb[idx] = old
250-
case cmd >= 0x03:
262+
case cmd == 0x03:
263+
switch buf[2] {
264+
case 0x00:
265+
m.pri.Pri(0, func() error {
266+
return doa.Err(m.con.Write([]byte{0x00, 0x03, 0x01, 0x00}))
267+
})
268+
case 0x01:
269+
}
270+
case cmd >= 0x04:
251271
// Packet format error, connection closed.
252272
m.con.Close()
253273
}

0 commit comments

Comments
 (0)