Skip to content

Commit 2f91a11

Browse files
committed
Optimizing internal rpc
1 parent 4c1c3d1 commit 2f91a11

File tree

5 files changed

+34
-31
lines changed

5 files changed

+34
-31
lines changed

internal/transporter/internal/client/client.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,13 @@ import (
88

99
"github.com/dobyte/due/v2/core/buffer"
1010
"github.com/dobyte/due/v2/errors"
11+
"github.com/dobyte/due/v2/log"
1112
"golang.org/x/sync/errgroup"
1213
)
1314

1415
const (
1516
defaultTimeout = 3 * time.Second // 调用超时时间
16-
defaultConnNum = 20 // 默认连接数
17+
defaultConnNum = 10 // 默认连接数
1718
)
1819

1920
type chWrite struct {
@@ -57,6 +58,7 @@ func (c *Client) Establish() error {
5758
conn := newConn(c, c.disorderlyQueue)
5859

5960
if err := conn.dial(); err != nil {
61+
log.Warnf("conn dial failed: %v", err)
6062
return err
6163
}
6264

internal/transporter/internal/client/conn.go

Lines changed: 27 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ type Conn struct {
3333
func newConn(cli *Client, queue chan *chWrite) *Conn {
3434
c := &Conn{}
3535
c.cli = cli
36-
c.state.Store(def.ConnClosed)
36+
c.state.Store(def.ConnHanged)
3737
c.pending = newPending()
3838
c.orderlyQueue = make(chan *chWrite, 4096)
3939
c.disorderlyQueue = queue
@@ -97,30 +97,26 @@ func (c *Conn) send(ch *chWrite, isOrderly ...bool) error {
9797
}
9898

9999
// 处理连接
100-
func (c *Conn) process(conn net.Conn) (err error) {
100+
func (c *Conn) process(conn net.Conn) error {
101101
c.ctx, c.cancel = context.WithCancel(context.Background())
102102
c.state.Store(def.ConnOpened)
103103
c.lastHeartbeatTime.Store(xtime.Now().Unix())
104104

105105
go c.read(conn)
106106

107-
defer func() {
108-
if err != nil {
109-
c.close()
110-
}
111-
}()
112-
113-
if err = c.handshake(conn); err != nil {
114-
return
115-
}
107+
if err := c.handshake(conn); err != nil {
108+
c.close()
116109

117-
go c.write(conn)
110+
return err
111+
} else {
112+
go c.write(conn)
118113

119-
return
114+
return nil
115+
}
120116
}
121117

122118
// 握手
123-
func (c *Conn) handshake(conn net.Conn) (err error) {
119+
func (c *Conn) handshake(conn net.Conn) error {
124120
var (
125121
seq = uint64(1)
126122
call = make(chan []byte)
@@ -131,15 +127,22 @@ func (c *Conn) handshake(conn net.Conn) (err error) {
131127

132128
c.pending.store(seq, call)
133129

134-
if _, err = conn.Write(buf.Bytes()); err != nil {
130+
defer close(call)
131+
132+
if _, err := conn.Write(buf.Bytes()); err != nil {
135133
c.pending.delete(seq)
136-
} else {
137-
<-call
134+
return err
138135
}
139136

140-
close(call)
137+
ctx, cancel := context.WithTimeout(context.Background(), defaultTimeout)
138+
defer cancel()
141139

142-
return
140+
select {
141+
case <-ctx.Done():
142+
return ctx.Err()
143+
case <-call:
144+
return nil
145+
}
143146
}
144147

145148
// 读取数据
@@ -188,6 +191,7 @@ func (c *Conn) write(conn net.Conn) {
188191
deadline := t.Add(-2 * def.HeartbeatInterval).Unix()
189192

190193
if c.lastHeartbeatTime.Load() < deadline {
194+
log.Warn("connection heartbeat timeout")
191195
c.retry(conn)
192196
return
193197
} else {
@@ -246,7 +250,7 @@ func (c *Conn) retry(conn net.Conn) {
246250
return
247251
}
248252

249-
conn.Close()
253+
_ = conn.Close()
250254

251255
if c.cancel != nil {
252256
c.cancel()
@@ -259,7 +263,9 @@ func (c *Conn) retry(conn net.Conn) {
259263

260264
// 关闭连接
261265
func (c *Conn) close() {
262-
c.state.Store(def.ConnClosed)
266+
if c.state.Swap(def.ConnClosed) == def.ConnClosed {
267+
return
268+
}
263269

264270
c.cli.done()
265271

internal/transporter/internal/server/conn.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,9 @@ func (c *Conn) write() {
128128
return
129129
case <-ticker.C:
130130
deadline := xtime.Now().Add(-2 * def.HeartbeatInterval).Unix()
131+
131132
if atomic.LoadInt64(&c.lastHeartbeatTime) < deadline {
133+
log.Warn("connection heartbeat timeout")
132134
_ = c.close(true)
133135
return
134136
}

internal/transporter/internal/server/def.go

Lines changed: 0 additions & 9 deletions
This file was deleted.

internal/transporter/internal/server/server.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@ import (
1515

1616
const scheme = "drpc"
1717

18+
type RouteHandler func(conn *Conn, data []byte) error
19+
1820
type Server struct {
1921
listener net.Listener // 监听器
2022
listenAddr string // 监听地址

0 commit comments

Comments
 (0)