Skip to content

Commit a48e0ce

Browse files
authored
Merge pull request #192 from ydb-platform/recover-panic-from-retry-operation
* Added `WithPanicCallback` option to all service configs (discovery,…
2 parents a258f03 + 136f59a commit a48e0ce

File tree

13 files changed

+202
-32
lines changed

13 files changed

+202
-32
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
* Added `WithPanicCallback` option to all service configs (discovery, coordination, ratelimiter, scheme, scripting, table) and auto-applying from `ydb.WithPanicCallback`
2+
* Added panic recovering (if defined `ydb.WithPanicCallback` option) which thrown from retry operation
3+
14
## v3.18.2
25
* Refactored balancers (makes concurrent-safe)
36
* Excluded separate balancers lock from cluster

coordination/config/config.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,23 @@ type Config interface {
2323

2424
// Trace defines trace over coordination client calls
2525
Trace() trace.Coordination
26+
27+
// PanicCallback returns user-defined panic callback
28+
// If nil - panic callback not defined
29+
PanicCallback() func(e interface{})
2630
}
2731

2832
type config struct {
2933
trace trace.Coordination
3034

3135
operationTimeout time.Duration
3236
operationCancelAfter time.Duration
37+
38+
panicCallback func(e interface{})
39+
}
40+
41+
func (c *config) PanicCallback() func(e interface{}) {
42+
return c.panicCallback
3343
}
3444

3545
func (c *config) Trace() trace.Coordination {
@@ -64,6 +74,12 @@ func WithOperationCancelAfter(operationCancelAfter time.Duration) Option {
6474
}
6575
}
6676

77+
func WithPanicCallback(cb func(e interface{})) Option {
78+
return func(c *config) {
79+
c.panicCallback = cb
80+
}
81+
}
82+
6783
func New(opts ...Option) Config {
6884
c := &config{}
6985
for _, o := range opts {

discovery/config/config.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,10 @@ type Config interface {
4545

4646
// Meta is an option which contains meta information about database connection
4747
Meta() meta.Meta
48+
49+
// PanicCallback returns user-defined panic callback
50+
// If nil - panic callback not defined
51+
PanicCallback() func(e interface{})
4852
}
4953

5054
type config struct {
@@ -58,6 +62,12 @@ type config struct {
5862

5963
interval time.Duration
6064
trace trace.Discovery
65+
66+
panicCallback func(e interface{})
67+
}
68+
69+
func (c *config) PanicCallback() func(e interface{}) {
70+
return c.panicCallback
6171
}
6272

6373
func (c *config) Meta() meta.Meta {
@@ -146,6 +156,12 @@ func WithInterval(interval time.Duration) Option {
146156
}
147157
}
148158

159+
func WithPanicCallback(cb func(e interface{})) Option {
160+
return func(c *config) {
161+
c.panicCallback = cb
162+
}
163+
}
164+
149165
func New(opts ...Option) Config {
150166
c := &config{
151167
interval: DefaultInterval,

internal/table/client.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -545,6 +545,7 @@ func (c *client) Do(ctx context.Context, op table.Operation, opts ...table.Optio
545545
return do(
546546
ctx,
547547
c,
548+
c.config,
548549
op,
549550
retryOptions(c.config.Trace(), opts...),
550551
)
@@ -557,6 +558,7 @@ func (c *client) DoTx(ctx context.Context, op table.TxOperation, opts ...table.O
557558
return doTx(
558559
ctx,
559560
c,
561+
c.config,
560562
op,
561563
retryOptions(c.config.Trace(), opts...),
562564
)

internal/table/retry.go

Lines changed: 35 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"github.com/ydb-platform/ydb-go-sdk/v3/internal/errors"
88
"github.com/ydb-platform/ydb-go-sdk/v3/retry"
99
"github.com/ydb-platform/ydb-go-sdk/v3/table"
10+
"github.com/ydb-platform/ydb-go-sdk/v3/table/config"
1011
"github.com/ydb-platform/ydb-go-sdk/v3/testutil"
1112
"github.com/ydb-platform/ydb-go-sdk/v3/trace"
1213
)
@@ -27,7 +28,13 @@ type SessionProvider interface {
2728
CloseSession(ctx context.Context, s Session) error
2829
}
2930

30-
func doTx(ctx context.Context, c SessionProvider, op table.TxOperation, opts table.Options) (err error) {
31+
func doTx(
32+
ctx context.Context,
33+
c SessionProvider,
34+
config config.Config,
35+
op table.TxOperation,
36+
opts table.Options,
37+
) (err error) {
3138
attempts, onIntermediate := 0, trace.TableOnDoTx(
3239
opts.Trace,
3340
&ctx,
@@ -59,7 +66,16 @@ func doTx(ctx context.Context, c SessionProvider, op table.TxOperation, opts tab
5966
}
6067
}()
6168

62-
err = op(ctx, tx)
69+
err = func() error {
70+
if panicCallback := config.PanicCallback(); panicCallback != nil {
71+
defer func() {
72+
if e := recover(); e != nil {
73+
panicCallback(e)
74+
}
75+
}()
76+
}
77+
return op(ctx, tx)
78+
}()
6379

6480
if err != nil {
6581
return errors.WithStackTrace(err)
@@ -75,7 +91,13 @@ func doTx(ctx context.Context, c SessionProvider, op table.TxOperation, opts tab
7591
)
7692
}
7793

78-
func do(ctx context.Context, c SessionProvider, op table.Operation, opts table.Options) (err error) {
94+
func do(
95+
ctx context.Context,
96+
c SessionProvider,
97+
config config.Config,
98+
op table.Operation,
99+
opts table.Options,
100+
) (err error) {
79101
attempts, onIntermediate := 0, trace.TableOnDo(
80102
opts.Trace,
81103
&ctx,
@@ -96,7 +118,16 @@ func do(ctx context.Context, c SessionProvider, op table.Operation, opts table.O
96118
attempts++
97119
}()
98120

99-
err = op(ctx, s)
121+
err = func() error {
122+
if panicCallback := config.PanicCallback(); panicCallback != nil {
123+
defer func() {
124+
if e := recover(); e != nil {
125+
panicCallback(e)
126+
}
127+
}()
128+
}
129+
return op(ctx, s)
130+
}()
100131

101132
if err != nil {
102133
return errors.WithStackTrace(err)

internal/table/retry_test.go

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
"github.com/ydb-platform/ydb-go-sdk/v3/internal/rand"
1616
"github.com/ydb-platform/ydb-go-sdk/v3/retry"
1717
"github.com/ydb-platform/ydb-go-sdk/v3/table"
18+
"github.com/ydb-platform/ydb-go-sdk/v3/table/config"
1819
"github.com/ydb-platform/ydb-go-sdk/v3/table/options"
1920
"github.com/ydb-platform/ydb-go-sdk/v3/testutil"
2021
)
@@ -43,6 +44,7 @@ func TestRetryerBackoffRetryCancelation(t *testing.T) {
4344
err := do(
4445
ctx,
4546
p,
47+
config.New(),
4648
func(ctx context.Context, _ table.Session) error {
4749
return testErr
4850
},
@@ -93,6 +95,7 @@ func TestRetryerBadSession(t *testing.T) {
9395
err := do(
9496
ctx,
9597
p,
98+
config.New(),
9699
func(ctx context.Context, s table.Session) error {
97100
sessions = append(sessions, s)
98101
i++
@@ -137,6 +140,7 @@ func TestRetryerSessionClosing(t *testing.T) {
137140
err := do(
138141
context.Background(),
139142
p,
143+
config.New(),
140144
func(ctx context.Context, s table.Session) error {
141145
sessions = append(sessions, s)
142146
s.(*session).SetStatus(options.SessionClosing)
@@ -189,6 +193,7 @@ func TestRetryerImmediateReturn(t *testing.T) {
189193
err := do(
190194
context.Background(),
191195
p,
196+
config.New(),
192197
func(ctx context.Context, _ table.Session) error {
193198
return testErr
194199
},
@@ -319,6 +324,7 @@ func TestRetryContextDeadline(t *testing.T) {
319324
_ = do(
320325
ctx,
321326
p,
327+
config.New(),
322328
func(ctx context.Context, _ table.Session) error {
323329
select {
324330
case <-ctx.Done():
@@ -416,14 +422,20 @@ func TestRetryWithCustomErrors(t *testing.T) {
416422
i = 0
417423
sessions = make(map[table.Session]int)
418424
)
419-
err := do(ctx, p, func(ctx context.Context, s table.Session) (err error) {
420-
sessions[s]++
421-
i++
422-
if i < limit {
423-
return test.error
424-
}
425-
return nil
426-
}, table.Options{})
425+
err := do(
426+
ctx,
427+
p,
428+
config.New(),
429+
func(ctx context.Context, s table.Session) (err error) {
430+
sessions[s]++
431+
i++
432+
if i < limit {
433+
return test.error
434+
}
435+
return nil
436+
},
437+
table.Options{},
438+
)
427439
// nolint:nestif
428440
if test.retriable {
429441
if i != limit {

options.go

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -355,9 +355,16 @@ func WithSessionPoolDeleteTimeout(deleteTimeout time.Duration) Option {
355355
// WithPanicCallback specified behavior on panic
356356
// Warning: WithPanicCallback must be defined on start of all options
357357
// (before `WithTrace{Driver,Table,Scheme,Scripting,Coordination,Ratelimiter}` and other options)
358-
func WithPanicCallback(cb func(e interface{})) Option {
359-
return func(ctx context.Context, c *connection) error {
360-
c.panicCallback = cb
358+
// If not defined - panic would not intercept with driver
359+
func WithPanicCallback(panicCallback func(e interface{})) Option {
360+
return func(ctx context.Context, c *connection) error {
361+
c.panicCallback = panicCallback
362+
c.tableOptions = append(c.tableOptions, tableConfig.WithPanicCallback(panicCallback))
363+
c.coordinationOptions = append(c.coordinationOptions, coordinationConfig.WithPanicCallback(panicCallback))
364+
c.schemeOptions = append(c.schemeOptions, schemeConfig.WithPanicCallback(panicCallback))
365+
c.ratelimiterOptions = append(c.ratelimiterOptions, ratelimiterConfig.WithPanicCallback(panicCallback))
366+
c.discoveryOptions = append(c.discoveryOptions, discoveryConfig.WithPanicCallback(panicCallback))
367+
c.tableOptions = append(c.tableOptions, tableConfig.WithPanicCallback(panicCallback))
361368
return nil
362369
}
363370
}

ratelimiter/config/config.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,23 @@ type Config interface {
2323

2424
// Trace defines trace over ratelimiter calls
2525
Trace() trace.Ratelimiter
26+
27+
// PanicCallback returns user-defined panic callback
28+
// If nil - panic callback not defined
29+
PanicCallback() func(e interface{})
2630
}
2731

2832
type config struct {
2933
trace trace.Ratelimiter
3034

3135
operationTimeout time.Duration
3236
operationCancelAfter time.Duration
37+
38+
panicCallback func(e interface{})
39+
}
40+
41+
func (c *config) PanicCallback() func(e interface{}) {
42+
return c.panicCallback
3343
}
3444

3545
func (c *config) Trace() trace.Ratelimiter {
@@ -64,6 +74,12 @@ func WithOperationCancelAfter(operationCancelAfter time.Duration) Option {
6474
}
6575
}
6676

77+
func WithPanicCallback(cb func(e interface{})) Option {
78+
return func(c *config) {
79+
c.panicCallback = cb
80+
}
81+
}
82+
6783
func New(opts ...Option) Config {
6884
c := &config{}
6985
for _, o := range opts {

0 commit comments

Comments
 (0)