Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
- Fixed bug in Topic Writer: worker is stopped by disposeCts
- Fixed bug in sql parser ADO.NET: deduplication declare param in YQL query
- Deleted property BufferOverflowRetryTimeoutMs

## v0.9.2
- Fixed bug: delete deadline grpc timeout on AttachStream

Expand Down
8 changes: 7 additions & 1 deletion slo/src/Internal/SloContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,13 @@ public abstract class SloContext<T> where T : IDisposable
{
// ReSharper disable once StaticMemberInGenericType
protected static readonly ILoggerFactory Factory =
LoggerFactory.Create(builder => builder.AddConsole().SetMinimumLevel(LogLevel.Information));
LoggerFactory.Create(builder =>
{
builder.AddConsole().SetMinimumLevel(LogLevel.Information);
builder.AddFilter("Ydb.Sdk.Ado", LogLevel.Debug);
builder.AddFilter("Ydb.Sdk.Services.Query", LogLevel.Debug);
builder.AddFilter("Ydb.Sdk.Services.Topic", LogLevel.Debug);
});

protected static readonly ILogger Logger = Factory.CreateLogger<SloContext<T>>();

Expand Down
8 changes: 7 additions & 1 deletion src/Ydb.Sdk/src/Ado/Internal/SqlParser.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ internal static ParsedResult Parse(string sql)

var newYql = new StringBuilder();
var paramNames = new List<string>();
var foundParamNames = new HashSet<string>();

var prevToken = 0;

Expand Down Expand Up @@ -114,7 +115,12 @@ internal static ParsedResult Parse(string sql)

var originalParamName = $"${sql[prevToken .. curToken]}";

paramNames.Add(originalParamName);
if (!foundParamNames.Contains(originalParamName))
{
paramNames.Add(originalParamName);
}

foundParamNames.Add(originalParamName);
newYql.Append(originalParamName);
prevToken = curToken;

Expand Down
2 changes: 1 addition & 1 deletion src/Ydb.Sdk/src/Driver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ private async Task<Status> DiscoverEndpoints()

var resultProto = response.Operation.Result.Unpack<ListEndpointsResult>();

Logger.LogInformation(
Logger.LogDebug(
"Successfully discovered endpoints: {EndpointsCount}, self location: {SelfLocation}, sdk info: {SdkInfo}",
resultProto.Endpoints.Count, resultProto.SelfLocation, _sdkInfo);

Expand Down
48 changes: 35 additions & 13 deletions src/Ydb.Sdk/src/Services/Topic/Writer/Writer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ internal class Writer<TValue> : IWriter<TValue>
private readonly CancellationTokenSource _disposeCts = new();

private volatile TaskCompletionSource _tcsWakeUp = new();
private volatile TaskCompletionSource _tcsBufferAvailableEvent = new();
private volatile IWriteSession _session = null!;

private int _limitBufferMaxSize;
private volatile int _limitBufferMaxSize;

internal Writer(IDriver driver, WriterConfig config, ISerializer<TValue> serializer)
{
Expand Down Expand Up @@ -79,7 +79,7 @@ public async Task<WriteResult> WriteAsync(Message<TValue> message, CancellationT

while (true)
{
var curLimitBufferSize = Volatile.Read(ref _limitBufferMaxSize);
var curLimitBufferSize = _limitBufferMaxSize;

if ( // sending one biggest message anyway
(curLimitBufferSize == _config.BufferMaxSize && data.Length > curLimitBufferSize)
Expand All @@ -104,9 +104,9 @@ public async Task<WriteResult> WriteAsync(Message<TValue> message, CancellationT

try
{
await Task.Delay(_config.BufferOverflowRetryTimeoutMs, cancellationToken);
await WaitBufferAvailable(cancellationToken);
}
catch (TaskCanceledException)
catch (OperationCanceledException)
{
throw new WriterException("Buffer overflow");
}
Expand All @@ -121,24 +121,46 @@ public async Task<WriteResult> WriteAsync(Message<TValue> message, CancellationT
finally
{
Interlocked.Add(ref _limitBufferMaxSize, data.Length);

_tcsBufferAvailableEvent.TrySetResult();
}
}

private async Task WaitBufferAvailable(CancellationToken cancellationToken)
{
var tcsBufferAvailableEvent = _tcsBufferAvailableEvent;

await tcsBufferAvailableEvent.Task.WaitAsync(cancellationToken);

Interlocked.CompareExchange(
ref _tcsBufferAvailableEvent,
new TaskCompletionSource(),
tcsBufferAvailableEvent
);
}

private async void StartWriteWorker()
{
await Initialize();

while (!_disposeCts.Token.IsCancellationRequested)
try
{
await _tcsWakeUp.Task;
_tcsWakeUp = new TaskCompletionSource();

if (_toSendBuffer.IsEmpty)
while (!_disposeCts.Token.IsCancellationRequested)
{
continue;
}
await _tcsWakeUp.Task.WaitAsync(_disposeCts.Token);
_tcsWakeUp = new TaskCompletionSource();

await _session.Write(_toSendBuffer);
if (_toSendBuffer.IsEmpty)
{
continue;
}

await _session.Write(_toSendBuffer);
}
}
catch (OperationCanceledException)
{
_logger.LogInformation("WriteWorker[{WriterConfig}] is disposed", _config);
}
}

Expand Down
15 changes: 1 addition & 14 deletions src/Ydb.Sdk/src/Services/Topic/Writer/WriterBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -48,27 +48,14 @@ public WriterBuilder(IDriver driver, string topicPath)
/// </remarks>
public ISerializer<TValue>? Serializer { get; set; }


/// <summary>
/// Represents the timeout duration, in milliseconds, used when a buffer overflow is detected.
/// This timeout specifies how long the system should wait before attempting to retry the operation.
/// </summary>
/// <remarks>
/// This timeout is important for managing system performance and stability.
/// Too short a timeout could lead to rapid retry attempts, potentially causing further resource contention
/// and degrading system performance. Conversely, too long a timeout might delay processing significantly.
/// </remarks>
public int BufferOverflowRetryTimeoutMs { get; set; } = 10;

public IWriter<TValue> Build()
{
var config = new WriterConfig(
topicPath: TopicPath,
producerId: ProducerId,
codec: Codec,
bufferMaxSize: BufferMaxSize,
partitionId: PartitionId,
bufferOverflowRetryTimeoutMs: BufferOverflowRetryTimeoutMs
partitionId: PartitionId
);

return new Writer<TValue>(
Expand Down
10 changes: 3 additions & 7 deletions src/Ydb.Sdk/src/Services/Topic/Writer/WriterConfig.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,13 @@ internal WriterConfig(
string? producerId,
Codec codec,
int bufferMaxSize,
long? partitionId,
int bufferOverflowRetryTimeoutMs)
long? partitionId)
{
TopicPath = topicPath;
ProducerId = producerId;
Codec = codec;
BufferMaxSize = bufferMaxSize;
PartitionId = partitionId;
BufferOverflowRetryTimeoutMs = bufferOverflowRetryTimeoutMs;
}

public string TopicPath { get; }
Expand All @@ -30,17 +28,15 @@ internal WriterConfig(

public long? PartitionId { get; }

public int BufferOverflowRetryTimeoutMs { get; }

public override string ToString()
{
var toString = new StringBuilder().Append("[TopicPath: ").Append(TopicPath);
var toString = new StringBuilder().Append("TopicPath: ").Append(TopicPath);

if (ProducerId != null)
{
toString.Append(", ProducerId: ").Append(ProducerId);
}

return toString.Append(", Codec: ").Append(Codec).Append(']').ToString();
return toString.Append(", Codec: ").Append(Codec).ToString();
}
}
9 changes: 9 additions & 0 deletions src/Ydb.Sdk/tests/Ado/Internal/SqlParserTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -241,4 +241,13 @@ public void Parse_WhenMultilineStringLiterals_ReturnSql()
SELECT $param; SELECT $p2; SELECT $p_3;", sql);
Assert.Equal(new[] { "$param", "$p2", "$p_3" }, paramNames);
}

[Fact]
public void Parse_WhenRepeatedOneParam_ReturnThisParamInParamNames()
{
var (sql, paramNames) = SqlParser.Parse("SELECT @a, @a, @a;");

Assert.Equal("SELECT $a, $a, $a;", sql);
Assert.Equal(new[] { "$a" }, paramNames);
}
}
3 changes: 2 additions & 1 deletion src/Ydb.Sdk/tests/Topic/WriterIntegrationTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ public async Task WriteAsync_When1000Messages_ReturnWriteResultIsPersisted()
topicSettings.Consumers.Add(new Consumer("Consumer"));
await topicClient.CreateTopic(topicSettings);

using var writer = new WriterBuilder<int>(_driver, topicName) { ProducerId = "producerId" }.Build();
using var writer = new WriterBuilder<int>(_driver, topicName)
{ ProducerId = "producerId" }.Build();

var tasks = new List<Task>();
for (var i = 0; i < messageCount; i++)
Expand Down
93 changes: 1 addition & 92 deletions src/Ydb.Sdk/tests/Topic/WriterUnitTests.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
using Grpc.Core;
using Moq;
using Xunit;
using Xunit.Abstractions;
using Ydb.Issue;
using Ydb.Sdk.Services.Topic;
using Ydb.Sdk.Services.Topic.Writer;
Expand All @@ -15,13 +14,11 @@ namespace Ydb.Sdk.Tests.Topic;

public class WriterUnitTests
{
private readonly ITestOutputHelper _testOutputHelper;
private readonly Mock<IDriver> _mockIDriver = new();
private readonly Mock<WriterStream> _mockStream = new();

public WriterUnitTests(ITestOutputHelper testOutputHelper)
public WriterUnitTests()
{
_testOutputHelper = testOutputHelper;
_mockIDriver.Setup(driver => driver.BidirectionalStreamCall(
It.IsAny<Method<FromClient, StreamWriteMessage.Types.FromServer>>(),
It.IsAny<GrpcRequestSettings>())
Expand Down Expand Up @@ -317,94 +314,6 @@ public async Task Initialize_WhenNotSupportedCodec_ThrowWriterExceptionOnWriteAs
_mockStream.Verify(stream => stream.MoveNextAsync(), Times.Once);
}

[Fact]
public async Task WriteAsyncStress_WhenBufferIsOverflow_ThrowWriterExceptionOnBufferOverflow()
{
const int countBatchSendingSize = 1000;
const int batchTasksSize = 100;
const int bufferSize = 100;
const int messageSize = sizeof(int);

Assert.True(batchTasksSize > bufferSize / 4);
Assert.True(bufferSize % 4 == 0);

var taskSource = new TaskCompletionSource<bool>();
_mockStream.Setup(stream => stream.Write(It.IsAny<FromClient>()))
.Returns(Task.CompletedTask);
var mockNextAsync = _mockStream.SetupSequence(stream => stream.MoveNextAsync())
.Returns(new ValueTask<bool>(true))
.Returns(new ValueTask<bool>(taskSource.Task));
var sequentialResult = _mockStream.SetupSequence(stream => stream.Current)
.Returns(new StreamWriteMessage.Types.FromServer
{
InitResponse = new StreamWriteMessage.Types.InitResponse
{ LastSeqNo = 0, PartitionId = 1, SessionId = "SessionId" },
Status = StatusIds.Types.StatusCode.Success
});
using var writer = new WriterBuilder<int>(_mockIDriver.Object, "/topic")
{
ProducerId = "producerId",
BufferMaxSize = bufferSize /* bytes */,
BufferOverflowRetryTimeoutMs = 1_000
}.Build();

for (var attempt = 0; attempt < countBatchSendingSize; attempt++)
{
_testOutputHelper.WriteLine($"Processing attempt {attempt}");
var cts = new CancellationTokenSource();

var tasks = new List<Task<WriteResult>>();
var serverAck = new StreamWriteMessage.Types.FromServer
{
WriteResponse = new StreamWriteMessage.Types.WriteResponse { PartitionId = 1 },
Status = StatusIds.Types.StatusCode.Success
};
for (var i = 0; i < batchTasksSize; i++)
{
tasks.Add(writer.WriteAsync(100, cts.Token));
serverAck.WriteResponse.Acks.Add(new StreamWriteMessage.Types.WriteResponse.Types.WriteAck
{
SeqNo = bufferSize / messageSize * attempt + i + 1,
Written = new StreamWriteMessage.Types.WriteResponse.Types.WriteAck.Types.Written
{ Offset = i * messageSize + bufferSize * attempt }
});
}

sequentialResult.Returns(() =>
{
// ReSharper disable once AccessToModifiedClosure
Volatile.Write(ref taskSource, new TaskCompletionSource<bool>());
mockNextAsync.Returns(() =>
{
cts.Cancel();
return new ValueTask<bool>(Volatile.Read(ref taskSource).Task);
});
return serverAck;
});
taskSource.SetResult(true);

var countSuccess = 0;
var countErrors = 0;
foreach (var task in tasks)
{
try
{
var res = await task;
countSuccess++;
Assert.Equal(PersistenceStatus.Written, res.Status);
}
catch (WriterException e)
{
countErrors++;
Assert.Equal("Buffer overflow", e.Message);
}
}

Assert.Equal(bufferSize / messageSize, countSuccess);
Assert.Equal(batchTasksSize - bufferSize / messageSize, countErrors);
}
}

/*
* Performed invocations:

Expand Down
Loading