Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/Ydb.Sdk/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
- Fixed bug: Received message exceeds the maximum configured message size (#421).
- Added `MaxSendMessageSize` \ `MaxReceiveMessageSize` grpc message size settings.
- Added `EnableMultipleHttp2Connections` setting to grpc channel.
- `Connection.State` is set to `Broken` when the session is deactivated.

Expand Down
76 changes: 56 additions & 20 deletions src/Ydb.Sdk/src/Ado/YdbConnectionStringBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,11 @@ private void InitDefaultValues()
_database = "/local";
_maxSessionPool = 100;
_useTls = false;
_keepAlivePingDelay = SocketHttpHandlerDefaults.DefaultKeepAlivePingSeconds;
_keepAlivePingTimeout = SocketHttpHandlerDefaults.DefaultKeepAlivePingTimeoutSeconds;
_keepAlivePingDelay = GrpcDefaultSettings.DefaultKeepAlivePingSeconds;
_keepAlivePingTimeout = GrpcDefaultSettings.DefaultKeepAlivePingTimeoutSeconds;
_enableMultipleHttp2Connections = false;
_maxSendMessageSize = GrpcDefaultSettings.MaxSendMessageSize;
_maxReceiveMessageSize = GrpcDefaultSettings.MaxReceiveMessageSize;
}

public string Host
Expand Down Expand Up @@ -187,6 +189,30 @@ public bool EnableMultipleHttp2Connections

private bool _enableMultipleHttp2Connections;

public int MaxSendMessageSize
{
get => _maxSendMessageSize;
set
{
_maxSendMessageSize = value;
SaveValue(nameof(MaxSendMessageSize), value);
}
}

private int _maxSendMessageSize;

public int MaxReceiveMessageSize
{
get => _maxReceiveMessageSize;
set
{
_maxReceiveMessageSize = value;
SaveValue(nameof(MaxReceiveMessageSize), value);
}
}

private int _maxReceiveMessageSize;

public ILoggerFactory? LoggerFactory { get; init; }

public ICredentialsProvider? CredentialsProvider { get; init; }
Expand Down Expand Up @@ -235,24 +261,29 @@ internal Task<Driver> BuildDriver()
{
var cert = RootCertificate != null ? X509Certificate.CreateFromCertFile(RootCertificate) : null;

return Driver.CreateInitialized(new DriverConfig(
endpoint: Endpoint,
database: Database,
credentials: CredentialsProvider,
customServerCertificate: cert,
customServerCertificates: ServerCertificates
)
{
KeepAlivePingDelay = KeepAlivePingDelay == 0
? Timeout.InfiniteTimeSpan
: TimeSpan.FromSeconds(KeepAlivePingDelay),
KeepAlivePingTimeout = KeepAlivePingTimeout == 0
? Timeout.InfiniteTimeSpan
: TimeSpan.FromSeconds(KeepAlivePingTimeout),
User = User,
Password = Password,
EnableMultipleHttp2Connections = EnableMultipleHttp2Connections
}, LoggerFactory);
return Driver.CreateInitialized(
new DriverConfig(
endpoint: Endpoint,
database: Database,
credentials: CredentialsProvider,
customServerCertificate: cert,
customServerCertificates: ServerCertificates
)
{
KeepAlivePingDelay = KeepAlivePingDelay == 0
? Timeout.InfiniteTimeSpan
: TimeSpan.FromSeconds(KeepAlivePingDelay),
KeepAlivePingTimeout = KeepAlivePingTimeout == 0
? Timeout.InfiniteTimeSpan
: TimeSpan.FromSeconds(KeepAlivePingTimeout),
User = User,
Password = Password,
EnableMultipleHttp2Connections = EnableMultipleHttp2Connections,
MaxSendMessageSize = MaxSendMessageSize,
MaxReceiveMessageSize = MaxReceiveMessageSize
},
LoggerFactory
);
}

public override void Clear()
Expand Down Expand Up @@ -333,6 +364,11 @@ static YdbConnectionOption()
AddOption(new YdbConnectionOption<bool>(BoolExtractor, (builder, enableMultipleHttp2Connections) =>
builder.EnableMultipleHttp2Connections = enableMultipleHttp2Connections),
"EnableMultipleHttp2Connections", "Enable Multiple Http2 Connections");
AddOption(new YdbConnectionOption<int>(IntExtractor, (builder, maxSendMessageSize) =>
builder.MaxSendMessageSize = maxSendMessageSize), "MaxSendMessageSize", "Max Send Message Size");
AddOption(new YdbConnectionOption<int>(IntExtractor, (builder, maxReceiveMessageSize) =>
builder.MaxReceiveMessageSize = maxReceiveMessageSize),
"MaxReceiveMessageSize", "Max Receive Message Size");
}

private static void AddOption(YdbConnectionOption option, params string[] keys)
Expand Down
11 changes: 7 additions & 4 deletions src/Ydb.Sdk/src/DriverConfig.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,19 @@ public class DriverConfig
public ICredentialsProvider? Credentials { get; }

public TimeSpan KeepAlivePingDelay { get; init; } =
TimeSpan.FromSeconds(SocketHttpHandlerDefaults.DefaultKeepAlivePingSeconds);
TimeSpan.FromSeconds(GrpcDefaultSettings.DefaultKeepAlivePingSeconds);

public TimeSpan KeepAlivePingTimeout { get; init; } =
TimeSpan.FromSeconds(SocketHttpHandlerDefaults.DefaultKeepAlivePingTimeoutSeconds);

public bool EnableMultipleHttp2Connections { get; init; }
TimeSpan.FromSeconds(GrpcDefaultSettings.DefaultKeepAlivePingTimeoutSeconds);

public string? User { get; init; }
public string? Password { get; init; }

public bool EnableMultipleHttp2Connections { get; init; }

public int MaxSendMessageSize { get; init; } = GrpcDefaultSettings.MaxSendMessageSize;
public int MaxReceiveMessageSize { get; init; } = GrpcDefaultSettings.MaxReceiveMessageSize;

internal X509Certificate2Collection CustomServerCertificates { get; } = new();
internal TimeSpan EndpointDiscoveryInterval = TimeSpan.FromMinutes(1);
internal TimeSpan EndpointDiscoveryTimeout = TimeSpan.FromSeconds(10);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
namespace Ydb.Sdk;

internal static class SocketHttpHandlerDefaults
internal static class GrpcDefaultSettings
{
/// <summary>
/// Default interval (in seconds) for sending keep-alive ping messages.
Expand All @@ -11,4 +11,8 @@ internal static class SocketHttpHandlerDefaults
/// Default timeout (in seconds) for receiving a response to a keep-alive ping.
/// </summary>
internal const int DefaultKeepAlivePingTimeoutSeconds = 10;

internal const int MaxSendMessageSize = 64 * 1024 * 1024; // 64 Mb

internal const int MaxReceiveMessageSize = 64 * 1024 * 1024; // 64 Mb
}
4 changes: 3 additions & 1 deletion src/Ydb.Sdk/src/Pool/ChannelPool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,9 @@ public GrpcChannel CreateChannel(string endpoint)
var channelOptions = new GrpcChannelOptions
{
LoggerFactory = _loggerFactory,
DisposeHttpClient = true
DisposeHttpClient = true,
MaxSendMessageSize = _config.MaxSendMessageSize,
MaxReceiveMessageSize = _config.MaxReceiveMessageSize
};

var httpHandler = new SocketsHttpHandler
Expand Down
32 changes: 19 additions & 13 deletions src/Ydb.Sdk/tests/Ado/YdbConnectionStringBuilderTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,20 @@ public class YdbConnectionStringBuilderTests
[Fact]
public void InitDefaultValues_WhenEmptyConstructorInvoke_ReturnDefaultConnectionString()
{
var connectionString = new YdbConnectionStringBuilder();
var ydbConnectionStringBuilder = new YdbConnectionStringBuilder();

Assert.Equal(2136, connectionString.Port);
Assert.Equal("localhost", connectionString.Host);
Assert.Equal("/local", connectionString.Database);
Assert.Equal(100, connectionString.MaxSessionPool);
Assert.Null(connectionString.User);
Assert.Null(connectionString.Password);
Assert.Equal(10, connectionString.KeepAlivePingDelay);
Assert.Equal(10, connectionString.KeepAlivePingTimeout);
Assert.Equal("", connectionString.ConnectionString);
Assert.False(connectionString.EnableMultipleHttp2Connections);
Assert.Equal(2136, ydbConnectionStringBuilder.Port);
Assert.Equal("localhost", ydbConnectionStringBuilder.Host);
Assert.Equal("/local", ydbConnectionStringBuilder.Database);
Assert.Equal(100, ydbConnectionStringBuilder.MaxSessionPool);
Assert.Null(ydbConnectionStringBuilder.User);
Assert.Null(ydbConnectionStringBuilder.Password);
Assert.Equal(10, ydbConnectionStringBuilder.KeepAlivePingDelay);
Assert.Equal(10, ydbConnectionStringBuilder.KeepAlivePingTimeout);
Assert.Equal("", ydbConnectionStringBuilder.ConnectionString);
Assert.False(ydbConnectionStringBuilder.EnableMultipleHttp2Connections);
Assert.Equal(64 * 1024 * 1024, ydbConnectionStringBuilder.MaxSendMessageSize);
Assert.Equal(64 * 1024 * 1024, ydbConnectionStringBuilder.MaxReceiveMessageSize);
}

[Fact]
Expand All @@ -38,7 +40,8 @@ public void InitConnectionStringBuilder_WhenExpectedKeys_ReturnUpdatedConnection
var connectionString =
new YdbConnectionStringBuilder("Host=server;Port=2135;Database=/my/path;User=Kirill;UseTls=true;" +
"KeepAlivePingDelay=30;KeepAlivePingTimeout=60;" +
"EnableMultipleHttp2Connections=true");
"EnableMultipleHttp2Connections=true;" +
"MaxSendMessageSize=1000000;MaxReceiveMessageSize=1000000");

Assert.Equal(2135, connectionString.Port);
Assert.Equal("server", connectionString.Host);
Expand All @@ -49,9 +52,12 @@ public void InitConnectionStringBuilder_WhenExpectedKeys_ReturnUpdatedConnection
Assert.Equal(60, connectionString.KeepAlivePingTimeout);
Assert.Null(connectionString.Password);
Assert.True(connectionString.EnableMultipleHttp2Connections);
Assert.Equal(1000000, connectionString.MaxSendMessageSize);
Assert.Equal(1000000, connectionString.MaxReceiveMessageSize);
Assert.Equal("Host=server;Port=2135;Database=/my/path;User=Kirill;UseTls=True;" +
"KeepAlivePingDelay=30;KeepAlivePingTimeout=60;" +
"EnableMultipleHttp2Connections=True", connectionString.ConnectionString);
"EnableMultipleHttp2Connections=True;" +
"MaxSendMessageSize=1000000;MaxReceiveMessageSize=1000000", connectionString.ConnectionString);
}

[Fact]
Expand Down
48 changes: 44 additions & 4 deletions src/Ydb.Sdk/tests/Topic/ReaderIntegrationTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,21 @@ public class ReaderIntegrationTests : IClassFixture<DriverFixture>
{
private readonly IDriver _driver;
private readonly string _topicName;
private readonly TopicClient _topicClient;

public ReaderIntegrationTests(DriverFixture driverFixture)
{
_driver = driverFixture.Driver;
_topicName = "reader_topic_" + Utils.Net;
_topicName = $"reader{Random.Shared.Next()}_topic_" + Utils.Net;
_topicClient = new TopicClient(_driver);
}

[Fact]
public async Task StressTest_WhenReadingThenCommiting_ReturnMessages()
{
var topicClient = new TopicClient(_driver);
var topicSettings = new CreateTopicSettings { Path = _topicName };
topicSettings.Consumers.Add(new Consumer("Consumer"));
await topicClient.CreateTopic(topicSettings);
await _topicClient.CreateTopic(topicSettings);

await using var writer = new WriterBuilder<string>(_driver, _topicName)
{ ProducerId = "producerId" }.Build();
Expand Down Expand Up @@ -61,6 +62,45 @@ public async Task StressTest_WhenReadingThenCommiting_ReturnMessages()

await readerNext.DisposeAsync();

await topicClient.DropTopic(_topicName);
await _topicClient.DropTopic(_topicName);
}

// Fixed error on receiving the biggest messages
// Grpc.Core.RpcException: Status(StatusCode="ResourceExhausted", Detail="Received message exceeds the maximum configured message size.")
[Fact]
public async Task BigMessage_WhenClientSendingLargeMessage_ReturnReading()
{
const int messageSize = 100_000;
const int payloadSize = 200;

var topicSettings = new CreateTopicSettings { Path = _topicName };
topicSettings.Consumers.Add(new Consumer("Consumer"));
await _topicClient.CreateTopic(topicSettings);
await using var writer = new WriterBuilder<byte[]>(_driver, _topicName)
{ ProducerId = "producerId" }.Build();

var payload = new byte[payloadSize];
Random.Shared.NextBytes(payload);

// 20 Mb sending
for (var i = 0; i < messageSize; i++)
{
await writer.WriteAsync(payload);
}

await using var reader = new ReaderBuilder<byte[]>(_driver)
{
ConsumerName = "Consumer",
SubscribeSettings = { new SubscribeSettings(_topicName) }
}.Build();


// 20 Mb reading
for (var i = 0; i < messageSize; i++)
{
var message = await reader.ReadAsync();
Assert.Equal(payload, message.Data);
await message.CommitAsync();
}
}
}
Loading