Skip to content

Commit 62ab80c

Browse files
committed
reproduce error in mock test
1 parent 37db19a commit 62ab80c

File tree

6 files changed

+39
-15
lines changed

6 files changed

+39
-15
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
* Fixed handle of operational errors in topic streams
12
* Fixed topic writer infinite reconnections in some cases
23
* Refactored nil on err `internal/grpcwrapper/rawydb/issues.go`, when golangci-lint nilerr enabled
34
* Refactored nil on err `internal/grpcwrapper/rawtopic/describe_topic.go`, when golangci-lint nilerr enabled

internal/grpcwrapper/rawtopic/rawtopicreader/rawtopicreader.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,9 @@ func (s StreamReader) CloseSend() error {
3131
func (s StreamReader) Recv() (ServerMessage, error) {
3232
grpcMess, err := s.Stream.Recv()
3333
if err != nil {
34-
err = xerrors.Transport(err)
34+
if !xerrors.IsErrorFromServer(err) {
35+
err = xerrors.Transport(err)
36+
}
3537

3638
return nil, err
3739
}

internal/grpcwrapper/rawtopic/rawtopicwriter/streamwriter.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,9 @@ func (w *StreamWriter) Recv() (ServerMessage, error) {
4141

4242
grpcMsg, err := w.Stream.Recv()
4343
if err != nil {
44-
err = xerrors.Transport(err)
44+
if !xerrors.IsErrorFromServer(err) {
45+
err = xerrors.Transport(err)
46+
}
4547

4648
return nil, xerrors.WithStackTrace(xerrors.Wrap(fmt.Errorf(
4749
"ydb: failed to read grpc message from writer stream: %w",

internal/topic/topicwriterinternal/writer_grpc_mock_test.go

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -63,12 +63,15 @@ func (t *topicWriterOperationUnavailable) StreamWrite(server Ydb_Topic_V1.TopicS
6363

6464
err = server.Send(&Ydb_Topic.StreamWriteMessage_FromServer{
6565
Status: Ydb.StatusIds_SUCCESS,
66-
ServerMessage: &Ydb_Topic.StreamWriteMessage_FromServer_InitResponse{InitResponse: &Ydb_Topic.StreamWriteMessage_InitResponse{
67-
LastSeqNo: 0,
68-
SessionId: "test",
69-
PartitionId: 0,
70-
SupportedCodecs: nil,
71-
}}})
66+
ServerMessage: &Ydb_Topic.StreamWriteMessage_FromServer_InitResponse{
67+
InitResponse: &Ydb_Topic.StreamWriteMessage_InitResponse{
68+
LastSeqNo: 0,
69+
SessionId: "test",
70+
PartitionId: 0,
71+
SupportedCodecs: nil,
72+
},
73+
},
74+
})
7275
if err != nil {
7376
return fmt.Errorf("failed to send init response: %w", err)
7477
}
@@ -98,7 +101,8 @@ func (t *topicWriterOperationUnavailable) StreamWrite(server Ydb_Topic_V1.TopicS
98101
return errors.New("failed to read messages block")
99102
}
100103

101-
if len(messagesMsg.GetClientMessage().(*Ydb_Topic.StreamWriteMessage_FromClient_WriteRequest).WriteRequest.GetMessages()) == 0 {
104+
if len(messagesMsg.GetClientMessage().(*Ydb_Topic.StreamWriteMessage_FromClient_WriteRequest).
105+
WriteRequest.GetMessages()) == 0 {
102106
return errors.New("received zero messages block")
103107
}
104108

@@ -119,7 +123,8 @@ func (t *topicWriterOperationUnavailable) StreamWrite(server Ydb_Topic_V1.TopicS
119123
PartitionId: 0,
120124
WriteStatistics: &Ydb_Topic.StreamWriteMessage_WriteResponse_WriteStatistics{},
121125
},
122-
}})
126+
},
127+
})
123128

124129
if err != nil {
125130
return fmt.Errorf("failed to sent write ack: %w", err)

internal/xerrors/xerrors.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,12 @@ func As(err error, targets ...interface{}) bool {
7979
return false
8080
}
8181

82+
// IsErrorFromServer return true if err returned from server
83+
// (opposite to raised internally in sdk)
84+
func IsErrorFromServer(err error) bool {
85+
return IsTransportError(err) || IsOperationError(err)
86+
}
87+
8288
// Is is a improved proxy to errors.Is
8389
// This need to single import errors
8490
func Is(err error, targets ...error) bool {

internal/xtest/ydb_grpc_mocks.go

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ type grpcMock struct {
5050

5151
func (m *grpcMock) Close() error {
5252
m.grpcServer.Stop()
53+
5354
return m.listener.Close()
5455
}
5556

@@ -99,7 +100,10 @@ func newMockDiscoveryService(host string, port uint32) *mockDiscoveryService {
99100
}
100101
}
101102

102-
func (m mockDiscoveryService) ListEndpoints(ctx context.Context, request *Ydb_Discovery.ListEndpointsRequest) (*Ydb_Discovery.ListEndpointsResponse, error) {
103+
func (m mockDiscoveryService) ListEndpoints(
104+
ctx context.Context,
105+
request *Ydb_Discovery.ListEndpointsRequest,
106+
) (*Ydb_Discovery.ListEndpointsResponse, error) {
103107
res := &Ydb_Discovery.ListEndpointsResult{
104108
Endpoints: []*Ydb_Discovery.EndpointInfo{
105109
{
@@ -121,12 +125,16 @@ func (m mockDiscoveryService) ListEndpoints(ctx context.Context, request *Ydb_Di
121125
Ready: true,
122126
Status: Ydb.StatusIds_SUCCESS,
123127
Result: &anypb.Any{},
124-
}}
128+
},
129+
}
125130
err := resp.GetOperation().GetResult().MarshalFrom(res)
131+
126132
return resp, err
127133
}
128134

129-
func (m mockDiscoveryService) WhoAmI(ctx context.Context, request *Ydb_Discovery.WhoAmIRequest) (*Ydb_Discovery.WhoAmIResponse, error) {
130-
//TODO implement me
131-
panic("implement me")
135+
func (m mockDiscoveryService) WhoAmI(
136+
ctx context.Context,
137+
request *Ydb_Discovery.WhoAmIRequest,
138+
) (*Ydb_Discovery.WhoAmIResponse, error) {
139+
panic("unimplemented")
132140
}

0 commit comments

Comments
 (0)