Skip to content

Commit 185edfb

Browse files
authored
Merge pull request #1866 from ydb-platform/reuse-node-ip-fix
Fixed grpc connections leak on reused node IP
2 parents 48b9ce0 + 6619356 commit 185edfb

File tree

4 files changed

+37
-14
lines changed

4 files changed

+37
-14
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
* Fixed grpc connections leak on reused node IP
2+
13
## v3.116.1
24
* Fixed an issue where `topic.UpdateOffsetsInTransaction` was executed on a node different from the one where the transaction was running, which could lead to the error "Database coordinators are unavailable"
35

internal/conn/pool.go

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ type Pool struct {
2424
usages int64
2525
config Config
2626
dialOptions []grpc.DialOption
27-
conns xsync.Map[string, *conn]
27+
conns xsync.Map[endpoint.Key, *conn]
2828
done chan struct{}
2929
}
3030

@@ -42,12 +42,11 @@ func (p *Pool) GrpcDialOptions() []grpc.DialOption {
4242

4343
func (p *Pool) Get(endpoint endpoint.Endpoint) Conn {
4444
var (
45-
address = endpoint.Address()
46-
cc *conn
47-
has bool
45+
cc *conn
46+
has bool
4847
)
4948

50-
cc, has = p.conns.Get(address)
49+
cc, has = p.conns.Get(endpoint.Key())
5150
if has && cc.Endpoint().NodeID() == endpoint.NodeID() && cc.Endpoint().OverrideHost() == endpoint.OverrideHost() {
5251
return cc
5352
}
@@ -57,13 +56,13 @@ func (p *Pool) Get(endpoint endpoint.Endpoint) Conn {
5756
withOnTransportError(p.Ban),
5857
)
5958

60-
p.conns.Set(address, cc)
59+
p.conns.Set(endpoint.Key(), cc)
6160

6261
return cc
6362
}
6463

6564
func (p *Pool) remove(c *conn) {
66-
p.conns.Delete(c.Address())
65+
p.conns.Delete(c.endpoint.Key())
6766
}
6867

6968
func (p *Pool) isClosed() bool {
@@ -104,7 +103,7 @@ func (p *Pool) Ban(ctx context.Context, cc Conn, cause error) {
104103

105104
e := cc.Endpoint().Copy()
106105

107-
cc, ok := p.conns.Get(e.Address())
106+
cc, ok := p.conns.Get(e.Key())
108107
if !ok {
109108
return
110109
}
@@ -123,7 +122,7 @@ func (p *Pool) Allow(ctx context.Context, cc Conn) {
123122

124123
e := cc.Endpoint().Copy()
125124

126-
cc, ok := p.conns.Get(e.Address())
125+
cc, ok := p.conns.Get(e.Key())
127126
if !ok {
128127
return
129128
}
@@ -161,7 +160,7 @@ func (p *Pool) Release(ctx context.Context) (finalErr error) {
161160
)
162161

163162
wg.Add(cap(errCh))
164-
p.conns.Range(func(_ string, c *conn) bool {
163+
p.conns.Range(func(_ endpoint.Key, c *conn) bool {
165164
go func(c closer.Closer) {
166165
defer wg.Done()
167166
if err := c.Close(ctx); err != nil {
@@ -194,7 +193,7 @@ func (p *Pool) connParker(ctx context.Context, ttl, interval time.Duration) {
194193
case <-p.done:
195194
return
196195
case <-ticker.C:
197-
p.conns.Range(func(_ string, c *conn) bool {
196+
p.conns.Range(func(_ endpoint.Key, c *conn) bool {
198197
if time.Since(c.LastUsage()) > ttl {
199198
switch c.GetState() {
200199
case Online, Banned:
@@ -232,10 +231,10 @@ func NewPool(ctx context.Context, config Config) *Pool {
232231

233232
return func(info trace.DriverResolveDoneInfo) {
234233
if info.Error != nil || len(resolved) == 0 {
235-
p.conns.Range(func(address string, cc *conn) bool {
236-
if u, err := url.Parse(address); err == nil && u.Host == target && cc.grpcConn != nil {
234+
p.conns.Range(func(key endpoint.Key, cc *conn) bool {
235+
if u, err := url.Parse(key.Address); err == nil && u.Host == target && cc.grpcConn != nil {
237236
_ = cc.grpcConn.Close()
238-
_ = p.conns.Delete(address)
237+
_ = p.conns.Delete(key)
239238
}
240239

241240
return true

internal/endpoint/endpoint.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,18 @@ type (
2424
// Read about versioning policy: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#deprecated
2525
LocalDC() bool
2626
}
27+
Key struct {
28+
Address string
29+
NodeID uint32
30+
HostOverride string
31+
}
2732
Endpoint interface {
2833
Info
2934

3035
String() string
3136
Copy() Endpoint
3237
Touch(opts ...Option)
38+
Key() Key
3339
}
3440
)
3541

@@ -49,6 +55,14 @@ type endpoint struct {
4955
local bool
5056
}
5157

58+
func (e *endpoint) Key() Key {
59+
return Key{
60+
Address: e.address,
61+
NodeID: e.id,
62+
HostOverride: e.sslNameOverride,
63+
}
64+
}
65+
5266
func (e *endpoint) Copy() Endpoint {
5367
e.mu.RLock()
5468
defer e.mu.RUnlock()

internal/mock/conn.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,14 @@ type Endpoint struct {
8585
OverrideHostField string
8686
}
8787

88+
func (e *Endpoint) Key() endpoint.Key {
89+
return endpoint.Key{
90+
Address: e.AddrField,
91+
NodeID: e.NodeIDField,
92+
HostOverride: e.OverrideHostField,
93+
}
94+
}
95+
8896
func (e *Endpoint) Choose(bool) {
8997
}
9098

0 commit comments

Comments
 (0)