Skip to content

Commit 870256f

Browse files
committed
fix processing errors on stream calls
1 parent 8d373d8 commit 870256f

File tree

1 file changed

+69
-49
lines changed

1 file changed

+69
-49
lines changed

internal/conn/grpc_client_stream.go

Lines changed: 69 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,12 @@ type grpcClientStream struct {
2424
onDone func(ctx context.Context, md metadata.MD)
2525
}
2626

27-
func (s *grpcClientStream) CloseSend() (err error) {
27+
func (s *grpcClientStream) CloseSend() (finalErr error) {
2828
onDone := trace.DriverOnConnStreamCloseSend(s.c.config.Trace(), &s.ctx,
2929
stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/conn.(*grpcClientStream).CloseSend"),
3030
)
3131
defer func() {
32-
onDone(err)
32+
onDone(finalErr)
3333
}()
3434

3535
locked, unlock := s.c.inUse.TryLock()
@@ -41,35 +41,47 @@ func (s *grpcClientStream) CloseSend() (err error) {
4141
stop := s.c.lastUsage.Start()
4242
defer stop()
4343

44-
err = s.ClientStream.CloseSend()
45-
44+
err := s.ClientStream.CloseSend()
4645
if err != nil {
47-
if xerrors.IsContextError(err) {
46+
if !s.wrapping {
47+
return err
48+
}
49+
50+
if !xerrors.IsTransportError(err) {
4851
return xerrors.WithStackTrace(err)
4952
}
5053

51-
if s.wrapping {
54+
defer func() {
55+
s.c.onTransportError(s.Context(), finalErr)
56+
}()
57+
58+
if s.sentMark.canRetry() {
5259
return s.wrapError(
53-
xerrors.Transport(
54-
err,
55-
xerrors.WithAddress(s.c.Address()),
56-
xerrors.WithTraceID(s.traceID),
60+
xerrors.Retryable(
61+
xerrors.Transport(err,
62+
xerrors.WithAddress(s.c.Address()),
63+
xerrors.WithTraceID(s.traceID),
64+
),
65+
xerrors.WithName("CloseSend"),
5766
),
5867
)
5968
}
6069

61-
return s.wrapError(err)
70+
return s.wrapError(xerrors.Transport(err,
71+
xerrors.WithAddress(s.c.Address()),
72+
xerrors.WithTraceID(s.traceID),
73+
))
6274
}
6375

6476
return nil
6577
}
6678

67-
func (s *grpcClientStream) SendMsg(m interface{}) (err error) {
79+
func (s *grpcClientStream) SendMsg(m interface{}) (finalErr error) {
6880
onDone := trace.DriverOnConnStreamSendMsg(s.c.config.Trace(), &s.ctx,
6981
stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/conn.(*grpcClientStream).SendMsg"),
7082
)
7183
defer func() {
72-
onDone(err)
84+
onDone(finalErr)
7385
}()
7486

7587
locked, unlock := s.c.inUse.TryLock()
@@ -81,43 +93,47 @@ func (s *grpcClientStream) SendMsg(m interface{}) (err error) {
8193
stop := s.c.lastUsage.Start()
8294
defer stop()
8395

84-
err = s.ClientStream.SendMsg(m)
85-
96+
err := s.ClientStream.SendMsg(m)
8697
if err != nil {
87-
if xerrors.IsContextError(err) {
98+
if !s.wrapping {
99+
return err
100+
}
101+
102+
if !xerrors.IsTransportError(err) {
88103
return xerrors.WithStackTrace(err)
89104
}
90105

91106
defer func() {
92-
s.c.onTransportError(s.Context(), err)
107+
s.c.onTransportError(s.Context(), finalErr)
93108
}()
94109

95-
if s.wrapping {
96-
err = xerrors.Transport(err,
97-
xerrors.WithAddress(s.c.Address()),
98-
xerrors.WithTraceID(s.traceID),
99-
)
100-
if s.sentMark.canRetry() {
101-
return s.wrapError(xerrors.Retryable(err,
110+
if s.sentMark.canRetry() {
111+
return s.wrapError(
112+
xerrors.Retryable(
113+
xerrors.Transport(err,
114+
xerrors.WithAddress(s.c.Address()),
115+
xerrors.WithTraceID(s.traceID),
116+
),
102117
xerrors.WithName("SendMsg"),
103-
))
104-
}
105-
106-
return s.wrapError(err)
118+
),
119+
)
107120
}
108121

109-
return err
122+
return s.wrapError(xerrors.Transport(err,
123+
xerrors.WithAddress(s.c.Address()),
124+
xerrors.WithTraceID(s.traceID),
125+
))
110126
}
111127

112128
return nil
113129
}
114130

115-
func (s *grpcClientStream) RecvMsg(m interface{}) (err error) {
131+
func (s *grpcClientStream) RecvMsg(m interface{}) (finalErr error) {
116132
onDone := trace.DriverOnConnStreamRecvMsg(s.c.config.Trace(), &s.ctx,
117133
stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/conn.(*grpcClientStream).RecvMsg"),
118134
)
119135
defer func() {
120-
onDone(err)
136+
onDone(finalErr)
121137
}()
122138

123139
locked, unlock := s.c.inUse.TryLock()
@@ -130,39 +146,42 @@ func (s *grpcClientStream) RecvMsg(m interface{}) (err error) {
130146
defer stop()
131147

132148
defer func() {
133-
if err != nil {
149+
if finalErr != nil {
134150
md := s.ClientStream.Trailer()
135151
s.onDone(s.ctx, md)
136152
}
137153
}()
138154

139-
err = s.ClientStream.RecvMsg(m)
155+
err := s.ClientStream.RecvMsg(m)
156+
if err != nil {
157+
if xerrors.Is(err, io.EOF) || !s.wrapping {
158+
return io.EOF
159+
}
140160

141-
if err != nil { //nolint:nestif
142-
if xerrors.IsContextError(err) {
161+
if !xerrors.IsTransportError(err) {
143162
return xerrors.WithStackTrace(err)
144163
}
145164

146165
defer func() {
147-
if !xerrors.Is(err, io.EOF) {
148-
s.c.onTransportError(s.Context(), err)
149-
}
166+
s.c.onTransportError(s.Context(), finalErr)
150167
}()
151168

152-
if s.wrapping {
153-
err = xerrors.Transport(err,
154-
xerrors.WithAddress(s.c.Address()),
155-
)
156-
if s.sentMark.canRetry() {
157-
return s.wrapError(xerrors.Retryable(err,
169+
if s.sentMark.canRetry() {
170+
return s.wrapError(
171+
xerrors.Retryable(
172+
xerrors.Transport(err,
173+
xerrors.WithAddress(s.c.Address()),
174+
xerrors.WithTraceID(s.traceID),
175+
),
158176
xerrors.WithName("RecvMsg"),
159-
))
160-
}
161-
162-
return s.wrapError(err)
177+
),
178+
)
163179
}
164180

165-
return err
181+
return s.wrapError(xerrors.Transport(err,
182+
xerrors.WithAddress(s.c.Address()),
183+
xerrors.WithTraceID(s.traceID),
184+
))
166185
}
167186

168187
if s.wrapping {
@@ -172,6 +191,7 @@ func (s *grpcClientStream) RecvMsg(m interface{}) (err error) {
172191
xerrors.Operation(
173192
xerrors.FromOperation(operation),
174193
xerrors.WithAddress(s.c.Address()),
194+
xerrors.WithTraceID(s.traceID),
175195
),
176196
)
177197
}

0 commit comments

Comments
 (0)