diff --git a/protocol/czar/engine.go b/protocol/czar/engine.go index b385b3b..883bdf4 100644 --- a/protocol/czar/engine.go +++ b/protocol/czar/engine.go @@ -46,12 +46,17 @@ import ( var Conf = struct { // The newly created stream has a higher write priority. FastWriteDuration time.Duration + // If no new data is received for a period of time, close the conn. + IdleProbeDuration time.Duration + IdleReplyDuration time.Duration // Packet size. Since the size of the packet header is 4, this value must be greater than 4. If the value is too // small, the transmission efficiency will be reduced, and if it is too large, the concurrency capability of mux // will be reduced. PacketSize int }{ FastWriteDuration: time.Second * 8, + IdleProbeDuration: time.Second * 120, + IdleReplyDuration: time.Second * 128, PacketSize: 2048, } diff --git a/protocol/czar/mux.go b/protocol/czar/mux.go index 20b1f91..4333472 100644 --- a/protocol/czar/mux.go +++ b/protocol/czar/mux.go @@ -204,9 +204,21 @@ func (m *Mux) Recv() { idx uint8 msg []byte old *Stream + prb = time.AfterFunc(Conf.IdleProbeDuration, func() { + if m.pri.Pri(0, func() error { + return doa.Err(m.con.Write([]byte{0x00, 0x03, 0x00, 0x00})) + }) != nil { + m.Close() + } + }) + rst = time.AfterFunc(Conf.IdleReplyDuration, func() { + m.Close() + }) stm *Stream ) for { + prb.Reset(Conf.IdleProbeDuration) + rst.Reset(Conf.IdleReplyDuration) _, err = io.ReadFull(m.con, buf[:4]) if err != nil { m.rer.Put(err) @@ -247,7 +259,15 @@ func (m *Mux) Recv() { stm.Esolc() old = NewWither(idx, m) m.usb[idx] = old - case cmd >= 0x03: + case cmd == 0x03: + switch buf[2] { + case 0x00: + m.pri.Pri(0, func() error { + return doa.Err(m.con.Write([]byte{0x00, 0x03, 0x01, 0x00})) + }) + case 0x01: + } + case cmd >= 0x04: // Packet format error, connection closed. m.con.Close() }