From 1f8090ab7a1ee15622e314dad22c4faac2bec052 Mon Sep 17 00:00:00 2001 From: KirillKurdyukov Date: Tue, 24 Dec 2024 18:34:49 +0300 Subject: [PATCH 1/5] 1) Do not pessimize the node on Grpc.Core.StatusCode.Cancelled. 2) Dispose of WriterSession using dispose CancellationToken. 3) BidirectionalStream is internal class. 4) Move Metadata class to Ydb.Sdk.Services.Topic. 5) Fixed memory leak CancellationTokenRegistration. 6) Cancel writing tasks after disposing of Writer. --- src/Ydb.Sdk/src/Driver.cs | 6 +++ src/Ydb.Sdk/src/GrpcRequestSettings.cs | 1 + src/Ydb.Sdk/src/IDriver.cs | 5 +- src/Ydb.Sdk/src/Services/Topic/Metadata.cs | 3 ++ .../src/Services/Topic/Writer/Message.cs | 2 - .../src/Services/Topic/Writer/Writer.cs | 49 +++++++------------ src/Ydb.Sdk/tests/Topic/WriterUnitTests.cs | 17 +++++++ 7 files changed, 48 insertions(+), 35 deletions(-) create mode 100644 src/Ydb.Sdk/src/Services/Topic/Metadata.cs diff --git a/src/Ydb.Sdk/src/Driver.cs b/src/Ydb.Sdk/src/Driver.cs index 912ef583..817263fd 100644 --- a/src/Ydb.Sdk/src/Driver.cs +++ b/src/Ydb.Sdk/src/Driver.cs @@ -97,6 +97,12 @@ protected override (string, GrpcChannel) GetChannel(long nodeId) protected override void OnRpcError(string endpoint, RpcException e) { Logger.LogWarning("gRPC error [{Status}] on channel {Endpoint}", e.Status, endpoint); + + if (e.StatusCode == Grpc.Core.StatusCode.Cancelled) + { + return; + } + if (!_endpointPool.PessimizeEndpoint(endpoint)) { return; diff --git a/src/Ydb.Sdk/src/GrpcRequestSettings.cs b/src/Ydb.Sdk/src/GrpcRequestSettings.cs index 933b54a5..9931cca1 100644 --- a/src/Ydb.Sdk/src/GrpcRequestSettings.cs +++ b/src/Ydb.Sdk/src/GrpcRequestSettings.cs @@ -11,6 +11,7 @@ public class GrpcRequestSettings public string TraceId { get; set; } = string.Empty; public TimeSpan TransportTimeout { get; set; } = TimeSpan.Zero; public ImmutableArray CustomClientHeaders { get; } = new(); + public CancellationToken CancellationToken = default; internal long NodeId { get; set; } internal Action TrailersHandler { get; set; } = _ => { }; diff --git a/src/Ydb.Sdk/src/IDriver.cs b/src/Ydb.Sdk/src/IDriver.cs index 1ca665f6..4e5a4373 100644 --- a/src/Ydb.Sdk/src/IDriver.cs +++ b/src/Ydb.Sdk/src/IDriver.cs @@ -144,7 +144,8 @@ protected CallOptions GetCallOptions(GrpcRequestSettings settings) } var options = new CallOptions( - headers: meta + headers: meta, + cancellationToken: settings.CancellationToken ); if (settings.TransportTimeout != TimeSpan.Zero) @@ -213,7 +214,7 @@ public async ValueTask MoveNextAsync() } } -public class BidirectionalStream : IBidirectionalStream +internal class BidirectionalStream : IBidirectionalStream { private readonly AsyncDuplexStreamingCall _stream; private readonly Action _rpcErrorAction; diff --git a/src/Ydb.Sdk/src/Services/Topic/Metadata.cs b/src/Ydb.Sdk/src/Services/Topic/Metadata.cs new file mode 100644 index 00000000..9053c937 --- /dev/null +++ b/src/Ydb.Sdk/src/Services/Topic/Metadata.cs @@ -0,0 +1,3 @@ +namespace Ydb.Sdk.Services.Topic; + +public record Metadata(string Key, byte[] Value); diff --git a/src/Ydb.Sdk/src/Services/Topic/Writer/Message.cs b/src/Ydb.Sdk/src/Services/Topic/Writer/Message.cs index 1d0db4b1..449119ad 100644 --- a/src/Ydb.Sdk/src/Services/Topic/Writer/Message.cs +++ b/src/Ydb.Sdk/src/Services/Topic/Writer/Message.cs @@ -13,5 +13,3 @@ public Message(TValue data) public List Metadata { get; } = new(); } - -public record Metadata(string Key, byte[] Value); diff --git a/src/Ydb.Sdk/src/Services/Topic/Writer/Writer.cs b/src/Ydb.Sdk/src/Services/Topic/Writer/Writer.cs index e7920f1a..bd8b8808 100644 --- a/src/Ydb.Sdk/src/Services/Topic/Writer/Writer.cs +++ b/src/Ydb.Sdk/src/Services/Topic/Writer/Writer.cs @@ -21,6 +21,7 @@ internal class Writer : IWriter private readonly WriterConfig _config; private readonly ILogger> _logger; private readonly ISerializer _serializer; + private readonly GrpcRequestSettings _writerGrpcRequestSettings; private readonly ConcurrentQueue _toSendBuffer = new(); private readonly ConcurrentQueue _inFlightMessages = new(); private readonly CancellationTokenSource _disposeCts = new(); @@ -37,6 +38,7 @@ internal Writer(IDriver driver, WriterConfig config, ISerializer seriali _logger = driver.LoggerFactory.CreateLogger>(); _serializer = serializer; _limitBufferMaxSize = config.BufferMaxSize; + _writerGrpcRequestSettings = new GrpcRequestSettings { CancellationToken = _disposeCts.Token }; StartWriteWorker(); } @@ -49,11 +51,15 @@ public Task WriteAsync(TValue data, CancellationToken cancellationT public async Task WriteAsync(Message message, CancellationToken cancellationToken) { TaskCompletionSource tcs = new(); - cancellationToken.Register( + await using var registrationUserCancellationTokenRegistration = cancellationToken.Register( () => tcs.TrySetException( new WriterException("The write operation was canceled before it could be completed") ), useSynchronizationContext: false ); + await using var writerDisposedCancellationTokenRegistration = _disposeCts.Token.Register( + () => tcs.TrySetException(new WriterException($"Writer[{_config}] is disposed")), + useSynchronizationContext: false + ); byte[] data; try @@ -184,10 +190,7 @@ private async Task Initialize() _logger.LogInformation("Writer session initialization started. WriterConfig: {WriterConfig}", _config); - var stream = _driver.BidirectionalStreamCall( - TopicService.StreamWriteMethod, - GrpcRequestSettings.DefaultInstance - ); + var stream = _driver.BidirectionalStreamCall(TopicService.StreamWriteMethod, _writerGrpcRequestSettings); var initRequest = new StreamWriteMessage.Types.InitRequest { Path = _config.TopicPath }; if (_config.ProducerId != null) @@ -304,22 +307,15 @@ private async Task Initialize() public void Dispose() { - try - { - _disposeCts.Cancel(); + _disposeCts.Cancel(); - _session.Dispose(); - } - finally - { - _disposeCts.Dispose(); - } + _session = new NotStartedWriterSession("Writer is disposed"); } } internal record MessageSending(MessageData MessageData, TaskCompletionSource Tcs); -internal interface IWriteSession : IDisposable +internal interface IWriteSession { Task Write(ConcurrentQueue toSendBuffer); } @@ -347,11 +343,6 @@ public Task Write(ConcurrentQueue toSendBuffer) return Task.CompletedTask; } - - public void Dispose() - { - // Do nothing - } } internal class DummyWriterSession : IWriteSession @@ -362,10 +353,6 @@ private DummyWriterSession() { } - public void Dispose() - { - } - public Task Write(ConcurrentQueue toSendBuffer) { return Task.CompletedTask; @@ -505,14 +492,14 @@ Completing task on exception... catch (Driver.TransportException e) { Logger.LogError(e, "WriterSession[{SessionId}] have error on processing writeAck", SessionId); - + } + catch (ObjectDisposedException) + { + Logger.LogWarning("WriterSession[{SessionId}]: stream is closed", SessionId); + } + finally + { ReconnectSession(); - - return; } - - Logger.LogWarning("WriterSession[{SessionId}]: stream is closed", SessionId); - - ReconnectSession(); } } diff --git a/src/Ydb.Sdk/tests/Topic/WriterUnitTests.cs b/src/Ydb.Sdk/tests/Topic/WriterUnitTests.cs index 84f7f56c..3c60e880 100644 --- a/src/Ydb.Sdk/tests/Topic/WriterUnitTests.cs +++ b/src/Ydb.Sdk/tests/Topic/WriterUnitTests.cs @@ -704,6 +704,23 @@ public async Task WriteAsync_WhenInFlightBufferSendInInitialize_ReturnCompletedT _mockStream.Verify(stream => stream.Current, Times.Exactly(3)); } + [Fact] + public async Task WriteAsync_WhenWriterIsDisposed_ThrowWriterException() + { + _mockStream.SetupSequence(stream => stream.Write(It.IsAny())) + .Returns(Task.CompletedTask); + _mockStream.SetupSequence(stream => stream.MoveNextAsync()) + .ReturnsAsync(true); + SetupReadOneWriteAckMessage(); + + var writer = new WriterBuilder(_mockIDriver.Object, "/topic") + { ProducerId = "producerId" }.Build(); + writer.Dispose(); + + Assert.Equal("Writer[TopicPath: /topic, ProducerId: producerId, Codec: Raw] is disposed", + (await Assert.ThrowsAsync(() => writer.WriteAsync("abacaba"))).Message); + } + private void SetupReadOneWriteAckMessage() { _mockStream.SetupSequence(stream => stream.Current) From 26d34268d3257b822f59b23f1cdbcb3d5de31fc5 Mon Sep 17 00:00:00 2001 From: KirillKurdyukov Date: Tue, 24 Dec 2024 18:39:53 +0300 Subject: [PATCH 2/5] fix linter --- src/Ydb.Sdk/src/Driver.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Ydb.Sdk/src/Driver.cs b/src/Ydb.Sdk/src/Driver.cs index 817263fd..513eab09 100644 --- a/src/Ydb.Sdk/src/Driver.cs +++ b/src/Ydb.Sdk/src/Driver.cs @@ -102,7 +102,7 @@ protected override void OnRpcError(string endpoint, RpcException e) { return; } - + if (!_endpointPool.PessimizeEndpoint(endpoint)) { return; From 3e917d885b0fb949ff4f0ebc15180b061b808c0d Mon Sep 17 00:00:00 2001 From: KirillKurdyukov Date: Tue, 24 Dec 2024 18:43:47 +0300 Subject: [PATCH 3/5] Update CHANGELOG.md --- CHANGELOG.md | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index af26b531..28141fe5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,10 @@ +- Do not pessimize the node on Grpc.Core.StatusCode.Cancelled. +- Dispose of WriterSession using dispose CancellationToken. +- BidirectionalStream is internal class. +- Move Metadata class to Ydb.Sdk.Services.Topic. +- Fixed memory leak CancellationTokenRegistration. +- Cancel writing tasks after disposing of Writer. + ## v0.9.3 - Fixed bug in Topic Writer: worker is stopped by disposeCts - Fixed bug in sql parser ADO.NET: deduplication declare param in YQL query From 424fb35b321e4b4e6d7af6cac0dec80739ca8b24 Mon Sep 17 00:00:00 2001 From: KirillKurdyukov Date: Wed, 25 Dec 2024 11:28:33 +0300 Subject: [PATCH 4/5] Do not pessimize on Grpc.Core.StatusCode.DeadlineExceeded --- CHANGELOG.md | 2 +- src/Ydb.Sdk/src/Driver.cs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 28141fe5..5f18758f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,4 @@ -- Do not pessimize the node on Grpc.Core.StatusCode.Cancelled. +- Do not pessimize the node on Grpc.Core.StatusCode.Cancelled and Grpc.Core.StatusCode.DeadlineExceeded. - Dispose of WriterSession using dispose CancellationToken. - BidirectionalStream is internal class. - Move Metadata class to Ydb.Sdk.Services.Topic. diff --git a/src/Ydb.Sdk/src/Driver.cs b/src/Ydb.Sdk/src/Driver.cs index 513eab09..7ff27491 100644 --- a/src/Ydb.Sdk/src/Driver.cs +++ b/src/Ydb.Sdk/src/Driver.cs @@ -98,7 +98,7 @@ protected override void OnRpcError(string endpoint, RpcException e) { Logger.LogWarning("gRPC error [{Status}] on channel {Endpoint}", e.Status, endpoint); - if (e.StatusCode == Grpc.Core.StatusCode.Cancelled) + if (e.StatusCode is Grpc.Core.StatusCode.Cancelled or Grpc.Core.StatusCode.DeadlineExceeded) { return; } From 5cc2eba77fa00f3d7b1f23e17ace3ecdaedcbd60 Mon Sep 17 00:00:00 2001 From: KirillKurdyukov Date: Wed, 25 Dec 2024 11:54:35 +0300 Subject: [PATCH 5/5] deleted catch on ObjectDisposedException --- src/Ydb.Sdk/src/Services/Topic/Writer/Writer.cs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/src/Ydb.Sdk/src/Services/Topic/Writer/Writer.cs b/src/Ydb.Sdk/src/Services/Topic/Writer/Writer.cs index bd8b8808..278c4501 100644 --- a/src/Ydb.Sdk/src/Services/Topic/Writer/Writer.cs +++ b/src/Ydb.Sdk/src/Services/Topic/Writer/Writer.cs @@ -488,15 +488,13 @@ Completing task on exception... _inFlightMessages.TryDequeue(out _); // Dequeue } } + + Logger.LogWarning("WriterSession[{SessionId}]: stream is closed", SessionId); } catch (Driver.TransportException e) { Logger.LogError(e, "WriterSession[{SessionId}] have error on processing writeAck", SessionId); } - catch (ObjectDisposedException) - { - Logger.LogWarning("WriterSession[{SessionId}]: stream is closed", SessionId); - } finally { ReconnectSession();