Skip to content

Commit 516f319

Browse files
authored
Merge pull request #344 from ydb-platform/retry-do-tx
* Improved the `xsql` errors mapping to `driver.ErrBadConn`
2 parents 213232b + 673f30a commit 516f319

36 files changed

+2917
-897
lines changed

CHANGELOG.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,9 @@
1+
* Improved the `xsql` errors mapping to `driver.ErrBadConn`
2+
* Extended `retry.DoTx` test for to achieve equivalence with `retry.Retry` behaviour
3+
* Added `database/sql` events for tracing `database/sql` driver events
4+
* Added internal logging for `database/sql` events
5+
* Supports `YDB_LOG_DETAILS` environment variable for specify scope of log messages
6+
17
## v3.33.0
28
* Added `retry.DoTx` helper for retrying `database/sql` transactions
39
* Implemented `database/sql` driver over `ydb-go-sdk`

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,7 @@ Next packages provide debug tooling:
126126
|----------------------------------|-----------|---------|--------------------------------------------------------------------------------------------------------------------------|
127127
| `YDB_SSL_ROOT_CERTIFICATES_FILE` | `string` | | path to certificates file |
128128
| `YDB_LOG_SEVERITY_LEVEL` | `string` | `quiet` | severity logging level of internal driver logger. Supported: `trace`, `debug`, `info`, `warn`, `error`, `fatal`, `quiet` |
129+
| `YDB_LOG_DETAILS` | `string` | `.*` | pattern for lookup internal logger logs |
129130
| `YDB_LOG_NO_COLOR` | `bool` | `false` | set any non empty value to disable colouring logs with internal driver logger |
130131
| `GRPC_GO_LOG_VERBOSITY_LEVEL` | `integer` | | set to `99` to see grpc logs |
131132
| `GRPC_GO_LOG_SEVERITY_LEVEL` | `string` | | set to `info` to see grpc logs |

connection.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626
tableConfig "github.com/ydb-platform/ydb-go-sdk/v3/internal/table/config"
2727
"github.com/ydb-platform/ydb-go-sdk/v3/internal/topic/topicclientinternal"
2828
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors"
29+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xsql"
2930
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xsync"
3031
"github.com/ydb-platform/ydb-go-sdk/v3/log"
3132
"github.com/ydb-platform/ydb-go-sdk/v3/ratelimiter"
@@ -113,6 +114,8 @@ type connection struct {
113114
topic *topicclientinternal.Client
114115
topicOptions []topicoptions.TopicOption
115116

117+
databaseSQLOptions []xsql.ConnectorOption
118+
116119
pool *conn.Pool
117120

118121
mtx sync.Mutex
@@ -367,7 +370,10 @@ func open(ctx context.Context, opts ...Option) (_ Connection, err error) {
367370
opts = append(
368371
opts,
369372
WithLogger(
370-
trace.DetailsAll,
373+
trace.MatchDetails(
374+
os.Getenv("YDB_LOG_DETAILS"),
375+
trace.WithDefaultDetails(trace.DetailsAll),
376+
),
371377
WithNamespace("ydb"),
372378
WithMinLevel(log.FromString(logLevel)),
373379
WithNoColor(os.Getenv("YDB_LOG_NO_COLOR") != ""),

internal/retry/check.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,3 +25,26 @@ func Check(err error) (
2525
backoff.TypeNoBackoff,
2626
false
2727
}
28+
29+
func MustDeleteSession(err error) bool {
30+
var e xerrors.Error
31+
if xerrors.As(err, &e) {
32+
return e.MustDeleteSession()
33+
}
34+
return false
35+
}
36+
37+
func MustRetry(err error, isOperationIdempotent bool) bool {
38+
var e xerrors.Error
39+
if xerrors.As(err, &e) {
40+
switch e.OperationStatus() {
41+
case operation.Finished:
42+
return false
43+
case operation.Undefined:
44+
return isOperationIdempotent
45+
default:
46+
return true
47+
}
48+
}
49+
return false
50+
}

internal/retry/context.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package retry
2+
3+
import "context"
4+
5+
type ctxIdempotentKey struct{}
6+
7+
func WithIdempotent(ctx context.Context, idempotent bool) context.Context {
8+
return context.WithValue(ctx,
9+
ctxIdempotentKey{},
10+
idempotent,
11+
)
12+
}
13+
14+
func IsIdempotent(ctx context.Context) bool {
15+
if idempotent, ok := ctx.Value(ctxIdempotentKey{}).(bool); ok {
16+
return idempotent
17+
}
18+
return false
19+
}

internal/table/client.go

Lines changed: 37 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -81,9 +81,9 @@ type Client struct {
8181
index map[*session]sessionInfo
8282
createInProgress int // KIKIMR-9163: in-create-process counter
8383
limit int // Upper bound for Client size.
84-
idle *list.List // list<table.session>
85-
waitq *list.List // list<*chan table.session>
86-
keeperWake chan struct{} // Set by internalPoolKeeper.
84+
idle *list.List // list<*session>
85+
waitq *list.List // list<*chan *session>
86+
keeperWake chan struct{} // Set by keeper.
8787
keeperStop chan struct{}
8888
keeperDone chan struct{}
8989
touchingDone chan struct{}
@@ -199,32 +199,9 @@ func (c *Client) isClosed() bool {
199199

200200
// c.mu must NOT be held.
201201
func (c *Client) internalPoolCreateSession(ctx context.Context) (s *session, err error) {
202-
defer func() {
203-
if s != nil {
204-
s.onClose = append(s.onClose, func(s *session) {
205-
c.spawnedGoroutines.Start("onClose", func(ctx context.Context) {
206-
c.mu.WithLock(func() {
207-
info, has := c.index[s]
208-
if !has {
209-
panic("session removed from pool early")
210-
}
211-
212-
delete(c.index, s)
213-
214-
trace.TableOnPoolSessionRemove(c.config.Trace(), s)
215-
trace.TableOnPoolStateChange(c.config.Trace(), len(c.index), "remove")
216-
217-
c.internalPoolNotify(nil)
218-
219-
if info.idle != nil {
220-
c.idle.Remove(info.idle)
221-
}
222-
})
223-
})
224-
})
225-
}
226-
}()
227-
202+
if c.isClosed() {
203+
return nil, errClosedClient
204+
}
228205
// pre-check the Client size
229206
var enoughSpace bool
230207
c.mu.WithLock(func() {
@@ -272,8 +249,29 @@ func (c *Client) internalPoolCreateSession(ctx context.Context) (s *session, err
272249
c.createInProgress--
273250
if s != nil {
274251
c.index[s] = sessionInfo{}
252+
275253
trace.TableOnPoolSessionAdd(c.config.Trace(), s)
276254
trace.TableOnPoolStateChange(c.config.Trace(), len(c.index), "append")
255+
256+
s.onClose = append(s.onClose, func(s *session) {
257+
c.mu.WithLock(func() {
258+
info, has := c.index[s]
259+
if !has {
260+
return
261+
}
262+
263+
delete(c.index, s)
264+
265+
trace.TableOnPoolSessionRemove(c.config.Trace(), s)
266+
trace.TableOnPoolStateChange(c.config.Trace(), len(c.index), "remove")
267+
268+
c.internalPoolNotify(nil)
269+
270+
if info.idle != nil {
271+
c.idle.Remove(info.idle)
272+
}
273+
})
274+
})
277275
}
278276
})
279277

@@ -507,6 +505,7 @@ func (c *Client) Close(ctx context.Context) (err error) {
507505
var issues []error
508506
if atomic.CompareAndSwapUint32(&c.closed, 0, 1) {
509507
close(c.done)
508+
var toClose []*session
510509
c.mu.WithLock(func() {
511510
keeperDone := c.keeperDone
512511
if ch := c.keeperStop; ch != nil {
@@ -519,13 +518,17 @@ func (c *Client) Close(ctx context.Context) (err error) {
519518

520519
c.limit = 0
521520

522-
issues = make([]error, 0, len(c.index))
523-
for e := c.idle.Front(); e != nil; e = e.Next() {
524-
if err = c.internalPoolCloseSession(ctx, e.Value.(*session)); err != nil {
525-
issues = append(issues, err)
526-
}
521+
toClose = make([]*session, 0, len(c.index))
522+
for s := range c.index {
523+
toClose = append(toClose, s)
527524
}
528525
})
526+
issues = make([]error, 0, len(toClose))
527+
for _, s := range toClose {
528+
if err = c.internalPoolCloseSession(ctx, s); err != nil {
529+
issues = append(issues, err)
530+
}
531+
}
529532
}
530533

531534
_ = c.spawnedGoroutines.Close(ctx, errClosedClient)

internal/table/client_test.go

Lines changed: 47 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -243,85 +243,54 @@ func TestSessionPoolCloseWhenWaiting(t *testing.T) {
243243
}
244244

245245
func TestSessionPoolClose(t *testing.T) {
246-
wg := sync.WaitGroup{}
247-
p := newClientWithStubBuilder(
248-
t,
249-
testutil.NewBalancer(testutil.WithInvokeHandlers(testutil.InvokeHandlers{
250-
testutil.TableCreateSession: func(interface{}) (proto.Message, error) {
251-
return &Ydb_Table.CreateSessionResult{
252-
SessionId: testutil.SessionID(),
253-
}, nil
254-
},
255-
testutil.TableDeleteSession: func(interface{}) (proto.Message, error) {
256-
return &Ydb_Table.DeleteSessionResponse{}, nil
257-
},
258-
})),
259-
3,
260-
config.WithSizeLimit(3),
261-
config.WithIdleThreshold(time.Hour),
262-
config.WithTrace(
263-
trace.Table{
264-
OnPoolPut: func(info trace.TablePoolPutStartInfo) func(trace.TablePoolPutDoneInfo) {
265-
wg.Add(1)
266-
return func(info trace.TablePoolPutDoneInfo) {
267-
wg.Done()
268-
}
246+
xtest.TestManyTimes(t, func(t testing.TB) {
247+
p := newClientWithStubBuilder(
248+
t,
249+
testutil.NewBalancer(testutil.WithInvokeHandlers(testutil.InvokeHandlers{
250+
testutil.TableCreateSession: func(interface{}) (proto.Message, error) {
251+
return &Ydb_Table.CreateSessionResult{
252+
SessionId: testutil.SessionID(),
253+
}, nil
269254
},
270-
OnPoolSessionClose: func(
271-
info trace.TablePoolSessionCloseStartInfo,
272-
) func(
273-
doneInfo trace.TablePoolSessionCloseDoneInfo,
274-
) {
275-
wg.Add(1)
276-
return func(info trace.TablePoolSessionCloseDoneInfo) {
277-
wg.Done()
278-
}
255+
testutil.TableDeleteSession: func(interface{}) (proto.Message, error) {
256+
return &Ydb_Table.DeleteSessionResponse{}, nil
279257
},
280-
},
281-
),
282-
)
283-
defer func() {
284-
_ = p.Close(context.Background())
285-
}()
286-
287-
var (
288-
s1 = mustGetSession(t, p)
289-
s2 = mustGetSession(t, p)
290-
s3 = mustGetSession(t, p)
291-
closed1 = false
292-
closed2 = false
293-
closed3 = false
294-
)
295-
296-
s1.onClose = append(s1.onClose, func(s *session) { closed1 = true })
297-
s2.onClose = append(s2.onClose, func(s *session) { closed2 = true })
298-
s3.onClose = append(s3.onClose, func(s *session) { closed3 = true })
258+
})),
259+
3,
260+
config.WithSizeLimit(3),
261+
config.WithIdleThreshold(time.Hour),
262+
)
263+
defer func() {
264+
_ = p.Close(context.Background())
265+
}()
299266

300-
mustPutSession(t, p, s1)
301-
mustPutSession(t, p, s2)
302-
mustClose(t, p)
267+
var (
268+
s1 = mustGetSession(t, p)
269+
s2 = mustGetSession(t, p)
270+
s3 = mustGetSession(t, p)
271+
closed1 = false
272+
closed2 = false
273+
closed3 = false
274+
)
303275

304-
if !closed1 {
305-
t.Fatalf("session1 was not closed")
306-
}
307-
if !closed2 {
308-
t.Fatalf("session2 was not closed")
309-
}
310-
if closed3 {
311-
t.Fatalf("unexpected session close")
312-
}
276+
s1.onClose = append(s1.onClose, func(s *session) { closed1 = true })
277+
s2.onClose = append(s2.onClose, func(s *session) { closed2 = true })
278+
s3.onClose = append(s3.onClose, func(s *session) { closed3 = true })
313279

314-
if err := p.Put(context.Background(), s3); !xerrors.Is(err, errClosedClient) {
315-
t.Errorf(
316-
"unexpected Put() error: %v; want %v",
317-
err, errClosedClient,
318-
)
319-
}
320-
wg.Wait()
280+
mustPutSession(t, p, s1)
281+
mustPutSession(t, p, s2)
282+
mustClose(t, p)
321283

322-
if !closed3 {
323-
t.Fatalf("session was not closed")
324-
}
284+
if !closed1 {
285+
t.Errorf("session1 was not closed")
286+
}
287+
if !closed2 {
288+
t.Errorf("session2 was not closed")
289+
}
290+
if !closed3 {
291+
t.Errorf("session3 was not closed")
292+
}
293+
}, xtest.StopAfter(42*time.Second))
325294
}
326295

327296
func TestRaceWgClosed(t *testing.T) {
@@ -380,7 +349,7 @@ func TestRaceWgClosed(t *testing.T) {
380349
}
381350
_ = p.Close(context.Background())
382351
wg.Wait()
383-
}, xtest.StopAfter(17*time.Second))
352+
}, xtest.StopAfter(37*time.Second))
384353
}
385354

386355
func TestSessionPoolDeleteReleaseWait(t *testing.T) {
@@ -1216,7 +1185,7 @@ func mustResetTimer(t *testing.T, ch <-chan time.Duration, exp time.Duration) {
12161185
}
12171186
}
12181187

1219-
func mustCreateSession(t *testing.T, p *Client) *session {
1188+
func mustCreateSession(t testing.TB, p *Client) *session {
12201189
wg := sync.WaitGroup{}
12211190
defer wg.Wait()
12221191
s, err := p.internalPoolCreateSession(context.Background())
@@ -1227,7 +1196,7 @@ func mustCreateSession(t *testing.T, p *Client) *session {
12271196
return s
12281197
}
12291198

1230-
func mustGetSession(t *testing.T, p *Client) *session {
1199+
func mustGetSession(t testing.TB, p *Client) *session {
12311200
wg := sync.WaitGroup{}
12321201
defer wg.Wait()
12331202
s, err := p.Get(context.Background())
@@ -1238,7 +1207,7 @@ func mustGetSession(t *testing.T, p *Client) *session {
12381207
return s
12391208
}
12401209

1241-
func mustPutSession(t *testing.T, p *Client, s *session) {
1210+
func mustPutSession(t testing.TB, p *Client, s *session) {
12421211
wg := sync.WaitGroup{}
12431212
defer wg.Wait()
12441213
if err := p.Put(
@@ -1250,7 +1219,7 @@ func mustPutSession(t *testing.T, p *Client, s *session) {
12501219
}
12511220
}
12521221

1253-
func mustClose(t *testing.T, p *Client) {
1222+
func mustClose(t testing.TB, p *Client) {
12541223
wg := sync.WaitGroup{}
12551224
defer wg.Wait()
12561225
if err := p.Close(context.Background()); err != nil {

internal/xerrors/stacktrace.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,12 +28,12 @@ func WithStackTrace(err error, opts ...withStackTraceOption) error {
2828
o(&options)
2929
}
3030
return &stackError{
31-
stackRecord: stackRecord(options.skipDepth + 1),
31+
stackRecord: StackRecord(options.skipDepth + 1),
3232
err: err,
3333
}
3434
}
3535

36-
func stackRecord(depth int) string {
36+
func StackRecord(depth int) string {
3737
function, file, line, _ := runtime.Caller(depth + 1)
3838
name := runtime.FuncForPC(function).Name()
3939
return name + "(" + fileName(file) + ":" + strconv.Itoa(line) + ")"

0 commit comments

Comments
 (0)