@@ -12,17 +12,17 @@ import (
1212)
1313
1414type Conn interface {
15- ReadOnly () (* grpc. ClientConn , error )
16- WriteOnly () (* grpc. ClientConn , error )
15+ ReadOnly () (* GrpcPoolConn , error )
16+ WriteOnly () (* GrpcPoolConn , error )
1717 Close () error
1818}
1919
2020type clientConn struct {
2121 state atomic.Bool
2222 mu sync.Mutex
2323 ctx context.Context
24- writeOnly * grpc. ClientConn
25- readOnly map [string ]* grpc. ClientConn
24+ writeOnly * GrpcPool
25+ readOnly map [string ]* GrpcPool
2626
2727 endpoints []string
2828}
@@ -49,7 +49,8 @@ func (c *clientConn) listenLeader() {
4949 wg := sync.WaitGroup {}
5050 wg .Add (len (c .readOnly ))
5151
52- finalTry := func (conn * grpc.ClientConn ) {
52+ finalTry := func (conn * GrpcPoolConn ) {
53+ defer conn .Release ()
5354 var (
5455 err error
5556 monitor serverpb.RedQueen_LeaderMonitorClient
@@ -87,13 +88,17 @@ func (c *clientConn) listenLeader() {
8788 }
8889
8990 for _ , conn := range c .readOnly {
90- go finalTry (conn )
91+ cc , err := conn .Alloc ()
92+ if err != nil {
93+ panic (err )
94+ }
95+ go finalTry (cc )
9196 }
9297
9398 wg .Wait ()
9499}
95100
96- func (c * clientConn ) ReadOnly () (* grpc. ClientConn , error ) {
101+ func (c * clientConn ) ReadOnly () (* GrpcPoolConn , error ) {
97102 size := len (c .readOnly )
98103 if size == 0 {
99104 return nil , errors .New ("read-only not maintained" )
@@ -104,21 +109,31 @@ func (c *clientConn) ReadOnly() (*grpc.ClientConn, error) {
104109 round , _ = rand .Int (rand .Reader , big .NewInt (int64 (size )))
105110 )
106111
107- for _ , conn := range c .readOnly {
112+ for _ , pool := range c .readOnly {
108113 step ++
109- if step == round .Int64 () {
110- return conn , nil
114+ if step != round .Int64 () {
115+ continue
116+ }
117+
118+ cc , err := pool .Alloc ()
119+ if err != nil {
120+ return nil , err
111121 }
122+ return cc , nil
112123 }
113124
114125 return nil , errors .New ("unexpected" )
115126}
116127
117- func (c * clientConn ) WriteOnly () (* grpc. ClientConn , error ) {
128+ func (c * clientConn ) WriteOnly () (* GrpcPoolConn , error ) {
118129 if c .writeOnly == nil {
119130 return nil , errors .New ("write-only not maintained" )
120131 }
121- return c .writeOnly , nil
132+ cc , err := c .writeOnly .Alloc ()
133+ if err != nil {
134+ return nil , err
135+ }
136+ return cc , nil
122137}
123138
124139func (c * clientConn ) Close () error {
@@ -127,8 +142,10 @@ func (c *clientConn) Close() error {
127142 }
128143 c .state .Store (false )
129144
130- c .writeOnly .Close ()
131- c .writeOnly = nil
145+ if c .writeOnly != nil {
146+ c .writeOnly .Close ()
147+ c .writeOnly = nil
148+ }
132149
133150 for key , conn := range c .readOnly {
134151 conn .Close ()
@@ -143,22 +160,22 @@ func NewClientConn(ctx context.Context, endpoints []string, opts ...grpc.DialOpt
143160 state : atomic.Bool {},
144161 ctx : ctx ,
145162 writeOnly : nil ,
146- readOnly : make (map [string ]* grpc. ClientConn ),
163+ readOnly : make (map [string ]* GrpcPool ),
147164 endpoints : endpoints ,
148165 }
149166 cc .state .Store (true )
150167
151168 var (
152169 err error
153- conn * grpc. ClientConn
170+ pool * GrpcPool
154171 )
155172
156173 // init
157174 for _ , endpoint := range endpoints {
158- if conn , err = grpc . DialContext (ctx , endpoint , opts ... ); err != nil {
175+ if pool , err = NewGrpcPool (ctx , endpoint , 16 , opts ... ); err != nil {
159176 return nil , err
160177 }
161- cc .readOnly [endpoint ] = conn
178+ cc .readOnly [endpoint ] = pool
162179 }
163180
164181 // start listen
0 commit comments