Skip to content

Commit a760212

Browse files
authored
Merge branch 'master' into allow-skip-sql-fields
2 parents 000e031 + eb7d205 commit a760212

File tree

12 files changed

+91
-59
lines changed

12 files changed

+91
-59
lines changed

CHANGELOG.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,14 @@
11
* Allowed skip column for `ScanStruct` by tag `-`
22

3+
## v3.81.4
4+
* Returned `topicwriter.ErrQueueLimitExceed`, accidental removed at `v3.81.0`
5+
6+
## v3.81.3
7+
* Fixed tracing details check for some metrics
8+
9+
## v3.81.2
10+
* Removed `experimantal` comment for query service client
11+
312
## v3.81.1
413
* Fixed nil pointer dereference panic on failed `ydb.Open`
514
* Added ip discovery. Server can show own ip address and target hostname in the ListEndpoint message. These fields are used to bypass DNS resolving.

driver.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -194,8 +194,6 @@ func (d *Driver) Table() table.Client {
194194
}
195195

196196
// Query returns query client
197-
//
198-
// Experimental: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#experimental
199197
func (d *Driver) Query() *internalQuery.Client {
200198
return d.query.Must()
201199
}

internal/table/client.go

Lines changed: 24 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -100,51 +100,58 @@ func (c *Client) CreateSession(ctx context.Context, opts ...table.Option) (_ tab
100100
if c.isClosed() {
101101
return nil, xerrors.WithStackTrace(errClosedClient)
102102
}
103-
var s *session
104103
createSession := func(ctx context.Context) (*session, error) {
105-
s, err = c.build(ctx)
104+
s, err := c.build(ctx)
106105
if err != nil {
107106
return nil, xerrors.WithStackTrace(err)
108107
}
109108

110109
return s, nil
111110
}
112111
if !c.config.AutoRetry() {
113-
s, err = createSession(ctx)
112+
s, err := createSession(ctx)
114113
if err != nil {
115114
return nil, xerrors.WithStackTrace(err)
116115
}
117116

118117
return s, nil
119118
}
120-
err = retry.Retry(ctx,
121-
func(ctx context.Context) (err error) {
122-
s, err = createSession(ctx)
123-
if err != nil {
124-
return xerrors.WithStackTrace(err)
125-
}
126119

127-
return nil
128-
},
120+
var (
121+
onDone = trace.TableOnCreateSession(c.config.Trace(), &ctx,
122+
stack.FunctionID(
123+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/table.(*Client).CreateSession"),
124+
)
125+
attempts = 0
126+
s *session
127+
)
128+
defer func() {
129+
if s != nil {
130+
onDone(s, attempts, err)
131+
} else {
132+
onDone(nil, attempts, err)
133+
}
134+
}()
135+
136+
s, err = retry.RetryWithResult(ctx, createSession,
129137
append(
130138
[]retry.Option{
131139
retry.WithIdempotent(true),
132140
retry.WithTrace(&trace.Retry{
133141
OnRetry: func(info trace.RetryLoopStartInfo) func(trace.RetryLoopDoneInfo) {
134-
onDone := trace.TableOnCreateSession(c.config.Trace(), info.Context,
135-
stack.FunctionID(
136-
"github.com/ydb-platform/ydb-go-sdk/v3/internal/table.(*Client).CreateSession"))
137-
138142
return func(info trace.RetryLoopDoneInfo) {
139-
onDone(s, info.Attempts, info.Error)
143+
attempts = info.Attempts
140144
}
141145
},
142146
}),
143147
}, c.retryOptions(opts...).RetryOptions...,
144148
)...,
145149
)
150+
if err != nil {
151+
return nil, xerrors.WithStackTrace(err)
152+
}
146153

147-
return s, xerrors.WithStackTrace(err)
154+
return s, nil
148155
}
149156

150157
func (c *Client) isClosed() bool {

internal/topic/topicwriterinternal/writer_reconnector.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,8 @@ var (
3636
errNonZeroSeqNo = xerrors.Wrap(errors.New("ydb: non zero seqno for auto set seqno mode")) //nolint:lll
3737
errNonZeroCreatedAt = xerrors.Wrap(errors.New("ydb: non zero Message.CreatedAt and set auto fill created at option")) //nolint:lll
3838
errNoAllowedCodecs = xerrors.Wrap(errors.New("ydb: no allowed codecs for write to topic"))
39-
errLargeMessage = xerrors.Wrap(errors.New("ydb: message uncompressed size more, then limit")) //nolint:lll
39+
errLargeMessage = xerrors.Wrap(errors.New("ydb: message uncompressed size more, then limit")) //nolint:lll
40+
PublicErrQueueIsFull = xerrors.Wrap(errors.New("ydb: queue is full"))
4041
PublicErrMessagesPutToInternalQueueBeforeError = xerrors.Wrap(errors.New("ydb: the messages was put to internal buffer before the error happened. It mean about the messages can be delivered to the server")) //nolint:lll
4142
errDiffetentTransactions = xerrors.Wrap(errors.New("ydb: internal writer has messages from different trasactions. It is internal logic error, write issue please: https://github.com/ydb-platform/ydb-go-sdk/issues/new?assignees=&labels=bug&projects=&template=01_BUG_REPORT.md&title=bug%3A+")) //nolint:lll
4243

@@ -223,16 +224,18 @@ func (w *WriterReconnector) Write(ctx context.Context, messages []PublicMessage)
223224
semaphoreWeight := int64(len(messages))
224225
if semaphoreWeight > int64(w.cfg.MaxQueueLen) {
225226
return xerrors.WithStackTrace(fmt.Errorf(
226-
"ydb: add more messages, then max queue limit. max queue: %v, try to add: %v",
227+
"ydb: add more messages, then max queue limit. max queue: %v, try to add: %v: %w",
227228
w.cfg.MaxQueueLen,
228229
semaphoreWeight,
230+
PublicErrQueueIsFull,
229231
))
230232
}
231233
if err := w.semaphore.Acquire(ctx, semaphoreWeight); err != nil {
232234
return xerrors.WithStackTrace(
233-
fmt.Errorf("ydb: add new messages exceed max queue size limit. Add count: %v, max size: %v",
235+
fmt.Errorf("ydb: add new messages exceed max queue size limit. Add count: %v, max size: %v: %w",
234236
semaphoreWeight,
235237
w.cfg.MaxQueueLen,
238+
PublicErrQueueIsFull,
236239
))
237240
}
238241
defer func() {

internal/version/version.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ package version
33
const (
44
Major = "3"
55
Minor = "81"
6-
Patch = "1"
6+
Patch = "4"
77

88
Package = "ydb-go-sdk"
99
)

metrics/driver.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -95,10 +95,12 @@ func driver(config Config) (t trace.Driver) {
9595
eventType := repeater.EventType(*info.Context)
9696

9797
return func(info trace.DriverBalancerClusterDiscoveryAttemptDoneInfo) {
98-
balancersDiscoveries.With(map[string]string{
99-
"status": errorBrief(info.Error),
100-
"cause": eventType,
101-
}).Inc()
98+
if config.Details()&trace.DriverBalancerEvents != 0 {
99+
balancersDiscoveries.With(map[string]string{
100+
"status": errorBrief(info.Error),
101+
"cause": eventType,
102+
}).Inc()
103+
}
102104
}
103105
}
104106
t.OnBalancerUpdate = func(info trace.DriverBalancerUpdateStartInfo) func(trace.DriverBalancerUpdateDoneInfo) {

metrics/table.go

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,9 @@ func table(config Config) (t trace.Table) {
2222
with := config.GaugeVec("with")
2323
t.OnInit = func(info trace.TableInitStartInfo) func(trace.TableInitDoneInfo) {
2424
return func(info trace.TableInitDoneInfo) {
25-
limit.With(nil).Set(float64(info.Limit))
25+
if config.Details()&trace.TableEvents != 0 {
26+
limit.With(nil).Set(float64(info.Limit))
27+
}
2628
}
2729
}
2830
t.OnSessionNew = func(info trace.TableSessionNewStartInfo) func(trace.TableSessionNewDoneInfo) {
@@ -69,11 +71,13 @@ func table(config Config) (t trace.Table) {
6971
return nil
7072
}
7173
t.OnPoolStateChange = func(info trace.TablePoolStateChangeInfo) {
72-
limit.With(nil).Set(float64(info.Limit))
73-
index.With(nil).Set(float64(info.Index))
74-
idle.With(nil).Set(float64(info.Idle))
75-
wait.With(nil).Set(float64(info.Wait))
76-
createInProgress.With(nil).Set(float64(info.CreateInProgress))
74+
if config.Details()&trace.TablePoolEvents != 0 {
75+
limit.With(nil).Set(float64(info.Limit))
76+
index.With(nil).Set(float64(info.Index))
77+
idle.With(nil).Set(float64(info.Idle))
78+
wait.With(nil).Set(float64(info.Wait))
79+
createInProgress.With(nil).Set(float64(info.CreateInProgress))
80+
}
7781
}
7882
{
7983
latency := session.WithSystem("query").TimerVec("latency")

query/client.go

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,6 @@ import (
1212

1313
type (
1414
// Executor is an interface for execute queries
15-
//
16-
// Experimental: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#experimental
1715
Executor interface {
1816
// Exec execute query without result
1917
//
@@ -40,8 +38,6 @@ type (
4038
QueryRow(ctx context.Context, query string, opts ...options.Execute) (Row, error)
4139
}
4240
// Client defines API of query client
43-
//
44-
// Experimental: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#experimental
4541
Client interface {
4642
Executor
4743

@@ -85,15 +81,11 @@ type (
8581
// QueryResultSet is a helper which read all rows from first result set in result
8682
//
8783
// Warning: the large result set from query will be materialized and can happened to "OOM killed" problem
88-
//
89-
// Experimental: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#experimental
9084
QueryResultSet(ctx context.Context, query string, opts ...options.Execute) (ClosableResultSet, error)
9185

9286
// QueryRow is a helper which read only one row from first result set in result
9387
//
9488
// ReadRow returns error if result contains more than one result set or more than one row
95-
//
96-
// Experimental: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#experimental
9789
QueryRow(ctx context.Context, query string, opts ...options.Execute) (Row, error)
9890

9991
// ExecuteScript starts long executing script with polling results later
@@ -157,8 +149,6 @@ func WithLabel(lbl string) options.RetryOptionsOption {
157149
}
158150

159151
// WithRetryBudget creates option with external budget
160-
//
161-
// Experimental: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#experimental
162152
func WithRetryBudget(b budget.Budget) options.RetryOptionsOption {
163153
return options.WithRetryBudget(b)
164154
}

tests/slo/internal/workers/read.go

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,24 @@ import (
1414
func (w *Workers) Read(ctx context.Context, wg *sync.WaitGroup, rl *rate.Limiter) {
1515
defer wg.Done()
1616
for {
17-
err := rl.Wait(ctx)
18-
if err != nil {
17+
select {
18+
case <-ctx.Done():
1919
return
20+
default:
21+
err := rl.Wait(ctx)
22+
if err != nil {
23+
return
24+
}
25+
26+
err = w.read(ctx)
27+
if err != nil {
28+
if ctxErr := ctx.Err(); ctxErr == nil {
29+
log.Printf("read failed: %v", err)
30+
}
31+
32+
return
33+
}
2034
}
21-
22-
_ = w.read(ctx)
2335
}
2436
}
2537

@@ -29,9 +41,6 @@ func (w *Workers) read(ctx context.Context) error {
2941
m := w.m.Start(metrics.JobRead)
3042

3143
_, attempts, err := w.s.Read(ctx, id)
32-
if err != nil {
33-
log.Printf("read failed with %d attempts: %v", attempts, err)
34-
}
3544

3645
m.Finish(err, attempts)
3746

tests/slo/internal/workers/write.go

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,22 @@ import (
1414
func (w *Workers) Write(ctx context.Context, wg *sync.WaitGroup, rl *rate.Limiter, gen *generator.Generator) {
1515
defer wg.Done()
1616
for {
17-
err := rl.Wait(ctx)
18-
if err != nil {
17+
select {
18+
case <-ctx.Done():
1919
return
20+
default:
21+
err := rl.Wait(ctx)
22+
if err != nil {
23+
return
24+
}
25+
26+
err = w.write(ctx, gen)
27+
if err != nil {
28+
if ctxErr := ctx.Err(); ctxErr == nil {
29+
log.Printf("write failed: %v", err)
30+
}
31+
}
2032
}
21-
22-
_ = w.write(ctx, gen)
2333
}
2434
}
2535

@@ -34,9 +44,6 @@ func (w *Workers) write(ctx context.Context, gen *generator.Generator) error {
3444
m := w.m.Start(metrics.JobWrite)
3545

3646
attempts, err := w.s.Write(ctx, row)
37-
if err != nil {
38-
log.Printf("write failed with %d attempts: %v", attempts, err)
39-
}
4047

4148
m.Finish(err, attempts)
4249

0 commit comments

Comments
 (0)