Skip to content

Commit 16f994e

Browse files
committed
* Added config.SharedPool() setting and config.WithSharedPool() option
* Added management of shared pool flag on change dial timeout and credentials * Removed explicit checks of conditions for use (or not) shared pool in `ydb.With()` * Renamed `internal/db` interfaces * Changed signature of `conn.Conn.Release` (added error as result)
1 parent 9aeface commit 16f994e

File tree

10 files changed

+68
-41
lines changed

10 files changed

+68
-41
lines changed

CHANGELOG.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,9 @@
1+
* Added `config.SharedPool()` setting and `config.WithSharedPool()` option
2+
* Added management of shared pool flag on change dial timeout and credentials
3+
* Removed explicit checks of conditions for use (or not) shared pool in `ydb.With()`
4+
* Renamed `internal/db` interfaces
5+
* Changed signature of `conn.Conn.Release` (added error as result)
6+
17
## v3.16.4
28
* Removed `WithMeta()` discovery config option
39
* Moved `meta.Meta` call to conn exclusively

config/config.go

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,9 @@ type Config interface {
7676

7777
// UseDNSResolver is a flag about using dns-resolving or not
7878
UseDNSResolver() bool
79+
80+
// SharedPool is a flag about using shared connection pool or detached
81+
SharedPool() bool
7982
}
8083

8184
// Config contains driver configuration options.
@@ -88,6 +91,7 @@ type config struct {
8891
balancer balancer.Balancer
8992
secure bool
9093
dnsResolver bool
94+
sharedPool bool
9195
endpoint string
9296
database string
9397
requestsType string
@@ -98,6 +102,10 @@ type config struct {
98102
meta meta.Meta
99103
}
100104

105+
func (c *config) SharedPool() bool {
106+
return c.sharedPool
107+
}
108+
101109
func (c *config) UseDNSResolver() bool {
102110
return c.dnsResolver
103111
}
@@ -211,9 +219,16 @@ func WithConnectionTTL(ttl time.Duration) Option {
211219
}
212220
}
213221

222+
func WithSharedPool() Option {
223+
return func(c *config) {
224+
c.sharedPool = true
225+
}
226+
}
227+
214228
func WithCredentials(credentials credentials.Credentials) Option {
215229
return func(c *config) {
216230
c.credentials = credentials
231+
c.sharedPool = false // use separated pool for another credentials
217232
}
218233
}
219234

@@ -231,6 +246,9 @@ func WithOperationCancelAfter(operationCancelAfter time.Duration) Option {
231246

232247
func WithDialTimeout(timeout time.Duration) Option {
233248
return func(c *config) {
249+
if c.dialTimeout != timeout {
250+
c.sharedPool = false // use separated pool for another dial timeout
251+
}
234252
c.dialTimeout = timeout
235253
}
236254
}
@@ -312,8 +330,9 @@ func certPool() (certPool *x509.CertPool) {
312330

313331
func defaultConfig() (c *config) {
314332
return &config{
315-
balancer: balancers.Default(),
316-
secure: true,
333+
balancer: balancers.Default(),
334+
secure: true,
335+
sharedPool: true,
317336
tlsConfig: &tls.Config{
318337
MinVersion: tls.VersionTLS12,
319338
RootCAs: certPool(),

connection.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ import (
3434
// Interface and list of clients may be changed in the future
3535
type Connection interface {
3636
closer.Closer
37-
db.ConnectionInfo
37+
db.Info
3838
grpc.ClientConnInterface
3939

4040
// Table returns table client
@@ -265,7 +265,7 @@ func New(ctx context.Context, opts ...Option) (_ Connection, err error) {
265265
)
266266
}
267267

268-
if c.pool == nil {
268+
if c.pool == nil || !c.config.SharedPool() {
269269
c.pool = conn.NewPool(
270270
ctx,
271271
c.config,

internal/cluster/cluster.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -417,7 +417,9 @@ func (c *cluster) Remove(ctx context.Context, e endpoint.Endpoint, opts ...crudO
417417
panic("ydb: can't remove not-existing endpoint")
418418
}
419419

420-
defer entry.Conn.Release(ctx)
420+
defer func() {
421+
_ = entry.Conn.Release(ctx)
422+
}()
421423

422424
removed = entry.RemoveFrom(c.balancer, &c.balancerMtx)
423425

internal/conn/conn.go

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ type Conn interface {
3939
GetState() State
4040
SetState(State) State
4141

42-
Release(ctx context.Context)
42+
Release(ctx context.Context) error
4343
}
4444

4545
func (c *conn) Address() string {
@@ -60,19 +60,19 @@ type conn struct {
6060
onClose []func(*conn)
6161
}
6262

63-
func (c *conn) Release(ctx context.Context) {
63+
func (c *conn) Release(ctx context.Context) (err error) {
6464
var (
6565
onDone = trace.DriverOnConnRelease(
6666
c.config.Trace(),
6767
&ctx,
6868
c.endpoint.Copy(),
6969
)
70-
err error
70+
issues []error
7171
)
7272
defer func() {
7373
onDone(err)
7474
}()
75-
var issues []error
75+
7676
if c.changeUsages(-1) == 0 {
7777
if usages := atomic.LoadInt32(&c.streamUsages); usages > 0 {
7878
issues = append(issues, fmt.Errorf("conn in stream use: usages=%d", usages))
@@ -81,9 +81,12 @@ func (c *conn) Release(ctx context.Context) {
8181
issues = append(issues, closeErr)
8282
}
8383
}
84+
8485
if len(issues) > 0 {
85-
err = errors.NewWithIssues("conn released with issues", issues...)
86+
return errors.NewWithIssues("conn released with issues", issues...)
8687
}
88+
89+
return nil
8790
}
8891

8992
func (c *conn) LastUsage() time.Time {

internal/db/connection.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,11 @@ type Cluster interface {
1616
closer.Closer
1717
}
1818

19-
type ConnectionDiscovery interface {
19+
type Discoverer interface {
2020
Discovery() discovery.Client
2121
}
2222

23-
type ConnectionInfo interface {
23+
type Info interface {
2424
// Endpoint returns initial endpoint
2525
Endpoint() string
2626

@@ -33,6 +33,6 @@ type ConnectionInfo interface {
3333

3434
type Connection interface {
3535
Cluster
36-
ConnectionInfo
37-
ConnectionDiscovery
36+
Info
37+
Discoverer
3838
}

internal/db/database.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ type database struct {
2020
config config.Config
2121
cluster cluster.Cluster
2222
discovery discovery.Client
23+
cc conn.Conn
2324
}
2425

2526
func (db *database) Discovery() discovery.Client {
@@ -64,6 +65,7 @@ func New(
6465
db := &database{
6566
config: c,
6667
cluster: cluster.New(ctx, c, pool, c.Balancer()),
68+
cc: pool.Create(endpoint.New(c.Endpoint(), endpoint.WithLocalDC(true))),
6769
}
6870

6971
var cancel context.CancelFunc
@@ -74,17 +76,14 @@ func New(
7476
}
7577
defer cancel()
7678

77-
cc := pool.Create(endpoint.New(c.Endpoint(), endpoint.WithLocalDC(true)))
78-
7979
db.discovery, err = builder.New(
8080
ctx,
81-
cc,
81+
db.cc,
8282
db.cluster,
8383
db.config.Trace(),
8484
opts...,
8585
)
8686
if err != nil {
87-
cc.Release(ctx)
8887
return nil, err
8988
}
9089

internal/discovery/discovery.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,5 @@ func (c *client) WhoAmI(ctx context.Context) (whoAmI *discovery.WhoAmI, err erro
197197
}
198198

199199
func (c *client) Close(ctx context.Context) error {
200-
c.cc.Release(ctx)
201-
return nil
200+
return c.cc.Release(ctx)
202201
}

test/connection_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ func TestConnection(t *testing.T) {
8383
ydb.WithUserAgent(userAgent),
8484
ydb.WithRequestsType(requestType),
8585
ydb.With(
86+
config.WithInternalDNSResolver(),
8687
config.WithGrpcOptions(
8788
grpc.WithUnaryInterceptor(func(
8889
ctx context.Context,
@@ -236,7 +237,7 @@ func TestConnection(t *testing.T) {
236237
var childDB ydb.Connection
237238
childDB, err = db.With(
238239
ctx,
239-
ydb.WithAccessTokenCredentials(""),
240+
ydb.WithDialTimeout(time.Second*5),
240241
)
241242
if err != nil {
242243
t.Fatalf("failed to open sub-connection: %v", err)

with.go

Lines changed: 18 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -10,43 +10,41 @@ import (
1010
var nextID = uint64(0)
1111

1212
func (c *connection) With(ctx context.Context, opts ...Option) (Connection, error) {
13-
if len(opts) == 0 {
14-
return c, nil
15-
}
13+
id := atomic.AddUint64(&nextID, 1)
1614

1715
opts = append(
1816
opts,
1917
WithBalancer(
2018
c.config.Balancer().Create(),
2119
),
22-
)
23-
24-
id := atomic.AddUint64(&nextID, 1)
25-
26-
opts = append(
27-
opts,
2820
withOnClose(func(child *connection) {
2921
c.childrenMtx.Lock()
3022
defer c.childrenMtx.Unlock()
3123

3224
delete(c.children, id)
3325
}),
26+
withConnPool(c.pool),
3427
)
35-
// check if credentials have been overridden
36-
tmp := new(connection)
37-
for _, o := range opts {
38-
_ = o(ctx, tmp)
39-
}
40-
tmpCfg := config.New(tmp.options...)
41-
if tmpCfg.Credentials() == nil {
42-
// use previous credentials, so we can share conn pool
43-
opts = append(opts, withConnPool(c.pool))
44-
}
4528

4629
child, err := New(
4730
ctx,
4831
append(
49-
c.opts,
32+
append(
33+
c.opts,
34+
With(
35+
config.WithSharedPool(), // force set to true prefer shared pool
36+
config.WithBalancer(
37+
c.config.Balancer().Create(),
38+
),
39+
),
40+
withOnClose(func(child *connection) {
41+
c.childrenMtx.Lock()
42+
defer c.childrenMtx.Unlock()
43+
44+
delete(c.children, id)
45+
}),
46+
withConnPool(c.pool),
47+
),
5048
opts...,
5149
)...,
5250
)

0 commit comments

Comments
 (0)