diff --git a/CHANGELOG.md b/CHANGELOG.md index 01205b3f..0728fd55 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,7 @@ +- Fixed bug in Topic Writer: worker is stopped by disposeCts +- Fixed bug in sql parser ADO.NET: deduplication declare param in YQL query +- Deleted property BufferOverflowRetryTimeoutMs + ## v0.9.2 - Fixed bug: delete deadline grpc timeout on AttachStream diff --git a/slo/src/Internal/SloContext.cs b/slo/src/Internal/SloContext.cs index b3554b14..d607e1e9 100644 --- a/slo/src/Internal/SloContext.cs +++ b/slo/src/Internal/SloContext.cs @@ -11,7 +11,13 @@ public abstract class SloContext where T : IDisposable { // ReSharper disable once StaticMemberInGenericType protected static readonly ILoggerFactory Factory = - LoggerFactory.Create(builder => builder.AddConsole().SetMinimumLevel(LogLevel.Information)); + LoggerFactory.Create(builder => + { + builder.AddConsole().SetMinimumLevel(LogLevel.Information); + builder.AddFilter("Ydb.Sdk.Ado", LogLevel.Debug); + builder.AddFilter("Ydb.Sdk.Services.Query", LogLevel.Debug); + builder.AddFilter("Ydb.Sdk.Services.Topic", LogLevel.Debug); + }); protected static readonly ILogger Logger = Factory.CreateLogger>(); diff --git a/src/Ydb.Sdk/src/Ado/Internal/SqlParser.cs b/src/Ydb.Sdk/src/Ado/Internal/SqlParser.cs index c72a42ae..4e9fe47b 100644 --- a/src/Ydb.Sdk/src/Ado/Internal/SqlParser.cs +++ b/src/Ydb.Sdk/src/Ado/Internal/SqlParser.cs @@ -17,6 +17,7 @@ internal static ParsedResult Parse(string sql) var newYql = new StringBuilder(); var paramNames = new List(); + var foundParamNames = new HashSet(); var prevToken = 0; @@ -114,7 +115,12 @@ internal static ParsedResult Parse(string sql) var originalParamName = $"${sql[prevToken .. curToken]}"; - paramNames.Add(originalParamName); + if (!foundParamNames.Contains(originalParamName)) + { + paramNames.Add(originalParamName); + } + + foundParamNames.Add(originalParamName); newYql.Append(originalParamName); prevToken = curToken; diff --git a/src/Ydb.Sdk/src/Driver.cs b/src/Ydb.Sdk/src/Driver.cs index 4f42fe3b..912ef583 100644 --- a/src/Ydb.Sdk/src/Driver.cs +++ b/src/Ydb.Sdk/src/Driver.cs @@ -155,7 +155,7 @@ private async Task DiscoverEndpoints() var resultProto = response.Operation.Result.Unpack(); - Logger.LogInformation( + Logger.LogDebug( "Successfully discovered endpoints: {EndpointsCount}, self location: {SelfLocation}, sdk info: {SdkInfo}", resultProto.Endpoints.Count, resultProto.SelfLocation, _sdkInfo); diff --git a/src/Ydb.Sdk/src/Services/Topic/Writer/Writer.cs b/src/Ydb.Sdk/src/Services/Topic/Writer/Writer.cs index 544bb065..e7920f1a 100644 --- a/src/Ydb.Sdk/src/Services/Topic/Writer/Writer.cs +++ b/src/Ydb.Sdk/src/Services/Topic/Writer/Writer.cs @@ -26,9 +26,9 @@ internal class Writer : IWriter private readonly CancellationTokenSource _disposeCts = new(); private volatile TaskCompletionSource _tcsWakeUp = new(); + private volatile TaskCompletionSource _tcsBufferAvailableEvent = new(); private volatile IWriteSession _session = null!; - - private int _limitBufferMaxSize; + private volatile int _limitBufferMaxSize; internal Writer(IDriver driver, WriterConfig config, ISerializer serializer) { @@ -79,7 +79,7 @@ public async Task WriteAsync(Message message, CancellationT while (true) { - var curLimitBufferSize = Volatile.Read(ref _limitBufferMaxSize); + var curLimitBufferSize = _limitBufferMaxSize; if ( // sending one biggest message anyway (curLimitBufferSize == _config.BufferMaxSize && data.Length > curLimitBufferSize) @@ -104,9 +104,9 @@ public async Task WriteAsync(Message message, CancellationT try { - await Task.Delay(_config.BufferOverflowRetryTimeoutMs, cancellationToken); + await WaitBufferAvailable(cancellationToken); } - catch (TaskCanceledException) + catch (OperationCanceledException) { throw new WriterException("Buffer overflow"); } @@ -121,24 +121,46 @@ public async Task WriteAsync(Message message, CancellationT finally { Interlocked.Add(ref _limitBufferMaxSize, data.Length); + + _tcsBufferAvailableEvent.TrySetResult(); } } + private async Task WaitBufferAvailable(CancellationToken cancellationToken) + { + var tcsBufferAvailableEvent = _tcsBufferAvailableEvent; + + await tcsBufferAvailableEvent.Task.WaitAsync(cancellationToken); + + Interlocked.CompareExchange( + ref _tcsBufferAvailableEvent, + new TaskCompletionSource(), + tcsBufferAvailableEvent + ); + } + private async void StartWriteWorker() { await Initialize(); - while (!_disposeCts.Token.IsCancellationRequested) + try { - await _tcsWakeUp.Task; - _tcsWakeUp = new TaskCompletionSource(); - - if (_toSendBuffer.IsEmpty) + while (!_disposeCts.Token.IsCancellationRequested) { - continue; - } + await _tcsWakeUp.Task.WaitAsync(_disposeCts.Token); + _tcsWakeUp = new TaskCompletionSource(); - await _session.Write(_toSendBuffer); + if (_toSendBuffer.IsEmpty) + { + continue; + } + + await _session.Write(_toSendBuffer); + } + } + catch (OperationCanceledException) + { + _logger.LogInformation("WriteWorker[{WriterConfig}] is disposed", _config); } } diff --git a/src/Ydb.Sdk/src/Services/Topic/Writer/WriterBuilder.cs b/src/Ydb.Sdk/src/Services/Topic/Writer/WriterBuilder.cs index 3064ceec..93b186ee 100644 --- a/src/Ydb.Sdk/src/Services/Topic/Writer/WriterBuilder.cs +++ b/src/Ydb.Sdk/src/Services/Topic/Writer/WriterBuilder.cs @@ -48,18 +48,6 @@ public WriterBuilder(IDriver driver, string topicPath) /// public ISerializer? Serializer { get; set; } - - /// - /// Represents the timeout duration, in milliseconds, used when a buffer overflow is detected. - /// This timeout specifies how long the system should wait before attempting to retry the operation. - /// - /// - /// This timeout is important for managing system performance and stability. - /// Too short a timeout could lead to rapid retry attempts, potentially causing further resource contention - /// and degrading system performance. Conversely, too long a timeout might delay processing significantly. - /// - public int BufferOverflowRetryTimeoutMs { get; set; } = 10; - public IWriter Build() { var config = new WriterConfig( @@ -67,8 +55,7 @@ public IWriter Build() producerId: ProducerId, codec: Codec, bufferMaxSize: BufferMaxSize, - partitionId: PartitionId, - bufferOverflowRetryTimeoutMs: BufferOverflowRetryTimeoutMs + partitionId: PartitionId ); return new Writer( diff --git a/src/Ydb.Sdk/src/Services/Topic/Writer/WriterConfig.cs b/src/Ydb.Sdk/src/Services/Topic/Writer/WriterConfig.cs index 460ca697..81102ec4 100644 --- a/src/Ydb.Sdk/src/Services/Topic/Writer/WriterConfig.cs +++ b/src/Ydb.Sdk/src/Services/Topic/Writer/WriterConfig.cs @@ -9,15 +9,13 @@ internal WriterConfig( string? producerId, Codec codec, int bufferMaxSize, - long? partitionId, - int bufferOverflowRetryTimeoutMs) + long? partitionId) { TopicPath = topicPath; ProducerId = producerId; Codec = codec; BufferMaxSize = bufferMaxSize; PartitionId = partitionId; - BufferOverflowRetryTimeoutMs = bufferOverflowRetryTimeoutMs; } public string TopicPath { get; } @@ -30,17 +28,15 @@ internal WriterConfig( public long? PartitionId { get; } - public int BufferOverflowRetryTimeoutMs { get; } - public override string ToString() { - var toString = new StringBuilder().Append("[TopicPath: ").Append(TopicPath); + var toString = new StringBuilder().Append("TopicPath: ").Append(TopicPath); if (ProducerId != null) { toString.Append(", ProducerId: ").Append(ProducerId); } - return toString.Append(", Codec: ").Append(Codec).Append(']').ToString(); + return toString.Append(", Codec: ").Append(Codec).ToString(); } } diff --git a/src/Ydb.Sdk/tests/Ado/Internal/SqlParserTests.cs b/src/Ydb.Sdk/tests/Ado/Internal/SqlParserTests.cs index ca96aff6..61d54a4f 100644 --- a/src/Ydb.Sdk/tests/Ado/Internal/SqlParserTests.cs +++ b/src/Ydb.Sdk/tests/Ado/Internal/SqlParserTests.cs @@ -241,4 +241,13 @@ public void Parse_WhenMultilineStringLiterals_ReturnSql() SELECT $param; SELECT $p2; SELECT $p_3;", sql); Assert.Equal(new[] { "$param", "$p2", "$p_3" }, paramNames); } + + [Fact] + public void Parse_WhenRepeatedOneParam_ReturnThisParamInParamNames() + { + var (sql, paramNames) = SqlParser.Parse("SELECT @a, @a, @a;"); + + Assert.Equal("SELECT $a, $a, $a;", sql); + Assert.Equal(new[] { "$a" }, paramNames); + } } diff --git a/src/Ydb.Sdk/tests/Topic/WriterIntegrationTests.cs b/src/Ydb.Sdk/tests/Topic/WriterIntegrationTests.cs index 924f0383..50f3ebea 100644 --- a/src/Ydb.Sdk/tests/Topic/WriterIntegrationTests.cs +++ b/src/Ydb.Sdk/tests/Topic/WriterIntegrationTests.cs @@ -61,7 +61,8 @@ public async Task WriteAsync_When1000Messages_ReturnWriteResultIsPersisted() topicSettings.Consumers.Add(new Consumer("Consumer")); await topicClient.CreateTopic(topicSettings); - using var writer = new WriterBuilder(_driver, topicName) { ProducerId = "producerId" }.Build(); + using var writer = new WriterBuilder(_driver, topicName) + { ProducerId = "producerId" }.Build(); var tasks = new List(); for (var i = 0; i < messageCount; i++) diff --git a/src/Ydb.Sdk/tests/Topic/WriterUnitTests.cs b/src/Ydb.Sdk/tests/Topic/WriterUnitTests.cs index baf8e78a..b0501f00 100644 --- a/src/Ydb.Sdk/tests/Topic/WriterUnitTests.cs +++ b/src/Ydb.Sdk/tests/Topic/WriterUnitTests.cs @@ -1,7 +1,6 @@ using Grpc.Core; using Moq; using Xunit; -using Xunit.Abstractions; using Ydb.Issue; using Ydb.Sdk.Services.Topic; using Ydb.Sdk.Services.Topic.Writer; @@ -15,13 +14,11 @@ namespace Ydb.Sdk.Tests.Topic; public class WriterUnitTests { - private readonly ITestOutputHelper _testOutputHelper; private readonly Mock _mockIDriver = new(); private readonly Mock _mockStream = new(); - public WriterUnitTests(ITestOutputHelper testOutputHelper) + public WriterUnitTests() { - _testOutputHelper = testOutputHelper; _mockIDriver.Setup(driver => driver.BidirectionalStreamCall( It.IsAny>(), It.IsAny()) @@ -317,94 +314,6 @@ public async Task Initialize_WhenNotSupportedCodec_ThrowWriterExceptionOnWriteAs _mockStream.Verify(stream => stream.MoveNextAsync(), Times.Once); } - [Fact] - public async Task WriteAsyncStress_WhenBufferIsOverflow_ThrowWriterExceptionOnBufferOverflow() - { - const int countBatchSendingSize = 1000; - const int batchTasksSize = 100; - const int bufferSize = 100; - const int messageSize = sizeof(int); - - Assert.True(batchTasksSize > bufferSize / 4); - Assert.True(bufferSize % 4 == 0); - - var taskSource = new TaskCompletionSource(); - _mockStream.Setup(stream => stream.Write(It.IsAny())) - .Returns(Task.CompletedTask); - var mockNextAsync = _mockStream.SetupSequence(stream => stream.MoveNextAsync()) - .Returns(new ValueTask(true)) - .Returns(new ValueTask(taskSource.Task)); - var sequentialResult = _mockStream.SetupSequence(stream => stream.Current) - .Returns(new StreamWriteMessage.Types.FromServer - { - InitResponse = new StreamWriteMessage.Types.InitResponse - { LastSeqNo = 0, PartitionId = 1, SessionId = "SessionId" }, - Status = StatusIds.Types.StatusCode.Success - }); - using var writer = new WriterBuilder(_mockIDriver.Object, "/topic") - { - ProducerId = "producerId", - BufferMaxSize = bufferSize /* bytes */, - BufferOverflowRetryTimeoutMs = 1_000 - }.Build(); - - for (var attempt = 0; attempt < countBatchSendingSize; attempt++) - { - _testOutputHelper.WriteLine($"Processing attempt {attempt}"); - var cts = new CancellationTokenSource(); - - var tasks = new List>(); - var serverAck = new StreamWriteMessage.Types.FromServer - { - WriteResponse = new StreamWriteMessage.Types.WriteResponse { PartitionId = 1 }, - Status = StatusIds.Types.StatusCode.Success - }; - for (var i = 0; i < batchTasksSize; i++) - { - tasks.Add(writer.WriteAsync(100, cts.Token)); - serverAck.WriteResponse.Acks.Add(new StreamWriteMessage.Types.WriteResponse.Types.WriteAck - { - SeqNo = bufferSize / messageSize * attempt + i + 1, - Written = new StreamWriteMessage.Types.WriteResponse.Types.WriteAck.Types.Written - { Offset = i * messageSize + bufferSize * attempt } - }); - } - - sequentialResult.Returns(() => - { - // ReSharper disable once AccessToModifiedClosure - Volatile.Write(ref taskSource, new TaskCompletionSource()); - mockNextAsync.Returns(() => - { - cts.Cancel(); - return new ValueTask(Volatile.Read(ref taskSource).Task); - }); - return serverAck; - }); - taskSource.SetResult(true); - - var countSuccess = 0; - var countErrors = 0; - foreach (var task in tasks) - { - try - { - var res = await task; - countSuccess++; - Assert.Equal(PersistenceStatus.Written, res.Status); - } - catch (WriterException e) - { - countErrors++; - Assert.Equal("Buffer overflow", e.Message); - } - } - - Assert.Equal(bufferSize / messageSize, countSuccess); - Assert.Equal(batchTasksSize - bufferSize / messageSize, countErrors); - } - } - /* * Performed invocations: