Skip to content

Commit 37db19a

Browse files
committed
reproduce error in mock test
1 parent 85caddf commit 37db19a

File tree

3 files changed

+261
-81
lines changed

3 files changed

+261
-81
lines changed
Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
package topicwriterinternal_test
2+
3+
import (
4+
"errors"
5+
"fmt"
6+
"strings"
7+
"testing"
8+
9+
"github.com/rekby/fixenv"
10+
"github.com/rekby/fixenv/sf"
11+
"github.com/stretchr/testify/require"
12+
"github.com/ydb-platform/ydb-go-genproto/Ydb_Topic_V1"
13+
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb"
14+
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Issue"
15+
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Topic"
16+
17+
"github.com/ydb-platform/ydb-go-sdk/v3"
18+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xtest"
19+
"github.com/ydb-platform/ydb-go-sdk/v3/topic/topicoptions"
20+
"github.com/ydb-platform/ydb-go-sdk/v3/topic/topicwriter"
21+
)
22+
23+
func TestRegressionOperationUnavailableIssue1007(t *testing.T) {
24+
xtest.TestManyTimes(t, func(t testing.TB) {
25+
e := fixenv.New(t)
26+
27+
mock := newTopicWriterOperationUnavailable()
28+
connString := xtest.GrpcMockTopicConnString(e, mock)
29+
30+
db, err := ydb.Open(sf.Context(e), connString)
31+
require.NoError(t, err)
32+
33+
writer, err := db.Topic().StartWriter("test", topicoptions.WithWriterWaitServerAck(true))
34+
require.NoError(t, err)
35+
36+
err = writer.Write(sf.Context(e), topicwriter.Message{
37+
Data: strings.NewReader("asd"),
38+
})
39+
require.NoError(t, err)
40+
require.True(t, mock.UnavailableResponsed)
41+
})
42+
}
43+
44+
type topicWriterOperationUnavailable struct {
45+
Ydb_Topic_V1.UnimplementedTopicServiceServer
46+
47+
UnavailableResponsed bool
48+
}
49+
50+
func newTopicWriterOperationUnavailable() *topicWriterOperationUnavailable {
51+
return &topicWriterOperationUnavailable{}
52+
}
53+
54+
func (t *topicWriterOperationUnavailable) StreamWrite(server Ydb_Topic_V1.TopicService_StreamWriteServer) error {
55+
initMsg, err := server.Recv()
56+
if err != nil {
57+
return fmt.Errorf("failed read init message: %w", err)
58+
}
59+
60+
if initMsg.GetInitRequest() == nil {
61+
return errors.New("first message must be init message")
62+
}
63+
64+
err = server.Send(&Ydb_Topic.StreamWriteMessage_FromServer{
65+
Status: Ydb.StatusIds_SUCCESS,
66+
ServerMessage: &Ydb_Topic.StreamWriteMessage_FromServer_InitResponse{InitResponse: &Ydb_Topic.StreamWriteMessage_InitResponse{
67+
LastSeqNo: 0,
68+
SessionId: "test",
69+
PartitionId: 0,
70+
SupportedCodecs: nil,
71+
}}})
72+
if err != nil {
73+
return fmt.Errorf("failed to send init response: %w", err)
74+
}
75+
76+
if !t.UnavailableResponsed {
77+
t.UnavailableResponsed = true
78+
79+
err = server.Send(&Ydb_Topic.StreamWriteMessage_FromServer{
80+
Status: Ydb.StatusIds_UNAVAILABLE,
81+
Issues: []*Ydb_Issue.IssueMessage{
82+
{
83+
Message: "Test status unavailable",
84+
},
85+
},
86+
})
87+
88+
if err != nil {
89+
return fmt.Errorf("failed to send error response: %w", err)
90+
}
91+
92+
return nil
93+
}
94+
95+
// wait message block
96+
messagesMsg, err := server.Recv()
97+
if err != nil {
98+
return errors.New("failed to read messages block")
99+
}
100+
101+
if len(messagesMsg.GetClientMessage().(*Ydb_Topic.StreamWriteMessage_FromClient_WriteRequest).WriteRequest.GetMessages()) == 0 {
102+
return errors.New("received zero messages block")
103+
}
104+
105+
err = server.Send(&Ydb_Topic.StreamWriteMessage_FromServer{
106+
Status: Ydb.StatusIds_SUCCESS,
107+
ServerMessage: &Ydb_Topic.StreamWriteMessage_FromServer_WriteResponse{
108+
WriteResponse: &Ydb_Topic.StreamWriteMessage_WriteResponse{
109+
Acks: []*Ydb_Topic.StreamWriteMessage_WriteResponse_WriteAck{
110+
{
111+
SeqNo: 1,
112+
MessageWriteStatus: &Ydb_Topic.StreamWriteMessage_WriteResponse_WriteAck_Written_{
113+
Written: &Ydb_Topic.StreamWriteMessage_WriteResponse_WriteAck_Written{
114+
Offset: 1,
115+
},
116+
},
117+
},
118+
},
119+
PartitionId: 0,
120+
WriteStatistics: &Ydb_Topic.StreamWriteMessage_WriteResponse_WriteStatistics{},
121+
},
122+
}})
123+
124+
if err != nil {
125+
return fmt.Errorf("failed to sent write ack: %w", err)
126+
}
127+
128+
return nil
129+
}

internal/xtest/ydb_grpc_mocks.go

Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
package xtest
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"net"
7+
"reflect"
8+
"strconv"
9+
"time"
10+
11+
"github.com/rekby/fixenv"
12+
"github.com/rekby/fixenv/sf"
13+
"github.com/ydb-platform/ydb-go-genproto/Ydb_Discovery_V1"
14+
"github.com/ydb-platform/ydb-go-genproto/Ydb_Topic_V1"
15+
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb"
16+
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Discovery"
17+
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Operations"
18+
"google.golang.org/grpc"
19+
"google.golang.org/protobuf/types/known/anypb"
20+
)
21+
22+
func GrpcMockTopicConnString(e fixenv.Env, topicServiceImpl Ydb_Topic_V1.TopicServiceServer) string {
23+
v := reflect.ValueOf(topicServiceImpl)
24+
addr := v.Pointer()
25+
26+
var f fixenv.GenericFixtureFunction[string] = func() (*fixenv.GenericResult[string], error) {
27+
listener := sf.LocalTCPListenerNamed(e, fmt.Sprintf("ydb-grpc-mock-topic-%v", addr))
28+
connString := fmt.Sprintf("grpc://%s/local", listener.Addr())
29+
30+
mock, err := newGrpcMock(listener, topicServiceImpl)
31+
if err != nil {
32+
return nil, fmt.Errorf("failed to create grpc mock: %w", err)
33+
}
34+
35+
clean := func() {
36+
_ = mock.Close()
37+
}
38+
39+
return fixenv.NewGenericResultWithCleanup(connString, clean), nil
40+
}
41+
42+
return fixenv.CacheResult(e, f, fixenv.CacheOptions{CacheKey: addr})
43+
}
44+
45+
type grpcMock struct {
46+
listener net.Listener
47+
grpcServer *grpc.Server
48+
stopChan chan error
49+
}
50+
51+
func (m *grpcMock) Close() error {
52+
m.grpcServer.Stop()
53+
return m.listener.Close()
54+
}
55+
56+
func newGrpcMock(listener net.Listener, topicServiceImpl Ydb_Topic_V1.TopicServiceServer) (*grpcMock, error) {
57+
res := &grpcMock{
58+
listener: listener,
59+
grpcServer: grpc.NewServer(),
60+
stopChan: make(chan error, 1),
61+
}
62+
63+
host, portS, err := net.SplitHostPort(listener.Addr().String())
64+
if err != nil {
65+
return nil, fmt.Errorf("failed to split host port addresses: %w", err)
66+
}
67+
68+
port, err := strconv.ParseUint(portS, 10, 32)
69+
if err != nil {
70+
return nil, fmt.Errorf("failed convert port to int: %w", err)
71+
}
72+
discoveryService := newMockDiscoveryService(host, uint32(port))
73+
74+
Ydb_Discovery_V1.RegisterDiscoveryServiceServer(res.grpcServer, discoveryService)
75+
Ydb_Topic_V1.RegisterTopicServiceServer(res.grpcServer, topicServiceImpl)
76+
77+
go func() {
78+
res.stopChan <- res.grpcServer.Serve(res.listener)
79+
}()
80+
81+
select {
82+
case <-res.stopChan:
83+
return nil, err
84+
case <-time.After(time.Millisecond):
85+
return res, nil
86+
}
87+
}
88+
89+
type mockDiscoveryService struct {
90+
Ydb_Discovery_V1.UnimplementedDiscoveryServiceServer
91+
host string
92+
port uint32
93+
}
94+
95+
func newMockDiscoveryService(host string, port uint32) *mockDiscoveryService {
96+
return &mockDiscoveryService{
97+
host: host,
98+
port: port,
99+
}
100+
}
101+
102+
func (m mockDiscoveryService) ListEndpoints(ctx context.Context, request *Ydb_Discovery.ListEndpointsRequest) (*Ydb_Discovery.ListEndpointsResponse, error) {
103+
res := &Ydb_Discovery.ListEndpointsResult{
104+
Endpoints: []*Ydb_Discovery.EndpointInfo{
105+
{
106+
Address: m.host,
107+
Port: m.port,
108+
LoadFactor: 0,
109+
Ssl: false,
110+
Service: nil,
111+
Location: "",
112+
NodeId: 1,
113+
IpV4: []string{"127.0.0.1"},
114+
},
115+
},
116+
SelfLocation: "",
117+
}
118+
resp := &Ydb_Discovery.ListEndpointsResponse{
119+
Operation: &Ydb_Operations.Operation{
120+
Id: "test-list-operation",
121+
Ready: true,
122+
Status: Ydb.StatusIds_SUCCESS,
123+
Result: &anypb.Any{},
124+
}}
125+
err := resp.GetOperation().GetResult().MarshalFrom(res)
126+
return resp, err
127+
}
128+
129+
func (m mockDiscoveryService) WhoAmI(ctx context.Context, request *Ydb_Discovery.WhoAmIRequest) (*Ydb_Discovery.WhoAmIResponse, error) {
130+
//TODO implement me
131+
panic("implement me")
132+
}

tests/mocked/scope.go

Lines changed: 0 additions & 81 deletions
This file was deleted.

0 commit comments

Comments
 (0)