Skip to content

Commit 8a2684f

Browse files
authored
Merge pull request #1290 from ydb-platform/query-without-pool
* Replaced internal query session pool by default to stub for exclude impact from internal/pool
2 parents df32151 + d9cb39e commit 8a2684f

File tree

4 files changed

+104
-24
lines changed

4 files changed

+104
-24
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
* Refactored the `balancers.PreferLocations()` function - it is a clean/pure function
33
* Added experimental `balancers.WithNodeID()` context modifier for define per request the YDB endpoint by NodeID
44
* Reverted the allowing the casts from signed YDB types to unsigned destination types if source value is not negative
5+
* Replaced internal query session pool by default to stub for exclude impact from internal/pool
56

67
## v3.74.2
78
* Added description to scan errors with use query service client scanner

internal/query/client.go

Lines changed: 97 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,12 @@ package query
22

33
import (
44
"context"
5+
"sync/atomic"
56

67
"github.com/ydb-platform/ydb-go-genproto/Ydb_Query_V1"
78
"google.golang.org/grpc"
89

10+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/closer"
911
"github.com/ydb-platform/ydb-go-sdk/v3/internal/pool"
1012
"github.com/ydb-platform/ydb-go-sdk/v3/internal/pool/stats"
1113
"github.com/ydb-platform/ydb-go-sdk/v3/internal/query/config"
@@ -20,14 +22,74 @@ import (
2022

2123
//go:generate mockgen -destination grpc_client_mock_test.go -package query -write_package_comment=false github.com/ydb-platform/ydb-go-genproto/Ydb_Query_V1 QueryServiceClient,QueryService_AttachSessionClient,QueryService_ExecuteQueryClient
2224

23-
var _ query.Client = (*Client)(nil)
25+
var (
26+
_ query.Client = (*Client)(nil)
27+
_ sessionPool = (*poolStub)(nil)
28+
_ sessionPool = (*pool.Pool[*Session, Session])(nil)
29+
)
30+
31+
type (
32+
sessionPool interface {
33+
closer.Closer
34+
35+
Stats() stats.Stats
36+
With(ctx context.Context, f func(ctx context.Context, s *Session) error, opts ...retry.Option) error
37+
}
38+
poolStub struct {
39+
createSession func(ctx context.Context) (*Session, error)
40+
InUse atomic.Int32
41+
}
42+
Client struct {
43+
config *config.Config
44+
client Ydb_Query_V1.QueryServiceClient
45+
pool sessionPool
46+
47+
done chan struct{}
48+
}
49+
)
50+
51+
func (pool *poolStub) Close(ctx context.Context) error {
52+
return nil
53+
}
54+
55+
func (pool *poolStub) Stats() stats.Stats {
56+
return stats.Stats{
57+
Limit: -1,
58+
Index: 0,
59+
Idle: 0,
60+
InUse: int(pool.InUse.Load()),
61+
}
62+
}
63+
64+
func (pool *poolStub) With(
65+
ctx context.Context, f func(ctx context.Context, s *Session) error, opts ...retry.Option,
66+
) error {
67+
pool.InUse.Add(1)
68+
defer func() {
69+
pool.InUse.Add(-1)
70+
}()
71+
72+
err := retry.Retry(ctx, func(ctx context.Context) (err error) {
73+
s, err := pool.createSession(ctx)
74+
if err != nil {
75+
return xerrors.WithStackTrace(err)
76+
}
77+
defer func() {
78+
_ = s.Close(ctx)
79+
}()
2480

25-
type Client struct {
26-
config *config.Config
27-
grpcClient Ydb_Query_V1.QueryServiceClient
28-
pool *pool.Pool[*Session, Session]
81+
err = f(ctx, s)
82+
if err != nil {
83+
return xerrors.WithStackTrace(err)
84+
}
2985

30-
done chan struct{}
86+
return nil
87+
}, opts...)
88+
if err != nil {
89+
return xerrors.WithStackTrace(err)
90+
}
91+
92+
return nil
3193
}
3294

3395
func (c *Client) Stats() *stats.Stats {
@@ -48,7 +110,7 @@ func (c *Client) Close(ctx context.Context) error {
48110

49111
func do(
50112
ctx context.Context,
51-
pool *pool.Pool[*Session, Session],
113+
pool sessionPool,
52114
op query.Operation,
53115
opts ...retry.Option,
54116
) (finalErr error) {
@@ -100,7 +162,7 @@ func (c *Client) Do(ctx context.Context, op query.Operation, opts ...options.DoO
100162

101163
func doTx(
102164
ctx context.Context,
103-
pool *pool.Pool[*Session, Session],
165+
pool sessionPool,
104166
op query.TxOperation,
105167
t *trace.Query,
106168
opts ...options.DoTxOption,
@@ -169,7 +231,7 @@ func (c *Client) ReadRow(ctx context.Context, q string, opts ...options.ExecuteO
169231
}
170232

171233
func clientExecute(ctx context.Context,
172-
pool *pool.Pool[*Session, Session],
234+
pool sessionPool,
173235
q string, opts ...options.ExecuteOption,
174236
) (r query.Result, err error) {
175237
err = do(ctx, pool, func(ctx context.Context, s query.Session) (err error) {
@@ -275,24 +337,37 @@ func (c *Client) DoTx(ctx context.Context, op query.TxOperation, opts ...options
275337
return nil
276338
}
277339

340+
func newPool(
341+
ctx context.Context, cfg *config.Config, createSession func(ctx context.Context) (*Session, error),
342+
) sessionPool {
343+
if cfg.UseSessionPool() {
344+
return pool.New(ctx,
345+
pool.WithLimit[*Session, Session](cfg.PoolLimit()),
346+
pool.WithTrace[*Session, Session](poolTrace(cfg.Trace())),
347+
pool.WithCreateItemTimeout[*Session, Session](cfg.SessionCreateTimeout()),
348+
pool.WithCloseItemTimeout[*Session, Session](cfg.SessionDeleteTimeout()),
349+
pool.WithCreateFunc(createSession),
350+
)
351+
}
352+
353+
return &poolStub{
354+
createSession: createSession,
355+
}
356+
}
357+
278358
func New(ctx context.Context, balancer grpc.ClientConnInterface, cfg *config.Config) *Client {
279359
onDone := trace.QueryOnNew(cfg.Trace(), &ctx,
280360
stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/query.New"),
281361
)
282362
defer onDone()
283363

284-
client := &Client{
285-
config: cfg,
286-
grpcClient: Ydb_Query_V1.NewQueryServiceClient(balancer),
287-
done: make(chan struct{}),
288-
}
364+
grpcClient := Ydb_Query_V1.NewQueryServiceClient(balancer)
289365

290-
client.pool = pool.New(ctx,
291-
pool.WithLimit[*Session, Session](cfg.PoolLimit()),
292-
pool.WithTrace[*Session, Session](poolTrace(cfg.Trace())),
293-
pool.WithCreateItemTimeout[*Session, Session](cfg.SessionCreateTimeout()),
294-
pool.WithCloseItemTimeout[*Session, Session](cfg.SessionDeleteTimeout()),
295-
pool.WithCreateFunc(func(ctx context.Context) (_ *Session, err error) {
366+
client := &Client{
367+
config: cfg,
368+
client: grpcClient,
369+
done: make(chan struct{}),
370+
pool: newPool(ctx, cfg, func(ctx context.Context) (_ *Session, err error) {
296371
var (
297372
createCtx context.Context
298373
cancelCreate context.CancelFunc
@@ -304,14 +379,14 @@ func New(ctx context.Context, balancer grpc.ClientConnInterface, cfg *config.Con
304379
}
305380
defer cancelCreate()
306381

307-
s, err := createSession(createCtx, client.grpcClient, cfg)
382+
s, err := createSession(createCtx, grpcClient, cfg)
308383
if err != nil {
309384
return nil, xerrors.WithStackTrace(err)
310385
}
311386

312387
return s, nil
313388
}),
314-
)
389+
}
315390

316391
return client
317392
}

internal/query/config/config.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ type Config struct {
1919

2020
poolLimit int
2121

22+
useSessionPool bool
2223
sessionCreateTimeout time.Duration
2324
sessionDeleteTimeout time.Duration
2425

@@ -68,3 +69,7 @@ func (c *Config) SessionCreateTimeout() time.Duration {
6869
func (c *Config) SessionDeleteTimeout() time.Duration {
6970
return c.sessionDeleteTimeout
7071
}
72+
73+
func (c *Config) UseSessionPool() bool {
74+
return c.useSessionPool
75+
}

tests/integration/query_execute_test.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@ func TestQueryExecute(t *testing.T) {
3131
db, err := ydb.Open(ctx,
3232
os.Getenv("YDB_CONNECTION_STRING"),
3333
ydb.WithAccessTokenCredentials(os.Getenv("YDB_ACCESS_TOKEN_CREDENTIALS")),
34-
ydb.WithSessionPoolSizeLimit(10),
3534
ydb.WithTraceQuery(
3635
log.Query(
3736
log.Default(os.Stdout,
@@ -80,7 +79,7 @@ func TestQueryExecute(t *testing.T) {
8079
t.Run("Stats", func(t *testing.T) {
8180
s, err := query.Stats(db.Query())
8281
require.NoError(t, err)
83-
require.EqualValues(t, 10, s.Limit)
82+
require.EqualValues(t, -1, s.Limit)
8483
})
8584
t.Run("Scan", func(t *testing.T) {
8685
var (

0 commit comments

Comments
 (0)