Skip to content

Commit 764d62b

Browse files
committed
* Implemented goroutine for closing idle connection in database/sql driver
1 parent 8286726 commit 764d62b

File tree

5 files changed

+78
-7
lines changed

5 files changed

+78
-7
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
* Fixed closing of `database/sql` connection (aka `YDB` session)
22
* Made `session.Close()` as `nop` for idled session
3+
* Implemented goroutine for closing idle connection in `database/sql` driver
34

45
## v3.42.4
56
* Added `ydb.WithDisableServerBalancer()` database/sql connector option

internal/xsql/conn.go

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@ func newConn(c *Connector, s table.ClosableSession, opts ...connOption) *conn {
9898
for _, o := range opts {
9999
o(cc)
100100
}
101+
c.attach(cc)
101102
return cc
102103
}
103104

@@ -124,6 +125,10 @@ func (c *conn) PrepareContext(ctx context.Context, query string) (_ driver.Stmt,
124125
}, nil
125126
}
126127

128+
func (c *conn) sinceLastUsage() time.Duration {
129+
return time.Since(time.Unix(atomic.LoadInt64(&c.lastUsage), 0))
130+
}
131+
127132
func (c *conn) execContext(ctx context.Context, query string, args []driver.NamedValue) (_ driver.Result, err error) {
128133
m := queryModeFromContext(ctx, c.defaultQueryMode)
129134
onDone := trace.DatabaseSQLOnConnExec(
@@ -132,7 +137,7 @@ func (c *conn) execContext(ctx context.Context, query string, args []driver.Name
132137
query,
133138
m.String(),
134139
xcontext.IsIdempotent(ctx),
135-
time.Since(time.Unix(atomic.LoadInt64(&c.lastUsage), 0)),
140+
c.sinceLastUsage(),
136141
)
137142
defer func() {
138143
atomic.StoreInt64(&c.lastUsage, time.Now().Unix())
@@ -215,7 +220,7 @@ func (c *conn) queryContext(ctx context.Context, query string, args []driver.Nam
215220
query,
216221
m.String(),
217222
xcontext.IsIdempotent(ctx),
218-
time.Since(time.Unix(atomic.LoadInt64(&c.lastUsage), 0)),
223+
c.sinceLastUsage(),
219224
)
220225
defer func() {
221226
atomic.StoreInt64(&c.lastUsage, time.Now().Unix())
@@ -303,6 +308,7 @@ func (c *conn) Ping(ctx context.Context) (err error) {
303308

304309
func (c *conn) Close() (err error) {
305310
if atomic.CompareAndSwapUint32(&c.closed, 0, 1) {
311+
c.connector.detach(c)
306312
onDone := trace.DatabaseSQLOnConnClose(c.trace)
307313
defer func() {
308314
onDone(err)
@@ -313,7 +319,7 @@ func (c *conn) Close() (err error) {
313319
}
314320
return nil
315321
}
316-
return badconn.Map(xerrors.WithStackTrace(errClosedConn))
322+
return badconn.Map(xerrors.WithStackTrace(errConnClosedEarly))
317323
}
318324

319325
func (c *conn) Prepare(string) (driver.Stmt, error) {

internal/xsql/connector.go

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ import (
44
"context"
55
"database/sql/driver"
66
"io"
7+
"sync"
8+
"time"
79

810
metaHeaders "github.com/ydb-platform/ydb-go-sdk/v3/internal/meta"
911
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors"
@@ -58,10 +60,18 @@ func WithDisableServerBalancer() ConnectorOption {
5860
}
5961
}
6062

63+
func WithIdleThreshold(idleThreshold time.Duration) ConnectorOption {
64+
return func(c *Connector) error {
65+
c.idleThreshold = idleThreshold
66+
return nil
67+
}
68+
}
69+
6170
func Open(d Driver, connection connection, opts ...ConnectorOption) (_ *Connector, err error) {
6271
c := &Connector{
6372
driver: d,
6473
connection: connection,
74+
conns: make(map[*conn]struct{}),
6575
defaultTxControl: table.DefaultTxControl(),
6676
defaultQueryMode: DefaultQueryMode,
6777
}
@@ -70,6 +80,9 @@ func Open(d Driver, connection connection, opts ...ConnectorOption) (_ *Connecto
7080
return nil, err
7181
}
7282
}
83+
if c.idleThreshold > 0 {
84+
c.idleStopper = c.idleCloser()
85+
}
7386
d.Attach(c)
7487
return c, nil
7588
}
@@ -97,11 +110,17 @@ type Connector struct {
97110
driver Driver
98111
connection connection
99112

113+
conns map[*conn]struct{}
114+
connsMtx sync.RWMutex
115+
116+
idleStopper func()
117+
100118
defaultTxControl *table.TransactionControl
101119
defaultQueryMode QueryMode
102120
defaultDataQueryOpts []options.ExecuteDataQueryOption
103121
defaultScanQueryOpts []options.ExecuteScanQueryOption
104122
disableServerBalancer bool
123+
idleThreshold time.Duration
105124

106125
trace trace.DatabaseSQL
107126
}
@@ -111,15 +130,56 @@ var (
111130
_ io.Closer = &Connector{}
112131
)
113132

133+
func (c *Connector) idleCloser() (idleStopper func()) {
134+
var ctx context.Context
135+
ctx, idleStopper = context.WithCancel(context.Background())
136+
go func() {
137+
for {
138+
select {
139+
case <-ctx.Done():
140+
return
141+
case <-time.After(c.idleThreshold):
142+
c.connsMtx.RLock()
143+
conns := make([]*conn, 0, len(c.conns))
144+
for cc := range c.conns {
145+
conns = append(conns, cc)
146+
}
147+
c.connsMtx.RUnlock()
148+
for _, cc := range conns {
149+
if cc.sinceLastUsage() > c.idleThreshold {
150+
cc.session.Close(context.Background())
151+
}
152+
}
153+
}
154+
}
155+
}()
156+
return idleStopper
157+
}
158+
114159
func (c *Connector) Close() (err error) {
115160
defer c.driver.Detach(c)
161+
if c.idleStopper != nil {
162+
c.idleStopper()
163+
}
116164
return nil
117165
}
118166

119167
func (c *Connector) Connection() connection {
120168
return c.connection
121169
}
122170

171+
func (c *Connector) attach(cc *conn) {
172+
c.connsMtx.Lock()
173+
defer c.connsMtx.Unlock()
174+
c.conns[cc] = struct{}{}
175+
}
176+
177+
func (c *Connector) detach(cc *conn) {
178+
c.connsMtx.Lock()
179+
defer c.connsMtx.Unlock()
180+
delete(c.conns, cc)
181+
}
182+
123183
func (c *Connector) Connect(ctx context.Context) (_ driver.Conn, err error) {
124184
onDone := trace.DatabaseSQLOnConnectorConnect(c.trace, &ctx)
125185
defer func() {

internal/xsql/errors.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,8 @@ import (
88
)
99

1010
var (
11-
ErrUnsupported = driver.ErrSkip
12-
errDeprecated = driver.ErrSkip
13-
errClosedConn = xerrors.Retryable(errors.New("conn closed early"), xerrors.WithDeleteSession())
14-
errNotReadyConn = xerrors.Retryable(errors.New("conn not ready"), xerrors.WithDeleteSession())
11+
ErrUnsupported = driver.ErrSkip
12+
errDeprecated = driver.ErrSkip
13+
errConnClosedEarly = xerrors.Retryable(errors.New("conn closed early"), xerrors.WithDeleteSession())
14+
errNotReadyConn = xerrors.Retryable(errors.New("conn not ready"), xerrors.WithDeleteSession())
1515
)

options.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -360,6 +360,10 @@ func WithSessionPoolKeepAliveMinSize(keepAliveMinSize int) Option {
360360
func WithSessionPoolIdleThreshold(idleThreshold time.Duration) Option {
361361
return func(ctx context.Context, c *connection) error {
362362
c.tableOptions = append(c.tableOptions, tableConfig.WithIdleThreshold(idleThreshold))
363+
c.databaseSQLOptions = append(
364+
c.databaseSQLOptions,
365+
xsql.WithIdleThreshold(idleThreshold),
366+
)
363367
return nil
364368
}
365369
}

0 commit comments

Comments
 (0)