Skip to content

Commit 1252c72

Browse files
authored
Merge pull request #1151 from ydb-platform/revert-parking
Revert conns parking
2 parents 158bc6a + 168e57c commit 1252c72

File tree

13 files changed

+229
-28
lines changed

13 files changed

+229
-28
lines changed

.github/workflows/breaking.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@ jobs:
2525
run: test -e ~/go/bin/gorelease || go install golang.org/x/exp/cmd/gorelease@latest
2626
- name: Check broken API changes
2727
run: gorelease -base=$GITHUB_BASE_REF 2>&1 > changes.txt | true
28+
- name: Print API changes
29+
run: cat changes.txt
2830
- name: Comment Report
2931
if: always()
3032
uses: marocchino/sticky-pull-request-comment@v2

CHANGELOG.md

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
* Added query pool metrics
55
* Fixed logic of query session pool
66
* Changed initialization of internal driver clients to lazy
7-
* Disabled the logic of background grpc-connection parking
87
* Removed `ydb.WithSessionPoolSizeLimit()` option
98
* Added async put session into pool if external context is done
109
* Dropped intermediate callbacks from `trace.{Table,Retry,Query}` events

README.md

Lines changed: 61 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -31,19 +31,20 @@ go get -u github.com/ydb-platform/ydb-go-sdk/v3
3131
## Example Usage <a name="example"></a>
3232

3333
* connect to YDB
34-
```golang
34+
```go
3535
db, err := ydb.Open(ctx, "grpc://localhost:2136/local")
3636
if err != nil {
3737
log.Fatal(err)
3838
}
3939
```
40-
* execute `SELECT` query
41-
```golang
42-
const query = `SELECT 42 as id, "myStr" as myStr;`
43-
40+
* execute `SELECT` query over `Table` service client
41+
```go
4442
// Do retry operation on errors with best effort
45-
queryErr := db.Table().Do(ctx, func(ctx context.Context, s table.Session) (err error) {
46-
_, res, err := s.Execute(ctx, table.DefaultTxControl(), query, nil)
43+
err := db.Table().Do(ctx, func(ctx context.Context, s table.Session) (err error) {
44+
_, res, err := s.Execute(ctx, table.DefaultTxControl(),
45+
`SELECT 42 as id, "myStr" as myStr;`,
46+
nil, // empty parameters
47+
)
4748
if err != nil {
4849
return err
4950
}
@@ -62,12 +63,61 @@ queryErr := db.Table().Do(ctx, func(ctx context.Context, s table.Session) (err e
6263
}
6364
return res.Err() // for driver retry if not nil
6465
})
65-
if queryErr != nil {
66-
log.Fatal(queryErr)
66+
if err != nil {
67+
log.Fatal(err)
68+
}
69+
```
70+
* execute `SELECT` query over `Query` service client
71+
```go
72+
// Do retry operation on errors with best effort
73+
err := db.Query().Do( // Do retry operation on errors with best effort
74+
ctx, // context manage exiting from Do
75+
func(ctx context.Context, s query.Session) (err error) { // retry operation
76+
_, res, err := s.Execute(ctx, `SELECT 42 as id, "myStr" as myStr;`))
77+
if err != nil {
78+
return err // for auto-retry with driver
79+
}
80+
defer func() { _ = res.Close(ctx) }() // cleanup resources
81+
for { // iterate over result sets
82+
rs, err := res.NextResultSet(ctx)
83+
if err != nil {
84+
if errors.Is(err, io.EOF) {
85+
break
86+
}
87+
88+
return err
89+
}
90+
for { // iterate over rows
91+
row, err := rs.NextRow(ctx)
92+
if err != nil {
93+
if errors.Is(err, io.EOF) {
94+
break
95+
}
96+
97+
return err
98+
}
99+
type myStruct struct {
100+
id uint64 `sql:"id"`
101+
str string `sql:"myStr"`
102+
}
103+
var s myStruct
104+
if err = row.ScanStruct(&s); err != nil {
105+
return err // generally scan error not retryable, return it for driver check error
106+
}
107+
}
108+
}
109+
110+
return res.Err() // return finally result error for auto-retry with driver
111+
},
112+
query.WithIdempotent(),
113+
)
114+
if err != nil {
115+
log.Fatal(err)
67116
}
68117
```
118+
69119
* usage with `database/sql` (see additional docs in [SQL.md](SQL.md) )
70-
```golang
120+
```go
71121
import (
72122
"context"
73123
"database/sql"
@@ -96,7 +146,7 @@ log.Printf("id = %d, myStr = \"%s\"", id, myStr)
96146
```
97147

98148

99-
More examples of usage placed in [examples](https://github.com/ydb-platform/ydb-go-examples) repository.
149+
More examples of usage placed in [examples](./examples) directory.
100150

101151
## Credentials <a name="credentials"></a>
102152

config/config.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ type Config struct {
2121

2222
trace *trace.Driver
2323
dialTimeout time.Duration
24+
connectionTTL time.Duration
2425
balancerConfig *balancerConfig.Config
2526
secure bool
2627
endpoint string
@@ -56,6 +57,13 @@ func (c *Config) Meta() *meta.Meta {
5657
return c.meta
5758
}
5859

60+
// ConnectionTTL defines interval for parking grpc connections.
61+
//
62+
// If ConnectionTTL is zero - connections are not park.
63+
func (c *Config) ConnectionTTL() time.Duration {
64+
return c.connectionTTL
65+
}
66+
5967
// Secure is a flag for secure connection
6068
func (c *Config) Secure() bool {
6169
return c.secure
@@ -169,6 +177,12 @@ func WithUserAgent(userAgent string) Option {
169177
}
170178
}
171179

180+
func WithConnectionTTL(ttl time.Duration) Option {
181+
return func(c *Config) {
182+
c.connectionTTL = ttl
183+
}
184+
}
185+
172186
func WithCredentials(credentials credentials.Credentials) Option {
173187
return func(c *Config) {
174188
c.credentials = credentials

internal/conn/config.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010

1111
type Config interface {
1212
DialTimeout() time.Duration
13+
ConnectionTTL() time.Duration
1314
Trace() *trace.Driver
1415
GrpcDialOptions() []grpc.DialOption
1516
}

internal/conn/conn.go

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,36 @@ func (c *conn) NodeID() uint32 {
102102
return 0
103103
}
104104

105+
func (c *conn) park(ctx context.Context) (err error) {
106+
onDone := trace.DriverOnConnPark(
107+
c.config.Trace(), &ctx,
108+
stack.FunctionID(""),
109+
c.Endpoint(),
110+
)
111+
defer func() {
112+
onDone(err)
113+
}()
114+
115+
c.mtx.Lock()
116+
defer c.mtx.Unlock()
117+
118+
if c.closed {
119+
return nil
120+
}
121+
122+
if c.cc == nil {
123+
return nil
124+
}
125+
126+
err = c.close(ctx)
127+
128+
if err != nil {
129+
return c.wrapError(err)
130+
}
131+
132+
return nil
133+
}
134+
105135
func (c *conn) Endpoint() endpoint.Endpoint {
106136
if c != nil {
107137
return c.endpoint

internal/conn/pool.go

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,15 @@ import (
44
"context"
55
"sync"
66
"sync/atomic"
7+
"time"
78

89
"google.golang.org/grpc"
910
grpcCodes "google.golang.org/grpc/codes"
1011

1112
"github.com/ydb-platform/ydb-go-sdk/v3/internal/closer"
1213
"github.com/ydb-platform/ydb-go-sdk/v3/internal/endpoint"
1314
"github.com/ydb-platform/ydb-go-sdk/v3/internal/stack"
15+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xcontext"
1416
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors"
1517
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xsync"
1618
"github.com/ydb-platform/ydb-go-sdk/v3/trace"
@@ -194,6 +196,39 @@ func (p *Pool) Release(ctx context.Context) (finalErr error) {
194196
return nil
195197
}
196198

199+
func (p *Pool) connParker(ctx context.Context, ttl, interval time.Duration) {
200+
ticker := time.NewTicker(interval)
201+
defer ticker.Stop()
202+
for {
203+
select {
204+
case <-p.done:
205+
return
206+
case <-ticker.C:
207+
for _, c := range p.collectConns() {
208+
if time.Since(c.LastUsage()) > ttl {
209+
switch c.GetState() {
210+
case Online, Banned:
211+
_ = c.park(ctx)
212+
default:
213+
// nop
214+
}
215+
}
216+
}
217+
}
218+
}
219+
}
220+
221+
func (p *Pool) collectConns() []*conn {
222+
p.mtx.RLock()
223+
defer p.mtx.RUnlock()
224+
conns := make([]*conn, 0, len(p.conns))
225+
for _, c := range p.conns {
226+
conns = append(conns, c)
227+
}
228+
229+
return conns
230+
}
231+
197232
func NewPool(ctx context.Context, config Config) *Pool {
198233
onDone := trace.DriverOnPoolNew(config.Trace(), &ctx, stack.FunctionID(""))
199234
defer onDone()
@@ -206,5 +241,9 @@ func NewPool(ctx context.Context, config Config) *Pool {
206241
done: make(chan struct{}),
207242
}
208243

244+
if ttl := config.ConnectionTTL(); ttl > 0 {
245+
go p.connParker(xcontext.ValueOnly(ctx), ttl, ttl/2)
246+
}
247+
209248
return p
210249
}

internal/topic/topicreaderinternal/batcher.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,7 @@ func (b *batcher) Pop(ctx context.Context, opts batcherGetOptions) (_ batcherMes
176176

177177
findRes = b.findNeedLock(opts)
178178
if findRes.Ok {
179-
b.applyNeedLock(findRes)
179+
b.applyNeedLock(&findRes)
180180

181181
return
182182
}
@@ -279,7 +279,7 @@ func (b *batcher) applyForceFlagToOptions(options batcherGetOptions) batcherGetO
279279
return res
280280
}
281281

282-
func (b *batcher) applyNeedLock(res batcherResultCandidate) {
282+
func (b *batcher) applyNeedLock(res *batcherResultCandidate) {
283283
if res.Rest.IsEmpty() && res.WaiterIndex >= 0 {
284284
delete(b.messages, res.Key)
285285
} else {

internal/topic/topicreaderinternal/batcher_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -407,7 +407,7 @@ func TestBatcher_Apply(t *testing.T) {
407407
Key: session,
408408
Rest: batcherMessageOrderItems{newBatcherItemBatch(batch)},
409409
}
410-
b.applyNeedLock(foundRes)
410+
b.applyNeedLock(&foundRes)
411411

412412
expectedMap := batcherMessagesMap{session: batcherMessageOrderItems{newBatcherItemBatch(batch)}}
413413
require.Equal(t, expectedMap, b.messages)
@@ -426,7 +426,7 @@ func TestBatcher_Apply(t *testing.T) {
426426

427427
b.messages = batcherMessagesMap{session: batcherMessageOrderItems{newBatcherItemBatch(batch)}}
428428

429-
b.applyNeedLock(foundRes)
429+
b.applyNeedLock(&foundRes)
430430

431431
require.Empty(t, b.messages)
432432
})

options.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -108,10 +108,10 @@ func WithConnectionString(connectionString string) Option {
108108
}
109109

110110
// WithConnectionTTL defines duration for parking idle connections
111-
//
112-
// Deprecated: background connection parking not available
113-
func WithConnectionTTL(time.Duration) Option {
111+
func WithConnectionTTL(ttl time.Duration) Option {
114112
return func(ctx context.Context, c *Driver) error {
113+
c.options = append(c.options, config.WithConnectionTTL(ttl))
114+
115115
return nil
116116
}
117117
}

0 commit comments

Comments
 (0)