Skip to content

Commit 4e94195

Browse files
committed
revert meta apply in internal/db/database.{Invoke,NewStream}
1 parent 8a8ec21 commit 4e94195

24 files changed

+338
-277
lines changed

CHANGELOG.md

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
* Refactored traces (start and done struct names have prefix about trace)
44
* Replaced `errors.Error`, `errors.Errorf` and `errors.ErrorfSkip` to single `errors.WithStackTrace`
55
* Refactored table client options
6-
* Added `IsTraceError` function to table retry options struct for check error before submit intermediate/done events of retry operation
6+
* Declared and implemented interface `errors.isYdbError` for checking ybd/non-ydb errors
77
* Fixed double tracing table do events
88
* Added `retry.WithFastBackoff` and `retry.WithFastBackoff` options
99
* Refactored `table.CreateSession` as retry operation with options
@@ -12,7 +12,6 @@
1212
* Fixed `recursive` param in `ratelimiter.ListResource`
1313
* Added counting stream usages for exclude park connection if it in use
1414
* Added `trace.Driver` events about change stream usage and `conn.Release()` call
15-
* Fixed bug with non-applying meta headers on direct call `db.Discovery.Discover`
1615

1716
## 3.14.4
1817
* Implemented auto-removing `conn.Conn` from `conn.Pool` with counting usages of `conn.Conn`

errors.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import (
99
)
1010

1111
func IterateByIssues(err error, it func(message string, code uint32, severity uint32)) {
12-
var o *errors.OpError
12+
var o *errors.OperationError
1313
if !errors.As(err, &o) {
1414
return
1515
}
@@ -80,7 +80,7 @@ func IsOperationError(err error, codes ...int32) bool {
8080
}
8181

8282
func OperationErrorDescription(err error) Error {
83-
var o *errors.OpError
83+
var o *errors.OperationError
8484
if errors.As(err, &o) {
8585
return o
8686
}

internal/conn/conn.go

Lines changed: 33 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"google.golang.org/grpc"
1212
"google.golang.org/grpc/codes"
1313
"google.golang.org/grpc/connectivity"
14+
grpcStatus "google.golang.org/grpc/status"
1415

1516
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb"
1617

@@ -186,8 +187,8 @@ func (c *conn) take(ctx context.Context) (cc *grpc.ClientConn, err error) {
186187

187188
if c.isClosed() {
188189
return nil, errors.NewGrpcError(
189-
codes.Unavailable,
190-
errors.WithMsg("ydb driver conn closed early"),
190+
errors.WithStatus(grpcStatus.New(codes.Unavailable, "ydb driver conn closed early")),
191+
errors.WithErr(err),
191192
)
192193
}
193194

@@ -312,6 +313,7 @@ func (c *conn) Close(ctx context.Context) (err error) {
312313
return err
313314
}
314315

316+
// invoke have behavior like grpc call
315317
func (c *conn) invoke(
316318
ctx context.Context,
317319
method string,
@@ -323,25 +325,34 @@ func (c *conn) invoke(
323325
cc, err = c.take(ctx)
324326
if err != nil {
325327
return errors.NewGrpcError(
326-
codes.Unavailable,
327-
errors.WithMsg("ydb driver conn take failed"),
328+
errors.WithStatus(grpcStatus.New(codes.Unavailable, "ydb driver conn take failed")),
328329
errors.WithErr(err),
329330
)
330331
}
331332

332333
ctx, err = c.config.Meta().Meta(ctx)
333334
if err != nil {
334335
return errors.NewGrpcError(
335-
codes.Unavailable,
336-
errors.WithMsg("ydb driver conn apply meta failed"),
336+
errors.WithStatus(grpcStatus.New(codes.Unavailable, "ydb driver conn apply meta failed")),
337337
errors.WithErr(err),
338338
)
339339
}
340340

341341
c.changeUsages(1)
342342
defer c.changeUsages(-1)
343343

344-
return cc.Invoke(ctx, method, req, res, opts...)
344+
err = cc.Invoke(ctx, method, req, res, opts...)
345+
346+
if err != nil {
347+
if s, ok := grpcStatus.FromError(err); ok {
348+
return errors.NewGrpcError(
349+
errors.WithStatus(s),
350+
)
351+
}
352+
return errors.WithStackTrace(err)
353+
}
354+
355+
return nil
345356
}
346357

347358
func (c *conn) Invoke(
@@ -400,6 +411,7 @@ func (c *conn) Invoke(
400411
return err
401412
}
402413

414+
// newStream have behavior like grpc call
403415
func (c *conn) newStream(
404416
ctx context.Context,
405417
desc *grpc.StreamDesc,
@@ -410,25 +422,27 @@ func (c *conn) newStream(
410422
cc, err = c.take(ctx)
411423
if err != nil {
412424
return nil, errors.NewGrpcError(
413-
codes.Unavailable,
414-
errors.WithMsg("ydb driver conn take failed"),
415-
errors.WithErr(err),
416-
)
417-
}
418-
419-
ctx, err = c.config.Meta().Meta(ctx)
420-
if err != nil {
421-
return nil, errors.NewGrpcError(
422-
codes.Unavailable,
423-
errors.WithMsg("ydb driver conn apply meta failed"),
425+
errors.WithStatus(grpcStatus.New(codes.Unavailable, "ydb driver conn take failed")),
424426
errors.WithErr(err),
425427
)
426428
}
427429

428430
c.changeStreamUsages(1)
429431
defer c.changeStreamUsages(-1)
430432

431-
return cc.NewStream(ctx, desc, method, opts...)
433+
var client grpc.ClientStream
434+
client, err = cc.NewStream(ctx, desc, method, opts...)
435+
436+
if err != nil {
437+
if s, ok := grpcStatus.FromError(err); ok {
438+
return nil, errors.NewGrpcError(
439+
errors.WithStatus(s),
440+
)
441+
}
442+
return nil, errors.WithStackTrace(err)
443+
}
444+
445+
return client, nil
432446
}
433447

434448
func (c *conn) NewStream(

internal/db/database.go

Lines changed: 29 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import (
44
"context"
55

66
"google.golang.org/grpc"
7-
"google.golang.org/grpc/codes"
87

98
"github.com/ydb-platform/ydb-go-sdk/v3/config"
109
"github.com/ydb-platform/ydb-go-sdk/v3/discovery"
@@ -113,18 +112,26 @@ func (db *database) Invoke(
113112
) error {
114113
cc, err := db.cluster.Get(ctx)
115114
if err != nil {
116-
return errors.NewGrpcError(
117-
codes.Unavailable,
118-
errors.WithMsg("ydb driver cluster get failed"),
119-
errors.WithErr(err),
120-
)
115+
return errors.WithStackTrace(err)
121116
}
117+
118+
ctx, err = db.config.Meta().Meta(ctx)
119+
if err != nil {
120+
return errors.WithStackTrace(err)
121+
}
122+
122123
defer func() {
123124
if err != nil && errors.MustPessimizeEndpoint(err) {
124125
db.cluster.Pessimize(ctx, cc, err)
125126
}
126127
}()
127-
return cc.Invoke(ctx, method, args, reply, opts...)
128+
129+
err = cc.Invoke(ctx, method, args, reply, opts...)
130+
if err != nil {
131+
return errors.WithStackTrace(err)
132+
}
133+
134+
return nil
128135
}
129136

130137
func (db *database) NewStream(
@@ -135,16 +142,25 @@ func (db *database) NewStream(
135142
) (grpc.ClientStream, error) {
136143
cc, err := db.cluster.Get(ctx)
137144
if err != nil {
138-
return nil, errors.NewGrpcError(
139-
codes.Unavailable,
140-
errors.WithMsg("ydb driver cluster get failed"),
141-
errors.WithErr(err),
142-
)
145+
return nil, errors.WithStackTrace(err)
143146
}
147+
148+
ctx, err = db.config.Meta().Meta(ctx)
149+
if err != nil {
150+
return nil, errors.WithStackTrace(err)
151+
}
152+
144153
defer func() {
145154
if err != nil && errors.MustPessimizeEndpoint(err) {
146155
db.cluster.Pessimize(ctx, cc, err)
147156
}
148157
}()
149-
return cc.NewStream(ctx, desc, method, opts...)
158+
159+
var client grpc.ClientStream
160+
client, err = cc.NewStream(ctx, desc, method, opts...)
161+
if err != nil {
162+
return nil, errors.WithStackTrace(err)
163+
}
164+
165+
return client, nil
150166
}

internal/errors/errors.go

Lines changed: 0 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,10 @@
11
package errors
22

33
import (
4-
"bytes"
54
"context"
65
"errors"
76
"fmt"
87
"io"
9-
"runtime"
10-
"strconv"
11-
"strings"
128
)
139

1410
func IsTimeoutError(err error) bool {
@@ -78,81 +74,3 @@ func Is(err error, targets ...error) bool {
7874
func New(text string) error {
7975
return WithStackTrace(fmt.Errorf("%w", errors.New(text)), WithSkipDepth(1))
8076
}
81-
82-
// NewWithIssues returns error which contains child issues
83-
func NewWithIssues(text string, issues ...error) error {
84-
return &errorWithIssues{
85-
reason: text,
86-
issues: issues,
87-
}
88-
}
89-
90-
type errorWithIssues struct {
91-
reason string
92-
issues []error
93-
}
94-
95-
func (e *errorWithIssues) Error() string {
96-
var b bytes.Buffer
97-
b.WriteString(e.reason)
98-
b.WriteString(", issues: [")
99-
for i, issue := range e.issues {
100-
if i != 0 {
101-
b.WriteString(", ")
102-
}
103-
b.WriteString(issue.Error())
104-
}
105-
b.WriteString("]")
106-
return b.String()
107-
}
108-
109-
type withStackTraceOptions struct {
110-
skipDepth int
111-
}
112-
113-
type withStackTraceOption func(o *withStackTraceOptions)
114-
115-
func WithSkipDepth(skipDepth int) withStackTraceOption {
116-
return func(o *withStackTraceOptions) {
117-
o.skipDepth = skipDepth
118-
}
119-
}
120-
121-
// WithStackTrace is a wrapper over original err with file:line identification
122-
func WithStackTrace(err error, opts ...withStackTraceOption) error {
123-
options := withStackTraceOptions{}
124-
for _, o := range opts {
125-
o(&options)
126-
}
127-
return &stackError{
128-
stackRecord: stackRecord(options.skipDepth + 1),
129-
err: err,
130-
}
131-
}
132-
133-
func stackRecord(depth int) string {
134-
function, file, line, _ := runtime.Caller(depth + 1)
135-
name := runtime.FuncForPC(function).Name()
136-
return name + "(" + fileName(file) + ":" + strconv.Itoa(line) + ")"
137-
}
138-
139-
func fileName(original string) string {
140-
i := strings.LastIndex(original, "/")
141-
if i == -1 {
142-
return original
143-
}
144-
return original[i+1:]
145-
}
146-
147-
type stackError struct {
148-
stackRecord string
149-
err error
150-
}
151-
152-
func (e *stackError) Error() string {
153-
return e.err.Error() + " at `" + e.stackRecord + "`"
154-
}
155-
156-
func (e *stackError) Unwrap() error {
157-
return e.err
158-
}

internal/errors/grpc.go

Lines changed: 13 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package errors
33
import (
44
"fmt"
55

6-
"google.golang.org/grpc/codes"
76
"google.golang.org/grpc/status"
87
)
98

@@ -13,9 +12,11 @@ type grpcError struct {
1312
err error
1413
}
1514

15+
func (e *grpcError) isYdbError() {}
16+
1617
func (e *grpcError) Error() string {
1718
if e.err != nil {
18-
return fmt.Sprintf("%s: %v", e.status.String(), e.err)
19+
return fmt.Sprintf("%s: %v", e.err, e.status.String())
1920
}
2021
return e.status.String()
2122
}
@@ -28,32 +29,24 @@ func (e *grpcError) GRPCStatus() *status.Status {
2829
return e.status
2930
}
3031

31-
type grpcErrorOptionsHolder struct {
32-
msg string
33-
err error
34-
}
35-
36-
type grpcErrorOption func(h *grpcErrorOptionsHolder)
32+
type grpcErrorOption func(e *grpcError)
3733

38-
func WithMsg(msg string) grpcErrorOption {
39-
return func(h *grpcErrorOptionsHolder) {
40-
h.msg = msg
34+
func WithStatus(s *status.Status) grpcErrorOption {
35+
return func(e *grpcError) {
36+
e.status = s
4137
}
4238
}
4339

4440
func WithErr(err error) grpcErrorOption {
45-
return func(h *grpcErrorOptionsHolder) {
46-
h.err = err
41+
return func(e *grpcError) {
42+
e.err = err
4743
}
4844
}
4945

50-
func NewGrpcError(code codes.Code, opts ...grpcErrorOption) error {
51-
h := &grpcErrorOptionsHolder{}
46+
func NewGrpcError(opts ...grpcErrorOption) error {
47+
e := &grpcError{}
5248
for _, o := range opts {
53-
o(h)
54-
}
55-
return &grpcError{
56-
status: status.New(code, h.msg),
57-
err: h.err,
49+
o(e)
5850
}
51+
return e
5952
}

0 commit comments

Comments
 (0)