@@ -57,6 +57,14 @@ type cluster struct {
5757 closed bool
5858}
5959
60+ func (c * cluster ) Lock () {
61+ c .mu .Lock ()
62+ }
63+
64+ func (c * cluster ) Unlock () {
65+ c .mu .Unlock ()
66+ }
67+
6068func (c * cluster ) GetConn (endpoint endpoint.Endpoint ) conn.Conn {
6169 return c .pool .GetConn (endpoint )
6270}
@@ -69,11 +77,38 @@ func (c *cluster) SetExplorer(repeater repeater.Repeater) {
6977 c .explorer = repeater
7078}
7179
80+ type crudOptionsHolder struct {
81+ locked bool
82+ }
83+
84+ type crudOption func (h * crudOptionsHolder )
85+
86+ func WithoutLock () crudOption {
87+ return func (h * crudOptionsHolder ) {
88+ h .locked = true
89+ }
90+ }
91+
92+ func parseOptions (opts ... crudOption ) * crudOptionsHolder {
93+ h := & crudOptionsHolder {}
94+ for _ , o := range opts {
95+ o (h )
96+ }
97+ return h
98+ }
99+
72100type CRUD interface {
73- Insert (ctx context.Context , endpoint endpoint.Endpoint ) conn.Conn
74- Update (ctx context.Context , endpoint endpoint.Endpoint ) conn.Conn
75- Remove (ctx context.Context , endpoint endpoint.Endpoint ) conn.Conn
76- Get (ctx context.Context ) (cc conn.Conn , err error )
101+ // Insert inserts endpoint to cluster
102+ Insert (ctx context.Context , endpoint endpoint.Endpoint , opts ... crudOption ) conn.Conn
103+
104+ // Update updates endpoint in cluster
105+ Update (ctx context.Context , endpoint endpoint.Endpoint , opts ... crudOption ) conn.Conn
106+
107+ // Remove removes endpoint from cluster
108+ Remove (ctx context.Context , endpoint endpoint.Endpoint , opts ... crudOption ) conn.Conn
109+
110+ // Get gets conn from cluster
111+ Get (ctx context.Context , opts ... crudOption ) (cc conn.Conn , err error )
77112}
78113
79114type Pessimizer interface {
@@ -85,16 +120,23 @@ type Explorer interface {
85120 Force ()
86121}
87122
88- type CRUDExplorer interface {
123+ type Locker interface {
124+ Lock ()
125+ Unlock ()
126+ }
127+
128+ type CRUDExplorerLocker interface {
89129 CRUD
90130 Explorer
131+ Locker
91132}
92133
93134type Cluster interface {
94135 closer.Closer
95136 CRUD
96137 Pessimizer
97138 Explorer
139+ Locker
98140 conn.PoolGetter
99141}
100142
@@ -151,7 +193,7 @@ func (c *cluster) Close(ctx context.Context) (err error) {
151193
152194// Get returns next available connection.
153195// It returns error on given deadline cancellation or when cluster become closed.
154- func (c * cluster ) Get (ctx context.Context ) (cc conn.Conn , err error ) {
196+ func (c * cluster ) Get (ctx context.Context , opts ... crudOption ) (cc conn.Conn , err error ) {
155197 var cancel context.CancelFunc
156198 ctx , cancel = context .WithTimeout (ctx , MaxGetConnTimeout )
157199 defer cancel ()
@@ -192,7 +234,7 @@ func (c *cluster) Get(ctx context.Context) (cc conn.Conn, err error) {
192234}
193235
194236// Insert inserts new connection into the cluster.
195- func (c * cluster ) Insert (ctx context.Context , e endpoint.Endpoint ) (cc conn.Conn ) {
237+ func (c * cluster ) Insert (ctx context.Context , e endpoint.Endpoint , opts ... crudOption ) (cc conn.Conn ) {
196238 onDone := trace .DriverOnClusterInsert (c .config .Trace (), & ctx , e )
197239 defer func () {
198240 if cc != nil {
@@ -202,8 +244,11 @@ func (c *cluster) Insert(ctx context.Context, e endpoint.Endpoint) (cc conn.Conn
202244 }
203245 }()
204246
205- c .mu .Lock ()
206- defer c .mu .Unlock ()
247+ options := parseOptions (opts ... )
248+ if ! options .locked {
249+ c .mu .Lock ()
250+ defer c .mu .Unlock ()
251+ }
207252
208253 if c .closed {
209254 return nil
@@ -234,7 +279,7 @@ func (c *cluster) Insert(ctx context.Context, e endpoint.Endpoint) (cc conn.Conn
234279}
235280
236281// Update updates existing connection's runtime stats such that load factor and others.
237- func (c * cluster ) Update (ctx context.Context , e endpoint.Endpoint ) (cc conn.Conn ) {
282+ func (c * cluster ) Update (ctx context.Context , e endpoint.Endpoint , opts ... crudOption ) (cc conn.Conn ) {
238283 onDone := trace .DriverOnClusterUpdate (c .config .Trace (), & ctx , e )
239284 defer func () {
240285 if cc != nil {
@@ -244,8 +289,12 @@ func (c *cluster) Update(ctx context.Context, e endpoint.Endpoint) (cc conn.Conn
244289 }
245290 }()
246291
247- c .mu .Lock ()
248- defer c .mu .Unlock ()
292+ options := parseOptions (opts ... )
293+ if ! options .locked {
294+ c .mu .Lock ()
295+ defer c .mu .Unlock ()
296+ }
297+
249298 if c .closed {
250299 return
251300 }
@@ -260,9 +309,11 @@ func (c *cluster) Update(ctx context.Context, e endpoint.Endpoint) (cc conn.Conn
260309
261310 delete (c .endpoints , e .NodeID ())
262311 c .index [e .Address ()] = entry
312+
263313 if e .NodeID () > 0 {
264314 c .endpoints [e .NodeID ()] = entry .Conn
265315 }
316+
266317 if entry .Handle != nil {
267318 // entry.Handle may be nil when connection is being tracked.
268319 c .balancer .Update (entry .Handle , info.Info {})
@@ -272,7 +323,7 @@ func (c *cluster) Update(ctx context.Context, e endpoint.Endpoint) (cc conn.Conn
272323}
273324
274325// Remove removes and closes previously inserted connection.
275- func (c * cluster ) Remove (ctx context.Context , e endpoint.Endpoint ) (cc conn.Conn ) {
326+ func (c * cluster ) Remove (ctx context.Context , e endpoint.Endpoint , opts ... crudOption ) (cc conn.Conn ) {
276327 onDone := trace .DriverOnClusterRemove (c .config .Trace (), & ctx , e )
277328 defer func () {
278329 if cc != nil {
@@ -282,9 +333,15 @@ func (c *cluster) Remove(ctx context.Context, e endpoint.Endpoint) (cc conn.Conn
282333 }
283334 }()
284335
285- c .mu .Lock ()
336+ options := parseOptions (opts ... )
337+ if ! options .locked {
338+ c .mu .Lock ()
339+ }
340+
286341 if c .closed {
287- c .mu .Unlock ()
342+ if ! options .locked {
343+ c .mu .Unlock ()
344+ }
288345 return
289346 }
290347
@@ -298,7 +355,9 @@ func (c *cluster) Remove(ctx context.Context, e endpoint.Endpoint) (cc conn.Conn
298355 delete (c .index , e .Address ())
299356 delete (c .endpoints , e .NodeID ())
300357
301- c .mu .Unlock ()
358+ if ! options .locked {
359+ c .mu .Unlock ()
360+ }
302361
303362 if entry .Conn != nil {
304363 // entry.Conn may be nil when connection is being tracked after unsuccessful dial().
0 commit comments