Skip to content

Commit 9fc1a5e

Browse files
committed
Revert commit of conns parking
1 parent 229d834 commit 9fc1a5e

File tree

8 files changed

+162
-4
lines changed

8 files changed

+162
-4
lines changed

CHANGELOG.md

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
* Added query pool metrics
44
* Fixed logic of query session pool
55
* Changed initialization of internal driver clients to lazy
6-
* Disabled the logic of background grpc-connection parking
76
* Removed `ydb.WithSessionPoolSizeLimit()` option
87
* Added async put session into pool if external context is done
98
* Dropped intermediate callbacks from `trace.{Table,Retry,Query}` events

config/config.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ type Config struct {
2121

2222
trace *trace.Driver
2323
dialTimeout time.Duration
24+
connectionTTL time.Duration
2425
balancerConfig *balancerConfig.Config
2526
secure bool
2627
endpoint string
@@ -56,6 +57,13 @@ func (c *Config) Meta() *meta.Meta {
5657
return c.meta
5758
}
5859

60+
// ConnectionTTL defines interval for parking grpc connections.
61+
//
62+
// If ConnectionTTL is zero - connections are not park.
63+
func (c *Config) ConnectionTTL() time.Duration {
64+
return c.connectionTTL
65+
}
66+
5967
// Secure is a flag for secure connection
6068
func (c *Config) Secure() bool {
6169
return c.secure
@@ -169,6 +177,12 @@ func WithUserAgent(userAgent string) Option {
169177
}
170178
}
171179

180+
func WithConnectionTTL(ttl time.Duration) Option {
181+
return func(c *Config) {
182+
c.connectionTTL = ttl
183+
}
184+
}
185+
172186
func WithCredentials(credentials credentials.Credentials) Option {
173187
return func(c *Config) {
174188
c.credentials = credentials

internal/conn/config.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010

1111
type Config interface {
1212
DialTimeout() time.Duration
13+
ConnectionTTL() time.Duration
1314
Trace() *trace.Driver
1415
GrpcDialOptions() []grpc.DialOption
1516
}

internal/conn/conn.go

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,36 @@ func (c *conn) NodeID() uint32 {
102102
return 0
103103
}
104104

105+
func (c *conn) park(ctx context.Context) (err error) {
106+
onDone := trace.DriverOnConnPark(
107+
c.config.Trace(), &ctx,
108+
stack.FunctionID(""),
109+
c.Endpoint(),
110+
)
111+
defer func() {
112+
onDone(err)
113+
}()
114+
115+
c.mtx.Lock()
116+
defer c.mtx.Unlock()
117+
118+
if c.closed {
119+
return nil
120+
}
121+
122+
if c.cc == nil {
123+
return nil
124+
}
125+
126+
err = c.close(ctx)
127+
128+
if err != nil {
129+
return c.wrapError(err)
130+
}
131+
132+
return nil
133+
}
134+
105135
func (c *conn) Endpoint() endpoint.Endpoint {
106136
if c != nil {
107137
return c.endpoint

internal/conn/pool.go

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,10 @@ package conn
22

33
import (
44
"context"
5+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xcontext"
56
"sync"
67
"sync/atomic"
8+
"time"
79

810
"google.golang.org/grpc"
911
grpcCodes "google.golang.org/grpc/codes"
@@ -194,6 +196,39 @@ func (p *Pool) Release(ctx context.Context) (finalErr error) {
194196
return nil
195197
}
196198

199+
func (p *Pool) connParker(ctx context.Context, ttl, interval time.Duration) {
200+
ticker := time.NewTicker(interval)
201+
defer ticker.Stop()
202+
for {
203+
select {
204+
case <-p.done:
205+
return
206+
case <-ticker.C:
207+
for _, c := range p.collectConns() {
208+
if time.Since(c.LastUsage()) > ttl {
209+
switch c.GetState() {
210+
case Online, Banned:
211+
_ = c.park(ctx)
212+
default:
213+
// nop
214+
}
215+
}
216+
}
217+
}
218+
}
219+
}
220+
221+
func (p *Pool) collectConns() []*conn {
222+
p.mtx.RLock()
223+
defer p.mtx.RUnlock()
224+
conns := make([]*conn, 0, len(p.conns))
225+
for _, c := range p.conns {
226+
conns = append(conns, c)
227+
}
228+
229+
return conns
230+
}
231+
197232
func NewPool(ctx context.Context, config Config) *Pool {
198233
onDone := trace.DriverOnPoolNew(config.Trace(), &ctx, stack.FunctionID(""))
199234
defer onDone()
@@ -206,5 +241,9 @@ func NewPool(ctx context.Context, config Config) *Pool {
206241
done: make(chan struct{}),
207242
}
208243

244+
if ttl := config.ConnectionTTL(); ttl > 0 {
245+
go p.connParker(xcontext.ValueOnly(ctx), ttl, ttl/2)
246+
}
247+
209248
return p
210249
}

options.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -108,10 +108,10 @@ func WithConnectionString(connectionString string) Option {
108108
}
109109

110110
// WithConnectionTTL defines duration for parking idle connections
111-
//
112-
// Deprecated: background connection parking not available
113-
func WithConnectionTTL(time.Duration) Option {
111+
func WithConnectionTTL(ttl time.Duration) Option {
114112
return func(ctx context.Context, c *Driver) error {
113+
c.options = append(c.options, config.WithConnectionTTL(ttl))
114+
115115
return nil
116116
}
117117
}

trace/driver.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ type (
3737
OnConnDial func(DriverConnDialStartInfo) func(DriverConnDialDoneInfo)
3838
OnConnBan func(DriverConnBanStartInfo) func(DriverConnBanDoneInfo)
3939
OnConnAllow func(DriverConnAllowStartInfo) func(DriverConnAllowDoneInfo)
40+
OnConnPark func(DriverConnParkStartInfo) func(DriverConnParkDoneInfo)
4041
OnConnClose func(DriverConnCloseStartInfo) func(DriverConnCloseDoneInfo)
4142

4243
// Repeater events
@@ -225,6 +226,18 @@ type (
225226
DriverConnDialDoneInfo struct {
226227
Error error
227228
}
229+
DriverConnParkStartInfo struct {
230+
// Context make available context in trace callback function.
231+
// Pointer to context provide replacement of context in trace callback function.
232+
// Warning: concurrent access to pointer on client side must be excluded.
233+
// Safe replacement of context are provided only inside callback function
234+
Context *context.Context
235+
Call call
236+
Endpoint EndpointInfo
237+
}
238+
DriverConnParkDoneInfo struct {
239+
Error error
240+
}
228241
DriverConnCloseStartInfo struct {
229242
// Context make available context in trace callback function.
230243
// Pointer to context provide replacement of context in trace callback function.

trace/driver_gtrace.go

Lines changed: 62 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)