diff --git a/CHANGELOG.md b/CHANGELOG.md index af26b531..5f18758f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,10 @@ +- Do not pessimize the node on Grpc.Core.StatusCode.Cancelled and Grpc.Core.StatusCode.DeadlineExceeded. +- Dispose of WriterSession using dispose CancellationToken. +- BidirectionalStream is internal class. +- Move Metadata class to Ydb.Sdk.Services.Topic. +- Fixed memory leak CancellationTokenRegistration. +- Cancel writing tasks after disposing of Writer. + ## v0.9.3 - Fixed bug in Topic Writer: worker is stopped by disposeCts - Fixed bug in sql parser ADO.NET: deduplication declare param in YQL query diff --git a/src/Ydb.Sdk/src/Driver.cs b/src/Ydb.Sdk/src/Driver.cs index 912ef583..7ff27491 100644 --- a/src/Ydb.Sdk/src/Driver.cs +++ b/src/Ydb.Sdk/src/Driver.cs @@ -97,6 +97,12 @@ protected override (string, GrpcChannel) GetChannel(long nodeId) protected override void OnRpcError(string endpoint, RpcException e) { Logger.LogWarning("gRPC error [{Status}] on channel {Endpoint}", e.Status, endpoint); + + if (e.StatusCode is Grpc.Core.StatusCode.Cancelled or Grpc.Core.StatusCode.DeadlineExceeded) + { + return; + } + if (!_endpointPool.PessimizeEndpoint(endpoint)) { return; diff --git a/src/Ydb.Sdk/src/GrpcRequestSettings.cs b/src/Ydb.Sdk/src/GrpcRequestSettings.cs index 933b54a5..9931cca1 100644 --- a/src/Ydb.Sdk/src/GrpcRequestSettings.cs +++ b/src/Ydb.Sdk/src/GrpcRequestSettings.cs @@ -11,6 +11,7 @@ public class GrpcRequestSettings public string TraceId { get; set; } = string.Empty; public TimeSpan TransportTimeout { get; set; } = TimeSpan.Zero; public ImmutableArray CustomClientHeaders { get; } = new(); + public CancellationToken CancellationToken = default; internal long NodeId { get; set; } internal Action TrailersHandler { get; set; } = _ => { }; diff --git a/src/Ydb.Sdk/src/IDriver.cs b/src/Ydb.Sdk/src/IDriver.cs index 1ca665f6..4e5a4373 100644 --- a/src/Ydb.Sdk/src/IDriver.cs +++ b/src/Ydb.Sdk/src/IDriver.cs @@ -144,7 +144,8 @@ protected CallOptions GetCallOptions(GrpcRequestSettings settings) } var options = new CallOptions( - headers: meta + headers: meta, + cancellationToken: settings.CancellationToken ); if (settings.TransportTimeout != TimeSpan.Zero) @@ -213,7 +214,7 @@ public async ValueTask MoveNextAsync() } } -public class BidirectionalStream : IBidirectionalStream +internal class BidirectionalStream : IBidirectionalStream { private readonly AsyncDuplexStreamingCall _stream; private readonly Action _rpcErrorAction; diff --git a/src/Ydb.Sdk/src/Services/Topic/Metadata.cs b/src/Ydb.Sdk/src/Services/Topic/Metadata.cs new file mode 100644 index 00000000..9053c937 --- /dev/null +++ b/src/Ydb.Sdk/src/Services/Topic/Metadata.cs @@ -0,0 +1,3 @@ +namespace Ydb.Sdk.Services.Topic; + +public record Metadata(string Key, byte[] Value); diff --git a/src/Ydb.Sdk/src/Services/Topic/Writer/Message.cs b/src/Ydb.Sdk/src/Services/Topic/Writer/Message.cs index 1d0db4b1..449119ad 100644 --- a/src/Ydb.Sdk/src/Services/Topic/Writer/Message.cs +++ b/src/Ydb.Sdk/src/Services/Topic/Writer/Message.cs @@ -13,5 +13,3 @@ public Message(TValue data) public List Metadata { get; } = new(); } - -public record Metadata(string Key, byte[] Value); diff --git a/src/Ydb.Sdk/src/Services/Topic/Writer/Writer.cs b/src/Ydb.Sdk/src/Services/Topic/Writer/Writer.cs index e7920f1a..278c4501 100644 --- a/src/Ydb.Sdk/src/Services/Topic/Writer/Writer.cs +++ b/src/Ydb.Sdk/src/Services/Topic/Writer/Writer.cs @@ -21,6 +21,7 @@ internal class Writer : IWriter private readonly WriterConfig _config; private readonly ILogger> _logger; private readonly ISerializer _serializer; + private readonly GrpcRequestSettings _writerGrpcRequestSettings; private readonly ConcurrentQueue _toSendBuffer = new(); private readonly ConcurrentQueue _inFlightMessages = new(); private readonly CancellationTokenSource _disposeCts = new(); @@ -37,6 +38,7 @@ internal Writer(IDriver driver, WriterConfig config, ISerializer seriali _logger = driver.LoggerFactory.CreateLogger>(); _serializer = serializer; _limitBufferMaxSize = config.BufferMaxSize; + _writerGrpcRequestSettings = new GrpcRequestSettings { CancellationToken = _disposeCts.Token }; StartWriteWorker(); } @@ -49,11 +51,15 @@ public Task WriteAsync(TValue data, CancellationToken cancellationT public async Task WriteAsync(Message message, CancellationToken cancellationToken) { TaskCompletionSource tcs = new(); - cancellationToken.Register( + await using var registrationUserCancellationTokenRegistration = cancellationToken.Register( () => tcs.TrySetException( new WriterException("The write operation was canceled before it could be completed") ), useSynchronizationContext: false ); + await using var writerDisposedCancellationTokenRegistration = _disposeCts.Token.Register( + () => tcs.TrySetException(new WriterException($"Writer[{_config}] is disposed")), + useSynchronizationContext: false + ); byte[] data; try @@ -184,10 +190,7 @@ private async Task Initialize() _logger.LogInformation("Writer session initialization started. WriterConfig: {WriterConfig}", _config); - var stream = _driver.BidirectionalStreamCall( - TopicService.StreamWriteMethod, - GrpcRequestSettings.DefaultInstance - ); + var stream = _driver.BidirectionalStreamCall(TopicService.StreamWriteMethod, _writerGrpcRequestSettings); var initRequest = new StreamWriteMessage.Types.InitRequest { Path = _config.TopicPath }; if (_config.ProducerId != null) @@ -304,22 +307,15 @@ private async Task Initialize() public void Dispose() { - try - { - _disposeCts.Cancel(); + _disposeCts.Cancel(); - _session.Dispose(); - } - finally - { - _disposeCts.Dispose(); - } + _session = new NotStartedWriterSession("Writer is disposed"); } } internal record MessageSending(MessageData MessageData, TaskCompletionSource Tcs); -internal interface IWriteSession : IDisposable +internal interface IWriteSession { Task Write(ConcurrentQueue toSendBuffer); } @@ -347,11 +343,6 @@ public Task Write(ConcurrentQueue toSendBuffer) return Task.CompletedTask; } - - public void Dispose() - { - // Do nothing - } } internal class DummyWriterSession : IWriteSession @@ -362,10 +353,6 @@ private DummyWriterSession() { } - public void Dispose() - { - } - public Task Write(ConcurrentQueue toSendBuffer) { return Task.CompletedTask; @@ -501,18 +488,16 @@ Completing task on exception... _inFlightMessages.TryDequeue(out _); // Dequeue } } + + Logger.LogWarning("WriterSession[{SessionId}]: stream is closed", SessionId); } catch (Driver.TransportException e) { Logger.LogError(e, "WriterSession[{SessionId}] have error on processing writeAck", SessionId); - + } + finally + { ReconnectSession(); - - return; } - - Logger.LogWarning("WriterSession[{SessionId}]: stream is closed", SessionId); - - ReconnectSession(); } } diff --git a/src/Ydb.Sdk/tests/Topic/WriterUnitTests.cs b/src/Ydb.Sdk/tests/Topic/WriterUnitTests.cs index 84f7f56c..3c60e880 100644 --- a/src/Ydb.Sdk/tests/Topic/WriterUnitTests.cs +++ b/src/Ydb.Sdk/tests/Topic/WriterUnitTests.cs @@ -704,6 +704,23 @@ public async Task WriteAsync_WhenInFlightBufferSendInInitialize_ReturnCompletedT _mockStream.Verify(stream => stream.Current, Times.Exactly(3)); } + [Fact] + public async Task WriteAsync_WhenWriterIsDisposed_ThrowWriterException() + { + _mockStream.SetupSequence(stream => stream.Write(It.IsAny())) + .Returns(Task.CompletedTask); + _mockStream.SetupSequence(stream => stream.MoveNextAsync()) + .ReturnsAsync(true); + SetupReadOneWriteAckMessage(); + + var writer = new WriterBuilder(_mockIDriver.Object, "/topic") + { ProducerId = "producerId" }.Build(); + writer.Dispose(); + + Assert.Equal("Writer[TopicPath: /topic, ProducerId: producerId, Codec: Raw] is disposed", + (await Assert.ThrowsAsync(() => writer.WriteAsync("abacaba"))).Message); + } + private void SetupReadOneWriteAckMessage() { _mockStream.SetupSequence(stream => stream.Current)