Skip to content

Commit c9efa08

Browse files
committed
* Dropped intermediate callbacks from trace.{Table,Retry,Query} events
* Wrapped errors from `internal/pool.Pool.getItem` as retryable
1 parent f131241 commit c9efa08

File tree

18 files changed

+304
-739
lines changed

18 files changed

+304
-739
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66
* Disabled the logic of background grpc-connection parking
77
* Removed `ydb.WithSessionPoolSizeLimit()` option
88
* Added async put session into pool if external context is done
9+
* Dropped intermediate callbacks from `trace.{Table,Retry,Query}` events
10+
* Wrapped errors from `internal/pool.Pool.getItem` as retryable
911

1012
## v3.58.2
1113
* Added `trace.Query.OnSessionBegin` event

internal/pool/errors.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,9 @@ package pool
22

33
import (
44
"errors"
5-
6-
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors"
75
)
86

97
var (
108
errClosedPool = errors.New("closed pool")
11-
errPoolOverflow = xerrors.Retryable(errors.New("pool overflow"))
129
errItemIsNotAlive = errors.New("item is not alive")
1310
)

internal/pool/pool.go

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ type (
3232
trace *Trace
3333
limit int
3434

35-
create func(ctx context.Context) (PT, error)
35+
createItem func(ctx context.Context) (PT, error)
3636
createTimeout time.Duration
3737
closeTimeout time.Duration
3838

@@ -97,7 +97,7 @@ func (s *safeStats) InUse() statsItemAddr {
9797

9898
func WithCreateFunc[PT Item[T], T any](f func(ctx context.Context) (PT, error)) option[PT, T] {
9999
return func(p *Pool[PT, T]) {
100-
p.create = f
100+
p.createItem = f
101101
}
102102
}
103103

@@ -132,7 +132,7 @@ func New[PT Item[T], T any](
132132
p := &Pool[PT, T]{
133133
trace: defaultTrace,
134134
limit: DefaultLimit,
135-
create: func(ctx context.Context) (PT, error) {
135+
createItem: func(ctx context.Context) (PT, error) {
136136
var item T
137137

138138
return &item, nil
@@ -157,9 +157,9 @@ func New[PT Item[T], T any](
157157
})
158158
}()
159159

160-
create := p.create
160+
createItem := p.createItem
161161

162-
p.create = func(ctx context.Context) (PT, error) {
162+
p.createItem = func(ctx context.Context) (PT, error) {
163163
var (
164164
ch = make(chan PT)
165165
createErr error
@@ -178,7 +178,7 @@ func New[PT Item[T], T any](
178178
}
179179
defer cancelCreate()
180180

181-
newItem, err := create(createCtx)
181+
newItem, err := createItem(createCtx)
182182
if err != nil {
183183
return xerrors.WithStackTrace(err)
184184
}
@@ -222,6 +222,10 @@ func New[PT Item[T], T any](
222222
return nil, xerrors.WithStackTrace(ctx.Err())
223223
case item, has := <-ch:
224224
if !has {
225+
if ctxErr := ctx.Err(); ctxErr == nil && xerrors.IsContextError(createErr) {
226+
return nil, xerrors.WithStackTrace(xerrors.Retryable(createErr))
227+
}
228+
225229
return nil, xerrors.WithStackTrace(createErr)
226230
}
227231

@@ -282,7 +286,7 @@ func (p *Pool[PT, T]) getItem(ctx context.Context) (_ PT, finalErr error) {
282286
p.stats.Index().Dec()
283287
}
284288

285-
item, err := p.create(ctx)
289+
item, err := p.createItem(ctx)
286290
if err != nil {
287291
return nil, xerrors.WithStackTrace(err)
288292
}
@@ -417,11 +421,9 @@ func (p *Pool[PT, T]) With(
417421

418422
return nil
419423
}, append(opts, retry.WithTrace(&trace.Retry{
420-
OnRetry: func(info trace.RetryLoopStartInfo) func(trace.RetryLoopIntermediateInfo) func(trace.RetryLoopDoneInfo) {
421-
return func(info trace.RetryLoopIntermediateInfo) func(trace.RetryLoopDoneInfo) {
422-
return func(info trace.RetryLoopDoneInfo) {
423-
attempts = info.Attempts
424-
}
424+
OnRetry: func(info trace.RetryLoopStartInfo) func(trace.RetryLoopDoneInfo) {
425+
return func(info trace.RetryLoopDoneInfo) {
426+
attempts = info.Attempts
425427
}
426428
},
427429
}))...)

internal/pool/pool_test.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,6 @@ func TestPool(t *testing.T) {
142142
})
143143
require.NoError(t, err)
144144
require.GreaterOrEqual(t, atomic.LoadInt64(&counter), int64(10))
145-
146145
})
147146
t.Run("OnOperationError", func(t *testing.T) {
148147
var counter int64

internal/query/client.go

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -80,17 +80,9 @@ func do(
8080

8181
return nil
8282
}, append(doOpts.RetryOpts(), retry.WithTrace(&trace.Retry{
83-
OnRetry: func(
84-
info trace.RetryLoopStartInfo,
85-
) func(
86-
trace.RetryLoopIntermediateInfo,
87-
) func(
88-
trace.RetryLoopDoneInfo,
89-
) {
90-
return func(info trace.RetryLoopIntermediateInfo) func(trace.RetryLoopDoneInfo) {
91-
return func(info trace.RetryLoopDoneInfo) {
92-
attempts = info.Attempts
93-
}
83+
OnRetry: func(info trace.RetryLoopStartInfo) func(trace.RetryLoopDoneInfo) {
84+
return func(info trace.RetryLoopDoneInfo) {
85+
attempts = info.Attempts
9486
}
9587
},
9688
}))...)

internal/table/client.go

Lines changed: 8 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -261,15 +261,11 @@ func (c *Client) CreateSession(ctx context.Context, opts ...table.Option) (_ tab
261261
[]retry.Option{
262262
retry.WithIdempotent(true),
263263
retry.WithTrace(&trace.Retry{
264-
OnRetry: func(info trace.RetryLoopStartInfo) func(trace.RetryLoopIntermediateInfo) func(trace.RetryLoopDoneInfo) {
265-
onIntermediate := trace.TableOnCreateSession(c.config.Trace(), info.Context, stack.FunctionID(""))
264+
OnRetry: func(info trace.RetryLoopStartInfo) func(trace.RetryLoopDoneInfo) {
265+
onDone := trace.TableOnCreateSession(c.config.Trace(), info.Context, stack.FunctionID(""))
266266

267-
return func(info trace.RetryLoopIntermediateInfo) func(trace.RetryLoopDoneInfo) {
268-
onDone := onIntermediate(info.Error)
269-
270-
return func(info trace.RetryLoopDoneInfo) {
271-
onDone(s, info.Attempts, info.Error)
272-
}
267+
return func(info trace.RetryLoopDoneInfo) {
268+
onDone(s, info.Attempts, info.Error)
273269
}
274270
},
275271
}),
@@ -643,17 +639,16 @@ func (c *Client) Do(ctx context.Context, op table.Operation, opts ...table.Optio
643639

644640
config := c.retryOptions(opts...)
645641

646-
attempts, onIntermediate := 0, trace.TableOnDo(config.Trace, &ctx,
642+
attempts, onDone := 0, trace.TableOnDo(config.Trace, &ctx,
647643
stack.FunctionID(""),
648644
config.Label, config.Idempotent, xcontext.IsNestedCall(ctx),
649645
)
650646
defer func() {
651-
onIntermediate(finalErr)(attempts, finalErr)
647+
onDone(attempts, finalErr)
652648
}()
653649

654650
err := do(ctx, c, c.config, op, func(err error) {
655651
attempts++
656-
onIntermediate(err)
657652
}, config.RetryOptions...)
658653
if err != nil {
659654
return xerrors.WithStackTrace(err)
@@ -673,22 +668,18 @@ func (c *Client) DoTx(ctx context.Context, op table.TxOperation, opts ...table.O
673668

674669
config := c.retryOptions(opts...)
675670

676-
attempts, onIntermediate := 0, trace.TableOnDoTx(config.Trace, &ctx,
671+
attempts, onDone := 0, trace.TableOnDoTx(config.Trace, &ctx,
677672
stack.FunctionID(""),
678673
config.Label, config.Idempotent, xcontext.IsNestedCall(ctx),
679674
)
680675
defer func() {
681-
onIntermediate(finalErr)(attempts, finalErr)
676+
onDone(attempts, finalErr)
682677
}()
683678

684679
return retryBackoff(ctx, c,
685680
func(ctx context.Context, s table.Session) (err error) {
686681
attempts++
687682

688-
defer func() {
689-
onIntermediate(err)
690-
}()
691-
692683
tx, err := s.BeginTransaction(ctx, config.TxSettings)
693684
if err != nil {
694685
return xerrors.WithStackTrace(err)

internal/table/session.go

Lines changed: 9 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -984,7 +984,7 @@ func (s *session) StreamReadTable(
984984
opts ...options.ReadTableOption,
985985
) (_ result.StreamResult, err error) {
986986
var (
987-
onIntermediate = trace.TableOnSessionQueryStreamRead(s.config.Trace(), &ctx,
987+
onDone = trace.TableOnSessionQueryStreamRead(s.config.Trace(), &ctx,
988988
stack.FunctionID(""),
989989
s,
990990
)
@@ -997,9 +997,7 @@ func (s *session) StreamReadTable(
997997
)
998998
defer func() {
999999
a.Free()
1000-
if err != nil {
1001-
onIntermediate(xerrors.HideEOF(err))(xerrors.HideEOF(err))
1002-
}
1000+
onDone(xerrors.HideEOF(err))
10031001
}()
10041002

10051003
for _, opt := range opts {
@@ -1023,9 +1021,6 @@ func (s *session) StreamReadTable(
10231021
stats *Ydb_TableStats.QueryStats,
10241022
err error,
10251023
) {
1026-
defer func() {
1027-
onIntermediate(xerrors.HideEOF(err))
1028-
}()
10291024
select {
10301025
case <-ctx.Done():
10311026
return nil, nil, xerrors.WithStackTrace(ctx.Err())
@@ -1042,7 +1037,7 @@ func (s *session) StreamReadTable(
10421037
},
10431038
func(err error) error {
10441039
cancel()
1045-
onIntermediate(xerrors.HideEOF(err))(xerrors.HideEOF(err))
1040+
onDone(xerrors.HideEOF(err))
10461041

10471042
return err
10481043
},
@@ -1105,9 +1100,9 @@ func (s *session) StreamExecuteScanQuery(
11051100
opts ...options.ExecuteScanQueryOption,
11061101
) (_ result.StreamResult, err error) {
11071102
var (
1108-
a = allocator.New()
1109-
q = queryFromText(query)
1110-
onIntermediate = trace.TableOnSessionQueryStreamExecute(
1103+
a = allocator.New()
1104+
q = queryFromText(query)
1105+
onDone = trace.TableOnSessionQueryStreamExecute(
11111106
s.config.Trace(), &ctx,
11121107
stack.FunctionID(""),
11131108
s, q, parameters,
@@ -1122,9 +1117,7 @@ func (s *session) StreamExecuteScanQuery(
11221117
)
11231118
defer func() {
11241119
a.Free()
1125-
if err != nil {
1126-
onIntermediate(xerrors.HideEOF(err))(xerrors.HideEOF(err))
1127-
}
1120+
onDone(xerrors.HideEOF(err))
11281121
}()
11291122

11301123
for _, opt := range opts {
@@ -1148,9 +1141,6 @@ func (s *session) StreamExecuteScanQuery(
11481141
stats *Ydb_TableStats.QueryStats,
11491142
err error,
11501143
) {
1151-
defer func() {
1152-
onIntermediate(xerrors.HideEOF(err))
1153-
}()
11541144
select {
11551145
case <-ctx.Done():
11561146
return nil, nil, xerrors.WithStackTrace(ctx.Err())
@@ -1167,7 +1157,7 @@ func (s *session) StreamExecuteScanQuery(
11671157
},
11681158
func(err error) error {
11691159
cancel()
1170-
onIntermediate(xerrors.HideEOF(err))(xerrors.HideEOF(err))
1160+
onDone(xerrors.HideEOF(err))
11711161

11721162
return err
11731163
},
@@ -1229,7 +1219,7 @@ func (s *session) BeginTransaction(
12291219
var (
12301220
result Ydb_Table.BeginTransactionResult
12311221
response *Ydb_Table.BeginTransactionResponse
1232-
onDone = trace.TableOnSessionTransactionBegin(
1222+
onDone = trace.TableOnTxBegin(
12331223
s.config.Trace(), &ctx,
12341224
stack.FunctionID(""),
12351225
s,

internal/table/transaction.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ func (tx *transaction) Execute(
6161
query string, parameters *params.Parameters,
6262
opts ...options.ExecuteDataQueryOption,
6363
) (r result.Result, err error) {
64-
onDone := trace.TableOnSessionTransactionExecute(
64+
onDone := trace.TableOnTxExecute(
6565
tx.s.config.Trace(), &ctx,
6666
stack.FunctionID(""),
6767
tx.s, tx, queryFromText(query), parameters,
@@ -98,7 +98,7 @@ func (tx *transaction) ExecuteStatement(
9898
a := allocator.New()
9999
defer a.Free()
100100

101-
onDone := trace.TableOnSessionTransactionExecuteStatement(
101+
onDone := trace.TableOnTxExecuteStatement(
102102
tx.s.config.Trace(), &ctx,
103103
stack.FunctionID(""),
104104
tx.s, tx, stmt.(*statement).query, parameters,
@@ -131,7 +131,7 @@ func (tx *transaction) CommitTx(
131131
ctx context.Context,
132132
opts ...options.CommitTransactionOption,
133133
) (r result.Result, err error) {
134-
onDone := trace.TableOnSessionTransactionCommit(
134+
onDone := trace.TableOnTxCommit(
135135
tx.s.config.Trace(), &ctx,
136136
stack.FunctionID(""),
137137
tx.s, tx,
@@ -189,7 +189,7 @@ func (tx *transaction) CommitTx(
189189

190190
// Rollback performs a rollback of the specified active transaction.
191191
func (tx *transaction) Rollback(ctx context.Context) (err error) {
192-
onDone := trace.TableOnSessionTransactionRollback(
192+
onDone := trace.TableOnTxRollback(
193193
tx.s.config.Trace(), &ctx,
194194
stack.FunctionID(""),
195195
tx.s, tx,

0 commit comments

Comments
 (0)