Skip to content

Commit 3649e04

Browse files
committed
Reduce client stream map lock contention and trim connection validity hot paths
1 parent 929ca57 commit 3649e04

File tree

1 file changed

+19
-9
lines changed

1 file changed

+19
-9
lines changed

internal/client/client.go

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ type Client struct {
5858
exchangeQueryFn func(Connection, []byte, time.Duration) ([]byte, error)
5959
fragmentLimits sync.Map
6060
stream0Runtime *stream0Runtime
61-
streamsMu sync.Mutex
61+
streamsMu sync.RWMutex
6262
streams map[uint16]*clientStream
6363
streamTXWindow int
6464
streamTXQueueLimit int
@@ -350,7 +350,7 @@ func (c *Client) BuildConnectionMap() {
350350
}
351351

352352
func (c *Client) GetConnectionByKey(serverKey string) (Connection, bool) {
353-
idx, ok := c.connectionsByKey[strings.TrimSpace(serverKey)]
353+
idx, ok := c.connectionIndexByKey(serverKey)
354354
if !ok || idx < 0 || idx >= len(c.connections) {
355355
return Connection{}, false
356356
}
@@ -359,14 +359,16 @@ func (c *Client) GetConnectionByKey(serverKey string) (Connection, bool) {
359359

360360
func (c *Client) SetConnectionValidity(serverKey string, valid bool) bool {
361361
key := strings.TrimSpace(serverKey)
362-
idx, ok := c.connectionsByKey[key]
362+
idx, ok := c.connectionIndexByKey(key)
363363
if !ok || idx < 0 || idx >= len(c.connections) {
364364
return false
365365
}
366+
if c.connections[idx].IsValid == valid {
367+
return true
368+
}
366369
if !c.balancer.SetConnectionValidity(key, valid) {
367370
return false
368371
}
369-
c.connections[idx].IsValid = valid
370372
return true
371373
}
372374

@@ -410,8 +412,8 @@ func (c *Client) getStream(streamID uint16) (*clientStream, bool) {
410412
if c == nil || streamID == 0 {
411413
return nil, false
412414
}
413-
c.streamsMu.Lock()
414-
defer c.streamsMu.Unlock()
415+
c.streamsMu.RLock()
416+
defer c.streamsMu.RUnlock()
415417
stream, ok := c.streams[streamID]
416418
return stream, ok
417419
}
@@ -436,22 +438,30 @@ func (c *Client) activeStreamCount() int {
436438
if c == nil {
437439
return 0
438440
}
439-
c.streamsMu.Lock()
440-
defer c.streamsMu.Unlock()
441+
c.streamsMu.RLock()
442+
defer c.streamsMu.RUnlock()
441443
return len(c.streams)
442444
}
443445

444446
func (c *Client) connectionPtrByKey(serverKey string) *Connection {
445447
if c == nil {
446448
return nil
447449
}
448-
idx, ok := c.connectionsByKey[strings.TrimSpace(serverKey)]
450+
idx, ok := c.connectionIndexByKey(serverKey)
449451
if !ok || idx < 0 || idx >= len(c.connections) {
450452
return nil
451453
}
452454
return &c.connections[idx]
453455
}
454456

457+
func (c *Client) connectionIndexByKey(serverKey string) (int, bool) {
458+
if c == nil {
459+
return 0, false
460+
}
461+
idx, ok := c.connectionsByKey[strings.TrimSpace(serverKey)]
462+
return idx, ok
463+
}
464+
455465
func (c *Client) startResolverHealthRuntime(ctx context.Context) {
456466
if c == nil {
457467
return

0 commit comments

Comments
 (0)