Skip to content

Commit f3ad4e9

Browse files
feat: max send/receive message size settings (#422)
1 parent 0cb3eee commit f3ad4e9

File tree

7 files changed

+136
-43
lines changed

7 files changed

+136
-43
lines changed

src/Ydb.Sdk/CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
- Fixed bug: Received message exceeds the maximum configured message size (#421).
2+
- Added `MaxSendMessageSize` \ `MaxReceiveMessageSize` grpc message size settings.
13
- Added `EnableMultipleHttp2Connections` setting to grpc channel.
24
- `Connection.State` is set to `Broken` when the session is deactivated.
35

src/Ydb.Sdk/src/Ado/YdbConnectionStringBuilder.cs

Lines changed: 56 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,11 @@ private void InitDefaultValues()
2727
_database = "/local";
2828
_maxSessionPool = 100;
2929
_useTls = false;
30-
_keepAlivePingDelay = SocketHttpHandlerDefaults.DefaultKeepAlivePingSeconds;
31-
_keepAlivePingTimeout = SocketHttpHandlerDefaults.DefaultKeepAlivePingTimeoutSeconds;
30+
_keepAlivePingDelay = GrpcDefaultSettings.DefaultKeepAlivePingSeconds;
31+
_keepAlivePingTimeout = GrpcDefaultSettings.DefaultKeepAlivePingTimeoutSeconds;
3232
_enableMultipleHttp2Connections = false;
33+
_maxSendMessageSize = GrpcDefaultSettings.MaxSendMessageSize;
34+
_maxReceiveMessageSize = GrpcDefaultSettings.MaxReceiveMessageSize;
3335
}
3436

3537
public string Host
@@ -187,6 +189,30 @@ public bool EnableMultipleHttp2Connections
187189

188190
private bool _enableMultipleHttp2Connections;
189191

192+
public int MaxSendMessageSize
193+
{
194+
get => _maxSendMessageSize;
195+
set
196+
{
197+
_maxSendMessageSize = value;
198+
SaveValue(nameof(MaxSendMessageSize), value);
199+
}
200+
}
201+
202+
private int _maxSendMessageSize;
203+
204+
public int MaxReceiveMessageSize
205+
{
206+
get => _maxReceiveMessageSize;
207+
set
208+
{
209+
_maxReceiveMessageSize = value;
210+
SaveValue(nameof(MaxReceiveMessageSize), value);
211+
}
212+
}
213+
214+
private int _maxReceiveMessageSize;
215+
190216
public ILoggerFactory? LoggerFactory { get; init; }
191217

192218
public ICredentialsProvider? CredentialsProvider { get; init; }
@@ -235,24 +261,29 @@ internal Task<Driver> BuildDriver()
235261
{
236262
var cert = RootCertificate != null ? X509Certificate.CreateFromCertFile(RootCertificate) : null;
237263

238-
return Driver.CreateInitialized(new DriverConfig(
239-
endpoint: Endpoint,
240-
database: Database,
241-
credentials: CredentialsProvider,
242-
customServerCertificate: cert,
243-
customServerCertificates: ServerCertificates
244-
)
245-
{
246-
KeepAlivePingDelay = KeepAlivePingDelay == 0
247-
? Timeout.InfiniteTimeSpan
248-
: TimeSpan.FromSeconds(KeepAlivePingDelay),
249-
KeepAlivePingTimeout = KeepAlivePingTimeout == 0
250-
? Timeout.InfiniteTimeSpan
251-
: TimeSpan.FromSeconds(KeepAlivePingTimeout),
252-
User = User,
253-
Password = Password,
254-
EnableMultipleHttp2Connections = EnableMultipleHttp2Connections
255-
}, LoggerFactory);
264+
return Driver.CreateInitialized(
265+
new DriverConfig(
266+
endpoint: Endpoint,
267+
database: Database,
268+
credentials: CredentialsProvider,
269+
customServerCertificate: cert,
270+
customServerCertificates: ServerCertificates
271+
)
272+
{
273+
KeepAlivePingDelay = KeepAlivePingDelay == 0
274+
? Timeout.InfiniteTimeSpan
275+
: TimeSpan.FromSeconds(KeepAlivePingDelay),
276+
KeepAlivePingTimeout = KeepAlivePingTimeout == 0
277+
? Timeout.InfiniteTimeSpan
278+
: TimeSpan.FromSeconds(KeepAlivePingTimeout),
279+
User = User,
280+
Password = Password,
281+
EnableMultipleHttp2Connections = EnableMultipleHttp2Connections,
282+
MaxSendMessageSize = MaxSendMessageSize,
283+
MaxReceiveMessageSize = MaxReceiveMessageSize
284+
},
285+
LoggerFactory
286+
);
256287
}
257288

258289
public override void Clear()
@@ -333,6 +364,11 @@ static YdbConnectionOption()
333364
AddOption(new YdbConnectionOption<bool>(BoolExtractor, (builder, enableMultipleHttp2Connections) =>
334365
builder.EnableMultipleHttp2Connections = enableMultipleHttp2Connections),
335366
"EnableMultipleHttp2Connections", "Enable Multiple Http2 Connections");
367+
AddOption(new YdbConnectionOption<int>(IntExtractor, (builder, maxSendMessageSize) =>
368+
builder.MaxSendMessageSize = maxSendMessageSize), "MaxSendMessageSize", "Max Send Message Size");
369+
AddOption(new YdbConnectionOption<int>(IntExtractor, (builder, maxReceiveMessageSize) =>
370+
builder.MaxReceiveMessageSize = maxReceiveMessageSize),
371+
"MaxReceiveMessageSize", "Max Receive Message Size");
336372
}
337373

338374
private static void AddOption(YdbConnectionOption option, params string[] keys)

src/Ydb.Sdk/src/DriverConfig.cs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,16 +11,19 @@ public class DriverConfig
1111
public ICredentialsProvider? Credentials { get; }
1212

1313
public TimeSpan KeepAlivePingDelay { get; init; } =
14-
TimeSpan.FromSeconds(SocketHttpHandlerDefaults.DefaultKeepAlivePingSeconds);
14+
TimeSpan.FromSeconds(GrpcDefaultSettings.DefaultKeepAlivePingSeconds);
1515

1616
public TimeSpan KeepAlivePingTimeout { get; init; } =
17-
TimeSpan.FromSeconds(SocketHttpHandlerDefaults.DefaultKeepAlivePingTimeoutSeconds);
18-
19-
public bool EnableMultipleHttp2Connections { get; init; }
17+
TimeSpan.FromSeconds(GrpcDefaultSettings.DefaultKeepAlivePingTimeoutSeconds);
2018

2119
public string? User { get; init; }
2220
public string? Password { get; init; }
2321

22+
public bool EnableMultipleHttp2Connections { get; init; }
23+
24+
public int MaxSendMessageSize { get; init; } = GrpcDefaultSettings.MaxSendMessageSize;
25+
public int MaxReceiveMessageSize { get; init; } = GrpcDefaultSettings.MaxReceiveMessageSize;
26+
2427
internal X509Certificate2Collection CustomServerCertificates { get; } = new();
2528
internal TimeSpan EndpointDiscoveryInterval = TimeSpan.FromMinutes(1);
2629
internal TimeSpan EndpointDiscoveryTimeout = TimeSpan.FromSeconds(10);

src/Ydb.Sdk/src/SocketHttpHandlerDefaultSettings.cs renamed to src/Ydb.Sdk/src/GrpcDefaultSettings.cs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
namespace Ydb.Sdk;
22

3-
internal static class SocketHttpHandlerDefaults
3+
internal static class GrpcDefaultSettings
44
{
55
/// <summary>
66
/// Default interval (in seconds) for sending keep-alive ping messages.
@@ -11,4 +11,8 @@ internal static class SocketHttpHandlerDefaults
1111
/// Default timeout (in seconds) for receiving a response to a keep-alive ping.
1212
/// </summary>
1313
internal const int DefaultKeepAlivePingTimeoutSeconds = 10;
14+
15+
internal const int MaxSendMessageSize = 64 * 1024 * 1024; // 64 Mb
16+
17+
internal const int MaxReceiveMessageSize = 64 * 1024 * 1024; // 64 Mb
1418
}

src/Ydb.Sdk/src/Pool/ChannelPool.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,9 @@ public GrpcChannel CreateChannel(string endpoint)
9595
var channelOptions = new GrpcChannelOptions
9696
{
9797
LoggerFactory = _loggerFactory,
98-
DisposeHttpClient = true
98+
DisposeHttpClient = true,
99+
MaxSendMessageSize = _config.MaxSendMessageSize,
100+
MaxReceiveMessageSize = _config.MaxReceiveMessageSize
99101
};
100102

101103
var httpHandler = new SocketsHttpHandler

src/Ydb.Sdk/tests/Ado/YdbConnectionStringBuilderTests.cs

Lines changed: 19 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -8,18 +8,20 @@ public class YdbConnectionStringBuilderTests
88
[Fact]
99
public void InitDefaultValues_WhenEmptyConstructorInvoke_ReturnDefaultConnectionString()
1010
{
11-
var connectionString = new YdbConnectionStringBuilder();
11+
var ydbConnectionStringBuilder = new YdbConnectionStringBuilder();
1212

13-
Assert.Equal(2136, connectionString.Port);
14-
Assert.Equal("localhost", connectionString.Host);
15-
Assert.Equal("/local", connectionString.Database);
16-
Assert.Equal(100, connectionString.MaxSessionPool);
17-
Assert.Null(connectionString.User);
18-
Assert.Null(connectionString.Password);
19-
Assert.Equal(10, connectionString.KeepAlivePingDelay);
20-
Assert.Equal(10, connectionString.KeepAlivePingTimeout);
21-
Assert.Equal("", connectionString.ConnectionString);
22-
Assert.False(connectionString.EnableMultipleHttp2Connections);
13+
Assert.Equal(2136, ydbConnectionStringBuilder.Port);
14+
Assert.Equal("localhost", ydbConnectionStringBuilder.Host);
15+
Assert.Equal("/local", ydbConnectionStringBuilder.Database);
16+
Assert.Equal(100, ydbConnectionStringBuilder.MaxSessionPool);
17+
Assert.Null(ydbConnectionStringBuilder.User);
18+
Assert.Null(ydbConnectionStringBuilder.Password);
19+
Assert.Equal(10, ydbConnectionStringBuilder.KeepAlivePingDelay);
20+
Assert.Equal(10, ydbConnectionStringBuilder.KeepAlivePingTimeout);
21+
Assert.Equal("", ydbConnectionStringBuilder.ConnectionString);
22+
Assert.False(ydbConnectionStringBuilder.EnableMultipleHttp2Connections);
23+
Assert.Equal(64 * 1024 * 1024, ydbConnectionStringBuilder.MaxSendMessageSize);
24+
Assert.Equal(64 * 1024 * 1024, ydbConnectionStringBuilder.MaxReceiveMessageSize);
2325
}
2426

2527
[Fact]
@@ -38,7 +40,8 @@ public void InitConnectionStringBuilder_WhenExpectedKeys_ReturnUpdatedConnection
3840
var connectionString =
3941
new YdbConnectionStringBuilder("Host=server;Port=2135;Database=/my/path;User=Kirill;UseTls=true;" +
4042
"KeepAlivePingDelay=30;KeepAlivePingTimeout=60;" +
41-
"EnableMultipleHttp2Connections=true");
43+
"EnableMultipleHttp2Connections=true;" +
44+
"MaxSendMessageSize=1000000;MaxReceiveMessageSize=1000000");
4245

4346
Assert.Equal(2135, connectionString.Port);
4447
Assert.Equal("server", connectionString.Host);
@@ -49,9 +52,12 @@ public void InitConnectionStringBuilder_WhenExpectedKeys_ReturnUpdatedConnection
4952
Assert.Equal(60, connectionString.KeepAlivePingTimeout);
5053
Assert.Null(connectionString.Password);
5154
Assert.True(connectionString.EnableMultipleHttp2Connections);
55+
Assert.Equal(1000000, connectionString.MaxSendMessageSize);
56+
Assert.Equal(1000000, connectionString.MaxReceiveMessageSize);
5257
Assert.Equal("Host=server;Port=2135;Database=/my/path;User=Kirill;UseTls=True;" +
5358
"KeepAlivePingDelay=30;KeepAlivePingTimeout=60;" +
54-
"EnableMultipleHttp2Connections=True", connectionString.ConnectionString);
59+
"EnableMultipleHttp2Connections=True;" +
60+
"MaxSendMessageSize=1000000;MaxReceiveMessageSize=1000000", connectionString.ConnectionString);
5561
}
5662

5763
[Fact]

src/Ydb.Sdk/tests/Topic/ReaderIntegrationTests.cs

Lines changed: 44 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,20 +10,21 @@ public class ReaderIntegrationTests : IClassFixture<DriverFixture>
1010
{
1111
private readonly IDriver _driver;
1212
private readonly string _topicName;
13+
private readonly TopicClient _topicClient;
1314

1415
public ReaderIntegrationTests(DriverFixture driverFixture)
1516
{
1617
_driver = driverFixture.Driver;
17-
_topicName = "reader_topic_" + Utils.Net;
18+
_topicName = $"reader{Random.Shared.Next()}_topic_" + Utils.Net;
19+
_topicClient = new TopicClient(_driver);
1820
}
1921

2022
[Fact]
2123
public async Task StressTest_WhenReadingThenCommiting_ReturnMessages()
2224
{
23-
var topicClient = new TopicClient(_driver);
2425
var topicSettings = new CreateTopicSettings { Path = _topicName };
2526
topicSettings.Consumers.Add(new Consumer("Consumer"));
26-
await topicClient.CreateTopic(topicSettings);
27+
await _topicClient.CreateTopic(topicSettings);
2728

2829
await using var writer = new WriterBuilder<string>(_driver, _topicName)
2930
{ ProducerId = "producerId" }.Build();
@@ -61,6 +62,45 @@ public async Task StressTest_WhenReadingThenCommiting_ReturnMessages()
6162

6263
await readerNext.DisposeAsync();
6364

64-
await topicClient.DropTopic(_topicName);
65+
await _topicClient.DropTopic(_topicName);
66+
}
67+
68+
// Fixed error on receiving the biggest messages
69+
// Grpc.Core.RpcException: Status(StatusCode="ResourceExhausted", Detail="Received message exceeds the maximum configured message size.")
70+
[Fact]
71+
public async Task BigMessage_WhenClientSendingLargeMessage_ReturnReading()
72+
{
73+
const int messageSize = 100_000;
74+
const int payloadSize = 200;
75+
76+
var topicSettings = new CreateTopicSettings { Path = _topicName };
77+
topicSettings.Consumers.Add(new Consumer("Consumer"));
78+
await _topicClient.CreateTopic(topicSettings);
79+
await using var writer = new WriterBuilder<byte[]>(_driver, _topicName)
80+
{ ProducerId = "producerId" }.Build();
81+
82+
var payload = new byte[payloadSize];
83+
Random.Shared.NextBytes(payload);
84+
85+
// 20 Mb sending
86+
for (var i = 0; i < messageSize; i++)
87+
{
88+
await writer.WriteAsync(payload);
89+
}
90+
91+
await using var reader = new ReaderBuilder<byte[]>(_driver)
92+
{
93+
ConsumerName = "Consumer",
94+
SubscribeSettings = { new SubscribeSettings(_topicName) }
95+
}.Build();
96+
97+
98+
// 20 Mb reading
99+
for (var i = 0; i < messageSize; i++)
100+
{
101+
var message = await reader.ReadAsync();
102+
Assert.Equal(payload, message.Data);
103+
await message.CommitAsync();
104+
}
65105
}
66106
}

0 commit comments

Comments
 (0)