Skip to content

Commit e3df416

Browse files
committed
lock cluster for update endpoints
1 parent 402b64e commit e3df416

File tree

6 files changed

+106
-28
lines changed

6 files changed

+106
-28
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
## 3.11.4
2+
* Refactored `internal/cluster.Cluster` (add option for notify about external lock, lock cluster for update cluster endpoints)
3+
14
## 3.11.3
25
* Replaced in `table/types/compare_test.go` checking error by error message to checking with `errors.Is()`
36

internal/cluster/cluster.go

Lines changed: 75 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,14 @@ type cluster struct {
5757
closed bool
5858
}
5959

60+
func (c *cluster) Lock() {
61+
c.mu.Lock()
62+
}
63+
64+
func (c *cluster) Unlock() {
65+
c.mu.Unlock()
66+
}
67+
6068
func (c *cluster) GetConn(endpoint endpoint.Endpoint) conn.Conn {
6169
return c.pool.GetConn(endpoint)
6270
}
@@ -69,11 +77,38 @@ func (c *cluster) SetExplorer(repeater repeater.Repeater) {
6977
c.explorer = repeater
7078
}
7179

80+
type crudOptionsHolder struct {
81+
locked bool
82+
}
83+
84+
type crudOption func(h *crudOptionsHolder)
85+
86+
func WithoutLock() crudOption {
87+
return func(h *crudOptionsHolder) {
88+
h.locked = true
89+
}
90+
}
91+
92+
func parseOptions(opts ...crudOption) *crudOptionsHolder {
93+
h := &crudOptionsHolder{}
94+
for _, o := range opts {
95+
o(h)
96+
}
97+
return h
98+
}
99+
72100
type CRUD interface {
73-
Insert(ctx context.Context, endpoint endpoint.Endpoint) conn.Conn
74-
Update(ctx context.Context, endpoint endpoint.Endpoint) conn.Conn
75-
Remove(ctx context.Context, endpoint endpoint.Endpoint) conn.Conn
76-
Get(ctx context.Context) (cc conn.Conn, err error)
101+
// Insert inserts endpoint to cluster
102+
Insert(ctx context.Context, endpoint endpoint.Endpoint, opts ...crudOption) conn.Conn
103+
104+
// Update updates endpoint in cluster
105+
Update(ctx context.Context, endpoint endpoint.Endpoint, opts ...crudOption) conn.Conn
106+
107+
// Remove removes endpoint from cluster
108+
Remove(ctx context.Context, endpoint endpoint.Endpoint, opts ...crudOption) conn.Conn
109+
110+
// Get gets conn from cluster
111+
Get(ctx context.Context, opts ...crudOption) (cc conn.Conn, err error)
77112
}
78113

79114
type Pessimizer interface {
@@ -85,16 +120,23 @@ type Explorer interface {
85120
Force()
86121
}
87122

88-
type CRUDExplorer interface {
123+
type Locker interface {
124+
Lock()
125+
Unlock()
126+
}
127+
128+
type CRUDExplorerLocker interface {
89129
CRUD
90130
Explorer
131+
Locker
91132
}
92133

93134
type Cluster interface {
94135
closer.Closer
95136
CRUD
96137
Pessimizer
97138
Explorer
139+
Locker
98140
conn.PoolGetter
99141
}
100142

@@ -151,7 +193,7 @@ func (c *cluster) Close(ctx context.Context) (err error) {
151193

152194
// Get returns next available connection.
153195
// It returns error on given deadline cancellation or when cluster become closed.
154-
func (c *cluster) Get(ctx context.Context) (cc conn.Conn, err error) {
196+
func (c *cluster) Get(ctx context.Context, opts ...crudOption) (cc conn.Conn, err error) {
155197
var cancel context.CancelFunc
156198
ctx, cancel = context.WithTimeout(ctx, MaxGetConnTimeout)
157199
defer cancel()
@@ -192,7 +234,7 @@ func (c *cluster) Get(ctx context.Context) (cc conn.Conn, err error) {
192234
}
193235

194236
// Insert inserts new connection into the cluster.
195-
func (c *cluster) Insert(ctx context.Context, e endpoint.Endpoint) (cc conn.Conn) {
237+
func (c *cluster) Insert(ctx context.Context, e endpoint.Endpoint, opts ...crudOption) (cc conn.Conn) {
196238
onDone := trace.DriverOnClusterInsert(c.config.Trace(), &ctx, e)
197239
defer func() {
198240
if cc != nil {
@@ -202,8 +244,11 @@ func (c *cluster) Insert(ctx context.Context, e endpoint.Endpoint) (cc conn.Conn
202244
}
203245
}()
204246

205-
c.mu.Lock()
206-
defer c.mu.Unlock()
247+
options := parseOptions(opts...)
248+
if !options.locked {
249+
c.mu.Lock()
250+
defer c.mu.Unlock()
251+
}
207252

208253
if c.closed {
209254
return nil
@@ -234,7 +279,7 @@ func (c *cluster) Insert(ctx context.Context, e endpoint.Endpoint) (cc conn.Conn
234279
}
235280

236281
// Update updates existing connection's runtime stats such that load factor and others.
237-
func (c *cluster) Update(ctx context.Context, e endpoint.Endpoint) (cc conn.Conn) {
282+
func (c *cluster) Update(ctx context.Context, e endpoint.Endpoint, opts ...crudOption) (cc conn.Conn) {
238283
onDone := trace.DriverOnClusterUpdate(c.config.Trace(), &ctx, e)
239284
defer func() {
240285
if cc != nil {
@@ -244,8 +289,12 @@ func (c *cluster) Update(ctx context.Context, e endpoint.Endpoint) (cc conn.Conn
244289
}
245290
}()
246291

247-
c.mu.Lock()
248-
defer c.mu.Unlock()
292+
options := parseOptions(opts...)
293+
if !options.locked {
294+
c.mu.Lock()
295+
defer c.mu.Unlock()
296+
}
297+
249298
if c.closed {
250299
return
251300
}
@@ -260,9 +309,11 @@ func (c *cluster) Update(ctx context.Context, e endpoint.Endpoint) (cc conn.Conn
260309

261310
delete(c.endpoints, e.NodeID())
262311
c.index[e.Address()] = entry
312+
263313
if e.NodeID() > 0 {
264314
c.endpoints[e.NodeID()] = entry.Conn
265315
}
316+
266317
if entry.Handle != nil {
267318
// entry.Handle may be nil when connection is being tracked.
268319
c.balancer.Update(entry.Handle, info.Info{})
@@ -272,7 +323,7 @@ func (c *cluster) Update(ctx context.Context, e endpoint.Endpoint) (cc conn.Conn
272323
}
273324

274325
// Remove removes and closes previously inserted connection.
275-
func (c *cluster) Remove(ctx context.Context, e endpoint.Endpoint) (cc conn.Conn) {
326+
func (c *cluster) Remove(ctx context.Context, e endpoint.Endpoint, opts ...crudOption) (cc conn.Conn) {
276327
onDone := trace.DriverOnClusterRemove(c.config.Trace(), &ctx, e)
277328
defer func() {
278329
if cc != nil {
@@ -282,9 +333,15 @@ func (c *cluster) Remove(ctx context.Context, e endpoint.Endpoint) (cc conn.Conn
282333
}
283334
}()
284335

285-
c.mu.Lock()
336+
options := parseOptions(opts...)
337+
if !options.locked {
338+
c.mu.Lock()
339+
}
340+
286341
if c.closed {
287-
c.mu.Unlock()
342+
if !options.locked {
343+
c.mu.Unlock()
344+
}
288345
return
289346
}
290347

@@ -298,7 +355,9 @@ func (c *cluster) Remove(ctx context.Context, e endpoint.Endpoint) (cc conn.Conn
298355
delete(c.index, e.Address())
299356
delete(c.endpoints, e.NodeID())
300357

301-
c.mu.Unlock()
358+
if !options.locked {
359+
c.mu.Unlock()
360+
}
302361

303362
if entry.Conn != nil {
304363
// entry.Conn may be nil when connection is being tracked after unsuccessful dial().

internal/conn/pool.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -68,19 +68,23 @@ func (p *pool) GetConn(e endpoint.Endpoint) Conn {
6868

6969
func (p *pool) Close(ctx context.Context) error {
7070
close(p.done)
71-
var issues []error
71+
7272
p.mtx.Lock()
7373
defer p.mtx.Unlock()
74+
75+
var issues []error
7476
for a, c := range p.conns {
7577
if err := c.Close(ctx); err != nil {
7678
issues = append(issues, err)
7779
}
7880
delete(p.conns, a)
7981
}
80-
if len(issues) == 0 {
81-
return nil
82+
83+
if len(issues) > 0 {
84+
return errors.NewWithIssues("connection pool close failed", issues...)
8285
}
83-
return errors.NewWithIssues("connection pool close failed", issues...)
86+
87+
return nil
8488
}
8589

8690
func (p *pool) connCloser(ctx context.Context, interval time.Duration) {

internal/discovery/discovery.go

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,16 +23,19 @@ import (
2323
func New(
2424
ctx context.Context,
2525
cc conn.Conn,
26-
crudExplorer cluster.CRUDExplorer,
26+
crudExplorer cluster.CRUDExplorerLocker,
2727
opts ...config.Option,
2828
) (_ discovery.Client, err error) {
2929
c := &client{
3030
config: config.New(opts...),
3131
service: Ydb_Discovery_V1.NewDiscoveryServiceClient(cc),
3232
}
3333

34+
crudExplorer.Lock()
35+
defer crudExplorer.Unlock()
36+
3437
if c.config.Interval() <= 0 {
35-
_ = crudExplorer.Insert(ctx, cc.Endpoint())
38+
_ = crudExplorer.Insert(ctx, cc.Endpoint(), cluster.WithoutLock())
3639
return c, nil
3740
}
3841

@@ -49,6 +52,7 @@ func New(
4952
crudExplorer.Insert(
5053
ctx,
5154
e,
55+
cluster.WithoutLock(),
5256
)
5357
}
5458

@@ -70,25 +74,31 @@ func New(
7074
// NOTE: curr endpoints must be sorted here.
7175
cluster.SortEndpoints(next)
7276

77+
crudExplorer.Lock()
78+
defer crudExplorer.Unlock()
79+
7380
cluster.DiffEndpoints(curr, next,
7481
func(i, j int) {
7582
// Endpoints are equal, but we still need to update meta
7683
// data such that load factor and others.
7784
crudExplorer.Update(
7885
ctx,
7986
next[j],
87+
cluster.WithoutLock(),
8088
)
8189
},
8290
func(i, j int) {
8391
crudExplorer.Insert(
8492
ctx,
8593
next[j],
94+
cluster.WithoutLock(),
8695
)
8796
},
8897
func(i, j int) {
8998
crudExplorer.Remove(
9099
ctx,
91100
curr[i],
101+
cluster.WithoutLock(),
92102
)
93103
},
94104
)

internal/meta/version.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
package meta
22

33
const (
4-
Version = "ydb-go-sdk/3.11.3"
4+
Version = "ydb-go-sdk/3.11.4"
55
)

test/table_test.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,13 @@ const (
3838
folder = "table_test"
3939
)
4040

41+
func init() {
42+
log.SetFlags(0)
43+
if os.Getenv("HIDE_APPLICATION_OUTPUT") == "1" {
44+
log.SetOutput(ioutil.Discard)
45+
}
46+
}
47+
4148
type stats struct {
4249
sync.Mutex
4350

@@ -1018,11 +1025,6 @@ func days(date string) time.Time {
10181025
return t
10191026
}
10201027

1021-
func init() {
1022-
log.SetFlags(0)
1023-
log.SetOutput(ioutil.Discard)
1024-
}
1025-
10261028
type templateConfig struct {
10271029
TablePathPrefix string
10281030
}

0 commit comments

Comments
 (0)