Skip to content

Commit f91e455

Browse files
authored
Merge pull request #805 add node info to error
2 parents 3bb616f + 7656182 commit f91e455

File tree

5 files changed

+128
-22
lines changed

5 files changed

+128
-22
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
* Added node info to grpc errors
2+
13
## v3.50.0
24
* Add method to QueryStats interface for add new stat fields
35
* Added check if commit order is bad in sync mode

internal/conn/conn.go

Lines changed: 23 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -66,10 +66,10 @@ func (c *conn) Address() string {
6666
func (c *conn) Ping(ctx context.Context) error {
6767
cc, err := c.realConn(ctx)
6868
if err != nil {
69-
return xerrors.WithStackTrace(err)
69+
return c.wrapError(err)
7070
}
7171
if !isAvailable(cc) {
72-
return xerrors.WithStackTrace(errUnavailableConnection)
72+
return c.wrapError(errUnavailableConnection)
7373
}
7474
return nil
7575
}
@@ -115,7 +115,7 @@ func (c *conn) park(ctx context.Context) (err error) {
115115
err = c.close()
116116

117117
if err != nil {
118-
return xerrors.WithStackTrace(err)
118+
return c.wrapError(err)
119119
}
120120

121121
return nil
@@ -169,7 +169,7 @@ func (c *conn) GetState() (s State) {
169169

170170
func (c *conn) realConn(ctx context.Context) (cc *grpc.ClientConn, err error) {
171171
if c.isClosed() {
172-
return nil, xerrors.WithStackTrace(errClosedConnection)
172+
return nil, c.wrapError(errClosedConnection)
173173
}
174174

175175
c.mtx.Lock()
@@ -213,7 +213,7 @@ func (c *conn) realConn(ctx context.Context) (cc *grpc.ClientConn, err error) {
213213
xerrors.WithAddress(address),
214214
)
215215

216-
return nil, xerrors.WithStackTrace(
216+
return nil, c.wrapError(
217217
xerrors.Retryable(err,
218218
xerrors.WithName("realConn"),
219219
),
@@ -250,7 +250,7 @@ func (c *conn) close() (err error) {
250250
err = c.cc.Close()
251251
c.cc = nil
252252
c.setState(Offline)
253-
return xerrors.WithStackTrace(err)
253+
return c.wrapError(err)
254254
}
255255

256256
func (c *conn) isClosed() bool {
@@ -286,7 +286,7 @@ func (c *conn) Close(ctx context.Context) (err error) {
286286
onClose(c)
287287
}
288288

289-
return xerrors.WithStackTrace(err)
289+
return c.wrapError(err)
290290
}
291291

292292
func (c *conn) Invoke(
@@ -320,7 +320,7 @@ func (c *conn) Invoke(
320320

321321
cc, err = c.realConn(ctx)
322322
if err != nil {
323-
return xerrors.WithStackTrace(err)
323+
return c.wrapError(err)
324324
}
325325

326326
c.touchLastUsage()
@@ -339,9 +339,9 @@ func (c *conn) Invoke(
339339
xerrors.WithAddress(c.Address()),
340340
)
341341
if sentMark.canRetry() {
342-
return xerrors.WithStackTrace(xerrors.Retryable(err, xerrors.WithName("Invoke")))
342+
return c.wrapError(xerrors.Retryable(err, xerrors.WithName("Invoke")))
343343
}
344-
return xerrors.WithStackTrace(err)
344+
return c.wrapError(err)
345345
}
346346

347347
return err
@@ -355,10 +355,10 @@ func (c *conn) Invoke(
355355
if useWrapping {
356356
switch {
357357
case !o.GetOperation().GetReady():
358-
return xerrors.WithStackTrace(errOperationNotReady)
358+
return c.wrapError(errOperationNotReady)
359359

360360
case o.GetOperation().GetStatus() != Ydb.StatusIds_SUCCESS:
361-
return xerrors.WithStackTrace(
361+
return c.wrapError(
362362
xerrors.Operation(
363363
xerrors.FromOperation(o.GetOperation()),
364364
xerrors.WithNodeAddress(c.Address()),
@@ -406,7 +406,7 @@ func (c *conn) NewStream(
406406

407407
cc, err = c.realConn(ctx)
408408
if err != nil {
409-
return nil, xerrors.WithStackTrace(err)
409+
return nil, c.wrapError(err)
410410
}
411411

412412
c.touchLastUsage()
@@ -425,9 +425,9 @@ func (c *conn) NewStream(
425425
xerrors.WithAddress(c.Address()),
426426
)
427427
if sentMark.canRetry() {
428-
return s, xerrors.WithStackTrace(xerrors.Retryable(err, xerrors.WithName("NewStream")))
428+
return s, c.wrapError(xerrors.Retryable(err, xerrors.WithName("NewStream")))
429429
}
430-
return s, xerrors.WithStackTrace(err)
430+
return s, c.wrapError(err)
431431
}
432432

433433
return s, err
@@ -446,6 +446,14 @@ func (c *conn) NewStream(
446446
}, nil
447447
}
448448

449+
func (c *conn) wrapError(err error) error {
450+
if err == nil {
451+
return nil
452+
}
453+
nodeErr := newNodeError(c.endpoint.NodeID(), c.endpoint.Address(), err)
454+
return xerrors.WithStackTrace(nodeErr, xerrors.WithSkipDepth(1))
455+
}
456+
449457
type option func(c *conn)
450458

451459
func withOnClose(onClose func(*conn)) option {

internal/conn/error.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
package conn
2+
3+
import "fmt"
4+
5+
type nodeError struct {
6+
id uint32
7+
endpoint string
8+
err error
9+
}
10+
11+
func newNodeError(id uint32, endpoint string, err error) nodeError {
12+
return nodeError{
13+
id: id,
14+
endpoint: endpoint,
15+
err: err,
16+
}
17+
}
18+
19+
func (n nodeError) Error() string {
20+
return fmt.Sprintf("on node %v (%v): %v", n.id, n.endpoint, n.err)
21+
}
22+
23+
func (n nodeError) Unwrap() error {
24+
return n.err
25+
}

internal/conn/error_test.go

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
package conn
2+
3+
import (
4+
"errors"
5+
"testing"
6+
7+
"github.com/stretchr/testify/require"
8+
)
9+
10+
func TestNodeErrorError(t *testing.T) {
11+
testErr := errors.New("test")
12+
nodeErr := newNodeError(1, "localhost:1234", testErr)
13+
message := nodeErr.Error()
14+
15+
require.Equal(t, "on node 1 (localhost:1234): test", message)
16+
}
17+
18+
func TestNodeErrorUnwrap(t *testing.T) {
19+
testErr := errors.New("test")
20+
nodeErr := newNodeError(1, "asd", testErr)
21+
22+
unwrapped := errors.Unwrap(nodeErr)
23+
require.Equal(t, testErr, unwrapped)
24+
}
25+
26+
func TestNodeErrorIs(t *testing.T) {
27+
testErr := errors.New("test")
28+
testErr2 := errors.New("test2")
29+
nodeErr := newNodeError(1, "localhost:1234", testErr)
30+
31+
require.True(t, errors.Is(nodeErr, testErr))
32+
require.False(t, errors.Is(nodeErr, testErr2))
33+
}
34+
35+
type testErrorType1 struct {
36+
msg string
37+
}
38+
39+
func (t testErrorType1) Error() string {
40+
return "1 - " + t.msg
41+
}
42+
43+
type testErrorType2 struct {
44+
msg string
45+
}
46+
47+
func (t testErrorType2) Error() string {
48+
return "2 - " + t.msg
49+
}
50+
51+
func TestNodeErrorAs(t *testing.T) {
52+
testErr := testErrorType1{msg: "test"}
53+
nodeErr := newNodeError(1, "localhost:1234", testErr)
54+
55+
target := testErrorType1{}
56+
require.True(t, errors.As(nodeErr, &target))
57+
require.ErrorAs(t, nodeErr, &target)
58+
require.Equal(t, testErr, target)
59+
60+
target2 := testErrorType2{}
61+
require.False(t, errors.As(nodeErr, &target2))
62+
}

internal/conn/grpc_client_stream.go

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -29,14 +29,14 @@ func (s *grpcClientStream) CloseSend() (err error) {
2929

3030
if err != nil {
3131
if s.wrapping {
32-
return xerrors.WithStackTrace(
32+
return s.wrapError(
3333
xerrors.Transport(
3434
err,
3535
xerrors.WithAddress(s.c.Address()),
3636
),
3737
)
3838
}
39-
return xerrors.WithStackTrace(err)
39+
return s.wrapError(err)
4040
}
4141

4242
return nil
@@ -58,11 +58,11 @@ func (s *grpcClientStream) SendMsg(m interface{}) (err error) {
5858
xerrors.WithAddress(s.c.Address()),
5959
)
6060
if s.sentMark.canRetry() {
61-
return xerrors.WithStackTrace(xerrors.Retryable(err,
61+
return s.wrapError(xerrors.Retryable(err,
6262
xerrors.WithName("SendMsg"),
6363
))
6464
}
65-
return xerrors.WithStackTrace(err)
65+
return s.wrapError(err)
6666
}
6767

6868
return err
@@ -98,11 +98,11 @@ func (s *grpcClientStream) RecvMsg(m interface{}) (err error) {
9898
xerrors.WithAddress(s.c.Address()),
9999
)
100100
if s.sentMark.canRetry() {
101-
return xerrors.WithStackTrace(xerrors.Retryable(err,
101+
return s.wrapError(xerrors.Retryable(err,
102102
xerrors.WithName("RecvMsg"),
103103
))
104104
}
105-
return xerrors.WithStackTrace(err)
105+
return s.wrapError(err)
106106
}
107107

108108
return err
@@ -111,7 +111,7 @@ func (s *grpcClientStream) RecvMsg(m interface{}) (err error) {
111111
if s.wrapping {
112112
if operation, ok := m.(wrap.StreamOperationResponse); ok {
113113
if status := operation.GetStatus(); status != Ydb.StatusIds_SUCCESS {
114-
return xerrors.WithStackTrace(
114+
return s.wrapError(
115115
xerrors.Operation(
116116
xerrors.FromOperation(operation),
117117
xerrors.WithNodeAddress(s.c.Address()),
@@ -124,6 +124,15 @@ func (s *grpcClientStream) RecvMsg(m interface{}) (err error) {
124124
return nil
125125
}
126126

127+
func (s *grpcClientStream) wrapError(err error) error {
128+
if err == nil {
129+
return nil
130+
}
131+
132+
nodeErr := newNodeError(s.c.endpoint.NodeID(), s.c.endpoint.Address(), err)
133+
return xerrors.WithStackTrace(nodeErr, xerrors.WithSkipDepth(1))
134+
}
135+
127136
func createPinger(c *conn) context.CancelFunc {
128137
c.touchLastUsage()
129138
ctx, cancel := xcontext.WithCancel(context.Background())

0 commit comments

Comments
 (0)