Skip to content

Commit 734c6a6

Browse files
authored
Merge pull request #155 from ydb-platform/auto-remove-conn-from-pool
v3.14.4: auto-removing `conn.Conn` from `conn.Pool`
2 parents ac66dda + 3140f39 commit 734c6a6

File tree

18 files changed

+163
-126
lines changed

18 files changed

+163
-126
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1+
## 3.14.4
2+
* Implemented auto-removing `conn.Conn` from `conn.Pool` with counting usages of `conn.Conn`
3+
* Refactored naming of source files which declares service client interfaces
4+
15
## 3.14.3
26
* Fixed bug with update balancer element with nil handle
37

File renamed without changes.

discovery/client.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"fmt"
66
"strings"
77

8+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/closer"
89
"github.com/ydb-platform/ydb-go-sdk/v3/internal/endpoint"
910
)
1011

@@ -18,7 +19,8 @@ func (w WhoAmI) String() string {
1819
}
1920

2021
type Client interface {
22+
closer.Closer
23+
2124
Discover(ctx context.Context) ([]endpoint.Endpoint, error)
2225
WhoAmI(ctx context.Context) (*WhoAmI, error)
23-
Close(ctx context.Context) error
2426
}

internal/cluster/cluster.go

Lines changed: 41 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -102,10 +102,6 @@ func (c *cluster) Unlock() {
102102
c.mu.Unlock()
103103
}
104104

105-
func (c *cluster) GetConn(endpoint endpoint.Endpoint) conn.Conn {
106-
return c.pool.GetConn(endpoint)
107-
}
108-
109105
func (c *cluster) Force() {
110106
c.explorer.Force()
111107
}
@@ -171,7 +167,6 @@ type Cluster interface {
171167
CRUD
172168
Explorer
173169
Locker
174-
conn.PoolGetter
175170
conn.Pessimizer
176171
}
177172

@@ -211,10 +206,40 @@ func (c *cluster) Close(ctx context.Context) (err error) {
211206
if c.explorer != nil {
212207
c.explorer.Stop()
213208
}
209+
210+
for _, entry := range c.index {
211+
c.Remove(
212+
ctx,
213+
entry.Conn.Endpoint(),
214+
WithoutLock(),
215+
)
216+
}
217+
214218
c.closed = true
215219

216-
c.index = nil
217-
c.endpoints = nil
220+
if len(c.index) > 0 {
221+
panic(fmt.Sprintf(
222+
"non empty index after remove all entries: %v",
223+
func() (endpoints []string) {
224+
for e := range c.index {
225+
endpoints = append(endpoints, e)
226+
}
227+
return endpoints
228+
}(),
229+
))
230+
}
231+
232+
if len(c.endpoints) > 0 {
233+
panic(fmt.Sprintf(
234+
"non empty nodes after remove all entries: %v",
235+
func() (nodes []uint32) {
236+
for e := range c.endpoints {
237+
nodes = append(nodes, e)
238+
}
239+
return nodes
240+
}(),
241+
))
242+
}
218243

219244
return c.pool.Release(ctx)
220245
}
@@ -284,7 +309,7 @@ func (c *cluster) Insert(ctx context.Context, e endpoint.Endpoint, opts ...crudO
284309
return nil
285310
}
286311

287-
cc = c.pool.GetConn(e)
312+
cc = c.pool.Get(ctx, e)
288313

289314
_, has := c.index[e.Address()]
290315
if has {
@@ -335,7 +360,12 @@ func (c *cluster) Update(ctx context.Context, e endpoint.Endpoint, opts ...crudO
335360
panic("ydb: cluster entry with nil conn")
336361
}
337362

338-
entry.Conn.Endpoint().Touch()
363+
entry.Conn.Endpoint().Touch(
364+
endpoint.WithLocation(e.Location()),
365+
endpoint.WithID(e.NodeID()),
366+
endpoint.WithLoadFactor(e.LoadFactor()),
367+
endpoint.WithLocalDC(e.LocalDC()),
368+
)
339369

340370
delete(c.endpoints, e.NodeID())
341371
c.index[e.Address()] = entry
@@ -380,6 +410,8 @@ func (c *cluster) Remove(ctx context.Context, e endpoint.Endpoint, opts ...crudO
380410
panic("ydb: can't remove not-existing endpoint")
381411
}
382412

413+
defer entry.Conn.Release(ctx)
414+
383415
removed = entry.RemoveFrom(c.balancer, &c.balancerMtx)
384416

385417
delete(c.index, e.Address())

internal/conn/conn.go

Lines changed: 44 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package conn
33
import (
44
"context"
55
"fmt"
6+
"strconv"
67
"sync"
78
"sync/atomic"
89
"time"
@@ -16,7 +17,6 @@ import (
1617
"github.com/ydb-platform/ydb-go-sdk/v3/internal/endpoint"
1718
"github.com/ydb-platform/ydb-go-sdk/v3/internal/errors"
1819
"github.com/ydb-platform/ydb-go-sdk/v3/internal/response"
19-
"github.com/ydb-platform/ydb-go-sdk/v3/testutil/timeutil"
2020
"github.com/ydb-platform/ydb-go-sdk/v3/trace"
2121
)
2222

@@ -32,11 +32,13 @@ type Conn interface {
3232

3333
Endpoint() endpoint.Endpoint
3434

35-
TTL() <-chan time.Time
35+
LastUsage() time.Time
3636

3737
IsState(states ...State) bool
3838
GetState() State
3939
SetState(State) State
40+
41+
Release(ctx context.Context)
4042
}
4143

4244
func (c *conn) Address() string {
@@ -45,15 +47,28 @@ func (c *conn) Address() string {
4547

4648
type conn struct {
4749
sync.RWMutex
48-
config Config // ro access
49-
cc *grpc.ClientConn
50-
done chan struct{}
51-
endpoint endpoint.Endpoint // ro access
52-
closed bool
53-
state State
54-
usages int32
55-
ttl timeutil.Timer
56-
onClose []func(*conn)
50+
config Config // ro access
51+
cc *grpc.ClientConn
52+
done chan struct{}
53+
endpoint endpoint.Endpoint // ro access
54+
closed bool
55+
state State
56+
usages int32
57+
lastUsage time.Time
58+
onClose []func(*conn)
59+
}
60+
61+
func (c *conn) Release(ctx context.Context) {
62+
if c.decUsages() == 0 {
63+
_ = c.Close(ctx)
64+
}
65+
}
66+
67+
func (c *conn) LastUsage() time.Time {
68+
c.RLock()
69+
defer c.RUnlock()
70+
71+
return c.lastUsage
5772
}
5873

5974
func (c *conn) IsState(states ...State) bool {
@@ -69,7 +84,7 @@ func (c *conn) IsState(states ...State) bool {
6984
return false
7085
}
7186

72-
func (c *conn) Park(ctx context.Context) (err error) {
87+
func (c *conn) park(ctx context.Context) (err error) {
7388
c.Lock()
7489
defer c.Unlock()
7590

@@ -135,19 +150,6 @@ func (c *conn) GetState() (s State) {
135150
return c.state
136151
}
137152

138-
func (c *conn) TTL() <-chan time.Time {
139-
if c.config.ConnectionTTL() == 0 {
140-
return nil
141-
}
142-
if c.isClosed() {
143-
return nil
144-
}
145-
if atomic.LoadInt32(&c.usages) > 0 {
146-
return nil
147-
}
148-
return c.ttl.C()
149-
}
150-
151153
func (c *conn) take(ctx context.Context) (cc *grpc.ClientConn, err error) {
152154
onDone := trace.DriverOnConnTake(
153155
trace.ContextDriver(ctx).Compose(c.config.Trace()),
@@ -192,25 +194,30 @@ func (c *conn) take(ctx context.Context) (cc *grpc.ClientConn, err error) {
192194
return c.cc, nil
193195
}
194196

195-
func (c *conn) changeUsages(delta int32) {
196-
trace.DriverOnConnUsagesChange(
197-
c.config.Trace(),
198-
c.endpoint.Copy(),
199-
int(atomic.AddInt32(&c.usages, delta)),
200-
)
197+
func (c *conn) changeUsages(delta int32) int32 {
198+
if usages := atomic.AddInt32(&c.usages, delta); usages < 0 {
199+
panic("negative usages" + strconv.Itoa(int(usages)))
200+
} else {
201+
trace.DriverOnConnUsagesChange(
202+
c.config.Trace(),
203+
c.endpoint.Copy(),
204+
int(usages),
205+
)
206+
return usages
207+
}
201208
}
202209

203210
func (c *conn) incUsages() {
211+
c.Lock()
212+
defer c.Unlock()
204213
c.changeUsages(1)
205214
}
206215

207-
func (c *conn) decUsages() {
216+
func (c *conn) decUsages() int32 {
208217
c.Lock()
209218
defer c.Unlock()
210-
if ttl := c.config.ConnectionTTL(); ttl > 0 {
211-
c.ttl.Reset(ttl)
212-
}
213-
c.changeUsages(-1)
219+
c.lastUsage = time.Now()
220+
return c.changeUsages(-1)
214221
}
215222

216223
func isBroken(raw *grpc.ClientConn) bool {
@@ -433,6 +440,7 @@ func withOnClose(onClose func(*conn)) option {
433440

434441
func newConn(e endpoint.Endpoint, config Config, opts ...option) *conn {
435442
c := &conn{
443+
usages: 1,
436444
state: Created,
437445
endpoint: e,
438446
config: config,
@@ -442,9 +450,6 @@ func newConn(e endpoint.Endpoint, config Config, opts ...option) *conn {
442450
for _, o := range opts {
443451
o(c)
444452
}
445-
if ttl := config.ConnectionTTL(); ttl > 0 {
446-
c.ttl = timeutil.NewTimer(ttl)
447-
}
448453
return c
449454
}
450455

internal/conn/grpc_client_stream.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,10 +44,12 @@ func (s *grpcClientStream) Context() context.Context {
4444
func (s *grpcClientStream) SendMsg(m interface{}) (err error) {
4545
s.c.incUsages()
4646
defer s.c.decUsages()
47+
4748
err = s.s.SendMsg(m)
4849
if err != nil && s.wrapping {
4950
err = errors.Error(errors.MapGRPCError(err))
5051
}
52+
5153
return
5254
}
5355

0 commit comments

Comments
 (0)