Skip to content

Commit f541a53

Browse files
committed
Extracted logic of checking error for delete session from retry.Retry
1 parent a627a2d commit f541a53

File tree

24 files changed

+369
-239
lines changed

24 files changed

+369
-239
lines changed

internal/coordination/client_test.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ func TestCreateNode(t *testing.T) {
4747
)
4848
err := createNode(ctx, client, &Ydb_Coordination.CreateNodeRequest{})
4949
require.True(t, xerrors.IsTransportError(err, grpcCodes.ResourceExhausted))
50-
require.True(t, xerrors.IsRetryObjectValid(err))
50+
require.False(t, mustDeleteSession(err))
5151
})
5252
t.Run("OperationError", func(t *testing.T) {
5353
ctx := xtest.Context(t)
@@ -58,7 +58,7 @@ func TestCreateNode(t *testing.T) {
5858
)
5959
err := createNode(ctx, client, &Ydb_Coordination.CreateNodeRequest{})
6060
require.True(t, xerrors.IsOperationError(err, Ydb.StatusIds_UNAVAILABLE))
61-
require.True(t, xerrors.IsRetryObjectValid(err))
61+
require.False(t, mustDeleteSession(err))
6262
})
6363
}
6464

@@ -306,7 +306,7 @@ func TestAlterNode(t *testing.T) {
306306
)
307307
err := alterNode(ctx, client, &Ydb_Coordination.AlterNodeRequest{})
308308
require.True(t, xerrors.IsTransportError(err, grpcCodes.ResourceExhausted))
309-
require.True(t, xerrors.IsRetryObjectValid(err))
309+
require.False(t, mustDeleteSession(err))
310310
})
311311
t.Run("OperationError", func(t *testing.T) {
312312
ctx := xtest.Context(t)
@@ -317,7 +317,7 @@ func TestAlterNode(t *testing.T) {
317317
)
318318
err := alterNode(ctx, client, &Ydb_Coordination.AlterNodeRequest{})
319319
require.True(t, xerrors.IsOperationError(err, Ydb.StatusIds_UNAVAILABLE))
320-
require.True(t, xerrors.IsRetryObjectValid(err))
320+
require.False(t, mustDeleteSession(err))
321321
})
322322
}
323323

@@ -372,7 +372,7 @@ func TestDropNode(t *testing.T) {
372372
)
373373
err := dropNode(ctx, client, &Ydb_Coordination.DropNodeRequest{})
374374
require.True(t, xerrors.IsTransportError(err, grpcCodes.ResourceExhausted))
375-
require.True(t, xerrors.IsRetryObjectValid(err))
375+
require.False(t, mustDeleteSession(err))
376376
})
377377
t.Run("OperationError", func(t *testing.T) {
378378
ctx := xtest.Context(t)
@@ -383,6 +383,6 @@ func TestDropNode(t *testing.T) {
383383
)
384384
err := dropNode(ctx, client, &Ydb_Coordination.DropNodeRequest{})
385385
require.True(t, xerrors.IsOperationError(err, Ydb.StatusIds_UNAVAILABLE))
386-
require.True(t, xerrors.IsRetryObjectValid(err))
386+
require.False(t, mustDeleteSession(err))
387387
})
388388
}

internal/coordination/errors.go

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
package coordination
2+
3+
import (
4+
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb"
5+
grpcCodes "google.golang.org/grpc/codes"
6+
7+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors"
8+
)
9+
10+
func mustDeleteSession(err error) bool {
11+
if xerrors.IsOperationError(err,
12+
Ydb.StatusIds_BAD_SESSION,
13+
Ydb.StatusIds_SESSION_BUSY,
14+
Ydb.StatusIds_SESSION_EXPIRED,
15+
) {
16+
return true
17+
}
18+
19+
if xerrors.IsTransportError(err,
20+
grpcCodes.Canceled,
21+
grpcCodes.Unknown,
22+
grpcCodes.InvalidArgument,
23+
grpcCodes.DeadlineExceeded,
24+
grpcCodes.NotFound,
25+
grpcCodes.AlreadyExists,
26+
grpcCodes.PermissionDenied,
27+
grpcCodes.FailedPrecondition,
28+
grpcCodes.Aborted,
29+
grpcCodes.Unimplemented,
30+
grpcCodes.Internal,
31+
grpcCodes.Unavailable,
32+
grpcCodes.DataLoss,
33+
grpcCodes.Unauthenticated,
34+
) {
35+
return true
36+
}
37+
38+
return false
39+
}

internal/pool/pool.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,7 @@ func New[PT ItemConstraint[T], T any](
151151
createTimeout: defaultCreateTimeout,
152152
closeTimeout: defaultCloseTimeout,
153153
mustDeleteItemFunc: func(item PT, err error) bool {
154-
return !xerrors.IsRetryObjectValid(err)
154+
return !item.IsAlive()
155155
},
156156
},
157157
index: make(map[PT]itemInfo[PT, T]),
@@ -417,14 +417,14 @@ func (p *Pool[PT, T]) try(ctx context.Context, f func(ctx context.Context, item
417417
}
418418

419419
defer func() {
420-
if finalErr == nil || !p.config.mustDeleteItemFunc(item, finalErr) {
421-
_ = p.putItem(ctx, item)
422-
} else {
420+
if !item.IsAlive() || (finalErr != nil && p.config.mustDeleteItemFunc(item, finalErr)) {
423421
p.closeItem(ctx, item,
424422
closeItemWithLock(),
425423
closeItemNotifyStats(),
426424
closeItemWithDeleteFromPool(),
427425
)
426+
} else {
427+
_ = p.putItem(ctx, item)
428428
}
429429
}()
430430

internal/query/client.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -561,6 +561,13 @@ func New(ctx context.Context, cc grpc.ClientConnInterface, cfg *config.Config) *
561561
pool.WithTrace[*Session, Session](poolTrace(cfg.Trace())),
562562
pool.WithCreateItemTimeout[*Session, Session](cfg.SessionCreateTimeout()),
563563
pool.WithCloseItemTimeout[*Session, Session](cfg.SessionDeleteTimeout()),
564+
pool.WithMustDeleteItemFunc[*Session, Session](func(s *Session, err error) bool {
565+
if !s.IsAlive() {
566+
return true
567+
}
568+
569+
return err != nil && xerrors.MustDeleteTableOrQuerySession(err)
570+
}),
564571
pool.WithIdleTimeToLive[*Session, Session](cfg.SessionIdleTimeToLive()),
565572
pool.WithCreateItemFunc(func(ctx context.Context) (_ *Session, err error) {
566573
var (

internal/table/client.go

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import (
2121
// sessionBuilder is the interface that holds logic of creating sessions.
2222
type sessionBuilder func(ctx context.Context) (*Session, error)
2323

24-
func New(ctx context.Context, cc grpc.ClientConnInterface, config *config.Config) *Client {
24+
func New(ctx context.Context, cc grpc.ClientConnInterface, config *config.Config) *Client { //nolint:funlen
2525
onDone := trace.TableOnInit(config.Trace(), &ctx,
2626
stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/v3/internal/table.New"),
2727
)
@@ -39,6 +39,13 @@ func New(ctx context.Context, cc grpc.ClientConnInterface, config *config.Config
3939
pool.WithIdleTimeToLive[*Session, Session](config.IdleThreshold()),
4040
pool.WithCreateItemTimeout[*Session, Session](config.CreateSessionTimeout()),
4141
pool.WithCloseItemTimeout[*Session, Session](config.DeleteTimeout()),
42+
pool.WithMustDeleteItemFunc[*Session, Session](func(s *Session, err error) bool {
43+
if !s.IsAlive() {
44+
return true
45+
}
46+
47+
return err != nil && xerrors.MustDeleteTableOrQuerySession(err)
48+
}),
4249
pool.WithClock[*Session, Session](config.Clock()),
4350
pool.WithCreateItemFunc[*Session, Session](func(ctx context.Context) (*Session, error) {
4451
return newSession(ctx, cc, config)
@@ -210,7 +217,9 @@ func (c *Client) Do(ctx context.Context, op table.Operation, opts ...table.Optio
210217
onDone(attempts, finalErr)
211218
}()
212219

213-
err := do(ctx, c.pool, c.config, op, func(err error) {
220+
err := do(ctx, c.pool, c.config, func(ctx context.Context, s *Session) error {
221+
return op(ctx, s)
222+
}, func(err error) {
214223
attempts++
215224
}, config.RetryOptions...)
216225
if err != nil {
@@ -239,7 +248,7 @@ func (c *Client) DoTx(ctx context.Context, op table.TxOperation, opts ...table.O
239248
onDone(attempts, finalErr)
240249
}()
241250

242-
return retryBackoff(ctx, c.pool, func(ctx context.Context, s table.Session) (err error) {
251+
return retryBackoff(ctx, c.pool, func(ctx context.Context, s *Session) (err error) {
243252
attempts++
244253

245254
tx, err := s.BeginTransaction(ctx, config.TxSettings)
@@ -248,9 +257,7 @@ func (c *Client) DoTx(ctx context.Context, op table.TxOperation, opts ...table.O
248257
}
249258

250259
defer func() {
251-
if err != nil && !xerrors.IsOperationError(err) {
252-
_ = tx.Rollback(ctx)
253-
}
260+
_ = tx.Rollback(ctx)
254261
}()
255262

256263
if err = executeTxOperation(ctx, c, op, tx); err != nil {

internal/table/retry.go

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,12 @@ func do(
2525
ctx context.Context,
2626
pool sessionPool,
2727
config *config.Config,
28-
op table.Operation,
28+
op func(ctx context.Context, s *Session) error,
2929
onAttempt func(err error),
3030
opts ...retry.Option,
3131
) (err error) {
3232
return retryBackoff(ctx, pool,
33-
func(ctx context.Context, s table.Session) (err error) {
33+
func(ctx context.Context, s *Session) (err error) {
3434
defer func() {
3535
if onAttempt != nil {
3636
onAttempt(err)
@@ -61,13 +61,11 @@ func do(
6161
func retryBackoff(
6262
ctx context.Context,
6363
pool sessionPool,
64-
op table.Operation,
64+
op func(ctx context.Context, s *Session) error,
6565
opts ...retry.Option,
6666
) error {
67-
return pool.With(ctx, func(ctx context.Context, s *Session) error {
67+
return pool.With(ctx, func(ctx context.Context, s *Session) (err error) {
6868
if err := op(ctx, s); err != nil {
69-
s.checkError(err)
70-
7169
return xerrors.WithStackTrace(err)
7270
}
7371

internal/table/retry_test.go

Lines changed: 25 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ func TestDoBackoffRetryCancelation(t *testing.T) {
4545
go func() {
4646
err := do(ctx, p,
4747
config.New(),
48-
func(ctx context.Context, _ table.Session) error {
48+
func(ctx context.Context, _ *Session) error {
4949
return testErr
5050
},
5151
nil,
@@ -86,6 +86,13 @@ func TestDoBadSession(t *testing.T) {
8686
xtest.TestManyTimes(t, func(t testing.TB) {
8787
closed := make(map[table.Session]bool)
8888
p := pool.New[*Session, Session](ctx,
89+
pool.WithMustDeleteItemFunc[*Session, Session](func(session *Session, err error) bool {
90+
if !session.IsAlive() {
91+
return true
92+
}
93+
94+
return xerrors.MustDeleteTableOrQuerySession(err)
95+
}),
8996
pool.WithCreateItemFunc[*Session, Session](func(ctx context.Context) (*Session, error) {
9097
s := simpleSession(t)
9198
s.closeOnce = func(_ context.Context) error {
@@ -104,7 +111,7 @@ func TestDoBadSession(t *testing.T) {
104111
)
105112
ctx, cancel := xcontext.WithCancel(context.Background())
106113
err := do(ctx, p, config.New(),
107-
func(ctx context.Context, s table.Session) error {
114+
func(ctx context.Context, s *Session) error {
108115
sessions = append(sessions, s)
109116
i++
110117
if i > maxRetryes {
@@ -143,7 +150,7 @@ func TestDoCreateSessionError(t *testing.T) {
143150
}),
144151
)
145152
err := do(ctx, p, config.New(),
146-
func(ctx context.Context, s table.Session) error {
153+
func(ctx context.Context, s *Session) error {
147154
return nil
148155
},
149156
nil,
@@ -186,7 +193,7 @@ func TestDoImmediateReturn(t *testing.T) {
186193
context.Background(),
187194
p,
188195
config.New(),
189-
func(ctx context.Context, _ table.Session) error {
196+
func(ctx context.Context, _ *Session) error {
190197
return testErr
191198
},
192199
nil,
@@ -322,7 +329,7 @@ func TestDoContextDeadline(t *testing.T) {
322329
ctx,
323330
p,
324331
config.New(),
325-
func(ctx context.Context, _ table.Session) error {
332+
func(ctx context.Context, _ *Session) error {
326333
select {
327334
case <-ctx.Done():
328335
return ctx.Err()
@@ -357,6 +364,13 @@ func TestDoWithCustomErrors(t *testing.T) {
357364
pool.WithCreateItemFunc[*Session, Session](func(ctx context.Context) (*Session, error) {
358365
return simpleSession(t), nil
359366
}),
367+
pool.WithMustDeleteItemFunc[*Session, Session](func(session *Session, err error) bool {
368+
if !session.IsAlive() {
369+
return true
370+
}
371+
372+
return xerrors.MustDeleteTableOrQuerySession(err)
373+
}),
360374
pool.WithLimit[*Session, Session](limit),
361375
)
362376
)
@@ -369,11 +383,10 @@ func TestDoWithCustomErrors(t *testing.T) {
369383
error: &CustomError{
370384
Err: retry.RetryableError(
371385
fmt.Errorf("custom error"),
372-
retry.WithDeleteSession(),
373386
),
374387
},
375388
retriable: true,
376-
deleteSession: true,
389+
deleteSession: false,
377390
},
378391
{
379392
error: &CustomError{
@@ -388,8 +401,7 @@ func TestDoWithCustomErrors(t *testing.T) {
388401
},
389402
{
390403
error: &CustomError{
391-
Err: fmt.Errorf(
392-
"wrapped error: %w",
404+
Err: fmt.Errorf("wrapped error: %w",
393405
xerrors.Operation(
394406
xerrors.WithStatusCode(
395407
Ydb.StatusIds_BAD_SESSION,
@@ -402,8 +414,7 @@ func TestDoWithCustomErrors(t *testing.T) {
402414
},
403415
{
404416
error: &CustomError{
405-
Err: fmt.Errorf(
406-
"wrapped error: %w",
417+
Err: fmt.Errorf("wrapped error: %w",
407418
xerrors.Operation(
408419
xerrors.WithStatusCode(
409420
Ydb.StatusIds_UNAUTHORIZED,
@@ -420,11 +431,8 @@ func TestDoWithCustomErrors(t *testing.T) {
420431
i = 0
421432
sessions = make(map[table.Session]int)
422433
)
423-
err := do(
424-
ctx,
425-
p,
426-
config.New(),
427-
func(ctx context.Context, s table.Session) (err error) {
434+
err := do(ctx, p, config.New(),
435+
func(ctx context.Context, s *Session) (err error) {
428436
sessions[s]++
429437
i++
430438
if i < limit {
@@ -442,7 +450,7 @@ func TestDoWithCustomErrors(t *testing.T) {
442450
}
443451
if test.deleteSession {
444452
if len(sessions) != limit {
445-
t.Fatalf("unexpected len(sessions): %d, err: %v", len(sessions), err)
453+
t.Fatalf("unexpected len(sessions): %d, exp: %d, err: %v", len(sessions), limit, err)
446454
}
447455
for s, n := range sessions {
448456
if n != 1 {
@@ -489,9 +497,3 @@ func (s *singleSession) With(ctx context.Context,
489497
return f(ctx, s.s)
490498
}, opts...)
491499
}
492-
493-
var (
494-
errNoSession = xerrors.Wrap(fmt.Errorf("no session"))
495-
errUnexpectedSession = xerrors.Wrap(fmt.Errorf("unexpected session"))
496-
errSessionOverflow = xerrors.Wrap(fmt.Errorf("session overflow"))
497-
)

internal/table/session.go

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@ import (
3636
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xcontext"
3737
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors"
3838
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xsync"
39-
"github.com/ydb-platform/ydb-go-sdk/v3/retry"
4039
"github.com/ydb-platform/ydb-go-sdk/v3/table"
4140
"github.com/ydb-platform/ydb-go-sdk/v3/table/options"
4241
"github.com/ydb-platform/ydb-go-sdk/v3/table/result"
@@ -762,16 +761,6 @@ func (s *Session) DropTable(
762761
return xerrors.WithStackTrace(err)
763762
}
764763

765-
func (s *Session) checkError(err error) {
766-
if err == nil {
767-
return
768-
}
769-
m := retry.Check(err)
770-
if m.MustDeleteSession() {
771-
s.SetStatus(table.SessionClosing)
772-
}
773-
}
774-
775764
// AlterTable modifies schema of table at given path with given options.
776765
func (s *Session) AlterTable(
777766
ctx context.Context,

0 commit comments

Comments
 (0)