Skip to content

Commit 8801638

Browse files
committed
move grpc message size option to driver level
1 parent 97145c6 commit 8801638

File tree

11 files changed

+92
-49
lines changed

11 files changed

+92
-49
lines changed

config/config.go

Lines changed: 31 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -20,18 +20,19 @@ import (
2020
type Config struct {
2121
config.Common
2222

23-
trace *trace.Driver
24-
dialTimeout time.Duration
25-
connectionTTL time.Duration
26-
balancerConfig *balancerConfig.Config
27-
secure bool
28-
endpoint string
29-
database string
30-
metaOptions []meta.Option
31-
grpcOptions []grpc.DialOption
32-
credentials credentials.Credentials
33-
tlsConfig *tls.Config
34-
meta *meta.Meta
23+
trace *trace.Driver
24+
dialTimeout time.Duration
25+
connectionTTL time.Duration
26+
balancerConfig *balancerConfig.Config
27+
secure bool
28+
endpoint string
29+
database string
30+
metaOptions []meta.Option
31+
grpcOptions []grpc.DialOption
32+
grpcMaxMessageSize int
33+
credentials credentials.Credentials
34+
tlsConfig *tls.Config
35+
meta *meta.Meta
3536

3637
excludeGRPCCodesForPessimization []grpcCodes.Code
3738
}
@@ -53,6 +54,11 @@ func (c *Config) GrpcDialOptions() []grpc.DialOption {
5354
)
5455
}
5556

57+
// GrpcMaxMessageSize return client settings for max grpc message size
58+
func (c *Config) GrpcMaxMessageSize() int {
59+
return c.grpcMaxMessageSize
60+
}
61+
5662
// Meta reports meta information about database connection
5763
func (c *Config) Meta() *meta.Meta {
5864
return c.meta
@@ -245,6 +251,19 @@ func WithDialTimeout(timeout time.Duration) Option {
245251
}
246252
}
247253

254+
func WithGrpcMaxMessageSize(sizeBytes int) Option {
255+
return func(c *Config) {
256+
c.grpcMaxMessageSize = sizeBytes
257+
c.grpcOptions = append(c.grpcOptions,
258+
// limit size of outgoing and incoming packages
259+
grpc.WithDefaultCallOptions(
260+
grpc.MaxCallRecvMsgSize(c.grpcMaxMessageSize),
261+
grpc.MaxCallSendMsgSize(c.grpcMaxMessageSize),
262+
),
263+
)
264+
}
265+
}
266+
248267
func WithBalancer(balancer *balancerConfig.Config) Option {
249268
return func(c *Config) {
250269
c.balancerConfig = balancer

config/defaults.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -107,9 +107,10 @@ func defaultConfig() (c *Config) {
107107
credentials: credentials.NewAnonymousCredentials(
108108
credentials.WithSourceInfo(stack.Record(0)),
109109
),
110-
balancerConfig: balancers.Default(),
111-
tlsConfig: defaultTLSConfig(),
112-
dialTimeout: DefaultDialTimeout,
113-
trace: &trace.Driver{},
110+
balancerConfig: balancers.Default(),
111+
tlsConfig: defaultTLSConfig(),
112+
dialTimeout: DefaultDialTimeout,
113+
trace: &trace.Driver{},
114+
grpcMaxMessageSize: DefaultGRPCMsgSize,
114115
}
115116
}

driver.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import (
3434
"github.com/ydb-platform/ydb-go-sdk/v3/internal/stack"
3535
internalTable "github.com/ydb-platform/ydb-go-sdk/v3/internal/table"
3636
tableConfig "github.com/ydb-platform/ydb-go-sdk/v3/internal/table/config"
37+
internalTopic "github.com/ydb-platform/ydb-go-sdk/v3/internal/topic"
3738
"github.com/ydb-platform/ydb-go-sdk/v3/internal/topic/topicclientinternal"
3839
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xcontext"
3940
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors"
@@ -588,6 +589,7 @@ func (d *Driver) connect(ctx context.Context) (err error) {
588589
[]topicoptions.TopicOption{
589590
topicoptions.WithOperationTimeout(d.config.OperationTimeout()),
590591
topicoptions.WithOperationCancelAfter(d.config.OperationCancelAfter()),
592+
internalTopic.WithGrpcMessageSize(d.config.GrpcMaxMessageSize()),
591593
},
592594
d.topicOptions...,
593595
)...,

internal/config/config.go

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,6 @@ import (
77
"github.com/ydb-platform/ydb-go-sdk/v3/trace"
88
)
99

10-
// InitDefaultGRPCMessageSize used for init value only
11-
// don't use it in any place, other than initial get values for addr and config
12-
var InitDefaultGRPCMessageSize = 64 * 1024 * 1024 // 64MB
13-
14-
// DefaultGRPCMessageSizeAddr pointer to public DefaultGRPCMessageSize value
15-
// the scheme needs for prevent import public package from internal (and prevent import cycle)
16-
var DefaultGRPCMessageSizeAddr = &InitDefaultGRPCMessageSize
17-
1810
var defaultRetryBudget = budget.Limited(-1)
1911

2012
type Common struct {

internal/topic/configs.go

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,40 @@
11
package topic
22

33
import (
4+
"time"
5+
46
"github.com/ydb-platform/ydb-go-sdk/v3/internal/config"
57
"github.com/ydb-platform/ydb-go-sdk/v3/trace"
68
)
79

810
type Config struct {
911
config.Common
10-
Trace *trace.Topic
12+
Trace *trace.Topic
13+
MaxGrpcMessageSize int
14+
}
15+
16+
type Option func(c *Config)
17+
18+
func PublicWithTrace(trace trace.Topic, opts ...trace.TopicComposeOption) Option { //nolint:gocritic
19+
return func(c *Config) {
20+
c.Trace = c.Trace.Compose(&trace, opts...)
21+
}
22+
}
23+
24+
func PublicWithOperationTimeout(operationTimeout time.Duration) Option {
25+
return func(c *Config) {
26+
config.SetOperationTimeout(&c.Common, operationTimeout)
27+
}
28+
}
29+
30+
func PublicWithOperationCancelAfter(operationCancelAfter time.Duration) Option {
31+
return func(c *Config) {
32+
config.SetOperationCancelAfter(&c.Common, operationCancelAfter)
33+
}
34+
}
35+
36+
func WithGrpcMessageSize(sizeBytes int) Option {
37+
return func(c *Config) {
38+
c.MaxGrpcMessageSize = sizeBytes
39+
}
1140
}

internal/topic/topicclientinternal/client.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"google.golang.org/grpc"
99

1010
"github.com/ydb-platform/ydb-go-sdk/v3/credentials"
11+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/config"
1112
"github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawtopic"
1213
"github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawydb"
1314
"github.com/ydb-platform/ydb-go-sdk/v3/internal/topic"
@@ -58,8 +59,10 @@ func New(
5859

5960
func newTopicConfig(opts ...topicoptions.TopicOption) topic.Config {
6061
c := topic.Config{
61-
Trace: &trace.Topic{},
62+
Trace: &trace.Topic{},
63+
MaxGrpcMessageSize: config.DefaultGRPCMsgSize,
6264
}
65+
6366
for _, opt := range opts {
6467
if opt != nil {
6568
opt(&c)
@@ -322,7 +325,9 @@ func (c *Client) StartReader(
322325

323326
// StartWriter create new topic writer wrapper
324327
func (c *Client) StartWriter(topicPath string, opts ...topicoptions.WriterOption) (*topicwriter.Writer, error) {
325-
cfg := c.createWriterConfig(topicPath, opts)
328+
cfg := c.createWriterConfig(topicPath, append(opts, topicwriterinternal.WithMaxGrpcMessageBytes(
329+
c.cfg.MaxGrpcMessageSize,
330+
)))
326331
writer, err := topicwriterinternal.NewWriterReconnector(cfg)
327332
if err != nil {
328333
return nil, err

internal/topic/topicwriterinternal/writer_reconnector.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ func NewWriterReconnectorConfig(options ...PublicWriterOption) WriterReconnector
8383
clock: clockwork.NewRealClock(),
8484
compressorCount: runtime.NumCPU(),
8585
Tracer: &trace.Topic{},
86-
maxBytesPerMessage: *config.DefaultGRPCMessageSizeAddr,
86+
maxBytesPerMessage: config.DefaultGRPCMsgSize,
8787
},
8888
AutoSetSeqNo: true,
8989
AutoSetCreatedTime: true,

options.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -354,6 +354,17 @@ func WithDialTimeout(timeout time.Duration) Option {
354354
}
355355
}
356356

357+
// WithGrpcMaxMessageSize set max size of message on grpc level
358+
// use the option for the driver known custom limit.
359+
// Driver can't read the limit from direct grpc option
360+
func WithGrpcMaxMessageSize(sizeBytes int) Option {
361+
return func(ctx context.Context, d *Driver) error {
362+
d.options = append(d.options, config.WithGrpcMaxMessageSize(sizeBytes))
363+
364+
return nil
365+
}
366+
}
367+
357368
// With collects additional configuration options.
358369
//
359370
// This option does not replace collected option, instead it will append provided options.

tests/integration/topic_read_writer_test.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -532,11 +532,10 @@ func TestSendMessagesLargerThenGRPCLimit(t *testing.T) {
532532
const maxGrpcMsgSize = 10000 // bytes
533533
const topicMessageSize = maxGrpcMsgSize / 3
534534

535-
scope.Driver(ydb.With(config.WithGrpcOptions(grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(maxGrpcMsgSize)))))
535+
scope.Driver(ydb.WithGrpcMaxMessageSize(maxGrpcMsgSize))
536536
writer, err := scope.Driver().Topic().StartWriter(
537537
scope.TopicPath(),
538538
topicoptions.WithWriterCodec(topictypes.CodecRaw),
539-
topicoptions.WithWriterMaxSizeOfDataGrpcMessageBytes(maxGrpcMsgSize),
540539
)
541540
scope.Require.NoError(err)
542541
defer writer.Close(scope.Ctx)

topic/topicoptions/topicoptions_topic.go

Lines changed: 4 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -3,23 +3,20 @@ package topicoptions
33
import (
44
"time"
55

6-
"github.com/ydb-platform/ydb-go-sdk/v3/internal/config"
76
"github.com/ydb-platform/ydb-go-sdk/v3/internal/topic"
87
"github.com/ydb-platform/ydb-go-sdk/v3/trace"
98
)
109

1110
// TopicOption
1211
//
1312
// Experimental: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#experimental
14-
type TopicOption func(c *topic.Config)
13+
type TopicOption = topic.Option // func(c *topic.Config)
1514

1615
// WithTrace defines trace over persqueue client calls
1716
//
1817
// Experimental: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#experimental
1918
func WithTrace(trace trace.Topic, opts ...trace.TopicComposeOption) TopicOption { //nolint:gocritic
20-
return func(c *topic.Config) {
21-
c.Trace = c.Trace.Compose(&trace, opts...)
22-
}
19+
return topic.PublicWithTrace(trace, opts...)
2320
}
2421

2522
// WithOperationTimeout set the maximum amount of time a YDB server will process
@@ -30,9 +27,7 @@ func WithTrace(trace trace.Topic, opts ...trace.TopicComposeOption) TopicOption
3027
//
3128
// Experimental: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#experimental
3229
func WithOperationTimeout(operationTimeout time.Duration) TopicOption {
33-
return func(c *topic.Config) {
34-
config.SetOperationTimeout(&c.Common, operationTimeout)
35-
}
30+
return topic.PublicWithOperationTimeout(operationTimeout)
3631
}
3732

3833
// WithOperationCancelAfter set the maximum amount of time a YDB server will process an
@@ -43,7 +38,5 @@ func WithOperationTimeout(operationTimeout time.Duration) TopicOption {
4338
//
4439
// Experimental: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#experimental
4540
func WithOperationCancelAfter(operationCancelAfter time.Duration) TopicOption {
46-
return func(c *topic.Config) {
47-
config.SetOperationCancelAfter(&c.Common, operationCancelAfter)
48-
}
41+
return topic.PublicWithOperationCancelAfter(operationCancelAfter)
4942
}

0 commit comments

Comments
 (0)