diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 6095b08f..6c31dd0e 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -156,7 +156,7 @@ jobs: strategy: fail-fast: false matrix: - ydb-version: [ 'latest', '24.1' ] + ydb-version: [ 'latest', 'trunk' ] dotnet-version: [ 6.0.x, 7.0.x ] include: - dotnet-version: 6.0.x @@ -181,7 +181,7 @@ jobs: uses: actions/setup-dotnet@v4 with: dotnet-version: ${{ matrix.dotnet-version }} - - name: Run ADO.NET examples + - name: Run ADO.NET example run: | docker cp ydb-local:/ydb_certs/ca.pem ~/ cd ./examples/src/AdoNet @@ -190,3 +190,7 @@ jobs: run: | cd ./examples/src/DapperExample dotnet run + - name: YDB Topic example + run: | + cd ./examples/src/Topic + dotnet run diff --git a/CHANGELOG.md b/CHANGELOG.md index 74db305f..c75094d5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,7 @@ +- Fixed Writer: possible creation of a session after `DisposeAsync()`, which this could happen when there are canceled tasks in `InFlightMessages`. +- Dev: `Writer.MoveNext()` changed exception on cancelToken from `WriterException` to `TaskCanceledException`. +- Dev: changed log level from `Warning` to `Information` in `(Reader / Writer).Initialize()` when it is disposed. + ## v0.15.0 - Dev: added `ValueTask GetAuthInfoAsync()` in ICredentialProvider. - Feat: `Writer.DisposeAsync()` waits for all in-flight messages to complete. diff --git a/examples/src/Topic/Program.cs b/examples/src/Topic/Program.cs new file mode 100644 index 00000000..b6000cfd --- /dev/null +++ b/examples/src/Topic/Program.cs @@ -0,0 +1,84 @@ +using Microsoft.Extensions.Logging; +using Ydb.Sdk; +using Ydb.Sdk.Services.Topic; +using Ydb.Sdk.Services.Topic.Reader; +using Ydb.Sdk.Services.Topic.Writer; + +const int countMessages = 100; +const string topicName = "topic_name"; + +var loggerFactory = LoggerFactory.Create(builder => builder.AddConsole().SetMinimumLevel(LogLevel.Information)); + +var logger = loggerFactory.CreateLogger(); + +var config = new DriverConfig( + endpoint: "grpc://localhost:2136", + database: "/local" +); + +await using var driver = await Driver.CreateInitialized( + config: config, + loggerFactory: loggerFactory +); + +var topicClient = new TopicClient(driver); + +await topicClient.CreateTopic(new CreateTopicSettings +{ + Path = topicName, + Consumers = { new Consumer("Consumer_Example") } +}); + +var readerCts = new CancellationTokenSource(); + +var writerJob = Task.Run(async () => +{ + // ReSharper disable once AccessToDisposedClosure + await using var writer = new WriterBuilder(driver, topicName) + { + ProducerId = "ProducerId_Example" + }.Build(); + + for (var i = 0; i < countMessages; i++) + { + await writer.WriteAsync($"Message num {i}: Hello Example YDB Topics!"); + } + + readerCts.CancelAfter(TimeSpan.FromSeconds(3)); +}); + +var readerJob = Task.Run(async () => +{ + // ReSharper disable once AccessToDisposedClosure + await using var reader = new ReaderBuilder(driver) + { + ConsumerName = "Consumer_Example", + SubscribeSettings = { new SubscribeSettings(topicName) } + }.Build(); + + try + { + while (!readerCts.IsCancellationRequested) + { + var message = await reader.ReadAsync(readerCts.Token); + + logger.LogInformation("Received message: [{MessageData}]", message.Data); + + try + { + await message.CommitAsync(); + } + catch (Exception e) + { + logger.LogError(e, "Failed commit message"); + } + } + } + catch (OperationCanceledException) + { + } +}); + +await writerJob; +await readerJob; +await topicClient.DropTopic(new DropTopicSettings { Path = topicName }); \ No newline at end of file diff --git a/examples/src/Topic/Topic.csproj b/examples/src/Topic/Topic.csproj new file mode 100644 index 00000000..b6cf30e9 --- /dev/null +++ b/examples/src/Topic/Topic.csproj @@ -0,0 +1,17 @@ + + + + Exe + net8.0 + enable + enable + + + + + + + + + + diff --git a/examples/src/YdbExamples.sln b/examples/src/YdbExamples.sln index 331e0962..b2f31d27 100644 --- a/examples/src/YdbExamples.sln +++ b/examples/src/YdbExamples.sln @@ -15,6 +15,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "DapperExample", "DapperExam EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "YC", "YC\YC.csproj", "{753E4F33-CB08-47B9-864F-4CC037B278C4}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Topic", "Topic\Topic.csproj", "{0FB9A1C2-4B0C-4AE4-9FA2-E0ED37802A6E}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -45,6 +47,10 @@ Global {753E4F33-CB08-47B9-864F-4CC037B278C4}.Debug|Any CPU.Build.0 = Debug|Any CPU {753E4F33-CB08-47B9-864F-4CC037B278C4}.Release|Any CPU.ActiveCfg = Release|Any CPU {753E4F33-CB08-47B9-864F-4CC037B278C4}.Release|Any CPU.Build.0 = Release|Any CPU + {0FB9A1C2-4B0C-4AE4-9FA2-E0ED37802A6E}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {0FB9A1C2-4B0C-4AE4-9FA2-E0ED37802A6E}.Debug|Any CPU.Build.0 = Debug|Any CPU + {0FB9A1C2-4B0C-4AE4-9FA2-E0ED37802A6E}.Release|Any CPU.ActiveCfg = Release|Any CPU + {0FB9A1C2-4B0C-4AE4-9FA2-E0ED37802A6E}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE diff --git a/src/Ydb.Sdk/src/Services/Topic/Reader/Reader.cs b/src/Ydb.Sdk/src/Services/Topic/Reader/Reader.cs index 0ce47759..74769cac 100644 --- a/src/Ydb.Sdk/src/Services/Topic/Reader/Reader.cs +++ b/src/Ydb.Sdk/src/Services/Topic/Reader/Reader.cs @@ -96,7 +96,7 @@ private async Task Initialize() { if (_disposeCts.IsCancellationRequested) { - _logger.LogWarning("Reader writer is canceled because it has been disposed"); + _logger.LogDebug("Initialize Reader[{ReaderConfig}] is stopped because it has been disposed", _config); return; } @@ -335,6 +335,8 @@ private async Task RunProcessingStreamResponse() throw new ArgumentOutOfRangeException(); } } + + Logger.LogInformation("ReaderSession[{SessionId}]: ResponseStream is closed", SessionId); } catch (Driver.TransportException e) { @@ -579,6 +581,8 @@ public override async ValueTask DisposeAsync() { await _runProcessingStreamRequest; await Stream.RequestStreamComplete(); + Logger.LogInformation("ReaderSession[{SessionId}]: RequestStream is closed", SessionId); + await _runProcessingStreamResponse; // waiting all ack's commits _lifecycleReaderSessionCts.Cancel(); diff --git a/src/Ydb.Sdk/src/Services/Topic/TopicSession.cs b/src/Ydb.Sdk/src/Services/Topic/TopicSession.cs index 711f0404..180a00dd 100644 --- a/src/Ydb.Sdk/src/Services/Topic/TopicSession.cs +++ b/src/Ydb.Sdk/src/Services/Topic/TopicSession.cs @@ -38,7 +38,7 @@ protected async void ReconnectSession() return; } - Logger.LogInformation("TopicSession[{SessionId}] has been deactivated, starting to reconnect", SessionId); + Logger.LogDebug("TopicSession[{SessionId}] has been deactivated, starting to reconnect", SessionId); await _initialize(); } diff --git a/src/Ydb.Sdk/src/Services/Topic/Writer/Writer.cs b/src/Ydb.Sdk/src/Services/Topic/Writer/Writer.cs index 714e1875..78889dad 100644 --- a/src/Ydb.Sdk/src/Services/Topic/Writer/Writer.cs +++ b/src/Ydb.Sdk/src/Services/Topic/Writer/Writer.cs @@ -31,6 +31,7 @@ internal class Writer : IWriter private volatile TaskCompletionSource _tcsBufferAvailableEvent = new(); private volatile IWriteSession _session = null!; private volatile int _limitBufferMaxSize; + private volatile bool _isStopped; internal Writer(IDriver driver, WriterConfig config, ISerializer serializer) { @@ -52,9 +53,7 @@ public async Task WriteAsync(Message message, CancellationT { TaskCompletionSource tcs = new(); await using var registrationUserCancellationTokenRegistration = cancellationToken.Register( - () => tcs.TrySetException( - new WriterException("The write operation was canceled before it could be completed") - ), useSynchronizationContext: false + () => tcs.TrySetCanceled(), useSynchronizationContext: false ); await using var writerDisposedCancellationTokenRegistration = _disposeCts.Token.Register( () => tcs.TrySetException(new WriterException($"Writer[{_config}] is disposed")), @@ -194,9 +193,9 @@ private async Task Initialize() try { - if (_disposeCts.IsCancellationRequested && _inFlightMessages.IsEmpty) + if (_isStopped) { - _logger.LogWarning("Initialize writer is canceled because it has been disposed"); + _logger.LogDebug("Initialize Writer[{WriterConfig}] is stopped because it has been disposed", _config); return; } @@ -313,7 +312,6 @@ private async Task Initialize() } _session = newSession; - newSession.RunProcessingWriteAck(); WakeUpWorker(); // attempt send buffer } finally @@ -367,6 +365,8 @@ public async ValueTask DisposeAsync() } } + _isStopped = true; + await _session.DisposeAsync(); _logger.LogInformation("Writer[{WriterConfig}] is disposed", _config); @@ -443,6 +443,7 @@ internal class WriterSession : TopicSession _inFlightMessages; + private readonly Task _processingResponseStream; private long _seqNum; @@ -466,6 +467,8 @@ ConcurrentQueue inFlightMessages _config = config; _inFlightMessages = inFlightMessages; Volatile.Write(ref _seqNum, lastSeqNo); // happens-before for Volatile.Read + + _processingResponseStream = RunProcessingWriteAck(); } public async Task Write(ConcurrentQueue toSendBuffer) @@ -513,7 +516,7 @@ public async Task Write(ConcurrentQueue toSendBuffer) } } - internal async void RunProcessingWriteAck() + private async Task RunProcessingWriteAck() { try { @@ -573,7 +576,7 @@ Completing task on exception... } } - Logger.LogWarning("WriterSession[{SessionId}]: stream is closed", SessionId); + Logger.LogInformation("WriterSession[{SessionId}]: ResponseStream is closed", SessionId); } catch (Driver.TransportException e) { @@ -601,6 +604,7 @@ public override async ValueTask DisposeAsync() Logger.LogDebug("WriterSession[{SessionId}]: start dispose process", SessionId); await Stream.RequestStreamComplete(); + await _processingResponseStream; Stream.Dispose(); } diff --git a/src/Ydb.Sdk/tests/Topic/WriterUnitTests.cs b/src/Ydb.Sdk/tests/Topic/WriterUnitTests.cs index 844c4bcc..217aec5b 100644 --- a/src/Ydb.Sdk/tests/Topic/WriterUnitTests.cs +++ b/src/Ydb.Sdk/tests/Topic/WriterUnitTests.cs @@ -358,6 +358,7 @@ public async Task Initialize_WhenNotSupportedCodec_ThrowWriterExceptionOnWriteAs public async Task WriteAsync_WhenTransportExceptionOnWriteInWriterSession_ShouldReconnectAndReturnWriteResult() { var moveTcs = new TaskCompletionSource(); + var moveTcsRetry = new TaskCompletionSource(); _mockStream.SetupSequence(stream => stream.Write(It.IsAny())) .Returns(Task.CompletedTask) @@ -367,12 +368,17 @@ public async Task WriteAsync_WhenTransportExceptionOnWriteInWriterSession_Should return new Driver.TransportException(new RpcException(Grpc.Core.Status.DefaultCancelled)); }) .Returns(Task.CompletedTask) - .Returns(Task.CompletedTask); + .Returns(() => + { + moveTcsRetry.SetResult(true); + + return Task.CompletedTask; + }); _mockStream.SetupSequence(stream => stream.MoveNextAsync()) .ReturnsAsync(true) .Returns(new ValueTask(moveTcs.Task)) .ReturnsAsync(true) - .ReturnsAsync(true) + .Returns(new ValueTask(moveTcsRetry.Task)) .Returns(_lastMoveNext); _mockStream.SetupSequence(stream => stream.Current) .Returns(new StreamWriteMessage.Types.FromServer @@ -569,7 +575,6 @@ public async Task WriteAsync_WhenStreamIsClosingOnProcessingWriteAck_ShouldRecon public async Task WriteAsync_WhenCancellationTokenIsClosed_ThrowCancellationException() { var cancellationTokenSource = new CancellationTokenSource(); - var nextCompleted = new TaskCompletionSource(); _mockStream.Setup(stream => stream.Write(It.IsAny())) .Returns(Task.CompletedTask); _mockStream.SetupSequence(stream => stream.MoveNextAsync()) @@ -582,10 +587,8 @@ public async Task WriteAsync_WhenCancellationTokenIsClosed_ThrowCancellationExce var task = writer.WriteAsync(123L, cancellationTokenSource.Token); cancellationTokenSource.Cancel(); - nextCompleted.SetResult(true); - Assert.Equal("The write operation was canceled before it could be completed", - (await Assert.ThrowsAsync(() => task)).Message); + await Assert.ThrowsAsync(() => task); } [Fact] @@ -638,6 +641,7 @@ public async Task WriteAsync_WhenInFlightBufferSendInInitialize_ReturnCompletedT var writeTcs2 = new TaskCompletionSource(); var writeTcs3 = new TaskCompletionSource(); var moveTcs = new TaskCompletionSource(); + var moveTcsRetry = new TaskCompletionSource(); _mockStream.SetupSequence(stream => stream.Write(It.IsAny())) .Returns(Task.CompletedTask) @@ -657,13 +661,17 @@ public async Task WriteAsync_WhenInFlightBufferSendInInitialize_ReturnCompletedT return Task.CompletedTask; }) .Returns(Task.CompletedTask) - .Returns(Task.CompletedTask); + .Returns(() => + { + moveTcsRetry.SetResult(true); + return Task.CompletedTask; + }); _mockStream.SetupSequence(stream => stream.MoveNextAsync()) .ReturnsAsync(true) .Returns(new ValueTask(moveTcs.Task)) .ReturnsAsync(true) - .ReturnsAsync(true) + .Returns(new ValueTask(moveTcsRetry.Task)) .Returns(_lastMoveNext); _mockStream.SetupSequence(stream => stream.Current) .Returns(new StreamWriteMessage.Types.FromServer @@ -713,8 +721,7 @@ public async Task WriteAsync_WhenInFlightBufferSendInInitialize_ReturnCompletedT moveTcs.SetResult(false); // Fail write ack stream => start reconnect - Assert.Equal("The write operation was canceled before it could be completed", - (await Assert.ThrowsAsync(() => runTaskWithCancel)).Message); + await Assert.ThrowsAsync(() => runTaskWithCancel); Assert.Equal(PersistenceStatus.AlreadyWritten, (await runTask1).Status); Assert.Equal(PersistenceStatus.Written, (await runTask2).Status); @@ -863,6 +870,7 @@ public async Task DisposeAsync_WhenInFlightMessages_WaitingInFlightMessages() { var tcsDetectedWrite = new TaskCompletionSource(); var writeTcs1 = new TaskCompletionSource(); + var moveTcsRetry = new TaskCompletionSource(); _mockStream.SetupSequence(stream => stream.Write(It.IsAny())) .Returns(Task.CompletedTask) @@ -872,12 +880,16 @@ public async Task DisposeAsync_WhenInFlightMessages_WaitingInFlightMessages() return Task.CompletedTask; }) .Returns(Task.CompletedTask) - .Returns(Task.CompletedTask); + .Returns(() => + { + moveTcsRetry.SetResult(true); + return Task.CompletedTask; + }); _mockStream.SetupSequence(stream => stream.MoveNextAsync()) .ReturnsAsync(true) .Returns(new ValueTask(writeTcs1.Task)) .ReturnsAsync(true) - .ReturnsAsync(true) + .Returns(new ValueTask(moveTcsRetry.Task)) .Returns(_lastMoveNext); _mockStream.SetupSequence(stream => stream.Current)