Skip to content

Commit b650d4d

Browse files
authored
Merge pull request #1040 Fixed topic writer hand up on server temporary error
2 parents e3bb72e + ac9f8b4 commit b650d4d

File tree

9 files changed

+300
-3
lines changed

9 files changed

+300
-3
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
* Fixed handle of operational errors in topic streams
12
* The minimum version of Go in `ydb-go-sdk` has been raised to `go1.21`
23
* Fixed topic writer infinite reconnections in some cases
34
* Refactored nil on err `internal/grpcwrapper/rawydb/issues.go`, when golangci-lint nilerr enabled

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ require (
1515

1616
// requires for tests only
1717
require (
18-
github.com/rekby/fixenv v0.3.2
18+
github.com/rekby/fixenv v0.6.1
1919
github.com/stretchr/testify v1.7.1
2020
go.uber.org/mock v0.3.1-0.20231011042131-892b665398ec // indirect
2121
)

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,8 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN
6060
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
6161
github.com/rekby/fixenv v0.3.2 h1:6AOdQ9Boaa/lOQJTY8GDmQRIhg3S3SD0mIEPkuDSkoQ=
6262
github.com/rekby/fixenv v0.3.2/go.mod h1:/b5LRc06BYJtslRtHKxsPWFT/ySpHV+rWvzTg+XWk4c=
63+
github.com/rekby/fixenv v0.6.1 h1:jUFiSPpajT4WY2cYuc++7Y1zWrnCxnovGCIX72PZniM=
64+
github.com/rekby/fixenv v0.6.1/go.mod h1:/b5LRc06BYJtslRtHKxsPWFT/ySpHV+rWvzTg+XWk4c=
6365
github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
6466
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
6567
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=

internal/grpcwrapper/rawtopic/rawtopicreader/rawtopicreader.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,9 @@ func (s StreamReader) CloseSend() error {
3131
func (s StreamReader) Recv() (ServerMessage, error) {
3232
grpcMess, err := s.Stream.Recv()
3333
if err != nil {
34-
err = xerrors.Transport(err)
34+
if !xerrors.IsErrorFromServer(err) {
35+
err = xerrors.Transport(err)
36+
}
3537

3638
return nil, err
3739
}

internal/grpcwrapper/rawtopic/rawtopicwriter/streamwriter.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,9 @@ func (w *StreamWriter) Recv() (ServerMessage, error) {
4141

4242
grpcMsg, err := w.Stream.Recv()
4343
if err != nil {
44-
err = xerrors.Transport(err)
44+
if !xerrors.IsErrorFromServer(err) {
45+
err = xerrors.Transport(err)
46+
}
4547

4648
return nil, xerrors.WithStackTrace(xerrors.Wrap(fmt.Errorf(
4749
"ydb: failed to read grpc message from writer stream: %w",
Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
package topicwriterinternal_test
2+
3+
import (
4+
"errors"
5+
"fmt"
6+
"strings"
7+
"testing"
8+
9+
"github.com/rekby/fixenv"
10+
"github.com/rekby/fixenv/sf"
11+
"github.com/stretchr/testify/require"
12+
"github.com/ydb-platform/ydb-go-genproto/Ydb_Topic_V1"
13+
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb"
14+
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Issue"
15+
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Topic"
16+
17+
"github.com/ydb-platform/ydb-go-sdk/v3"
18+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xtest"
19+
"github.com/ydb-platform/ydb-go-sdk/v3/topic/topicoptions"
20+
"github.com/ydb-platform/ydb-go-sdk/v3/topic/topicwriter"
21+
)
22+
23+
func TestRegressionOperationUnavailableIssue1007(t *testing.T) {
24+
xtest.TestManyTimes(t, func(t testing.TB) {
25+
e := fixenv.New(t)
26+
27+
mock := newTopicWriterOperationUnavailable()
28+
connString := xtest.GrpcMockTopicConnString(e, mock)
29+
30+
db, err := ydb.Open(sf.Context(e), connString)
31+
require.NoError(t, err)
32+
33+
writer, err := db.Topic().StartWriter("test", topicoptions.WithWriterWaitServerAck(true))
34+
require.NoError(t, err)
35+
36+
err = writer.Write(sf.Context(e), topicwriter.Message{
37+
Data: strings.NewReader("asd"),
38+
})
39+
require.NoError(t, err)
40+
require.True(t, mock.UnavailableResponsed)
41+
})
42+
}
43+
44+
type topicWriterOperationUnavailable struct {
45+
Ydb_Topic_V1.UnimplementedTopicServiceServer
46+
47+
UnavailableResponsed bool
48+
}
49+
50+
func newTopicWriterOperationUnavailable() *topicWriterOperationUnavailable {
51+
return &topicWriterOperationUnavailable{}
52+
}
53+
54+
func (t *topicWriterOperationUnavailable) StreamWrite(server Ydb_Topic_V1.TopicService_StreamWriteServer) error {
55+
initMsg, err := server.Recv()
56+
if err != nil {
57+
return fmt.Errorf("failed read init message: %w", err)
58+
}
59+
60+
if initMsg.GetInitRequest() == nil {
61+
return errors.New("first message must be init message")
62+
}
63+
64+
err = server.Send(&Ydb_Topic.StreamWriteMessage_FromServer{
65+
Status: Ydb.StatusIds_SUCCESS,
66+
ServerMessage: &Ydb_Topic.StreamWriteMessage_FromServer_InitResponse{
67+
InitResponse: &Ydb_Topic.StreamWriteMessage_InitResponse{
68+
LastSeqNo: 0,
69+
SessionId: "test",
70+
PartitionId: 0,
71+
SupportedCodecs: nil,
72+
},
73+
},
74+
})
75+
if err != nil {
76+
return fmt.Errorf("failed to send init response: %w", err)
77+
}
78+
79+
if !t.UnavailableResponsed {
80+
t.UnavailableResponsed = true
81+
82+
err = server.Send(&Ydb_Topic.StreamWriteMessage_FromServer{
83+
Status: Ydb.StatusIds_UNAVAILABLE,
84+
Issues: []*Ydb_Issue.IssueMessage{
85+
{
86+
Message: "Test status unavailable",
87+
},
88+
},
89+
})
90+
91+
if err != nil {
92+
return fmt.Errorf("failed to send error response: %w", err)
93+
}
94+
95+
return nil
96+
}
97+
98+
// wait message block
99+
messagesMsg, err := server.Recv()
100+
if err != nil {
101+
return errors.New("failed to read messages block")
102+
}
103+
104+
if len(messagesMsg.GetClientMessage().(*Ydb_Topic.StreamWriteMessage_FromClient_WriteRequest).
105+
WriteRequest.GetMessages()) == 0 {
106+
return errors.New("received zero messages block")
107+
}
108+
109+
err = server.Send(&Ydb_Topic.StreamWriteMessage_FromServer{
110+
Status: Ydb.StatusIds_SUCCESS,
111+
ServerMessage: &Ydb_Topic.StreamWriteMessage_FromServer_WriteResponse{
112+
WriteResponse: &Ydb_Topic.StreamWriteMessage_WriteResponse{
113+
Acks: []*Ydb_Topic.StreamWriteMessage_WriteResponse_WriteAck{
114+
{
115+
SeqNo: 1,
116+
MessageWriteStatus: &Ydb_Topic.StreamWriteMessage_WriteResponse_WriteAck_Written_{
117+
Written: &Ydb_Topic.StreamWriteMessage_WriteResponse_WriteAck_Written{
118+
Offset: 1,
119+
},
120+
},
121+
},
122+
},
123+
PartitionId: 0,
124+
WriteStatistics: &Ydb_Topic.StreamWriteMessage_WriteResponse_WriteStatistics{},
125+
},
126+
},
127+
})
128+
129+
if err != nil {
130+
return fmt.Errorf("failed to sent write ack: %w", err)
131+
}
132+
133+
return nil
134+
}

internal/xerrors/xerrors.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,12 @@ func As(err error, targets ...interface{}) bool {
7979
return false
8080
}
8181

82+
// IsErrorFromServer return true if err returned from server
83+
// (opposite to raised internally in sdk)
84+
func IsErrorFromServer(err error) bool {
85+
return IsTransportError(err) || IsOperationError(err)
86+
}
87+
8288
// Is is a improved proxy to errors.Is
8389
// This need to single import errors
8490
func Is(err error, targets ...error) bool {

internal/xtest/ydb_grpc_mocks.go

Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
package xtest
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"net"
7+
"reflect"
8+
"strconv"
9+
"time"
10+
11+
"github.com/rekby/fixenv"
12+
"github.com/rekby/fixenv/sf"
13+
"github.com/ydb-platform/ydb-go-genproto/Ydb_Discovery_V1"
14+
"github.com/ydb-platform/ydb-go-genproto/Ydb_Topic_V1"
15+
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb"
16+
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Discovery"
17+
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Operations"
18+
"google.golang.org/grpc"
19+
"google.golang.org/protobuf/types/known/anypb"
20+
)
21+
22+
func GrpcMockTopicConnString(e fixenv.Env, topicServiceImpl Ydb_Topic_V1.TopicServiceServer) string {
23+
v := reflect.ValueOf(topicServiceImpl)
24+
addr := v.Pointer()
25+
26+
var f fixenv.GenericFixtureFunction[string] = func() (*fixenv.GenericResult[string], error) {
27+
listener := sf.LocalTCPListenerNamed(e, fmt.Sprintf("ydb-grpc-mock-topic-%v", addr))
28+
connString := fmt.Sprintf("grpc://%s/local", listener.Addr())
29+
30+
mock, err := newGrpcMock(listener, topicServiceImpl)
31+
if err != nil {
32+
return nil, fmt.Errorf("failed to create grpc mock: %w", err)
33+
}
34+
35+
clean := func() {
36+
_ = mock.Close()
37+
}
38+
39+
return fixenv.NewGenericResultWithCleanup(connString, clean), nil
40+
}
41+
42+
return fixenv.CacheResult(e, f, fixenv.CacheOptions{CacheKey: addr})
43+
}
44+
45+
type grpcMock struct {
46+
listener net.Listener
47+
grpcServer *grpc.Server
48+
stopChan chan error
49+
}
50+
51+
func (m *grpcMock) Close() error {
52+
m.grpcServer.Stop()
53+
54+
return m.listener.Close()
55+
}
56+
57+
func newGrpcMock(listener net.Listener, topicServiceImpl Ydb_Topic_V1.TopicServiceServer) (*grpcMock, error) {
58+
res := &grpcMock{
59+
listener: listener,
60+
grpcServer: grpc.NewServer(),
61+
stopChan: make(chan error, 1),
62+
}
63+
64+
host, portS, err := net.SplitHostPort(listener.Addr().String())
65+
if err != nil {
66+
return nil, fmt.Errorf("failed to split host port addresses: %w", err)
67+
}
68+
69+
port, err := strconv.ParseUint(portS, 10, 32)
70+
if err != nil {
71+
return nil, fmt.Errorf("failed convert port to int: %w", err)
72+
}
73+
discoveryService := newMockDiscoveryService(host, uint32(port))
74+
75+
Ydb_Discovery_V1.RegisterDiscoveryServiceServer(res.grpcServer, discoveryService)
76+
Ydb_Topic_V1.RegisterTopicServiceServer(res.grpcServer, topicServiceImpl)
77+
78+
go func() {
79+
res.stopChan <- res.grpcServer.Serve(res.listener)
80+
}()
81+
82+
select {
83+
case <-res.stopChan:
84+
return nil, err
85+
case <-time.After(time.Millisecond):
86+
return res, nil
87+
}
88+
}
89+
90+
type mockDiscoveryService struct {
91+
Ydb_Discovery_V1.UnimplementedDiscoveryServiceServer
92+
host string
93+
port uint32
94+
}
95+
96+
func newMockDiscoveryService(host string, port uint32) *mockDiscoveryService {
97+
return &mockDiscoveryService{
98+
host: host,
99+
port: port,
100+
}
101+
}
102+
103+
func (m mockDiscoveryService) ListEndpoints(
104+
ctx context.Context,
105+
request *Ydb_Discovery.ListEndpointsRequest,
106+
) (*Ydb_Discovery.ListEndpointsResponse, error) {
107+
res := &Ydb_Discovery.ListEndpointsResult{
108+
Endpoints: []*Ydb_Discovery.EndpointInfo{
109+
{
110+
Address: m.host,
111+
Port: m.port,
112+
LoadFactor: 0,
113+
Ssl: false,
114+
Service: nil,
115+
Location: "",
116+
NodeId: 1,
117+
IpV4: []string{"127.0.0.1"},
118+
},
119+
},
120+
SelfLocation: "",
121+
}
122+
resp := &Ydb_Discovery.ListEndpointsResponse{
123+
Operation: &Ydb_Operations.Operation{
124+
Id: "test-list-operation",
125+
Ready: true,
126+
Status: Ydb.StatusIds_SUCCESS,
127+
Result: &anypb.Any{},
128+
},
129+
}
130+
err := resp.GetOperation().GetResult().MarshalFrom(res)
131+
132+
return resp, err
133+
}
134+
135+
func (m mockDiscoveryService) WhoAmI(
136+
ctx context.Context,
137+
request *Ydb_Discovery.WhoAmIRequest,
138+
) (*Ydb_Discovery.WhoAmIResponse, error) {
139+
panic("unimplemented")
140+
}

tests/integration/helpers_test.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,16 @@ func (scope *scopeT) Failed() bool {
6767
return scope.t.Failed()
6868
}
6969

70+
// CacheWithCleanup wrap new interface as deprecated - for prevent re-write a lot of code
71+
// in new code prefer to use generic version of cache: fixenv.CacheResult(env, ...)
72+
func (scope *scopeT) CacheWithCleanup(cacheKey interface{}, opt *fixenv.FixtureOptions, f fixenv.FixtureCallbackWithCleanupFunc) interface{} {
73+
fWrap := func() (*fixenv.Result, error) {
74+
res, cleanup, err := f()
75+
return fixenv.NewResultWithCleanup(res, cleanup), err
76+
}
77+
return scope.CacheResult(fWrap)
78+
}
79+
7080
func (scope *scopeT) ConnectionString() string {
7181
if envString := os.Getenv("YDB_CONNECTION_STRING"); envString != "" {
7282
return envString

0 commit comments

Comments
 (0)