Skip to content

Commit 90a1a6d

Browse files
committed
fix: add proactive check in connpool for gonet conn
1 parent 14199d5 commit 90a1a6d

File tree

11 files changed

+461
-34
lines changed

11 files changed

+461
-34
lines changed

client/option_test.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,9 @@ import (
4444
"github.com/cloudwego/kitex/pkg/http"
4545
"github.com/cloudwego/kitex/pkg/loadbalance"
4646
"github.com/cloudwego/kitex/pkg/proxy"
47+
connpool2 "github.com/cloudwego/kitex/pkg/remote/connpool"
48+
"github.com/cloudwego/kitex/pkg/remote/trans/gonet"
49+
"github.com/cloudwego/kitex/pkg/remote/trans/netpoll"
4750
"github.com/cloudwego/kitex/pkg/remote/trans/nphttp2/grpc"
4851
"github.com/cloudwego/kitex/pkg/retry"
4952
"github.com/cloudwego/kitex/pkg/rpcinfo"
@@ -764,6 +767,20 @@ func TestTailOption(t *testing.T) {
764767
test.Assert(t, opts.RemoteOpt.Dialer != nil)
765768
}
766769

770+
func TestGonetOption(t *testing.T) {
771+
// gonet
772+
opt := client.NewOptions([]Option{WithDialer(gonet.NewDialer()), WithLongConnection(connpool.IdleConfig{MaxIdlePerAddress: 10})})
773+
d := opt.RemoteOpt.ConnPool.(*connpool2.LongPool)
774+
pcfg := d.Config()
775+
test.Assert(t, pcfg.Enable)
776+
777+
// netpoll
778+
opt = client.NewOptions([]Option{WithDialer(netpoll.NewDialer()), WithLongConnection(connpool.IdleConfig{MaxIdlePerAddress: 10})})
779+
d = opt.RemoteOpt.ConnPool.(*connpool2.LongPool)
780+
pcfg = d.Config()
781+
test.Assert(t, !pcfg.Enable)
782+
}
783+
767784
func checkOneOptionDebugInfo(t *testing.T, opt Option, expectStr string) error {
768785
o := &Options{}
769786
o.Apply([]Option{opt})

internal/client/option.go

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"github.com/cloudwego/localsession/backup"
2525

2626
"github.com/cloudwego/kitex/internal/configutil"
27+
internalRemote "github.com/cloudwego/kitex/internal/remote"
2728
"github.com/cloudwego/kitex/internal/stream"
2829
"github.com/cloudwego/kitex/pkg/acl"
2930
"github.com/cloudwego/kitex/pkg/circuitbreak"
@@ -292,22 +293,27 @@ func (o *Options) initRemoteOpt() {
292293
}
293294
o.RemoteOpt.TTHeaderStreamingProvider = ttstream.NewClientProvider(o.TTHeaderStreamingOptions.TransportOptions...)
294295
}
296+
297+
_, setConnPoolProactiveCheck := o.RemoteOpt.Dialer.(internalRemote.IsGonetDialer)
295298
if o.RemoteOpt.ConnPool == nil {
296299
if o.PoolCfg != nil {
297300
if *o.PoolCfg == zero {
298301
o.RemoteOpt.ConnPool = connpool.NewShortPool(o.Svr.ServiceName)
299302
} else {
300-
o.RemoteOpt.ConnPool = connpool.NewLongPool(o.Svr.ServiceName, *o.PoolCfg)
303+
cfg := newDefaultLongPoolCfg(o.Svr.ServiceName, *o.PoolCfg, setConnPoolProactiveCheck)
304+
o.RemoteOpt.ConnPool = connpool.NewLongPoolWithConfig(cfg)
301305
}
302306
} else {
303-
o.RemoteOpt.ConnPool = connpool.NewLongPool(
307+
cfg := newDefaultLongPoolCfg(
304308
o.Svr.ServiceName,
305309
connpool2.IdleConfig{
306310
MaxIdlePerAddress: 10,
307311
MaxIdleGlobal: 100,
308312
MaxIdleTimeout: time.Minute,
309313
},
314+
setConnPoolProactiveCheck,
310315
)
316+
o.RemoteOpt.ConnPool = connpool.NewLongPoolWithConfig(cfg)
311317
}
312318
}
313319
}
@@ -319,3 +325,15 @@ func (o *Options) InitRetryContainer() {
319325
o.CloseCallbacks = append(o.CloseCallbacks, o.UnaryOptions.RetryContainer.Close)
320326
}
321327
}
328+
329+
func newDefaultLongPoolCfg(serviceName string, idleCfg connpool2.IdleConfig, enableProactiveCheck bool) connpool.LongPoolConfig {
330+
return connpool.LongPoolConfig{
331+
ServiceName: serviceName,
332+
IdleConfig: idleCfg,
333+
ProactiveCheckConfig: connpool.ProactiveCheckConfig{
334+
Enable: enableProactiveCheck,
335+
CheckFunc: internalRemote.ConnectionStateCheck,
336+
Interval: connpool.DefaultProactiveConnCheckInterval,
337+
},
338+
}
339+
}

internal/remote/conn_check.go

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
//go:build !windows
2+
3+
/*
4+
* Copyright 2025 CloudWeGo Authors
5+
*
6+
* Licensed under the Apache License, Version 2.0 (the "License");
7+
* you may not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package remote
20+
21+
import (
22+
"fmt"
23+
"net"
24+
"syscall"
25+
26+
"golang.org/x/sys/unix"
27+
)
28+
29+
// ConnectionStateCheck uses unix.Poll to detect the connection state
30+
// Since the connections are stored in the pool, we treat any POLLIN event as connection close and set the connection state to closed.
31+
func ConnectionStateCheck(conns ...net.Conn) error {
32+
pollFds := make([]unix.PollFd, 0, len(conns))
33+
34+
for _, conn := range conns {
35+
sysConn, ok := conn.(syscall.Conn)
36+
if !ok {
37+
return fmt.Errorf("conn is not a syscall.Conn, got %T", conn)
38+
}
39+
rawConn, err := sysConn.SyscallConn()
40+
if err != nil {
41+
return err
42+
}
43+
var fd int
44+
err = rawConn.Control(func(fileDescriptor uintptr) {
45+
fd = int(fileDescriptor)
46+
})
47+
if err != nil {
48+
return err
49+
}
50+
pollFds = append(pollFds, unix.PollFd{Fd: int32(fd), Events: unix.POLLIN})
51+
}
52+
53+
n, err := unix.Poll(pollFds, 0)
54+
if err != nil {
55+
return err
56+
}
57+
if n == 0 {
58+
return nil
59+
}
60+
for i := 0; i < len(pollFds); i++ {
61+
if pollFds[i].Revents&unix.POLLIN != 0 {
62+
// the connection should not receive any data, POLLIN means FIN or RST
63+
// set the state
64+
if s, ok := conns[i].(SetConnState); ok {
65+
s.SetConnState(true)
66+
}
67+
}
68+
}
69+
return nil
70+
}

internal/remote/conn_check_test.go

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
//go:build !windows
2+
3+
/*
4+
* Copyright 2025 CloudWeGo Authors
5+
*
6+
* Licensed under the Apache License, Version 2.0 (the "License");
7+
* you may not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package remote
20+
21+
import (
22+
"errors"
23+
"net"
24+
"strings"
25+
"sync/atomic"
26+
"syscall"
27+
"testing"
28+
"time"
29+
30+
"github.com/cloudwego/kitex/internal/test"
31+
)
32+
33+
var _ SetConnState = &mockConn{}
34+
35+
type mockConn struct {
36+
net.Conn
37+
closed atomic.Bool
38+
}
39+
40+
func (m *mockConn) SetConnState(c bool) {
41+
m.closed.Store(c)
42+
}
43+
44+
func (m *mockConn) SyscallConn() (syscall.RawConn, error) {
45+
if sc, ok := m.Conn.(syscall.Conn); ok {
46+
return sc.SyscallConn()
47+
}
48+
return nil, errors.New("not syscall.Conn")
49+
}
50+
51+
func TestConnectionStateCheck(t *testing.T) {
52+
// wrong connection type
53+
err := ConnectionStateCheck(net.Pipe())
54+
test.Assert(t, err != nil)
55+
test.Assert(t, strings.Contains(err.Error(), "conn is not a syscall.Conn"))
56+
57+
ln, err := net.Listen("tcp", "127.0.0.1:0") // 本地端口自动分配
58+
test.Assert(t, err == nil, err)
59+
defer ln.Close()
60+
61+
done := make(chan net.Conn)
62+
go func() {
63+
conn, e := ln.Accept()
64+
test.Assert(t, e == nil)
65+
done <- conn
66+
}()
67+
68+
clientConn, err := net.Dial("tcp", ln.Addr().String())
69+
test.Assert(t, err == nil, err)
70+
71+
serverConn := <-done
72+
serverConnWithState := &mockConn{Conn: serverConn}
73+
// check, not closed
74+
err = ConnectionStateCheck(serverConnWithState)
75+
test.Assert(t, err == nil, err)
76+
test.Assert(t, !serverConnWithState.closed.Load())
77+
78+
// close conn
79+
clientConn.Close()
80+
time.Sleep(100 * time.Millisecond)
81+
82+
// check, closed
83+
err = ConnectionStateCheck(serverConnWithState)
84+
test.Assert(t, err == nil, err)
85+
test.Assert(t, serverConnWithState.closed.Load())
86+
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
//go:build windows
2+
// +build windows
3+
4+
/*
5+
* Copyright 2025 CloudWeGo Authors
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
20+
package remote
21+
22+
import (
23+
"net"
24+
)
25+
26+
// FIXME: windows not supported
27+
func ConnectionStateCheck(conns ...net.Conn) error {
28+
return nil
29+
}

internal/remote/gonet.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/*
2+
* Copyright 2025 CloudWeGo Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package remote
18+
19+
// SetConnState only used to set the state to connection in gonet
20+
type SetConnState interface {
21+
SetConnState(inactive bool)
22+
}
23+
24+
// IsGonetDialer returns if the dialer is gonet dialer
25+
type IsGonetDialer interface {
26+
IsGonetDialer() bool
27+
}

0 commit comments

Comments
 (0)