Skip to content

Commit 118180c

Browse files
committed
shared conn.Pool
1 parent 64337c2 commit 118180c

File tree

14 files changed

+200
-183
lines changed

14 files changed

+200
-183
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
* Removed `CustomOption` and taking client with custom options
44
* Removed `proxy` package
55
* Improved `db.With()` helper for create child connections
6+
* Setted shared `conn.Pool` for all child `ydb.Connection`
67

78
## 3.12.1
89
* Added `trace.Driver.OnConnPark` event

connection.go

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,8 @@ type connection struct {
8383
ratelimiter ratelimiter.Client
8484
ratelimiterOptions []ratelimiterConfig.Option
8585

86+
pool conn.Pool
87+
8688
mtx sync.Mutex
8789
db db.Connection
8890

@@ -95,6 +97,12 @@ func (c *connection) Close(ctx context.Context) error {
9597
c.mtx.Lock()
9698
defer c.mtx.Unlock()
9799

100+
defer func() {
101+
for _, f := range c.onClose {
102+
f(c)
103+
}
104+
}()
105+
98106
var (
99107
issues []error
100108
children = make([]Connection, 0, len(c.children))
@@ -136,12 +144,12 @@ func (c *connection) Close(ctx context.Context) error {
136144
issues = append(issues, err)
137145
}
138146

139-
if len(issues) > 0 {
140-
return errors.NewWithIssues("close failed", issues...)
147+
if err := c.pool.Release(ctx); err != nil {
148+
issues = append(issues, err)
141149
}
142150

143-
for _, f := range c.onClose {
144-
f(c)
151+
if len(issues) > 0 {
152+
return errors.NewWithIssues("close failed", issues...)
145153
}
146154

147155
return nil
@@ -257,9 +265,17 @@ func New(ctx context.Context, opts ...Option) (_ Connection, err error) {
257265
)
258266
}
259267

268+
if c.pool == nil {
269+
c.pool = conn.NewPool(
270+
ctx,
271+
c.config,
272+
)
273+
}
274+
260275
c.db, err = db.New(
261276
ctx,
262277
c.config,
278+
c.pool,
263279
append(
264280
// prepend config params from root config
265281
[]discoveryConfig.Option{

internal/cluster/cluster.go

Lines changed: 50 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ var (
4646

4747
type cluster struct {
4848
config config.Config
49-
pool conn.PoolGetterCloser
49+
pool conn.Pool
5050
dial func(context.Context, string) (*grpc.ClientConn, error)
5151
balancer balancer.Balancer
5252
explorer repeater.Repeater
@@ -58,6 +58,47 @@ type cluster struct {
5858
closed bool
5959
}
6060

61+
func (c *cluster) Pessimize(ctx context.Context, cc conn.Conn, cause error) {
62+
c.pool.Pessimize(ctx, cc, cause)
63+
64+
c.mu.RLock()
65+
defer c.mu.RUnlock()
66+
67+
if c.closed {
68+
return
69+
}
70+
71+
entry, has := c.index[cc.Endpoint().Address()]
72+
if !has {
73+
return
74+
}
75+
76+
if entry.Handle == nil {
77+
return
78+
}
79+
80+
if !c.balancer.Contains(entry.Handle) {
81+
return
82+
}
83+
84+
if c.explorer == nil {
85+
return
86+
}
87+
88+
// count ratio (banned/all)
89+
online := 0
90+
for _, entry = range c.index {
91+
if entry.Conn != nil && entry.Conn.GetState() == conn.Online {
92+
online++
93+
}
94+
}
95+
96+
// more than half connections banned - re-discover now
97+
if online*2 < len(c.index) {
98+
c.explorer.Force()
99+
}
100+
}
101+
61102
func (c *cluster) Lock() {
62103
c.mu.Lock()
63104
}
@@ -136,75 +177,34 @@ type Cluster interface {
136177
Explorer
137178
Locker
138179
conn.PoolGetter
180+
conn.Pessimizer
139181
}
140182

141183
func New(
142184
ctx context.Context,
143185
config config.Config,
186+
pool conn.Pool,
144187
balancer balancer.Balancer,
145188
) Cluster {
146189
onDone := trace.DriverOnClusterInit(config.Trace(), &ctx)
147190
defer func() {
148-
onDone()
191+
onDone(pool.Take(ctx))
149192
}()
150193

151-
c := &cluster{
194+
return &cluster{
152195
config: config,
153196
index: make(map[string]entry.Entry),
154197
endpoints: make(map[uint32]conn.Conn),
198+
pool: pool,
155199
balancer: balancer,
156200
}
157-
158-
c.pool = conn.NewPool(
159-
ctx,
160-
config,
161-
func(e endpoint.Endpoint) {
162-
c.mu.RLock()
163-
defer c.mu.RUnlock()
164-
165-
if c.closed {
166-
return
167-
}
168-
169-
entry, has := c.index[e.Address()]
170-
if !has {
171-
return
172-
}
173-
174-
if entry.Handle == nil {
175-
return
176-
}
177-
178-
if !c.balancer.Contains(entry.Handle) {
179-
return
180-
}
181-
182-
if c.explorer == nil {
183-
return
184-
}
185-
186-
// count ratio (banned/all)
187-
online := 0
188-
for _, entry = range c.index {
189-
if entry.Conn != nil && entry.Conn.GetState() == conn.Online {
190-
online++
191-
}
192-
}
193-
194-
// more than half connections banned - re-discover now
195-
if online*2 < len(c.index) {
196-
c.explorer.Force()
197-
}
198-
},
199-
)
200-
201-
return c
202201
}
203202

204203
func (c *cluster) Close(ctx context.Context) (err error) {
205204
c.mu.Lock()
205+
defer c.mu.Unlock()
206+
206207
if c.closed {
207-
c.mu.Unlock()
208208
return
209209
}
210210

@@ -218,19 +218,10 @@ func (c *cluster) Close(ctx context.Context) (err error) {
218218
}
219219
c.closed = true
220220

221-
index := c.index
222221
c.index = nil
223222
c.endpoints = nil
224223

225-
c.mu.Unlock()
226-
227-
for _, entry := range index {
228-
if entry.Conn != nil {
229-
_ = entry.Conn.Close(ctx)
230-
}
231-
}
232-
233-
return c.pool.Close(ctx)
224+
return c.pool.Release(ctx)
234225
}
235226

236227
// Get returns next available connection.

internal/cluster/cluster_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ func TestClusterFastRedial(t *testing.T) {
3939
balancer: b,
4040
index: make(map[string]entry.Entry),
4141
endpoints: make(map[uint32]conn.Conn),
42-
pool: conn.NewPool(ctx, config.New(), nil),
42+
pool: conn.NewPool(ctx, config.New()),
4343
}
4444

4545
pingConnects := func(size int) chan struct{} {
@@ -96,7 +96,7 @@ func TestClusterMergeEndpoints(t *testing.T) {
9696
}(),
9797
index: make(map[string]entry.Entry),
9898
endpoints: make(map[uint32]conn.Conn),
99-
pool: conn.NewPool(ctx, config.New(), nil),
99+
pool: conn.NewPool(ctx, config.New()),
100100
}
101101

102102
assert := func(t *testing.T, exp []endpoint.Endpoint) {

internal/conn/conn.go

Lines changed: 17 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,14 @@ type Conn interface {
2424

2525
Endpoint() endpoint.Endpoint
2626

27+
TTL() <-chan time.Time
28+
2729
IsState(states ...State) bool
2830
GetState() State
2931
SetState(State) State
32+
3033
Close(ctx context.Context) error
3134
Park(ctx context.Context) error
32-
TTL() <-chan time.Time
3335
}
3436

3537
func (c *conn) Address() string {
@@ -38,16 +40,15 @@ func (c *conn) Address() string {
3840

3941
type conn struct {
4042
sync.RWMutex
41-
config Config // ro access
42-
cc *grpc.ClientConn
43-
done chan struct{}
44-
endpoint endpoint.Endpoint // ro access
45-
closed bool
46-
state State
47-
usages int32
48-
ttl timeutil.Timer
49-
onClose []func(Conn)
50-
onPessimize []func(e endpoint.Endpoint)
43+
config Config // ro access
44+
cc *grpc.ClientConn
45+
done chan struct{}
46+
endpoint endpoint.Endpoint // ro access
47+
closed bool
48+
state State
49+
usages int32
50+
ttl timeutil.Timer
51+
onClose []func(Conn)
5152
}
5253

5354
func (c *conn) IsState(states ...State) bool {
@@ -262,39 +263,13 @@ func (c *conn) Close(ctx context.Context) (err error) {
262263
return err
263264
}
264265

265-
func (c *conn) pessimize(ctx context.Context, err error) {
266-
if c.isClosed() {
267-
return
268-
}
269-
270-
defer func() {
271-
for _, f := range c.onPessimize {
272-
f(c.endpoint)
273-
}
274-
}()
275-
276-
trace.DriverOnPessimizeNode(
277-
trace.ContextDriver(ctx).Compose(c.config.Trace()),
278-
&ctx,
279-
c.endpoint.Copy(),
280-
c.GetState(),
281-
err,
282-
)(c.SetState(Banned))
283-
}
284-
285266
func (c *conn) invoke(
286267
ctx context.Context,
287268
method string,
288269
req interface{},
289270
res interface{},
290271
opts ...grpc.CallOption,
291272
) (err error) {
292-
defer func() {
293-
if err != nil && errors.MustPessimizeEndpoint(err) {
294-
c.pessimize(ctx, err)
295-
}
296-
}()
297-
298273
var cc *grpc.ClientConn
299274
cc, err = c.take(ctx)
300275
if err != nil {
@@ -368,12 +343,6 @@ func (c *conn) newStream(
368343
method string,
369344
opts ...grpc.CallOption,
370345
) (_ grpc.ClientStream, err error) {
371-
defer func() {
372-
if err != nil && errors.MustPessimizeEndpoint(err) {
373-
c.pessimize(ctx, err)
374-
}
375-
}()
376-
377346
var cc *grpc.ClientConn
378347
cc, err = c.take(ctx)
379348
if err != nil {
@@ -449,14 +418,6 @@ func (c *conn) NewStream(
449418

450419
type option func(c *conn)
451420

452-
func withOnPessimize(onPessimize func(e endpoint.Endpoint)) option {
453-
return func(c *conn) {
454-
if onPessimize != nil {
455-
c.onPessimize = append(c.onPessimize, onPessimize)
456-
}
457-
}
458-
}
459-
460421
func withOnClose(onClose func(Conn)) option {
461422
return func(c *conn) {
462423
if onClose != nil {
@@ -467,12 +428,11 @@ func withOnClose(onClose func(Conn)) option {
467428

468429
func New(e endpoint.Endpoint, config Config, opts ...option) Conn {
469430
c := &conn{
470-
state: Created,
471-
endpoint: e,
472-
config: config,
473-
done: make(chan struct{}),
474-
onClose: make([]func(Conn), 0),
475-
onPessimize: make([]func(e endpoint.Endpoint), 0),
431+
state: Created,
432+
endpoint: e,
433+
config: config,
434+
done: make(chan struct{}),
435+
onClose: make([]func(Conn), 0),
476436
}
477437
for _, o := range opts {
478438
o(c)

internal/conn/grpc_client_stream.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -66,9 +66,6 @@ func (s *grpcClientStream) RecvMsg(m interface{}) (err error) {
6666
err = s.s.RecvMsg(m)
6767

6868
if err != nil {
69-
if errors.MustPessimizeEndpoint(err) {
70-
s.c.pessimize(s.s.Context(), err)
71-
}
7269
if s.wrapping {
7370
return errors.Errorf(0, "receive message failed: %w", errors.MapGRPCError(err))
7471
}

0 commit comments

Comments
 (0)