@@ -41,9 +41,13 @@ type (
4141 With (ctx context.Context , f func (ctx context.Context , s * Session ) error , opts ... retry.Option ) error
4242 }
4343 Client struct {
44- config * config.Config
45- client Ydb_Query_V1.QueryServiceClient
46- pool sessionPool
44+ config * config.Config
45+ client Ydb_Query_V1.QueryServiceClient
46+ explicitSessionPool sessionPool
47+
48+ // implicitSessionPool is a pool of implicit sessions,
49+ // i.e. fake sessions created without CreateSession/AttachSession requests.
50+ implicitSessionPool sessionPool
4751
4852 done chan struct {}
4953 }
@@ -194,7 +198,11 @@ func (c *Client) Close(ctx context.Context) error {
194198
195199 close (c .done )
196200
197- if err := c .pool .Close (ctx ); err != nil {
201+ if err := c .explicitSessionPool .Close (ctx ); err != nil {
202+ return xerrors .WithStackTrace (err )
203+ }
204+
205+ if err := c .implicitSessionPool .Close (ctx ); err != nil {
198206 return xerrors .WithStackTrace (err )
199207 }
200208
@@ -242,7 +250,7 @@ func (c *Client) Do(ctx context.Context, op query.Operation, opts ...options.DoO
242250 onDone (attempts , finalErr )
243251 }()
244252
245- err := do (ctx , c .pool ,
253+ err := do (ctx , c .explicitSessionPool ,
246254 func (ctx context.Context , s * Session ) error {
247255 return op (ctx , s )
248256 },
@@ -329,7 +337,7 @@ func (c *Client) QueryRow(ctx context.Context, q string, opts ...options.Execute
329337 onDone (finalErr )
330338 }()
331339
332- row , err := clientQueryRow (ctx , c .pool , q , settings , withTrace (c .config .Trace ()))
340+ row , err := clientQueryRow (ctx , c .pool () , q , settings , withTrace (c .config .Trace ()))
333341 if err != nil {
334342 return nil , xerrors .WithStackTrace (err )
335343 }
@@ -376,7 +384,7 @@ func (c *Client) Exec(ctx context.Context, q string, opts ...options.Execute) (f
376384 onDone (finalErr )
377385 }()
378386
379- err := clientExec (ctx , c .pool , q , opts ... )
387+ err := clientExec (ctx , c .pool () , q , opts ... )
380388 if err != nil {
381389 return xerrors .WithStackTrace (err )
382390 }
@@ -424,7 +432,7 @@ func (c *Client) Query(ctx context.Context, q string, opts ...options.Execute) (
424432 onDone (err )
425433 }()
426434
427- r , err = clientQuery (ctx , c .pool , q , opts ... )
435+ r , err = clientQuery (ctx , c .pool () , q , opts ... )
428436 if err != nil {
429437 return nil , xerrors .WithStackTrace (err )
430438 }
@@ -479,14 +487,25 @@ func (c *Client) QueryResultSet(
479487 onDone (finalErr , rowsCount )
480488 }()
481489
482- rs , rowsCount , err = clientQueryResultSet (ctx , c .pool , q , settings , withTrace (c .config .Trace ()))
490+ rs , rowsCount , err = clientQueryResultSet (ctx , c .pool () , q , settings , withTrace (c .config .Trace ()))
483491 if err != nil {
484492 return nil , xerrors .WithStackTrace (err )
485493 }
486494
487495 return rs , nil
488496}
489497
498+ // pool returns the appropriate session pool based on the client configuration.
499+ // If implicit sessions are enabled, it returns the implicit session pool;
500+ // otherwise, it returns the explicit session pool.
501+ func (c * Client ) pool () sessionPool {
502+ if c .config .AllowImplicitSessions () {
503+ return c .implicitSessionPool
504+ }
505+
506+ return c .explicitSessionPool
507+ }
508+
490509func (c * Client ) DoTx (ctx context.Context , op query.TxOperation , opts ... options.DoTxOption ) (finalErr error ) {
491510 ctx , cancel := xcontext .WithDone (ctx , c .done )
492511 defer cancel ()
@@ -503,7 +522,7 @@ func (c *Client) DoTx(ctx context.Context, op query.TxOperation, opts ...options
503522 onDone (attempts , finalErr )
504523 }()
505524
506- err := doTx (ctx , c .pool , op ,
525+ err := doTx (ctx , c .explicitSessionPool , op ,
507526 settings .TxSettings (),
508527 append (
509528 []retry.Option {
@@ -565,25 +584,34 @@ func New(ctx context.Context, cc grpc.ClientConnInterface, cfg *config.Config) *
565584
566585 client := Ydb_Query_V1 .NewQueryServiceClient (cc )
567586
587+ return newWithQueryServiceClient (ctx , client , cc , cfg )
588+ }
589+
590+ func newWithQueryServiceClient (ctx context.Context ,
591+ client Ydb_Query_V1.QueryServiceClient ,
592+ cc grpc.ClientConnInterface ,
593+ cfg * config.Config ,
594+ ) * Client {
568595 return & Client {
569- config : cfg ,
570- client : client ,
571- done : make (chan struct {}),
572- pool : pool .New (ctx ,
573- pool.WithLimit [* Session , Session ](cfg .PoolLimit ()),
574- pool.WithItemUsageLimit [* Session , Session ](cfg .PoolSessionUsageLimit ()),
575- pool.WithItemUsageTTL [* Session , Session ](cfg .PoolSessionUsageTTL ()),
576- pool.WithTrace [* Session , Session ](poolTrace (cfg .Trace ())),
577- pool.WithCreateItemTimeout [* Session , Session ](cfg .SessionCreateTimeout ()),
578- pool.WithCloseItemTimeout [* Session , Session ](cfg .SessionDeleteTimeout ()),
579- pool.WithMustDeleteItemFunc [* Session , Session ](func (s * Session , err error ) bool {
596+ config : cfg ,
597+ client : client ,
598+ done : make (chan struct {}),
599+ implicitSessionPool : createImplicitSessionPool (ctx , cfg , client , cc ),
600+ explicitSessionPool : pool .New (ctx ,
601+ pool.WithLimit [* Session ](cfg .PoolLimit ()),
602+ pool.WithItemUsageLimit [* Session ](cfg .PoolSessionUsageLimit ()),
603+ pool.WithItemUsageTTL [* Session ](cfg .PoolSessionUsageTTL ()),
604+ pool.WithTrace [* Session ](poolTrace (cfg .Trace ())),
605+ pool.WithCreateItemTimeout [* Session ](cfg .SessionCreateTimeout ()),
606+ pool.WithCloseItemTimeout [* Session ](cfg .SessionDeleteTimeout ()),
607+ pool .WithMustDeleteItemFunc (func (s * Session , err error ) bool {
580608 if ! s .IsAlive () {
581609 return true
582610 }
583611
584612 return err != nil && xerrors .MustDeleteTableOrQuerySession (err )
585613 }),
586- pool.WithIdleTimeToLive [* Session , Session ](cfg .SessionIdleTimeToLive ()),
614+ pool.WithIdleTimeToLive [* Session ](cfg .SessionIdleTimeToLive ()),
587615 pool .WithCreateItemFunc (func (ctx context.Context ) (_ * Session , err error ) {
588616 var (
589617 createCtx context.Context
@@ -666,3 +694,28 @@ func poolTrace(t *trace.Query) *pool.Trace {
666694 },
667695 }
668696}
697+
698+ func createImplicitSessionPool (ctx context.Context ,
699+ cfg * config.Config ,
700+ c Ydb_Query_V1.QueryServiceClient ,
701+ cc grpc.ClientConnInterface ,
702+ ) sessionPool {
703+ return pool .New (ctx ,
704+ pool.WithLimit [* Session ](cfg .PoolLimit ()),
705+ pool.WithTrace [* Session ](poolTrace (cfg .Trace ())),
706+ pool .WithCreateItemFunc (func (ctx context.Context ) (_ * Session , err error ) {
707+ core := & sessionCore {
708+ cc : cc ,
709+ Client : c ,
710+ Trace : cfg .Trace (),
711+ done : make (chan struct {}),
712+ }
713+
714+ return & Session {
715+ Core : core ,
716+ trace : cfg .Trace (),
717+ client : c ,
718+ }, nil
719+ }),
720+ )
721+ }
0 commit comments