@@ -2,10 +2,12 @@ package query
22
33import (
44 "context"
5+ "sync/atomic"
56
67 "github.com/ydb-platform/ydb-go-genproto/Ydb_Query_V1"
78 "google.golang.org/grpc"
89
10+ "github.com/ydb-platform/ydb-go-sdk/v3/internal/closer"
911 "github.com/ydb-platform/ydb-go-sdk/v3/internal/pool"
1012 "github.com/ydb-platform/ydb-go-sdk/v3/internal/pool/stats"
1113 "github.com/ydb-platform/ydb-go-sdk/v3/internal/query/config"
@@ -20,14 +22,74 @@ import (
2022
2123//go:generate mockgen -destination grpc_client_mock_test.go -package query -write_package_comment=false github.com/ydb-platform/ydb-go-genproto/Ydb_Query_V1 QueryServiceClient,QueryService_AttachSessionClient,QueryService_ExecuteQueryClient
2224
23- var _ query.Client = (* Client )(nil )
25+ var (
26+ _ query.Client = (* Client )(nil )
27+ _ sessionPool = (* poolStub )(nil )
28+ _ sessionPool = (* pool.Pool [* Session , Session ])(nil )
29+ )
30+
31+ type (
32+ sessionPool interface {
33+ closer.Closer
34+
35+ Stats () stats.Stats
36+ With (ctx context.Context , f func (ctx context.Context , s * Session ) error , opts ... retry.Option ) error
37+ }
38+ poolStub struct {
39+ createSession func (ctx context.Context ) (* Session , error )
40+ InUse atomic.Int32
41+ }
42+ Client struct {
43+ config * config.Config
44+ client Ydb_Query_V1.QueryServiceClient
45+ pool sessionPool
46+
47+ done chan struct {}
48+ }
49+ )
50+
51+ func (pool * poolStub ) Close (ctx context.Context ) error {
52+ return nil
53+ }
54+
55+ func (pool * poolStub ) Stats () stats.Stats {
56+ return stats.Stats {
57+ Limit : - 1 ,
58+ Index : 0 ,
59+ Idle : 0 ,
60+ InUse : int (pool .InUse .Load ()),
61+ }
62+ }
63+
64+ func (pool * poolStub ) With (
65+ ctx context.Context , f func (ctx context.Context , s * Session ) error , opts ... retry.Option ,
66+ ) error {
67+ pool .InUse .Add (1 )
68+ defer func () {
69+ pool .InUse .Add (- 1 )
70+ }()
71+
72+ err := retry .Retry (ctx , func (ctx context.Context ) (err error ) {
73+ s , err := pool .createSession (ctx )
74+ if err != nil {
75+ return xerrors .WithStackTrace (err )
76+ }
77+ defer func () {
78+ _ = s .Close (ctx )
79+ }()
2480
25- type Client struct {
26- config * config. Config
27- grpcClient Ydb_Query_V1. QueryServiceClient
28- pool * pool. Pool [ * Session , Session ]
81+ err = f ( ctx , s )
82+ if err != nil {
83+ return xerrors . WithStackTrace ( err )
84+ }
2985
30- done chan struct {}
86+ return nil
87+ }, opts ... )
88+ if err != nil {
89+ return xerrors .WithStackTrace (err )
90+ }
91+
92+ return nil
3193}
3294
3395func (c * Client ) Stats () * stats.Stats {
@@ -48,7 +110,7 @@ func (c *Client) Close(ctx context.Context) error {
48110
49111func do (
50112 ctx context.Context ,
51- pool * pool. Pool [ * Session , Session ] ,
113+ pool sessionPool ,
52114 op query.Operation ,
53115 opts ... retry.Option ,
54116) (finalErr error ) {
@@ -100,7 +162,7 @@ func (c *Client) Do(ctx context.Context, op query.Operation, opts ...options.DoO
100162
101163func doTx (
102164 ctx context.Context ,
103- pool * pool. Pool [ * Session , Session ] ,
165+ pool sessionPool ,
104166 op query.TxOperation ,
105167 t * trace.Query ,
106168 opts ... options.DoTxOption ,
@@ -169,7 +231,7 @@ func (c *Client) ReadRow(ctx context.Context, q string, opts ...options.ExecuteO
169231}
170232
171233func clientExecute (ctx context.Context ,
172- pool * pool. Pool [ * Session , Session ] ,
234+ pool sessionPool ,
173235 q string , opts ... options.ExecuteOption ,
174236) (r query.Result , err error ) {
175237 err = do (ctx , pool , func (ctx context.Context , s query.Session ) (err error ) {
@@ -275,24 +337,37 @@ func (c *Client) DoTx(ctx context.Context, op query.TxOperation, opts ...options
275337 return nil
276338}
277339
340+ func newPool (
341+ ctx context.Context , cfg * config.Config , createSession func (ctx context.Context ) (* Session , error ),
342+ ) sessionPool {
343+ if cfg .UseSessionPool () {
344+ return pool .New (ctx ,
345+ pool.WithLimit [* Session , Session ](cfg .PoolLimit ()),
346+ pool.WithTrace [* Session , Session ](poolTrace (cfg .Trace ())),
347+ pool.WithCreateItemTimeout [* Session , Session ](cfg .SessionCreateTimeout ()),
348+ pool.WithCloseItemTimeout [* Session , Session ](cfg .SessionDeleteTimeout ()),
349+ pool .WithCreateFunc (createSession ),
350+ )
351+ }
352+
353+ return & poolStub {
354+ createSession : createSession ,
355+ }
356+ }
357+
278358func New (ctx context.Context , balancer grpc.ClientConnInterface , cfg * config.Config ) * Client {
279359 onDone := trace .QueryOnNew (cfg .Trace (), & ctx ,
280360 stack .FunctionID ("github.com/ydb-platform/ydb-go-sdk/3/internal/query.New" ),
281361 )
282362 defer onDone ()
283363
284- client := & Client {
285- config : cfg ,
286- grpcClient : Ydb_Query_V1 .NewQueryServiceClient (balancer ),
287- done : make (chan struct {}),
288- }
364+ grpcClient := Ydb_Query_V1 .NewQueryServiceClient (balancer )
289365
290- client .pool = pool .New (ctx ,
291- pool.WithLimit [* Session , Session ](cfg .PoolLimit ()),
292- pool.WithTrace [* Session , Session ](poolTrace (cfg .Trace ())),
293- pool.WithCreateItemTimeout [* Session , Session ](cfg .SessionCreateTimeout ()),
294- pool.WithCloseItemTimeout [* Session , Session ](cfg .SessionDeleteTimeout ()),
295- pool .WithCreateFunc (func (ctx context.Context ) (_ * Session , err error ) {
366+ client := & Client {
367+ config : cfg ,
368+ client : grpcClient ,
369+ done : make (chan struct {}),
370+ pool : newPool (ctx , cfg , func (ctx context.Context ) (_ * Session , err error ) {
296371 var (
297372 createCtx context.Context
298373 cancelCreate context.CancelFunc
@@ -304,14 +379,14 @@ func New(ctx context.Context, balancer grpc.ClientConnInterface, cfg *config.Con
304379 }
305380 defer cancelCreate ()
306381
307- s , err := createSession (createCtx , client . grpcClient , cfg )
382+ s , err := createSession (createCtx , grpcClient , cfg )
308383 if err != nil {
309384 return nil , xerrors .WithStackTrace (err )
310385 }
311386
312387 return s , nil
313388 }),
314- )
389+ }
315390
316391 return client
317392}
0 commit comments