Skip to content

Commit 302f738

Browse files
committed
ban only selected grpc codes (ResourceExhausted and Unavailable)
1 parent 5e1396a commit 302f738

File tree

9 files changed

+93
-10
lines changed

9 files changed

+93
-10
lines changed

internal/conn/conn.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,10 @@ func (c *conn) realConn(ctx context.Context) (cc *grpc.ClientConn, err error) {
183183
}, c.config.GrpcDialOptions()...,
184184
)...)
185185
if err != nil {
186+
if xerrors.IsContextError(err) {
187+
return nil, xerrors.WithStackTrace(err)
188+
}
189+
186190
defer func() {
187191
c.onTransportError(ctx, err)
188192
}()
@@ -310,6 +314,10 @@ func (c *conn) Invoke(
310314

311315
err = cc.Invoke(ctx, method, req, res, append(opts, grpc.Trailer(&md))...)
312316
if err != nil {
317+
if xerrors.IsContextError(err) {
318+
return xerrors.WithStackTrace(err)
319+
}
320+
313321
defer func() {
314322
c.onTransportError(ctx, err)
315323
}()
@@ -392,6 +400,10 @@ func (c *conn) NewStream(
392400

393401
s, err = cc.NewStream(ctx, desc, method, opts...)
394402
if err != nil {
403+
if xerrors.IsContextError(err) {
404+
return nil, xerrors.WithStackTrace(err)
405+
}
406+
395407
defer func() {
396408
c.onTransportError(ctx, err)
397409
}()

internal/conn/grpc_client_stream.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,10 @@ func (s *grpcClientStream) CloseSend() (err error) {
3333
err = s.ClientStream.CloseSend()
3434

3535
if err != nil {
36+
if xerrors.IsContextError(err) {
37+
return xerrors.WithStackTrace(err)
38+
}
39+
3640
if s.wrapping {
3741
return s.wrapError(
3842
xerrors.Transport(
@@ -58,6 +62,10 @@ func (s *grpcClientStream) SendMsg(m interface{}) (err error) {
5862
err = s.ClientStream.SendMsg(m)
5963

6064
if err != nil {
65+
if xerrors.IsContextError(err) {
66+
return xerrors.WithStackTrace(err)
67+
}
68+
6169
defer func() {
6270
s.c.onTransportError(s.Context(), err)
6371
}()
@@ -98,6 +106,10 @@ func (s *grpcClientStream) RecvMsg(m interface{}) (err error) {
98106
err = s.ClientStream.RecvMsg(m)
99107

100108
if err != nil {
109+
if xerrors.IsContextError(err) {
110+
return xerrors.WithStackTrace(err)
111+
}
112+
101113
defer func() {
102114
if !xerrors.Is(err, io.EOF) {
103115
s.c.onTransportError(s.Context(), err)

internal/conn/pool.go

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -78,11 +78,24 @@ func (p *Pool) Ban(ctx context.Context, cc Conn, cause error) {
7878
return
7979
}
8080

81-
if xerrors.IsTransportError(cause,
82-
grpcCodes.OK,
83-
grpcCodes.Canceled,
81+
if !xerrors.IsTransportError(cause,
8482
grpcCodes.ResourceExhausted,
85-
grpcCodes.OutOfRange,
83+
grpcCodes.Unavailable,
84+
//grpcCodes.OK,
85+
//grpcCodes.Canceled,
86+
//grpcCodes.Unknown,
87+
//grpcCodes.InvalidArgument,
88+
//grpcCodes.DeadlineExceeded,
89+
//grpcCodes.NotFound,
90+
//grpcCodes.AlreadyExists,
91+
//grpcCodes.PermissionDenied,
92+
//grpcCodes.FailedPrecondition,
93+
//grpcCodes.Aborted,
94+
//grpcCodes.OutOfRange,
95+
//grpcCodes.Unimplemented,
96+
//grpcCodes.Internal,
97+
//grpcCodes.DataLoss,
98+
//grpcCodes.Unauthenticated,
8699
) {
87100
return
88101
}

internal/query/execute_query.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,13 @@ func execute(ctx context.Context, s *Session, c Ydb_Query_V1.QueryServiceClient,
6464

6565
stream, err := c.ExecuteQuery(executeCtx, request, callOptions...)
6666
if err != nil {
67-
return nil, nil, xerrors.WithStackTrace(err)
67+
if xerrors.IsContextError(err) {
68+
return nil, nil, xerrors.WithStackTrace(err)
69+
}
70+
71+
return nil, nil, xerrors.WithStackTrace(
72+
xerrors.Transport(err),
73+
)
6874
}
6975

7076
r, txID, err := newResult(ctx, stream, s.trace, cancelExecute)

internal/query/result.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,13 @@ func nextPart(
9393

9494
part, err := stream.Recv()
9595
if err != nil {
96-
return nil, xerrors.WithStackTrace(err)
96+
if xerrors.IsContextError(err) {
97+
return nil, xerrors.WithStackTrace(err)
98+
}
99+
100+
return nil, xerrors.WithStackTrace(
101+
xerrors.Transport(err),
102+
)
97103
}
98104

99105
return part, nil

internal/query/session.go

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,10 @@ func createSession(
8181

8282
response, err := client.CreateSession(ctx, &Ydb_Query.CreateSessionRequest{})
8383
if err != nil {
84+
if xerrors.IsContextError(err) {
85+
return nil, xerrors.WithStackTrace(err)
86+
}
87+
8488
return nil, xerrors.WithStackTrace(
8589
xerrors.Transport(err),
8690
)
@@ -125,6 +129,10 @@ func (s *Session) attach(ctx context.Context) (finalErr error) {
125129
SessionId: s.id,
126130
})
127131
if err != nil {
132+
if xerrors.IsContextError(err) {
133+
return xerrors.WithStackTrace(err)
134+
}
135+
128136
return xerrors.WithStackTrace(
129137
xerrors.Transport(err),
130138
)
@@ -193,6 +201,10 @@ func deleteSession(ctx context.Context, client Ydb_Query_V1.QueryServiceClient,
193201
},
194202
)
195203
if err != nil {
204+
if xerrors.IsContextError(err) {
205+
return xerrors.WithStackTrace(err)
206+
}
207+
196208
return xerrors.WithStackTrace(xerrors.Transport(err))
197209
}
198210
if response.GetStatus() != Ydb.StatusIds_SUCCESS {
@@ -240,7 +252,13 @@ func begin(
240252
},
241253
)
242254
if err != nil {
243-
return nil, xerrors.WithStackTrace(xerrors.Transport(err))
255+
if xerrors.IsContextError(err) {
256+
return nil, xerrors.WithStackTrace(err)
257+
}
258+
259+
return nil, xerrors.WithStackTrace(
260+
xerrors.Transport(err),
261+
)
244262
}
245263
if response.GetStatus() != Ydb.StatusIds_SUCCESS {
246264
return nil, xerrors.WithStackTrace(xerrors.FromOperation(response))

internal/query/transaction.go

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,13 @@ func commitTx(ctx context.Context, client Ydb_Query_V1.QueryServiceClient, sessi
6060
TxId: txID,
6161
})
6262
if err != nil {
63-
return xerrors.WithStackTrace(xerrors.Transport(err))
63+
if xerrors.IsContextError(err) {
64+
return xerrors.WithStackTrace(err)
65+
}
66+
67+
return xerrors.WithStackTrace(
68+
xerrors.Transport(err),
69+
)
6470
}
6571
if response.GetStatus() != Ydb.StatusIds_SUCCESS {
6672
return xerrors.WithStackTrace(xerrors.FromOperation(response))
@@ -79,7 +85,13 @@ func rollback(ctx context.Context, client Ydb_Query_V1.QueryServiceClient, sessi
7985
TxId: txID,
8086
})
8187
if err != nil {
82-
return xerrors.WithStackTrace(xerrors.Transport(err))
88+
if xerrors.IsContextError(err) {
89+
return xerrors.WithStackTrace(err)
90+
}
91+
92+
return xerrors.WithStackTrace(
93+
xerrors.Transport(err),
94+
)
8395
}
8496
if response.GetStatus() != Ydb.StatusIds_SUCCESS {
8597
return xerrors.WithStackTrace(xerrors.FromOperation(response))

internal/xerrors/transport.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ func Transport(err error, opts ...teOpt) error {
141141
}
142142
var te *transportError
143143
if errors.As(err, &te) {
144-
return err
144+
return te
145145
}
146146
if s, ok := grpcStatus.FromError(err); ok {
147147
te = &transportError{

internal/xerrors/xerrors.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,3 +99,7 @@ func Is(err error, targets ...error) bool {
9999

100100
return false
101101
}
102+
103+
func IsContextError(err error) bool {
104+
return Is(err, context.Canceled, context.DeadlineExceeded)
105+
}

0 commit comments

Comments
 (0)