Skip to content

Commit 29b3666

Browse files
committed
Fixed handle of operational errors in topic streams (backported fix only)
1 parent 0e967e0 commit 29b3666

File tree

4 files changed

+15
-2
lines changed

4 files changed

+15
-2
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
## v3.55.3
2+
* Fixed handle of operational errors in topic streams (backported fix only)
3+
14
## v3.55.2
25
* Fixed init info in topic writer, when autoseq num turned off.
36

internal/grpcwrapper/rawtopic/rawtopicreader/rawtopicreader.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,9 @@ func (s StreamReader) CloseSend() error {
3131
func (s StreamReader) Recv() (ServerMessage, error) {
3232
grpcMess, err := s.Stream.Recv()
3333
if err != nil {
34-
err = xerrors.Transport(err)
34+
if !xerrors.IsErrorFromServer(err) {
35+
err = xerrors.Transport(err)
36+
}
3537
return nil, err
3638
}
3739

internal/grpcwrapper/rawtopic/rawtopicwriter/streamwriter.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,9 @@ func (w *StreamWriter) Recv() (ServerMessage, error) {
4141

4242
grpcMsg, err := w.Stream.Recv()
4343
if err != nil {
44-
err = xerrors.Transport(err)
44+
if !xerrors.IsErrorFromServer(err) {
45+
err = xerrors.Transport(err)
46+
}
4547
return nil, xerrors.WithStackTrace(xerrors.Wrap(fmt.Errorf(
4648
"ydb: failed to read grpc message from writer stream: %w",
4749
err,

internal/xerrors/xerrors.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,12 @@ func As(err error, targets ...interface{}) bool {
7676
return false
7777
}
7878

79+
// IsErrorFromServer return true if err returned from server
80+
// (opposite to raised internally in sdk)
81+
func IsErrorFromServer(err error) bool {
82+
return IsTransportError(err) || IsOperationError(err)
83+
}
84+
7985
// Is is a improved proxy to errors.Is
8086
// This need to single import errors
8187
func Is(err error, targets ...error) bool {

0 commit comments

Comments
 (0)