Skip to content

Commit e1c6de0

Browse files
authored
Merge pull request #1257 from ydb-platform/remove-attempts
refactoring of internal query retryers
2 parents e94c8c8 + a1b1046 commit e1c6de0

File tree

4 files changed

+59
-62
lines changed

4 files changed

+59
-62
lines changed

internal/query/client.go

Lines changed: 40 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -84,54 +84,39 @@ func do(
8484
return nil
8585
}
8686

87-
func doWithAttempts(
88-
ctx context.Context,
89-
pool *pool.Pool[*Session, Session],
90-
op query.Operation,
91-
t *trace.Query,
92-
opts ...options.DoOption,
93-
) (attempts int, finalErr error) {
94-
doOpts := options.ParseDoOpts(t, opts...)
95-
96-
err := do(ctx, pool, op,
97-
append(doOpts.RetryOpts(), retry.WithTrace(&trace.Retry{
98-
OnRetry: func(info trace.RetryLoopStartInfo) func(trace.RetryLoopDoneInfo) {
99-
return func(info trace.RetryLoopDoneInfo) {
100-
attempts = info.Attempts
101-
}
102-
},
103-
}))...,
104-
)
105-
if err != nil {
106-
return attempts, xerrors.WithStackTrace(err)
107-
}
108-
109-
return attempts, nil
110-
}
111-
112-
func (c *Client) Do(ctx context.Context, op query.Operation, opts ...options.DoOption) error {
87+
func (c *Client) Do(ctx context.Context, op query.Operation, opts ...options.DoOption) (finalErr error) {
11388
ctx, cancel := xcontext.WithDone(ctx, c.done)
11489
defer cancel()
11590

116-
onDone := trace.QueryOnDo(c.config.Trace(), &ctx,
117-
stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/query.(*Client).Do"),
91+
var (
92+
onDone = trace.QueryOnDo(c.config.Trace(), &ctx,
93+
stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/query.(*Client).Do"),
94+
)
95+
attempts = 0
11896
)
119-
attempts, err := doWithAttempts(ctx, c.pool, op, c.config.Trace(), opts...)
120-
onDone(attempts, err)
97+
defer func() {
98+
onDone(attempts, finalErr)
99+
}()
100+
101+
err := do(ctx, c.pool, func(ctx context.Context, s query.Session) error {
102+
attempts++
103+
104+
return op(ctx, s)
105+
}, options.ParseDoOpts(c.config.Trace(), opts...).RetryOpts()...)
121106

122107
return err
123108
}
124109

125-
func doTxWithAttempts(
110+
func doTx(
126111
ctx context.Context,
127112
pool *pool.Pool[*Session, Session],
128113
op query.TxOperation,
129114
t *trace.Query,
130115
opts ...options.DoTxOption,
131-
) (attempts int, err error) {
116+
) (finalErr error) {
132117
doTxOpts := options.ParseDoTxOpts(t, opts...)
133118

134-
attempts, err = doWithAttempts(ctx, pool, func(ctx context.Context, s query.Session) (err error) {
119+
err := do(ctx, pool, func(ctx context.Context, s query.Session) (err error) {
135120
tx, err := s.Begin(ctx, doTxOpts.TxSettings())
136121
if err != nil {
137122
return xerrors.WithStackTrace(err)
@@ -156,12 +141,12 @@ func doTxWithAttempts(
156141
}
157142

158143
return nil
159-
}, t, doTxOpts.DoOpts()...)
144+
}, doTxOpts.RetryOpts()...)
160145
if err != nil {
161-
return attempts, xerrors.WithStackTrace(err)
146+
return xerrors.WithStackTrace(err)
162147
}
163148

164-
return attempts, nil
149+
return nil
165150
}
166151

167152
// ReadRow is a helper which read only one row from first result set in result
@@ -273,17 +258,30 @@ func (c *Client) ReadResultSet(
273258
return rs, nil
274259
}
275260

276-
func (c *Client) DoTx(ctx context.Context, op query.TxOperation, opts ...options.DoTxOption) (err error) {
261+
func (c *Client) DoTx(ctx context.Context, op query.TxOperation, opts ...options.DoTxOption) (finalErr error) {
277262
ctx, cancel := xcontext.WithDone(ctx, c.done)
278263
defer cancel()
279264

280-
onDone := trace.QueryOnDoTx(c.config.Trace(), &ctx,
281-
stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/query.(*Client).DoTx"),
265+
var (
266+
onDone = trace.QueryOnDoTx(c.config.Trace(), &ctx,
267+
stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/query.(*Client).DoTx"),
268+
)
269+
attempts = 0
282270
)
283-
attempts, err := doTxWithAttempts(ctx, c.pool, op, c.config.Trace(), opts...)
284-
onDone(attempts, err)
271+
defer func() {
272+
onDone(attempts, finalErr)
273+
}()
285274

286-
return err
275+
err := doTx(ctx, c.pool, func(ctx context.Context, tx query.TxActor) error {
276+
attempts++
277+
278+
return op(ctx, tx)
279+
}, c.config.Trace(), opts...)
280+
if err != nil {
281+
return xerrors.WithStackTrace(err)
282+
}
283+
284+
return nil
287285
}
288286

289287
func New(ctx context.Context, balancer balancer, cfg *config.Config) *Client {

internal/query/client_test.go

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -151,17 +151,20 @@ func TestClient(t *testing.T) {
151151
})
152152
t.Run("Do", func(t *testing.T) {
153153
t.Run("HappyWay", func(t *testing.T) {
154-
attempts, err := doWithAttempts(ctx, testPool(ctx, func(ctx context.Context) (*Session, error) {
154+
var visited bool
155+
err := do(ctx, testPool(ctx, func(ctx context.Context) (*Session, error) {
155156
return newTestSession("123"), nil
156157
}), func(ctx context.Context, s query.Session) error {
158+
visited = true
159+
157160
return nil
158-
}, &trace.Query{})
161+
})
159162
require.NoError(t, err)
160-
require.EqualValues(t, 1, attempts)
163+
require.True(t, visited)
161164
})
162165
t.Run("RetryableError", func(t *testing.T) {
163166
counter := 0
164-
attempts, err := doWithAttempts(ctx, testPool(ctx, func(ctx context.Context) (*Session, error) {
167+
err := do(ctx, testPool(ctx, func(ctx context.Context) (*Session, error) {
165168
return newTestSession("123"), nil
166169
}), func(ctx context.Context, s query.Session) error {
167170
counter++
@@ -170,9 +173,8 @@ func TestClient(t *testing.T) {
170173
}
171174

172175
return nil
173-
}, &trace.Query{})
176+
})
174177
require.NoError(t, err)
175-
require.EqualValues(t, 10, attempts)
176178
require.Equal(t, 10, counter)
177179
})
178180
})
@@ -186,13 +188,12 @@ func TestClient(t *testing.T) {
186188
client.EXPECT().CommitTransaction(gomock.Any(), gomock.Any()).Return(&Ydb_Query.CommitTransactionResponse{
187189
Status: Ydb.StatusIds_SUCCESS,
188190
}, nil)
189-
attempts, err := doTxWithAttempts(ctx, testPool(ctx, func(ctx context.Context) (*Session, error) {
191+
err := doTx(ctx, testPool(ctx, func(ctx context.Context) (*Session, error) {
190192
return newTestSessionWithClient("123", client), nil
191193
}), func(ctx context.Context, tx query.TxActor) error {
192194
return nil
193195
}, &trace.Query{})
194196
require.NoError(t, err)
195-
require.EqualValues(t, 1, attempts)
196197
})
197198
t.Run("RetryableError", func(t *testing.T) {
198199
counter := 0
@@ -207,7 +208,7 @@ func TestClient(t *testing.T) {
207208
client.EXPECT().CommitTransaction(gomock.Any(), gomock.Any()).Return(&Ydb_Query.CommitTransactionResponse{
208209
Status: Ydb.StatusIds_SUCCESS,
209210
}, nil).AnyTimes()
210-
attempts, err := doTxWithAttempts(ctx, testPool(ctx, func(ctx context.Context) (*Session, error) {
211+
err := doTx(ctx, testPool(ctx, func(ctx context.Context) (*Session, error) {
211212
return newTestSessionWithClient("123", client), nil
212213
}), func(ctx context.Context, tx query.TxActor) error {
213214
counter++
@@ -218,7 +219,6 @@ func TestClient(t *testing.T) {
218219
return nil
219220
}, &trace.Query{})
220221
require.NoError(t, err)
221-
require.EqualValues(t, 10, attempts)
222222
require.Equal(t, 10, counter)
223223
})
224224
})

internal/query/options/retry.go

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ type (
3131
}
3232

3333
doTxSettings struct {
34-
doOpts []DoOption
34+
doSettings
3535
txSettings tx.Settings
3636
}
3737

@@ -52,10 +52,6 @@ func (s *doSettings) RetryOpts() []retry.Option {
5252
return s.retryOpts
5353
}
5454

55-
func (s *doTxSettings) DoOpts() []DoOption {
56-
return s.doOpts
57-
}
58-
5955
func (s *doTxSettings) TxSettings() tx.Settings {
6056
return s.txSettings
6157
}
@@ -65,15 +61,15 @@ func (opt traceOption) applyDoOption(s *doSettings) {
6561
}
6662

6763
func (opt traceOption) applyDoTxOption(s *doTxSettings) {
68-
s.doOpts = append(s.doOpts, opt)
64+
opt.applyDoOption(&s.doSettings)
6965
}
7066

7167
func (opts retryOptionsOption) applyDoOption(s *doSettings) {
7268
s.retryOpts = append(s.retryOpts, opts...)
7369
}
7470

7571
func (opts retryOptionsOption) applyDoTxOption(s *doTxSettings) {
76-
s.doOpts = append(s.doOpts, opts)
72+
opts.applyDoOption(&s.doSettings)
7773
}
7874

7975
func (opt doTxSettingsOption) applyDoTxOption(opts *doTxSettings) {
@@ -117,8 +113,8 @@ func ParseDoOpts(t *trace.Query, opts ...DoOption) (s *doSettings) {
117113
func ParseDoTxOpts(t *trace.Query, opts ...DoTxOption) (s *doTxSettings) {
118114
s = &doTxSettings{
119115
txSettings: tx.NewSettings(tx.WithDefaultTxMode()),
120-
doOpts: []DoOption{
121-
WithTrace(t),
116+
doSettings: doSettings{
117+
trace: t,
122118
},
123119
}
124120

internal/query/result_set.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,10 @@ import (
1414
"github.com/ydb-platform/ydb-go-sdk/v3/trace"
1515
)
1616

17-
var _ query.ResultSet = (*resultSet)(nil)
17+
var (
18+
_ query.ResultSet = (*resultSet)(nil)
19+
_ query.ResultSet = (*materializedResultSet)(nil)
20+
)
1821

1922
type (
2023
materializedResultSet struct {

0 commit comments

Comments
 (0)