@@ -79,19 +79,21 @@ func (c *cluster) SetExplorer(repeater repeater.Repeater) {
7979}
8080
8181type crudOptionsHolder struct {
82- locked bool
82+ withLock bool
8383}
8484
8585type crudOption func (h * crudOptionsHolder )
8686
8787func WithoutLock () crudOption {
8888 return func (h * crudOptionsHolder ) {
89- h .locked = true
89+ h .withLock = false
9090 }
9191}
9292
9393func parseOptions (opts ... crudOption ) * crudOptionsHolder {
94- h := & crudOptionsHolder {}
94+ h := & crudOptionsHolder {
95+ withLock : true ,
96+ }
9597 for _ , o := range opts {
9698 o (h )
9799 }
@@ -112,10 +114,6 @@ type CRUD interface {
112114 Get (ctx context.Context , opts ... crudOption ) (cc conn.Conn , err error )
113115}
114116
115- type Pessimizer interface {
116- Pessimize (ctx context.Context , endpoint endpoint.Endpoint ) error
117- }
118-
119117type Explorer interface {
120118 SetExplorer (repeater repeater.Repeater )
121119 Force ()
@@ -135,7 +133,6 @@ type CRUDExplorerLocker interface {
135133type Cluster interface {
136134 closer.Closer
137135 CRUD
138- Pessimizer
139136 Explorer
140137 Locker
141138 conn.PoolGetter
@@ -151,13 +148,57 @@ func New(
151148 onDone ()
152149 }()
153150
154- return & cluster {
151+ c := & cluster {
155152 config : config ,
156- pool : conn .NewPool (ctx , config ),
157153 index : make (map [string ]entry.Entry ),
158154 endpoints : make (map [uint32 ]conn.Conn ),
159155 balancer : balancer ,
160156 }
157+
158+ c .pool = conn .NewPool (
159+ ctx ,
160+ config ,
161+ func (e endpoint.Endpoint ) {
162+ c .mu .RLock ()
163+ defer c .mu .RUnlock ()
164+
165+ if c .closed {
166+ return
167+ }
168+
169+ entry , has := c .index [e .Address ()]
170+ if ! has {
171+ return
172+ }
173+
174+ if entry .Handle == nil {
175+ return
176+ }
177+
178+ if ! c .balancer .Contains (entry .Handle ) {
179+ return
180+ }
181+
182+ if c .explorer == nil {
183+ return
184+ }
185+
186+ // count ratio (banned/all)
187+ online := 0
188+ for _ , entry = range c .index {
189+ if entry .Conn != nil && entry .Conn .GetState () == conn .Online {
190+ online ++
191+ }
192+ }
193+
194+ // more than half connections banned - re-discover now
195+ if online * 2 < len (c .index ) {
196+ c .explorer .Force ()
197+ }
198+ },
199+ )
200+
201+ return c
161202}
162203
163204func (c * cluster ) Close (ctx context.Context ) (err error ) {
@@ -245,7 +286,7 @@ func (c *cluster) Insert(ctx context.Context, e endpoint.Endpoint, opts ...crudO
245286 }()
246287
247288 options := parseOptions (opts ... )
248- if ! options .locked {
289+ if options .withLock {
249290 c .mu .Lock ()
250291 defer c .mu .Unlock ()
251292 }
@@ -288,7 +329,7 @@ func (c *cluster) Update(ctx context.Context, e endpoint.Endpoint, opts ...crudO
288329 }()
289330
290331 options := parseOptions (opts ... )
291- if ! options .locked {
332+ if options .withLock {
292333 c .mu .Lock ()
293334 defer c .mu .Unlock ()
294335 }
@@ -333,20 +374,17 @@ func (c *cluster) Remove(ctx context.Context, e endpoint.Endpoint, opts ...crudO
333374 }()
334375
335376 options := parseOptions (opts ... )
336- if ! options .locked {
377+ if options .withLock {
337378 c .mu .Lock ()
379+ defer c .mu .Unlock ()
338380 }
339381
340382 if c .closed {
341- if ! options .locked {
342- c .mu .Unlock ()
343- }
344383 return
345384 }
346385
347386 entry , has := c .index [e .Address ()]
348387 if ! has {
349- c .mu .Unlock ()
350388 panic ("ydb: can't remove not-existing endpoint" )
351389 }
352390
@@ -355,10 +393,6 @@ func (c *cluster) Remove(ctx context.Context, e endpoint.Endpoint, opts ...crudO
355393 delete (c .index , e .Address ())
356394 delete (c .endpoints , e .NodeID ())
357395
358- if ! options .locked {
359- c .mu .Unlock ()
360- }
361-
362396 if entry .Conn != nil {
363397 // entry.Conn may be nil when connection is being tracked after unsuccessful dial().
364398 _ = entry .Conn .Close (ctx )
@@ -367,40 +401,6 @@ func (c *cluster) Remove(ctx context.Context, e endpoint.Endpoint, opts ...crudO
367401 return entry .Conn
368402}
369403
370- func (c * cluster ) Pessimize (ctx context.Context , e endpoint.Endpoint ) (err error ) {
371- c .mu .RLock ()
372- defer c .mu .RUnlock ()
373- if c .closed {
374- return errors .Errorf (0 , "cluster: pessimize failed: %w" , ErrClusterClosed )
375- }
376-
377- entry , has := c .index [e .Address ()]
378- if ! has {
379- return errors .Errorf (0 , "cluster: pessimize failed: %w" , ErrUnknownEndpoint )
380- }
381- if entry .Handle == nil {
382- return errors .Errorf (0 , "cluster: pessimize failed: %w" , ErrNilBalancerElement )
383- }
384- if ! c .balancer .Contains (entry .Handle ) {
385- return errors .Errorf (0 , "cluster: pessimize failed: %w" , ErrUnknownBalancerElement )
386- }
387- entry .Conn .SetState (conn .Banned )
388- if c .explorer != nil {
389- // count ratio (banned/all)
390- online := 0
391- for _ , entry = range c .index {
392- if entry .Conn != nil && entry .Conn .GetState () == conn .Online {
393- online ++
394- }
395- }
396- // more than half connections banned - re-discover now
397- if online * 2 < len (c .index ) {
398- c .explorer .Force ()
399- }
400- }
401- return err
402- }
403-
404404func compareEndpoints (a , b endpoint.Endpoint ) int {
405405 return strings .Compare (a .Address (), b .Address ())
406406}
0 commit comments