Skip to content

Commit 0d8d632

Browse files
committed
Fixed hangup when try to send batch of messages with size more, then grpc limits from topic writer internals
1 parent baa9c2a commit 0d8d632

File tree

12 files changed

+222
-173
lines changed

12 files changed

+222
-173
lines changed

CHANGELOG.md

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
1-
## v3.101.0
2-
* Added `table.Client.ReadRows` method with internal retries
1+
* Fixed hangup when try to send batch of messages with size more, then grpc limits from topic writer internals
32

43
## v3.100.3
54
* Fixed bug with concurrent rewrites source slice of `grpc.DialOption` on dial step

internal/config/config.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,14 @@ import (
77
"github.com/ydb-platform/ydb-go-sdk/v3/trace"
88
)
99

10+
// InitDefaultGRPCMessageSize used for init value only
11+
// don't use it in any place, other than initial get values for addr and config
12+
var InitDefaultGRPCMessageSize = 64 * 1024 * 1024 // 64MB
13+
14+
// DefaultGRPCMessageSizeAddr pointer to public DefaultGRPCMessageSize value
15+
// the scheme needs for prevent import public package from internal (and prevent import cycle)
16+
var DefaultGRPCMessageSizeAddr = &InitDefaultGRPCMessageSize
17+
1018
var defaultRetryBudget = budget.Limited(-1)
1119

1220
type Common struct {

internal/grpcwrapper/rawtopic/rawtopicwriter/messages.go

Lines changed: 47 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"time"
99

1010
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Topic"
11+
"google.golang.org/protobuf/proto"
1112
"google.golang.org/protobuf/types/known/timestamppb"
1213

1314
"github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawtopic/rawtopiccommon"
@@ -169,16 +170,49 @@ func (r *WriteRequest) toProto() (p *Ydb_Topic.StreamWriteMessage_FromClient_Wri
169170
return res, nil
170171
}
171172

173+
var writeRequestClientMessageSize = proto.Size(&Ydb_Topic.StreamWriteMessage_FromClient{
174+
ClientMessage: &Ydb_Topic.StreamWriteMessage_FromClient_WriteRequest{},
175+
})
176+
177+
func (r *WriteRequest) Size() int {
178+
if mess, err := r.toProto(); err == nil {
179+
size := proto.Size(mess.WriteRequest) + writeRequestClientMessageSize
180+
181+
return size
182+
}
183+
184+
return 0
185+
}
186+
187+
func (r *WriteRequest) Cut(count int) (head *WriteRequest, rest *WriteRequest) {
188+
if count >= len(r.Messages) {
189+
return r, nil
190+
}
191+
192+
rest = &WriteRequest{}
193+
*rest = *r
194+
r.Messages, rest.Messages = r.Messages[:count], r.Messages[count:]
195+
196+
return r, rest
197+
}
198+
172199
type MessageData struct {
173200
SeqNo int64
174201
CreatedAt time.Time
175202
UncompressedSize int64
176203
Partitioning Partitioning
177204
MetadataItems []rawtopiccommon.MetadataItem
178205
Data []byte
206+
207+
size int
208+
proto *Ydb_Topic.StreamWriteMessage_WriteRequest_MessageData
179209
}
180210

181211
func (d *MessageData) ToProto() (*Ydb_Topic.StreamWriteMessage_WriteRequest_MessageData, error) {
212+
if d.proto != nil {
213+
return d.proto, nil
214+
}
215+
182216
res := &Ydb_Topic.StreamWriteMessage_WriteRequest_MessageData{
183217
SeqNo: d.SeqNo,
184218
CreatedAt: timestamppb.New(d.CreatedAt),
@@ -197,7 +231,19 @@ func (d *MessageData) ToProto() (*Ydb_Topic.StreamWriteMessage_WriteRequest_Mess
197231
})
198232
}
199233

200-
return res, nil
234+
d.proto = res
235+
236+
return d.proto, nil
237+
}
238+
239+
func (d *MessageData) ProtoWireSizeBytes() int {
240+
if d.size == 0 {
241+
if p, err := d.ToProto(); err == nil {
242+
d.size = proto.Size(p)
243+
}
244+
}
245+
246+
return d.size
201247
}
202248

203249
type WriteResult struct {

internal/grpcwrapper/rawtopic/rawtopicwriter/streamwriter.go

Lines changed: 1 addition & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,6 @@ import (
88
"sync/atomic"
99

1010
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Topic"
11-
"google.golang.org/grpc/codes"
12-
grpcStatus "google.golang.org/grpc/status"
1311

1412
"github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawtopic/rawtopiccommon"
1513
"github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawydb"
@@ -127,7 +125,7 @@ func (w *StreamWriter) Send(rawMsg ClientMessage) (err error) {
127125
return writeErr
128126
}
129127

130-
return sendWriteRequest(w.Stream.Send, writeReqProto)
128+
return w.Stream.Send(&Ydb_Topic.StreamWriteMessage_FromClient{ClientMessage: writeReqProto})
131129
case *UpdateTokenRequest:
132130
protoMsg.ClientMessage = &Ydb_Topic.StreamWriteMessage_FromClient_UpdateTokenRequest{
133131
UpdateTokenRequest: v.ToProto(),
@@ -149,8 +147,6 @@ func (w *StreamWriter) Send(rawMsg ClientMessage) (err error) {
149147
return nil
150148
}
151149

152-
type sendFunc func(req *Ydb_Topic.StreamWriteMessage_FromClient) error
153-
154150
func (w *StreamWriter) CloseSend() error {
155151
w.sendCloseMtx.Lock()
156152
defer w.sendCloseMtx.Unlock()
@@ -175,40 +171,3 @@ type ServerMessage interface {
175171
type serverMessageImpl struct{}
176172

177173
func (*serverMessageImpl) isServerMessage() {}
178-
179-
func sendWriteRequest(send sendFunc, req *Ydb_Topic.StreamWriteMessage_FromClient_WriteRequest) error {
180-
sendErr := send(&Ydb_Topic.StreamWriteMessage_FromClient{
181-
ClientMessage: req,
182-
})
183-
184-
if sendErr == nil {
185-
return nil
186-
}
187-
188-
grpcStatus, ok := grpcStatus.FromError(sendErr)
189-
if !ok {
190-
return sendErr
191-
}
192-
193-
grpcMessages := req.WriteRequest.GetMessages()
194-
if grpcStatus.Code() != codes.ResourceExhausted || len(grpcMessages) < 2 {
195-
return sendErr
196-
}
197-
198-
//nolint:gomnd
199-
splitIndex := len(grpcMessages) / 2
200-
firstMessages, lastMessages := grpcMessages[:splitIndex], grpcMessages[splitIndex:]
201-
defer func() {
202-
req.WriteRequest.Messages = grpcMessages
203-
}()
204-
205-
req.WriteRequest.Messages = firstMessages
206-
err := sendWriteRequest(send, req)
207-
if err != nil {
208-
return err
209-
}
210-
211-
req.WriteRequest.Messages = lastMessages
212-
213-
return sendWriteRequest(send, req)
214-
}

internal/grpcwrapper/rawtopic/rawtopicwriter/streamwriter_test.go

Lines changed: 0 additions & 102 deletions
This file was deleted.

internal/topic/topicwriterinternal/writer_config.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ type WritersCommonConfig struct {
1717
writerMeta map[string]string
1818
defaultPartitioning rawtopicwriter.Partitioning
1919
compressorCount int
20+
maxBytesPerMessage int
2021

2122
Tracer *trace.Topic
2223
cred credentials.Credentials

internal/topic/topicwriterinternal/writer_options.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,12 @@ func WithCompressorCount(num int) PublicWriterOption {
4545
}
4646
}
4747

48+
func WithMaxGrpcMessageBytes(num int) PublicWriterOption {
49+
return func(cfg *WriterReconnectorConfig) {
50+
cfg.maxBytesPerMessage = num
51+
}
52+
}
53+
4854
func WithTokenUpdateInterval(interval time.Duration) PublicWriterOption {
4955
return func(cfg *WriterReconnectorConfig) {
5056
cfg.credUpdateInterval = interval

internal/topic/topicwriterinternal/writer_reconnector.go

Lines changed: 7 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ func NewWriterReconnectorConfig(options ...PublicWriterOption) WriterReconnector
8383
clock: clockwork.NewRealClock(),
8484
compressorCount: runtime.NumCPU(),
8585
Tracer: &trace.Topic{},
86+
maxBytesPerMessage: *config.DefaultGRPCMessageSizeAddr,
8687
},
8788
AutoSetSeqNo: true,
8889
AutoSetCreatedTime: true,
@@ -207,7 +208,7 @@ func (w *WriterReconnector) fillFields(messages []messageWithDataContent) error
207208

208209
func (w *WriterReconnector) start() {
209210
name := fmt.Sprintf("writer %q", w.cfg.topic)
210-
w.background.Start(name+", sendloop", w.connectionLoop)
211+
w.background.Start(name+", connectionLoop", w.connectionLoop)
211212
}
212213

213214
func (w *WriterReconnector) Write(ctx context.Context, messages []PublicMessage) (resErr error) {
@@ -643,27 +644,6 @@ func (w *WriterReconnector) GetSessionID() (sessionID string) {
643644
return sessionID
644645
}
645646

646-
func sendMessagesToStream(
647-
stream RawTopicWriterStream,
648-
targetCodec rawtopiccommon.Codec,
649-
messages []messageWithDataContent,
650-
) error {
651-
if len(messages) == 0 {
652-
return nil
653-
}
654-
655-
request, err := createWriteRequest(messages, targetCodec)
656-
if err != nil {
657-
return err
658-
}
659-
err = stream.Send(&request)
660-
if err != nil {
661-
return xerrors.WithStackTrace(fmt.Errorf("ydb: failed send write request: %w", err))
662-
}
663-
664-
return nil
665-
}
666-
667647
func allMessagesHasSameBufCodec(messages []messageWithDataContent) bool {
668648
if len(messages) <= 1 {
669649
return true
@@ -699,15 +679,17 @@ func splitMessagesByBufCodec(messages []messageWithDataContent) (res [][]message
699679
}
700680

701681
func createWriteRequest(messages []messageWithDataContent, targetCodec rawtopiccommon.Codec) (
702-
res rawtopicwriter.WriteRequest,
682+
res *rawtopicwriter.WriteRequest,
703683
err error,
704684
) {
705685
for i := 1; i < len(messages); i++ {
706686
if messages[i-1].tx != messages[i].tx {
707-
return res, xerrors.WithStackTrace(errDiffetentTransactions)
687+
return nil, xerrors.WithStackTrace(errDiffetentTransactions)
708688
}
709689
}
710690

691+
res = &rawtopicwriter.WriteRequest{}
692+
711693
if len(messages) > 0 && messages[0].tx != nil {
712694
res.Tx.ID = messages[0].tx.ID()
713695
res.Tx.Session = messages[0].tx.SessionID()
@@ -718,7 +700,7 @@ func createWriteRequest(messages []messageWithDataContent, targetCodec rawtopicc
718700
for i := range messages {
719701
res.Messages[i], err = createRawMessageData(res.Codec, &messages[i])
720702
if err != nil {
721-
return res, err
703+
return nil, err
722704
}
723705
}
724706

0 commit comments

Comments
 (0)