@@ -2,6 +2,7 @@ package cluster
22
33import (
44 "context"
5+ "fmt"
56 "sort"
67 "strings"
78 "sync"
@@ -26,29 +27,20 @@ const (
2627
2728var (
2829 // ErrClusterClosed returned when requested on a closed cluster.
29- ErrClusterClosed = errors . New ("cluster closed" )
30+ ErrClusterClosed = fmt . Errorf ("cluster closed" )
3031
3132 // ErrClusterEmpty returned when no connections left in cluster.
32- ErrClusterEmpty = errors .New ("cluster empty" )
33-
34- // ErrUnknownEndpoint returned when no connections left in cluster.
35- ErrUnknownEndpoint = errors .New ("unknown endpoint" )
36-
37- // ErrNilBalancerElement returned when requested on a nil Balancer element.
38- ErrNilBalancerElement = errors .New ("nil balancer element" )
39-
40- // ErrUnknownBalancerElement returned when requested on a unknown Balancer element.
41- ErrUnknownBalancerElement = errors .New ("unknown balancer element" )
42-
43- // ErrUnknownTypeOfBalancerElement returned when requested on a unknown types of Balancer element.
44- ErrUnknownTypeOfBalancerElement = errors .New ("unknown types of balancer element" )
33+ ErrClusterEmpty = fmt .Errorf ("cluster empty" )
4534)
4635
4736type cluster struct {
48- config config.Config
49- pool conn.Pool
50- dial func (context.Context , string ) (* grpc.ClientConn , error )
51- balancer balancer.Balancer
37+ config config.Config
38+ pool conn.Pool
39+ dial func (context.Context , string ) (* grpc.ClientConn , error )
40+
41+ balancerMtx sync.RWMutex
42+ balancer balancer.Balancer
43+
5244 explorer repeater.Repeater
5345
5446 index map [string ]entry.Entry
@@ -77,6 +69,9 @@ func (c *cluster) Pessimize(ctx context.Context, cc conn.Conn, cause error) {
7769 return
7870 }
7971
72+ c .balancerMtx .Lock ()
73+ defer c .balancerMtx .Unlock ()
74+
8075 if ! c .balancer .Contains (entry .Handle ) {
8176 return
8277 }
@@ -235,7 +230,7 @@ func (c *cluster) Get(ctx context.Context, opts ...crudOption) (cc conn.Conn, er
235230 defer c .mu .RUnlock ()
236231
237232 if c .closed {
238- return nil , ErrClusterClosed
233+ return nil , errors . Error ( ErrClusterClosed )
239234 }
240235
241236 onDone := trace .DriverOnClusterGet (c .config .Trace (), & ctx )
@@ -258,9 +253,12 @@ func (c *cluster) Get(ctx context.Context, opts ...crudOption) (cc conn.Conn, er
258253 }
259254 }
260255
256+ c .balancerMtx .RLock ()
257+ defer c .balancerMtx .RUnlock ()
258+
261259 cc = c .balancer .Next ()
262260 if cc == nil {
263- return nil , ErrClusterEmpty
261+ return nil , errors . Error ( ErrClusterEmpty )
264262 }
265263
266264 return cc , nil
@@ -297,7 +295,7 @@ func (c *cluster) Insert(ctx context.Context, e endpoint.Endpoint, opts ...crudO
297295
298296 entry := entry.Entry {Conn : cc }
299297
300- inserted = entry .InsertInto (c .balancer )
298+ inserted = entry .InsertInto (c .balancer , & c . balancerMtx )
301299
302300 c .index [e .Address ()] = entry
303301
@@ -346,10 +344,10 @@ func (c *cluster) Update(ctx context.Context, e endpoint.Endpoint, opts ...crudO
346344 c .endpoints [e .NodeID ()] = entry .Conn
347345 }
348346
349- if entry . Handle != nil {
350- // entry.Handle may be nil when connection is being tracked.
351- c . balancer . Update ( entry . Handle , e . Info ())
352- }
347+ c . balancerMtx . Lock ()
348+ defer c . balancerMtx . Unlock ()
349+
350+ c . balancer . Update ( entry . Handle , e . Info ())
353351
354352 return entry .Conn
355353}
@@ -379,16 +377,11 @@ func (c *cluster) Remove(ctx context.Context, e endpoint.Endpoint, opts ...crudO
379377 panic ("ydb: can't remove not-existing endpoint" )
380378 }
381379
382- removed = entry .RemoveFrom (c .balancer )
380+ removed = entry .RemoveFrom (c .balancer , & c . balancerMtx )
383381
384382 delete (c .index , e .Address ())
385383 delete (c .endpoints , e .NodeID ())
386384
387- if entry .Conn != nil {
388- // entry.Conn may be nil when connection is being tracked after unsuccessful dial().
389- _ = entry .Conn .Close (ctx )
390- }
391-
392385 return entry .Conn
393386}
394387
0 commit comments