Skip to content

Commit deaca20

Browse files
authored
Merge pull request #1676 from ydb-platform/1660-big-topic-message
Fixed hangup when try to send batch of messages with size more, then …
2 parents 7957a94 + 5ccb9ce commit deaca20

File tree

18 files changed

+310
-212
lines changed

18 files changed

+310
-212
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
* Fixed hangup when try to send batch of messages with size more, then grpc limits from topic writer internals
2+
13
## v3.101.2
24
* Added a new metric `ydb_go_sdk_ydb_info` with the current version of the SDK
35

config/config.go

Lines changed: 31 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -20,18 +20,19 @@ import (
2020
type Config struct {
2121
config.Common
2222

23-
trace *trace.Driver
24-
dialTimeout time.Duration
25-
connectionTTL time.Duration
26-
balancerConfig *balancerConfig.Config
27-
secure bool
28-
endpoint string
29-
database string
30-
metaOptions []meta.Option
31-
grpcOptions []grpc.DialOption
32-
credentials credentials.Credentials
33-
tlsConfig *tls.Config
34-
meta *meta.Meta
23+
trace *trace.Driver
24+
dialTimeout time.Duration
25+
connectionTTL time.Duration
26+
balancerConfig *balancerConfig.Config
27+
secure bool
28+
endpoint string
29+
database string
30+
metaOptions []meta.Option
31+
grpcOptions []grpc.DialOption
32+
grpcMaxMessageSize int
33+
credentials credentials.Credentials
34+
tlsConfig *tls.Config
35+
meta *meta.Meta
3536

3637
excludeGRPCCodesForPessimization []grpcCodes.Code
3738
}
@@ -53,6 +54,11 @@ func (c *Config) GrpcDialOptions() []grpc.DialOption {
5354
)
5455
}
5556

57+
// GrpcMaxMessageSize return client settings for max grpc message size
58+
func (c *Config) GrpcMaxMessageSize() int {
59+
return c.grpcMaxMessageSize
60+
}
61+
5662
// Meta reports meta information about database connection
5763
func (c *Config) Meta() *meta.Meta {
5864
return c.meta
@@ -245,6 +251,19 @@ func WithDialTimeout(timeout time.Duration) Option {
245251
}
246252
}
247253

254+
func WithGrpcMaxMessageSize(sizeBytes int) Option {
255+
return func(c *Config) {
256+
c.grpcMaxMessageSize = sizeBytes
257+
c.grpcOptions = append(c.grpcOptions,
258+
// limit size of outgoing and incoming packages
259+
grpc.WithDefaultCallOptions(
260+
grpc.MaxCallRecvMsgSize(c.grpcMaxMessageSize),
261+
grpc.MaxCallSendMsgSize(c.grpcMaxMessageSize),
262+
),
263+
)
264+
}
265+
}
266+
248267
func WithBalancer(balancer *balancerConfig.Config) Option {
249268
return func(c *Config) {
250269
c.balancerConfig = balancer

config/defaults.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -107,9 +107,10 @@ func defaultConfig() (c *Config) {
107107
credentials: credentials.NewAnonymousCredentials(
108108
credentials.WithSourceInfo(stack.Record(0)),
109109
),
110-
balancerConfig: balancers.Default(),
111-
tlsConfig: defaultTLSConfig(),
112-
dialTimeout: DefaultDialTimeout,
113-
trace: &trace.Driver{},
110+
balancerConfig: balancers.Default(),
111+
tlsConfig: defaultTLSConfig(),
112+
dialTimeout: DefaultDialTimeout,
113+
trace: &trace.Driver{},
114+
grpcMaxMessageSize: DefaultGRPCMsgSize,
114115
}
115116
}

driver.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import (
3434
"github.com/ydb-platform/ydb-go-sdk/v3/internal/stack"
3535
internalTable "github.com/ydb-platform/ydb-go-sdk/v3/internal/table"
3636
tableConfig "github.com/ydb-platform/ydb-go-sdk/v3/internal/table/config"
37+
internalTopic "github.com/ydb-platform/ydb-go-sdk/v3/internal/topic"
3738
"github.com/ydb-platform/ydb-go-sdk/v3/internal/topic/topicclientinternal"
3839
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xcontext"
3940
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors"
@@ -588,6 +589,7 @@ func (d *Driver) connect(ctx context.Context) (err error) {
588589
[]topicoptions.TopicOption{
589590
topicoptions.WithOperationTimeout(d.config.OperationTimeout()),
590591
topicoptions.WithOperationCancelAfter(d.config.OperationCancelAfter()),
592+
internalTopic.WithGrpcMessageSize(d.config.GrpcMaxMessageSize()),
591593
},
592594
d.topicOptions...,
593595
)...,

internal/grpcwrapper/rawtopic/rawtopicwriter/messages.go

Lines changed: 53 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"time"
99

1010
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Topic"
11+
"google.golang.org/protobuf/proto"
1112
"google.golang.org/protobuf/types/known/timestamppb"
1213

1314
"github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawtopic/rawtopiccommon"
@@ -169,16 +170,55 @@ func (r *WriteRequest) toProto() (p *Ydb_Topic.StreamWriteMessage_FromClient_Wri
169170
return res, nil
170171
}
171172

173+
var writeRequestClientMessageSize = proto.Size(&Ydb_Topic.StreamWriteMessage_FromClient{
174+
ClientMessage: &Ydb_Topic.StreamWriteMessage_FromClient_WriteRequest{},
175+
})
176+
177+
func (r *WriteRequest) Size() int {
178+
if mess, err := r.toProto(); err == nil {
179+
size := proto.Size(mess.WriteRequest) + writeRequestClientMessageSize
180+
181+
return size
182+
}
183+
184+
return 0
185+
}
186+
187+
func (r *WriteRequest) FillCache() *WriteRequest {
188+
r.Size()
189+
190+
return r
191+
}
192+
193+
func (r *WriteRequest) Cut(count int) (head *WriteRequest, rest *WriteRequest) {
194+
if count >= len(r.Messages) {
195+
return r, nil
196+
}
197+
198+
rest = &WriteRequest{}
199+
*rest = *r
200+
r.Messages, rest.Messages = r.Messages[:count], r.Messages[count:]
201+
202+
return r, rest
203+
}
204+
172205
type MessageData struct {
173206
SeqNo int64
174207
CreatedAt time.Time
175208
UncompressedSize int64
176209
Partitioning Partitioning
177210
MetadataItems []rawtopiccommon.MetadataItem
178211
Data []byte
212+
213+
size int
214+
proto *Ydb_Topic.StreamWriteMessage_WriteRequest_MessageData
179215
}
180216

181217
func (d *MessageData) ToProto() (*Ydb_Topic.StreamWriteMessage_WriteRequest_MessageData, error) {
218+
if d.proto != nil {
219+
return d.proto, nil
220+
}
221+
182222
res := &Ydb_Topic.StreamWriteMessage_WriteRequest_MessageData{
183223
SeqNo: d.SeqNo,
184224
CreatedAt: timestamppb.New(d.CreatedAt),
@@ -197,7 +237,19 @@ func (d *MessageData) ToProto() (*Ydb_Topic.StreamWriteMessage_WriteRequest_Mess
197237
})
198238
}
199239

200-
return res, nil
240+
d.proto = res
241+
242+
return d.proto, nil
243+
}
244+
245+
func (d *MessageData) ProtoWireSizeBytes() int {
246+
if d.size == 0 {
247+
if p, err := d.ToProto(); err == nil {
248+
d.size = proto.Size(p)
249+
}
250+
}
251+
252+
return d.size
201253
}
202254

203255
type WriteResult struct {

internal/grpcwrapper/rawtopic/rawtopicwriter/streamwriter.go

Lines changed: 1 addition & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,6 @@ import (
88
"sync/atomic"
99

1010
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Topic"
11-
"google.golang.org/grpc/codes"
12-
grpcStatus "google.golang.org/grpc/status"
1311

1412
"github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawtopic/rawtopiccommon"
1513
"github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawydb"
@@ -127,7 +125,7 @@ func (w *StreamWriter) Send(rawMsg ClientMessage) (err error) {
127125
return writeErr
128126
}
129127

130-
return sendWriteRequest(w.Stream.Send, writeReqProto)
128+
return w.Stream.Send(&Ydb_Topic.StreamWriteMessage_FromClient{ClientMessage: writeReqProto})
131129
case *UpdateTokenRequest:
132130
protoMsg.ClientMessage = &Ydb_Topic.StreamWriteMessage_FromClient_UpdateTokenRequest{
133131
UpdateTokenRequest: v.ToProto(),
@@ -149,8 +147,6 @@ func (w *StreamWriter) Send(rawMsg ClientMessage) (err error) {
149147
return nil
150148
}
151149

152-
type sendFunc func(req *Ydb_Topic.StreamWriteMessage_FromClient) error
153-
154150
func (w *StreamWriter) CloseSend() error {
155151
w.sendCloseMtx.Lock()
156152
defer w.sendCloseMtx.Unlock()
@@ -175,40 +171,3 @@ type ServerMessage interface {
175171
type serverMessageImpl struct{}
176172

177173
func (*serverMessageImpl) isServerMessage() {}
178-
179-
func sendWriteRequest(send sendFunc, req *Ydb_Topic.StreamWriteMessage_FromClient_WriteRequest) error {
180-
sendErr := send(&Ydb_Topic.StreamWriteMessage_FromClient{
181-
ClientMessage: req,
182-
})
183-
184-
if sendErr == nil {
185-
return nil
186-
}
187-
188-
grpcStatus, ok := grpcStatus.FromError(sendErr)
189-
if !ok {
190-
return sendErr
191-
}
192-
193-
grpcMessages := req.WriteRequest.GetMessages()
194-
if grpcStatus.Code() != codes.ResourceExhausted || len(grpcMessages) < 2 {
195-
return sendErr
196-
}
197-
198-
//nolint:gomnd
199-
splitIndex := len(grpcMessages) / 2
200-
firstMessages, lastMessages := grpcMessages[:splitIndex], grpcMessages[splitIndex:]
201-
defer func() {
202-
req.WriteRequest.Messages = grpcMessages
203-
}()
204-
205-
req.WriteRequest.Messages = firstMessages
206-
err := sendWriteRequest(send, req)
207-
if err != nil {
208-
return err
209-
}
210-
211-
req.WriteRequest.Messages = lastMessages
212-
213-
return sendWriteRequest(send, req)
214-
}

internal/grpcwrapper/rawtopic/rawtopicwriter/streamwriter_test.go

Lines changed: 0 additions & 102 deletions
This file was deleted.

internal/topic/configs.go

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,40 @@
11
package topic
22

33
import (
4+
"time"
5+
46
"github.com/ydb-platform/ydb-go-sdk/v3/internal/config"
57
"github.com/ydb-platform/ydb-go-sdk/v3/trace"
68
)
79

810
type Config struct {
911
config.Common
10-
Trace *trace.Topic
12+
Trace *trace.Topic
13+
MaxGrpcMessageSize int
14+
}
15+
16+
type Option func(c *Config)
17+
18+
func PublicWithTrace(trace trace.Topic, opts ...trace.TopicComposeOption) Option { //nolint:gocritic
19+
return func(c *Config) {
20+
c.Trace = c.Trace.Compose(&trace, opts...)
21+
}
22+
}
23+
24+
func PublicWithOperationTimeout(operationTimeout time.Duration) Option {
25+
return func(c *Config) {
26+
config.SetOperationTimeout(&c.Common, operationTimeout)
27+
}
28+
}
29+
30+
func PublicWithOperationCancelAfter(operationCancelAfter time.Duration) Option {
31+
return func(c *Config) {
32+
config.SetOperationCancelAfter(&c.Common, operationCancelAfter)
33+
}
34+
}
35+
36+
func WithGrpcMessageSize(sizeBytes int) Option {
37+
return func(c *Config) {
38+
c.MaxGrpcMessageSize = sizeBytes
39+
}
1140
}

0 commit comments

Comments
 (0)