Skip to content

Commit d8bc9a3

Browse files
authored
Merge pull request #171 from candiduslynx/feature/fill-in-meta-in-conn
move meta call to conn exclusively
2 parents 382956c + c364b93 commit d8bc9a3

File tree

12 files changed

+52
-63
lines changed

12 files changed

+52
-63
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,3 @@
11
.idea
22
.DS_Store
3+
vendor

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
* Removed `WithMeta()` discovery config option
2+
* Moved `meta.Meta` call to conn exclusively
3+
14
## v3.16.3
25
* Replaced panic on cluster close to error issues
36

connection.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -282,7 +282,6 @@ func New(ctx context.Context, opts ...Option) (_ Connection, err error) {
282282
discoveryConfig.WithEndpoint(c.Endpoint()),
283283
discoveryConfig.WithDatabase(c.Name()),
284284
discoveryConfig.WithSecure(c.Secure()),
285-
discoveryConfig.WithMeta(c.config.Meta()),
286285
discoveryConfig.WithOperationTimeout(c.config.OperationTimeout()),
287286
discoveryConfig.WithOperationCancelAfter(c.config.OperationCancelAfter()),
288287
},

discovery/config/config.go

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package config
33
import (
44
"time"
55

6-
"github.com/ydb-platform/ydb-go-sdk/v3/internal/meta"
76
"github.com/ydb-platform/ydb-go-sdk/v3/trace"
87
)
98

@@ -42,16 +41,12 @@ type Config interface {
4241
// If Interval is zero then the DefaultInterval is used.
4342
// If Interval is negative, then no background discovery prepared.
4443
Interval() time.Duration
45-
46-
// Meta is an option which contains meta information about database connection
47-
Meta() meta.Meta
4844
}
4945

5046
type config struct {
5147
endpoint string
5248
database string
5349
secure bool
54-
meta meta.Meta
5550

5651
operationTimeout time.Duration
5752
operationCancelAfter time.Duration
@@ -60,10 +55,6 @@ type config struct {
6055
trace trace.Discovery
6156
}
6257

63-
func (c *config) Meta() meta.Meta {
64-
return c.meta
65-
}
66-
6758
func (c *config) OperationTimeout() time.Duration {
6859
return c.operationTimeout
6960
}
@@ -100,12 +91,6 @@ func WithEndpoint(endpoint string) Option {
10091
}
10192
}
10293

103-
func WithMeta(meta meta.Meta) Option {
104-
return func(c *config) {
105-
c.meta = meta
106-
}
107-
}
108-
10994
func WithDatabase(database string) Option {
11095
return func(c *config) {
11196
c.database = database

internal/cluster/cluster.go

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -151,22 +151,17 @@ type Explorer interface {
151151
Force()
152152
}
153153

154-
type Locker interface {
155-
Lock()
156-
Unlock()
157-
}
158-
159154
type CRUDExplorerLocker interface {
160155
CRUD
161156
Explorer
162-
Locker
157+
sync.Locker
163158
}
164159

165160
type Cluster interface {
166161
closer.Closer
167162
CRUD
168163
Explorer
169-
Locker
164+
sync.Locker
170165
conn.Pessimizer
171166
}
172167

@@ -260,8 +255,11 @@ func (c *cluster) Get(ctx context.Context, opts ...crudOption) (cc conn.Conn, er
260255
ctx, cancel = context.WithTimeout(ctx, MaxGetConnTimeout)
261256
defer cancel()
262257

263-
c.mu.RLock()
264-
defer c.mu.RUnlock()
258+
options := parseOptions(opts...)
259+
if options.withLock {
260+
c.mu.Lock()
261+
defer c.mu.Unlock()
262+
}
265263

266264
if c.closed {
267265
return nil, errors.WithStackTrace(ErrClusterClosed)
@@ -318,7 +316,7 @@ func (c *cluster) Insert(ctx context.Context, e endpoint.Endpoint, opts ...crudO
318316
return nil
319317
}
320318

321-
cc = c.pool.Get(ctx, e)
319+
cc = c.pool.Get(e)
322320

323321
_, has := c.index[e.Address()]
324322
if has {

internal/conn/conn.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -236,7 +236,7 @@ func (c *conn) changeUsages(delta int32) int32 {
236236
usages := atomic.AddInt32(&c.usages, delta)
237237

238238
if usages < 0 {
239-
panic("negative usages" + strconv.Itoa(int(usages)))
239+
panic("negative usages: " + strconv.Itoa(int(usages)))
240240
}
241241

242242
trace.DriverOnConnUsagesChange(
@@ -254,7 +254,7 @@ func (c *conn) changeStreamUsages(delta int32) {
254254
usages := atomic.AddInt32(&c.streamUsages, delta)
255255

256256
if usages < 0 {
257-
panic("negative stream usages" + strconv.Itoa(int(usages)))
257+
panic("negative stream usages: " + strconv.Itoa(int(usages)))
258258
}
259259

260260
trace.DriverOnConnStreamUsagesChange(
@@ -433,6 +433,14 @@ func (c *conn) newStream(
433433
)
434434
}
435435

436+
ctx, err = c.config.Meta().Meta(ctx)
437+
if err != nil {
438+
return nil, errors.NewGrpcError(
439+
errors.WithStatus(grpcStatus.New(codes.Unavailable, "ydb driver conn apply meta failed")),
440+
errors.WithErr(err),
441+
)
442+
}
443+
436444
c.changeStreamUsages(1)
437445
defer c.changeStreamUsages(-1)
438446

internal/conn/pool.go

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,19 @@ import (
1515
)
1616

1717
type Pool interface {
18+
Creator
1819
Getter
1920
Taker
2021
Releaser
2122
Pessimizer
2223
}
2324

25+
type Creator interface {
26+
Create(endpoint endpoint.Endpoint) Conn // creates new detached conn applying pool config
27+
}
28+
2429
type Getter interface {
25-
Get(ctx context.Context, endpoint endpoint.Endpoint) Conn
30+
Get(endpoint endpoint.Endpoint) Conn
2631
}
2732

2833
type Taker interface {
@@ -51,7 +56,11 @@ type pool struct {
5156
done chan struct{}
5257
}
5358

54-
func (p *pool) Get(ctx context.Context, endpoint endpoint.Endpoint) Conn {
59+
func (p *pool) Create(endpoint endpoint.Endpoint) Conn {
60+
return newConn(endpoint, p.config)
61+
}
62+
63+
func (p *pool) Get(endpoint endpoint.Endpoint) Conn {
5564
p.mtx.Lock()
5665
defer p.mtx.Unlock()
5766

internal/db/database.go

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ func New(
7474
}
7575
defer cancel()
7676

77-
cc := pool.Get(ctx, endpoint.New(c.Endpoint(), endpoint.WithLocalDC(true)))
77+
cc := pool.Create(endpoint.New(c.Endpoint(), endpoint.WithLocalDC(true)))
7878

7979
db.discovery, err = builder.New(
8080
ctx,
@@ -115,11 +115,6 @@ func (db *database) Invoke(
115115
return errors.WithStackTrace(err)
116116
}
117117

118-
ctx, err = db.config.Meta().Meta(ctx)
119-
if err != nil {
120-
return errors.WithStackTrace(err)
121-
}
122-
123118
defer func() {
124119
if err != nil && errors.MustPessimizeEndpoint(err) {
125120
db.cluster.Pessimize(ctx, cc, err)
@@ -145,11 +140,6 @@ func (db *database) NewStream(
145140
return nil, errors.WithStackTrace(err)
146141
}
147142

148-
ctx, err = db.config.Meta().Meta(ctx)
149-
if err != nil {
150-
return nil, errors.WithStackTrace(err)
151-
}
152-
153143
defer func() {
154144
if err != nil && errors.MustPessimizeEndpoint(err) {
155145
db.cluster.Pessimize(ctx, cc, err)

internal/discovery/discovery.go

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -137,11 +137,6 @@ func (c *client) Discover(ctx context.Context) (endpoints []endpoint.Endpoint, e
137137
onDone(location, nodes, err)
138138
}()
139139

140-
ctx, err = c.config.Meta().Meta(ctx)
141-
if err != nil {
142-
return nil, errors.WithStackTrace(err)
143-
}
144-
145140
response, err = c.service.ListEndpoints(ctx, &request)
146141
if err != nil {
147142
return nil, errors.WithStackTrace(err)
@@ -185,11 +180,6 @@ func (c *client) WhoAmI(ctx context.Context) (whoAmI *discovery.WhoAmI, err erro
185180
}
186181
}()
187182

188-
ctx, err = c.config.Meta().Meta(ctx)
189-
if err != nil {
190-
return nil, errors.WithStackTrace(err)
191-
}
192-
193183
response, err = c.service.WhoAmI(ctx, &request)
194184
if err != nil {
195185
return nil, errors.WithStackTrace(err)

options.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -178,15 +178,17 @@ func WithAnonymousCredentials() Option {
178178

179179
func WithCreateCredentialsFunc(createCredentials func(ctx context.Context) (credentials.Credentials, error)) Option {
180180
return func(ctx context.Context, c *connection) error {
181-
credentials, err := createCredentials(ctx)
181+
creds, err := createCredentials(ctx)
182182
if err != nil {
183183
return errors.WithStackTrace(err)
184184
}
185-
c.options = append(c.options, config.WithCredentials(credentials))
185+
c.options = append(c.options, config.WithCredentials(creds))
186186
return nil
187187
}
188188
}
189189

190+
// WithCredentials in conjunction with Connection.With function prohibit reuse of conn pool.
191+
// Thus, Connection.With will effectively create totally separate Connection.
190192
func WithCredentials(c credentials.Credentials) Option {
191193
return WithCreateCredentialsFunc(func(context.Context) (credentials.Credentials, error) {
192194
return c, nil

0 commit comments

Comments
 (0)