@@ -380,6 +380,8 @@ func (c *Client) internalPoolGet(ctx context.Context, opts ...getOption) (s *ses
380380 return nil , xerrors .WithStackTrace (errClosedClient )
381381 }
382382
383+ const maxAttempts = 100
384+
383385 var (
384386 start = time .Now ()
385387 i = 0
@@ -398,17 +400,12 @@ func (c *Client) internalPoolGet(ctx context.Context, opts ...getOption) (s *ses
398400 onDone (s , i , err )
399401 }()
400402
401- const maxAttempts = 100
402403 for s == nil && err == nil && i < maxAttempts && ! c .isClosed () {
403404 i ++
404- // First, we try to internalPoolGet session from idle
405- c .mu .WithLock (func () {
406- s = c .internalPoolRemoveFirstIdle ()
407- })
408-
405+ s = tryGetIdleSession (c )
409406 if s != nil {
410- if c . nodeChecker != nil && ! c . nodeChecker . HasNode ( s . NodeID () ) {
411- _ = s . Close (ctx )
407+ if ! isValidNode ( c , s ) {
408+ closeInvalidSession (ctx , s )
412409 s = nil
413410
414411 continue
@@ -417,37 +414,59 @@ func (c *Client) internalPoolGet(ctx context.Context, opts ...getOption) (s *ses
417414 return s , nil
418415 }
419416
420- // Second, we try to create new session
421- s , err = c .internalPoolCreateSession (ctx )
422- if s == nil && err == nil {
423- if err = ctx .Err (); err != nil {
424- return nil , xerrors .WithStackTrace (err )
425- }
426- panic ("both of session and err are nil" )
427- }
428- // got session or err is not recoverable
417+ s , err = tryCreateNewSession (ctx , c )
429418 if s != nil || ! isCreateSessionErrorRetriable (err ) {
430419 return s , xerrors .WithStackTrace (err )
431420 }
432421
433- // Third, we try to wait for a touched session - Client is full.
434- //
435- // This should be done only if number of currently waiting goroutines
436- // are less than maximum amount of touched session. That is, we want to
437- // be fair here and not to lock more goroutines than we could ship
438- // session to.
439422 s , err = c .internalPoolWaitFromCh (ctx , o .t )
440423 if err != nil {
441424 err = xerrors .WithStackTrace (err )
442425 }
443426 }
427+
428+ return handleNoProgress (s , err , start , c , i )
429+ }
430+
431+ func tryGetIdleSession (c * Client ) * session {
432+ var s * session
433+ c .mu .WithLock (func () {
434+ s = c .internalPoolRemoveFirstIdle ()
435+ })
436+
437+ return s
438+ }
439+
440+ //nolint:interfacer
441+ func isValidNode (c * Client , s * session ) bool {
442+ return c .nodeChecker == nil || c .nodeChecker .HasNode (s .NodeID ())
443+ }
444+
445+ func closeInvalidSession (ctx context.Context , s * session ) {
446+ _ = s .Close (ctx )
447+ }
448+
449+ func tryCreateNewSession (ctx context.Context , c * Client ) (* session , error ) {
450+ s , err := c .internalPoolCreateSession (ctx )
451+ if s == nil && err == nil {
452+ if err = ctx .Err (); err != nil {
453+ return nil , xerrors .WithStackTrace (err )
454+ }
455+ panic ("both session and err are nil" )
456+ }
457+
458+ return s , err
459+ }
460+
461+ func handleNoProgress (s * session , err error , start time.Time , c * Client , attempts int ) (* session , error ) {
444462 if s == nil && err == nil {
445463 if c .isClosed () {
446464 err = xerrors .WithStackTrace (errClosedClient )
447465 } else {
448466 err = xerrors .WithStackTrace (errNoProgress )
449467 }
450468 }
469+
451470 if err != nil {
452471 var (
453472 index int
@@ -460,15 +479,15 @@ func (c *Client) internalPoolGet(ctx context.Context, opts ...getOption) (s *ses
460479 createInProgress = c .createInProgress
461480 })
462481
463- return s , xerrors .WithStackTrace (
482+ err = xerrors .WithStackTrace (
464483 fmt .Errorf ("failed to get session from pool (" +
465- "attempts: %d, latency: %v, pool have %d sessions (%d busy, %d idle, %d create_in_progress): %w" ,
466- i , time .Since (start ), index , index - idle , idle , createInProgress , err ,
484+ "attempts: %d, latency: %v, pool has %d sessions (%d busy, %d idle, %d create_in_progress): %w" ,
485+ attempts , time .Since (start ), index , index - idle , idle , createInProgress , err ,
467486 ),
468487 )
469488 }
470489
471- return s , nil
490+ return s , err
472491}
473492
474493// Get returns first idle session from the Client and removes it from
@@ -477,6 +496,7 @@ func (c *Client) Get(ctx context.Context) (s *session, err error) {
477496 return c .internalPoolGet (ctx )
478497}
479498
499+ //nolint:funlen
480500func (c * Client ) internalPoolWaitFromCh (ctx context.Context , t * trace.Table ) (s * session , err error ) {
481501 var (
482502 ch * chan * session
@@ -705,54 +725,57 @@ func (c *Client) DoTx(ctx context.Context, op table.TxOperation, opts ...table.O
705725 onDone (attempts , finalErr )
706726 }()
707727
708- return retryBackoff (ctx , c ,
709- func (ctx context.Context , s table.Session ) (err error ) {
710- attempts ++
728+ return retryBackoff (ctx , c , func (ctx context.Context , s table.Session ) (err error ) {
729+ attempts ++
711730
712- tx , err := s .BeginTransaction (ctx , config .TxSettings )
713- if err != nil {
714- return xerrors .WithStackTrace (err )
715- }
731+ tx , err := s .BeginTransaction (ctx , config .TxSettings )
732+ if err != nil {
733+ return xerrors .WithStackTrace (err )
734+ }
716735
717- defer func () {
718- if err != nil {
719- errRollback := tx .Rollback (ctx )
720- if errRollback != nil {
721- err = xerrors .NewWithIssues ("" ,
722- xerrors .WithStackTrace (err ),
723- xerrors .WithStackTrace (errRollback ),
724- )
725- } else {
726- err = xerrors .WithStackTrace (err )
727- }
728- }
729- }()
736+ defer func () {
737+ err = handleTransactionError (ctx , tx , err )
738+ }()
730739
731- err = func () error {
732- if panicCallback := c .config .PanicCallback (); panicCallback != nil {
733- defer func () {
734- if e := recover (); e != nil {
735- panicCallback (e )
736- }
737- }()
738- }
740+ if err = executeTxOperation (ctx , c , op , tx ); err != nil {
741+ return xerrors .WithStackTrace (err )
742+ }
739743
740- return op (xcontext .MarkRetryCall (ctx ), tx )
741- }()
744+ _ , err = tx .CommitTx (ctx , config .TxCommitOptions ... )
745+ if err != nil {
746+ return xerrors .WithStackTrace (err )
747+ }
742748
743- if err != nil {
744- return xerrors . WithStackTrace ( err )
745- }
749+ return nil
750+ }, config . RetryOptions ... )
751+ }
746752
747- _ , err = tx .CommitTx (ctx , config .TxCommitOptions ... )
748- if err != nil {
749- return xerrors .WithStackTrace (err )
753+ func handleTransactionError (ctx context.Context , tx table.Transaction , err error ) error {
754+ if err != nil {
755+ errRollback := tx .Rollback (ctx )
756+ if errRollback != nil {
757+ return xerrors .NewWithIssues ("" ,
758+ xerrors .WithStackTrace (err ),
759+ xerrors .WithStackTrace (errRollback ),
760+ )
761+ }
762+
763+ return xerrors .WithStackTrace (err )
764+ }
765+
766+ return nil
767+ }
768+
769+ func executeTxOperation (ctx context.Context , c * Client , op table.TxOperation , tx table.Transaction ) (err error ) {
770+ if panicCallback := c .config .PanicCallback (); panicCallback != nil {
771+ defer func () {
772+ if e := recover (); e != nil {
773+ panicCallback (e )
750774 }
775+ }()
776+ }
751777
752- return nil
753- },
754- config .RetryOptions ... ,
755- )
778+ return op (xcontext .MarkRetryCall (ctx ), tx )
756779}
757780
758781func (c * Client ) internalPoolGCTick (ctx context.Context , idleThreshold time.Duration ) {
0 commit comments