Skip to content

Commit 8b6a75b

Browse files
committed
add node info to error
1 parent 3bb616f commit 8b6a75b

File tree

4 files changed

+80
-22
lines changed

4 files changed

+80
-22
lines changed

internal/conn/conn.go

Lines changed: 20 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -66,10 +66,10 @@ func (c *conn) Address() string {
6666
func (c *conn) Ping(ctx context.Context) error {
6767
cc, err := c.realConn(ctx)
6868
if err != nil {
69-
return xerrors.WithStackTrace(err)
69+
return c.wrapError(err)
7070
}
7171
if !isAvailable(cc) {
72-
return xerrors.WithStackTrace(errUnavailableConnection)
72+
return c.wrapError(errUnavailableConnection)
7373
}
7474
return nil
7575
}
@@ -115,7 +115,7 @@ func (c *conn) park(ctx context.Context) (err error) {
115115
err = c.close()
116116

117117
if err != nil {
118-
return xerrors.WithStackTrace(err)
118+
return c.wrapError(err)
119119
}
120120

121121
return nil
@@ -169,7 +169,7 @@ func (c *conn) GetState() (s State) {
169169

170170
func (c *conn) realConn(ctx context.Context) (cc *grpc.ClientConn, err error) {
171171
if c.isClosed() {
172-
return nil, xerrors.WithStackTrace(errClosedConnection)
172+
return nil, c.wrapError(errClosedConnection)
173173
}
174174

175175
c.mtx.Lock()
@@ -213,7 +213,7 @@ func (c *conn) realConn(ctx context.Context) (cc *grpc.ClientConn, err error) {
213213
xerrors.WithAddress(address),
214214
)
215215

216-
return nil, xerrors.WithStackTrace(
216+
return nil, c.wrapError(
217217
xerrors.Retryable(err,
218218
xerrors.WithName("realConn"),
219219
),
@@ -250,7 +250,7 @@ func (c *conn) close() (err error) {
250250
err = c.cc.Close()
251251
c.cc = nil
252252
c.setState(Offline)
253-
return xerrors.WithStackTrace(err)
253+
return c.wrapError(err)
254254
}
255255

256256
func (c *conn) isClosed() bool {
@@ -286,7 +286,7 @@ func (c *conn) Close(ctx context.Context) (err error) {
286286
onClose(c)
287287
}
288288

289-
return xerrors.WithStackTrace(err)
289+
return c.wrapError(err)
290290
}
291291

292292
func (c *conn) Invoke(
@@ -320,7 +320,7 @@ func (c *conn) Invoke(
320320

321321
cc, err = c.realConn(ctx)
322322
if err != nil {
323-
return xerrors.WithStackTrace(err)
323+
return c.wrapError(err)
324324
}
325325

326326
c.touchLastUsage()
@@ -339,9 +339,9 @@ func (c *conn) Invoke(
339339
xerrors.WithAddress(c.Address()),
340340
)
341341
if sentMark.canRetry() {
342-
return xerrors.WithStackTrace(xerrors.Retryable(err, xerrors.WithName("Invoke")))
342+
return c.wrapError(xerrors.Retryable(err, xerrors.WithName("Invoke")))
343343
}
344-
return xerrors.WithStackTrace(err)
344+
return c.wrapError(err)
345345
}
346346

347347
return err
@@ -355,10 +355,10 @@ func (c *conn) Invoke(
355355
if useWrapping {
356356
switch {
357357
case !o.GetOperation().GetReady():
358-
return xerrors.WithStackTrace(errOperationNotReady)
358+
return c.wrapError(errOperationNotReady)
359359

360360
case o.GetOperation().GetStatus() != Ydb.StatusIds_SUCCESS:
361-
return xerrors.WithStackTrace(
361+
return c.wrapError(
362362
xerrors.Operation(
363363
xerrors.FromOperation(o.GetOperation()),
364364
xerrors.WithNodeAddress(c.Address()),
@@ -406,7 +406,7 @@ func (c *conn) NewStream(
406406

407407
cc, err = c.realConn(ctx)
408408
if err != nil {
409-
return nil, xerrors.WithStackTrace(err)
409+
return nil, c.wrapError(err)
410410
}
411411

412412
c.touchLastUsage()
@@ -425,9 +425,9 @@ func (c *conn) NewStream(
425425
xerrors.WithAddress(c.Address()),
426426
)
427427
if sentMark.canRetry() {
428-
return s, xerrors.WithStackTrace(xerrors.Retryable(err, xerrors.WithName("NewStream")))
428+
return s, c.wrapError(xerrors.Retryable(err, xerrors.WithName("NewStream")))
429429
}
430-
return s, xerrors.WithStackTrace(err)
430+
return s, c.wrapError(err)
431431
}
432432

433433
return s, err
@@ -446,6 +446,11 @@ func (c *conn) NewStream(
446446
}, nil
447447
}
448448

449+
func (c *conn) wrapError(err error) error {
450+
nodeErr := newNodeError(c.endpoint.NodeID(), c.endpoint.Address(), err)
451+
return xerrors.WithStackTrace(nodeErr, xerrors.WithSkipDepth(1))
452+
}
453+
449454
type option func(c *conn)
450455

451456
func withOnClose(onClose func(*conn)) option {

internal/conn/error.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
package conn
2+
3+
import "fmt"
4+
5+
type nodeError struct {
6+
id uint32
7+
endpoint string
8+
err error
9+
}
10+
11+
func newNodeError(id uint32, endpoint string, err error) nodeError {
12+
return nodeError{
13+
id: id,
14+
endpoint: endpoint,
15+
err: err,
16+
}
17+
}
18+
19+
func (n nodeError) Error() string {
20+
return fmt.Sprintf("on node %v (%v): %v", n.id, n.endpoint, n.err)
21+
}
22+
23+
func (n nodeError) Unwrap() error {
24+
return n.err
25+
}

internal/conn/error_test.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
package conn
2+
3+
import (
4+
"errors"
5+
"github.com/stretchr/testify/require"
6+
"testing"
7+
)
8+
9+
func TestNodeErrorError(t *testing.T) {
10+
testErr := errors.New("test")
11+
nodeErr := newNodeError(1, "localhost:1234", testErr)
12+
message := nodeErr.Error()
13+
14+
require.Equal(t, "on node 1 (localhost:1234): test", message)
15+
}
16+
17+
func TestNodeErrorUnwrap(t *testing.T) {
18+
testErr := errors.New("test")
19+
nodeErr := newNodeError(1, "asd", testErr)
20+
21+
unwrapped := errors.Unwrap(nodeErr)
22+
require.Equal(t, testErr, unwrapped)
23+
}

internal/conn/grpc_client_stream.go

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -29,14 +29,14 @@ func (s *grpcClientStream) CloseSend() (err error) {
2929

3030
if err != nil {
3131
if s.wrapping {
32-
return xerrors.WithStackTrace(
32+
return s.wrapError(
3333
xerrors.Transport(
3434
err,
3535
xerrors.WithAddress(s.c.Address()),
3636
),
3737
)
3838
}
39-
return xerrors.WithStackTrace(err)
39+
return s.wrapError(err)
4040
}
4141

4242
return nil
@@ -58,11 +58,11 @@ func (s *grpcClientStream) SendMsg(m interface{}) (err error) {
5858
xerrors.WithAddress(s.c.Address()),
5959
)
6060
if s.sentMark.canRetry() {
61-
return xerrors.WithStackTrace(xerrors.Retryable(err,
61+
return s.wrapError(xerrors.Retryable(err,
6262
xerrors.WithName("SendMsg"),
6363
))
6464
}
65-
return xerrors.WithStackTrace(err)
65+
return s.wrapError(err)
6666
}
6767

6868
return err
@@ -98,11 +98,11 @@ func (s *grpcClientStream) RecvMsg(m interface{}) (err error) {
9898
xerrors.WithAddress(s.c.Address()),
9999
)
100100
if s.sentMark.canRetry() {
101-
return xerrors.WithStackTrace(xerrors.Retryable(err,
101+
return s.wrapError(xerrors.Retryable(err,
102102
xerrors.WithName("RecvMsg"),
103103
))
104104
}
105-
return xerrors.WithStackTrace(err)
105+
return s.wrapError(err)
106106
}
107107

108108
return err
@@ -111,7 +111,7 @@ func (s *grpcClientStream) RecvMsg(m interface{}) (err error) {
111111
if s.wrapping {
112112
if operation, ok := m.(wrap.StreamOperationResponse); ok {
113113
if status := operation.GetStatus(); status != Ydb.StatusIds_SUCCESS {
114-
return xerrors.WithStackTrace(
114+
return s.wrapError(
115115
xerrors.Operation(
116116
xerrors.FromOperation(operation),
117117
xerrors.WithNodeAddress(s.c.Address()),
@@ -124,6 +124,11 @@ func (s *grpcClientStream) RecvMsg(m interface{}) (err error) {
124124
return nil
125125
}
126126

127+
func (s *grpcClientStream) wrapError(err error) error {
128+
nodeErr := newNodeError(s.c.endpoint.NodeID(), s.c.endpoint.Address(), err)
129+
return xerrors.WithStackTrace(nodeErr, xerrors.WithSkipDepth(1))
130+
}
131+
127132
func createPinger(c *conn) context.CancelFunc {
128133
c.touchLastUsage()
129134
ctx, cancel := xcontext.WithCancel(context.Background())

0 commit comments

Comments
 (0)