From 77e34ca0c5b7920fa6520b01378093d889b937d5 Mon Sep 17 00:00:00 2001 From: KirillKurdyukov Date: Wed, 23 Oct 2024 15:40:04 +0300 Subject: [PATCH 1/6] feat: Init Producer implementation --- src/Ydb.Sdk/src/Services/Topic/Message.cs | 17 ++ src/Ydb.Sdk/src/Services/Topic/Producer.cs | 235 +++++++++++++----- .../src/Services/Topic/ProducerBuilder.cs | 55 +--- src/Ydb.Sdk/src/Services/Topic/SendResult.cs | 40 +++ .../src/Services/Topic/TopicSession.cs | 46 ++++ 5 files changed, 290 insertions(+), 103 deletions(-) create mode 100644 src/Ydb.Sdk/src/Services/Topic/Message.cs create mode 100644 src/Ydb.Sdk/src/Services/Topic/SendResult.cs create mode 100644 src/Ydb.Sdk/src/Services/Topic/TopicSession.cs diff --git a/src/Ydb.Sdk/src/Services/Topic/Message.cs b/src/Ydb.Sdk/src/Services/Topic/Message.cs new file mode 100644 index 00000000..820923a2 --- /dev/null +++ b/src/Ydb.Sdk/src/Services/Topic/Message.cs @@ -0,0 +1,17 @@ +namespace Ydb.Sdk.Services.Topic; + +public class Message +{ + public Message(TValue data) + { + Data = data; + } + + public DateTime Timestamp { get; set; } = DateTime.Now; + + public TValue Data { get; } + + public List Metadata { get; } = new(); +} + +public record Metadata(string Key, byte[] Value); diff --git a/src/Ydb.Sdk/src/Services/Topic/Producer.cs b/src/Ydb.Sdk/src/Services/Topic/Producer.cs index a7d9dc3b..8d2f4f72 100644 --- a/src/Ydb.Sdk/src/Services/Topic/Producer.cs +++ b/src/Ydb.Sdk/src/Services/Topic/Producer.cs @@ -1,88 +1,203 @@ -// using System.Collections.Concurrent; -// using Microsoft.Extensions.Logging; -// using Ydb.Topic; +using System.Collections.Concurrent; +using System.Transactions; +using Google.Protobuf; +using Google.Protobuf.WellKnownTypes; +using Microsoft.Extensions.Logging; +using Ydb.Topic; +using Ydb.Topic.V1; namespace Ydb.Sdk.Services.Topic; -// using ProducerStream = Driver.BidirectionalStream< -// StreamWriteMessage.Types.FromClient, -// StreamWriteMessage.Types.FromServer -// >; +using InitResponse = StreamWriteMessage.Types.InitResponse; +using MessageData = StreamWriteMessage.Types.WriteRequest.Types.MessageData; +using MessageFromClient = StreamWriteMessage.Types.FromClient; +using ProducerStream = Driver.BidirectionalStream< + StreamWriteMessage.Types.FromClient, + StreamWriteMessage.Types.FromServer +>; internal class Producer : IProducer { - // private readonly Driver _driver; - // private readonly ILogger> _logger; - // private readonly long _partitionId; - // private readonly string _sessionId; - // private readonly ISerializer _serializer; - // - // private long _seqNum; - // - // private readonly ConcurrentQueue _inFlightMessages; - // private volatile ProducerStream _stream; - // - // internal Producer( - // ProducerConfig producerConfig, - // StreamWriteMessage.Types.InitResponse initResponse, - // ProducerStream stream, - // ISerializer serializer) - // { - // _driver = producerConfig.Driver; - // _stream = stream; - // _serializer = serializer; - // _logger = producerConfig.Driver.LoggerFactory.CreateLogger>(); - // _partitionId = initResponse.PartitionId; - // _sessionId = initResponse.SessionId; - // _seqNum = initResponse.LastSeqNo; - // _inFlightMessages = new ConcurrentQueue(); - // } + private readonly ProducerConfig _config; + private readonly ILogger> _logger; + private readonly ISerializer _serializer; + + private readonly ConcurrentQueue _inFlightMessages = new(); + private readonly ConcurrentQueue _toSendBuffer = new(); + private readonly SemaphoreSlim _writeSemaphoreSlim = new(1); + + private volatile ProducerSession _session = null!; + + internal Producer(ProducerConfig producerConfig, ISerializer serializer) + { + _config = producerConfig; + _serializer = serializer; + _logger = producerConfig.Driver.LoggerFactory.CreateLogger>(); + } + + internal async Task Initialize() + { + var stream = _config.Driver.BidirectionalStreamCall(TopicService.StreamWriteMethod, + GrpcRequestSettings.DefaultInstance); + + var initRequest = new StreamWriteMessage.Types.InitRequest { Path = _config.TopicPath }; + if (_config.ProducerId != null) + { + initRequest.ProducerId = _config.ProducerId; + } + + if (_config.MessageGroupId != null) + { + initRequest.MessageGroupId = _config.MessageGroupId; + } + + await stream.Write(new MessageFromClient { InitRequest = initRequest }); + if (!await stream.MoveNextAsync()) + { + throw new YdbProducerException("Write stream is closed by YDB server"); + } + + var receivedInitMessage = stream.Current; + + Status.FromProto(receivedInitMessage.Status, receivedInitMessage.Issues).EnsureSuccess(); + + var initResponse = receivedInitMessage.InitResponse; + + if (!initResponse.SupportedCodecs.Codecs.Contains((int)_config.Codec)) + { + throw new YdbProducerException($"Topic is not supported codec: {_config.Codec}"); + } + + _session = new ProducerSession(_config, stream, initResponse, Initialize, _logger); + _ = _session.RunProcessingWriteAck(_inFlightMessages); + } public Task SendAsync(TValue data) { - throw new NotImplementedException(); + return SendAsync(new Message(data)); } - public Task SendAsync(Message message) + public async Task SendAsync(Message message) { - throw new NotImplementedException(); + TaskCompletionSource completeTask = new(); + + var data = _serializer.Serialize(message.Data); + var messageData = new MessageData + { + Data = ByteString.CopyFrom(data), + CreatedAt = Timestamp.FromDateTime(message.Timestamp), + UncompressedSize = data.Length + }; + + foreach (var metadata in message.Metadata) + { + messageData.MetadataItems.Add( + new MetadataItem { Key = metadata.Key, Value = ByteString.CopyFrom(metadata.Value) } + ); + } + + _toSendBuffer.Enqueue(new MessageSending(messageData, completeTask)); + + if (_toSendBuffer.IsEmpty) // concurrent sending + { + return await completeTask.Task; + } + + await _writeSemaphoreSlim.WaitAsync(); + try + { + await _session.Write(_toSendBuffer, _inFlightMessages); + } + finally + { + _writeSemaphoreSlim.Release(); + } + + return await completeTask.Task; } } -public class Message +// No thread safe +internal class ProducerSession : TopicSession { - public Message(TValue data) + private readonly ProducerConfig _config; + private readonly ProducerStream _stream; + + private long _seqNum; + + public ProducerSession( + ProducerConfig config, + ProducerStream stream, + InitResponse initResponse, + Func initialize, + ILogger logger) : base(logger, initResponse.SessionId, initialize) { - Data = data; + _config = config; + _stream = stream; + _seqNum = initResponse.LastSeqNo; } - public DateTime Timestamp { get; set; } + internal async Task RunProcessingWriteAck(ConcurrentQueue inFlightMessages) + { + await foreach (var messageFromServer in _stream) + { + var status = Status.FromProto(messageFromServer.Status, messageFromServer.Issues); - public TValue Data { get; } + if (status.IsNotSuccess) + { + Logger.LogWarning(""); + return; + } - public List Metadata { get; } = new(); -} + foreach (var ack in messageFromServer.WriteResponse.Acks) + { + if (!inFlightMessages.TryDequeue(out var messageFromClient)) + { + break; + } -public record Metadata(string Key, byte[] Value); + messageFromClient.TaskCompletionSource.SetResult(new SendResult(ack)); + } + } + } -public class SendResult -{ - public SendResult(State status) + internal async Task Write(ConcurrentQueue toSendBuffer, + ConcurrentQueue inFlightMessages) { - State = status; - } + try + { + var writeMessage = new StreamWriteMessage.Types.WriteRequest + { + Codec = (int)_config.Codec + }; - public State State { get; } -} + var currentSeqNum = Volatile.Read(ref _seqNum); -public enum State -{ - Written, - AlreadyWritten -} + while (toSendBuffer.TryDequeue(out var sendData)) + { + var messageData = sendData.MessageData; -internal enum ProducerState -{ - Ready - // Broken + messageData.SeqNo = ++currentSeqNum; + writeMessage.Messages.Add(messageData); + inFlightMessages.Enqueue(sendData); + } + + Volatile.Write(ref _seqNum, currentSeqNum); + await _stream.Write(new MessageFromClient { WriteRequest = writeMessage }); + } + catch (TransactionException e) + { + ReconnectSession(); + + Console.WriteLine(e); + throw; + } + } + + public ValueTask DisposeAsync() + { + return _stream.DisposeAsync(); + } } + +internal record MessageSending(MessageData MessageData, TaskCompletionSource TaskCompletionSource); diff --git a/src/Ydb.Sdk/src/Services/Topic/ProducerBuilder.cs b/src/Ydb.Sdk/src/Services/Topic/ProducerBuilder.cs index 668c147b..0c60b579 100644 --- a/src/Ydb.Sdk/src/Services/Topic/ProducerBuilder.cs +++ b/src/Ydb.Sdk/src/Services/Topic/ProducerBuilder.cs @@ -1,6 +1,3 @@ -using Ydb.Topic; -using Ydb.Topic.V1; - namespace Ydb.Sdk.Services.Topic; public class ProducerBuilder @@ -16,45 +13,17 @@ public ProducerBuilder(ProducerConfig config) public async Task> Build() { - var stream = _config.Driver.BidirectionalStreamCall(TopicService.StreamWriteMethod, - GrpcRequestSettings.DefaultInstance); - - var initRequest = new StreamWriteMessage.Types.InitRequest { Path = _config.TopicPath }; - if (_config.ProducerId != null) - { - initRequest.ProducerId = _config.ProducerId; - } - - if (_config.MessageGroupId != null) - { - initRequest.MessageGroupId = _config.MessageGroupId; - } - - await stream.Write(new StreamWriteMessage.Types.FromClient { InitRequest = initRequest }); - if (!await stream.MoveNextAsync()) - { - throw new YdbProducerException("Write stream is closed by YDB server"); - } - - var receivedInitMessage = stream.Current; - - Status.FromProto(receivedInitMessage.Status, receivedInitMessage.Issues).EnsureSuccess(); - - var initResponse = receivedInitMessage.InitResponse; - - if (!initResponse.SupportedCodecs.Codecs.Contains((int)_config.Codec)) - { - throw new YdbProducerException($"Topic is not supported codec: {_config.Codec}"); - } - - throw new NotImplementedException(); - // return new Producer( - // _config, initResponse, stream, - // Serializer ?? (ISerializer)( - // Serializers.DefaultSerializers.TryGetValue(typeof(TValue), out var serializer) - // ? serializer - // : throw new YdbProducerException("The serializer is not set") - // ) - // ); + var producer = new Producer( + _config, + Serializer ?? (ISerializer)( + Serializers.DefaultSerializers.TryGetValue(typeof(TValue), out var serializer) + ? serializer + : throw new YdbProducerException("The serializer is not set") + ) + ); + + await producer.Initialize(); + + return producer; } } diff --git a/src/Ydb.Sdk/src/Services/Topic/SendResult.cs b/src/Ydb.Sdk/src/Services/Topic/SendResult.cs new file mode 100644 index 00000000..fdc3dc2c --- /dev/null +++ b/src/Ydb.Sdk/src/Services/Topic/SendResult.cs @@ -0,0 +1,40 @@ +using Ydb.Topic; + +namespace Ydb.Sdk.Services.Topic; + +public class SendResult +{ + private readonly long _offset; + + internal SendResult(StreamWriteMessage.Types.WriteResponse.Types.WriteAck ack) + { + switch (ack.MessageWriteStatusCase) + { + case StreamWriteMessage.Types.WriteResponse.Types.WriteAck.MessageWriteStatusOneofCase.Written: + Status = PersistenceStatus.Written; + _offset = ack.Written.Offset; + break; + case StreamWriteMessage.Types.WriteResponse.Types.WriteAck.MessageWriteStatusOneofCase.Skipped: + Status = PersistenceStatus.AlreadyWritten; + break; + case StreamWriteMessage.Types.WriteResponse.Types.WriteAck.MessageWriteStatusOneofCase.None: + default: + throw new YdbProducerException($"Unexpected WriteAck status: {ack.MessageWriteStatusCase}"); + } + } + + public PersistenceStatus Status { get; } + + public bool TryGetOffset(out long offset) + { + offset = _offset; + + return Status == PersistenceStatus.Written; + } +} + +public enum PersistenceStatus +{ + Written, + AlreadyWritten +} diff --git a/src/Ydb.Sdk/src/Services/Topic/TopicSession.cs b/src/Ydb.Sdk/src/Services/Topic/TopicSession.cs new file mode 100644 index 00000000..ada81bb1 --- /dev/null +++ b/src/Ydb.Sdk/src/Services/Topic/TopicSession.cs @@ -0,0 +1,46 @@ +using Microsoft.Extensions.Logging; + +namespace Ydb.Sdk.Services.Topic; + +internal abstract class TopicSession : IDisposable +{ + private readonly Func _initialize; + + protected readonly ILogger Logger; + protected readonly string SessionId; + + private int _isActive = 1; + + protected TopicSession(ILogger logger, string sessionId, Func initialize) + { + Logger = logger; + _initialize = initialize; + SessionId = sessionId; + } + + protected async void ReconnectSession() + { + if (Interlocked.CompareExchange(ref _isActive, 0, 1) == 0) + { + return; + } + + while (true) + { + try + { + await _initialize(); + break; + } + catch (Exception e) + { + Logger.LogError(e, "Error reconnect session!"); + } + } + } + + public void Dispose() + { + + } +} From d7383eaf942e10971d81223497e8f5a11723ea3b Mon Sep 17 00:00:00 2001 From: KirillKurdyukov Date: Wed, 23 Oct 2024 20:36:36 +0300 Subject: [PATCH 2/6] fix linter --- src/Ydb.Sdk/src/Services/Topic/Producer.cs | 50 +++++++++++++------ .../src/Services/Topic/ProducerConfig.cs | 19 +++++++ .../src/Services/Topic/TopicSession.cs | 14 +++--- 3 files changed, 62 insertions(+), 21 deletions(-) diff --git a/src/Ydb.Sdk/src/Services/Topic/Producer.cs b/src/Ydb.Sdk/src/Services/Topic/Producer.cs index 8d2f4f72..321314e3 100644 --- a/src/Ydb.Sdk/src/Services/Topic/Producer.cs +++ b/src/Ydb.Sdk/src/Services/Topic/Producer.cs @@ -37,8 +37,12 @@ internal Producer(ProducerConfig producerConfig, ISerializer serializer) internal async Task Initialize() { - var stream = _config.Driver.BidirectionalStreamCall(TopicService.StreamWriteMethod, - GrpcRequestSettings.DefaultInstance); + _logger.LogInformation("Producer session initialization started. ProducerConfig: {ProducerConfig}", _config); + + var stream = _config.Driver.BidirectionalStreamCall( + TopicService.StreamWriteMethod, + GrpcRequestSettings.DefaultInstance + ); var initRequest = new StreamWriteMessage.Types.InitRequest { Path = _config.TopicPath }; if (_config.ProducerId != null) @@ -54,7 +58,8 @@ internal async Task Initialize() await stream.Write(new MessageFromClient { InitRequest = initRequest }); if (!await stream.MoveNextAsync()) { - throw new YdbProducerException("Write stream is closed by YDB server"); + throw new YdbProducerException( + $"Stream unexpectedly closed by YDB server. Current InitRequest: {initRequest}"); } var receivedInitMessage = stream.Current; @@ -139,26 +144,41 @@ public ProducerSession( internal async Task RunProcessingWriteAck(ConcurrentQueue inFlightMessages) { - await foreach (var messageFromServer in _stream) + try { - var status = Status.FromProto(messageFromServer.Status, messageFromServer.Issues); - - if (status.IsNotSuccess) + Logger.LogInformation("ProducerSession[{SessionId}] is running processing writeAck", SessionId); + + await foreach (var messageFromServer in _stream) { - Logger.LogWarning(""); - return; - } + var status = Status.FromProto(messageFromServer.Status, messageFromServer.Issues); - foreach (var ack in messageFromServer.WriteResponse.Acks) - { - if (!inFlightMessages.TryDequeue(out var messageFromClient)) + if (status.IsNotSuccess) { - break; + Logger.LogWarning( + "ProducerSession[{SessionId}] received unsuccessful status while processing writeAck: {Status}", + SessionId, status); + return; } - messageFromClient.TaskCompletionSource.SetResult(new SendResult(ack)); + foreach (var ack in messageFromServer.WriteResponse.Acks) + { + if (!inFlightMessages.TryDequeue(out var messageFromClient)) + { + break; + } + + messageFromClient.TaskCompletionSource.SetResult(new SendResult(ack)); + } } } + catch (Exception e) + { + Logger.LogError(e, "ProducerSession[{SessionId}] have error on processing writeAck", SessionId); + } + finally + { + ReconnectSession(); + } } internal async Task Write(ConcurrentQueue toSendBuffer, diff --git a/src/Ydb.Sdk/src/Services/Topic/ProducerConfig.cs b/src/Ydb.Sdk/src/Services/Topic/ProducerConfig.cs index fcf72af3..ca98fe97 100644 --- a/src/Ydb.Sdk/src/Services/Topic/ProducerConfig.cs +++ b/src/Ydb.Sdk/src/Services/Topic/ProducerConfig.cs @@ -1,3 +1,5 @@ +using System.Text; + namespace Ydb.Sdk.Services.Topic; public class ProducerConfig @@ -13,4 +15,21 @@ public ProducerConfig(Driver driver, string topicPath) public string? ProducerId { get; set; } public string? MessageGroupId { get; set; } public Codec Codec { get; set; } = Codec.Raw; // TODO Supported only Raw + + public override string ToString() + { + var toString = new StringBuilder().Append("[TopicPath: ").Append(TopicPath); + + if (ProducerId != null) + { + toString.Append(", ProducerId: ").Append(ProducerId); + } + + if (MessageGroupId != null) + { + toString.Append(", MessageGroupId: ").Append(MessageGroupId); + } + + return toString.Append(", Codec: ").Append(Codec).Append(']').ToString(); + } } diff --git a/src/Ydb.Sdk/src/Services/Topic/TopicSession.cs b/src/Ydb.Sdk/src/Services/Topic/TopicSession.cs index ada81bb1..352d50b4 100644 --- a/src/Ydb.Sdk/src/Services/Topic/TopicSession.cs +++ b/src/Ydb.Sdk/src/Services/Topic/TopicSession.cs @@ -5,26 +5,29 @@ namespace Ydb.Sdk.Services.Topic; internal abstract class TopicSession : IDisposable { private readonly Func _initialize; - + protected readonly ILogger Logger; protected readonly string SessionId; - + private int _isActive = 1; protected TopicSession(ILogger logger, string sessionId, Func initialize) { Logger = logger; - _initialize = initialize; SessionId = sessionId; + _initialize = initialize; } protected async void ReconnectSession() { if (Interlocked.CompareExchange(ref _isActive, 0, 1) == 0) { - return; + Logger.LogWarning("The reconnect has already been launched"); + + return; } + while (true) { try @@ -38,9 +41,8 @@ protected async void ReconnectSession() } } } - + public void Dispose() { - } } From bc838414ebf3462628010006c145bf50a1ab8948 Mon Sep 17 00:00:00 2001 From: KirillKurdyukov Date: Thu, 24 Oct 2024 11:43:28 +0300 Subject: [PATCH 3/6] fix linter --- src/Ydb.Sdk/src/Services/Topic/Producer.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Ydb.Sdk/src/Services/Topic/Producer.cs b/src/Ydb.Sdk/src/Services/Topic/Producer.cs index 321314e3..e62d1589 100644 --- a/src/Ydb.Sdk/src/Services/Topic/Producer.cs +++ b/src/Ydb.Sdk/src/Services/Topic/Producer.cs @@ -147,7 +147,7 @@ internal async Task RunProcessingWriteAck(ConcurrentQueue inFlig try { Logger.LogInformation("ProducerSession[{SessionId}] is running processing writeAck", SessionId); - + await foreach (var messageFromServer in _stream) { var status = Status.FromProto(messageFromServer.Status, messageFromServer.Issues); From 5c68107976a885057cf1c166aad8e1c0ee2d2214 Mon Sep 17 00:00:00 2001 From: KirillKurdyukov Date: Fri, 25 Oct 2024 15:20:32 +0300 Subject: [PATCH 4/6] fix --- src/Ydb.Sdk/src/Services/Topic/Exceptions.cs | 4 +- src/Ydb.Sdk/src/Services/Topic/IProducer.cs | 8 -- src/Ydb.Sdk/src/Services/Topic/IWriter.cs | 8 ++ .../src/Services/Topic/TopicSession.cs | 14 ++- .../Topic/{SendResult.cs => WriteResult.cs} | 6 +- .../Services/Topic/{Producer.cs => Writer.cs} | 110 +++++++++++++----- .../{ProducerBuilder.cs => WriterBuilder.cs} | 16 +-- .../{ProducerConfig.cs => WriterConfig.cs} | 4 +- 8 files changed, 112 insertions(+), 58 deletions(-) delete mode 100644 src/Ydb.Sdk/src/Services/Topic/IProducer.cs create mode 100644 src/Ydb.Sdk/src/Services/Topic/IWriter.cs rename src/Ydb.Sdk/src/Services/Topic/{SendResult.cs => WriteResult.cs} (81%) rename src/Ydb.Sdk/src/Services/Topic/{Producer.cs => Writer.cs} (56%) rename src/Ydb.Sdk/src/Services/Topic/{ProducerBuilder.cs => WriterBuilder.cs} (50%) rename src/Ydb.Sdk/src/Services/Topic/{ProducerConfig.cs => WriterConfig.cs} (90%) diff --git a/src/Ydb.Sdk/src/Services/Topic/Exceptions.cs b/src/Ydb.Sdk/src/Services/Topic/Exceptions.cs index b9b80a1c..cd1c08fd 100644 --- a/src/Ydb.Sdk/src/Services/Topic/Exceptions.cs +++ b/src/Ydb.Sdk/src/Services/Topic/Exceptions.cs @@ -7,9 +7,9 @@ protected YdbTopicException(string message) : base(message) } } -public class YdbProducerException : YdbTopicException +public class YdbWriterException : YdbTopicException { - public YdbProducerException(string message) : base(message) + public YdbWriterException(string message) : base(message) { } } diff --git a/src/Ydb.Sdk/src/Services/Topic/IProducer.cs b/src/Ydb.Sdk/src/Services/Topic/IProducer.cs deleted file mode 100644 index f528e1c7..00000000 --- a/src/Ydb.Sdk/src/Services/Topic/IProducer.cs +++ /dev/null @@ -1,8 +0,0 @@ -namespace Ydb.Sdk.Services.Topic; - -public interface IProducer -{ - public Task SendAsync(TValue data); - - public Task SendAsync(Message message); -} diff --git a/src/Ydb.Sdk/src/Services/Topic/IWriter.cs b/src/Ydb.Sdk/src/Services/Topic/IWriter.cs new file mode 100644 index 00000000..eabe571f --- /dev/null +++ b/src/Ydb.Sdk/src/Services/Topic/IWriter.cs @@ -0,0 +1,8 @@ +namespace Ydb.Sdk.Services.Topic; + +public interface IWriter +{ + public Task WriteAsync(TValue data); + + public Task WriteAsync(Message message); +} diff --git a/src/Ydb.Sdk/src/Services/Topic/TopicSession.cs b/src/Ydb.Sdk/src/Services/Topic/TopicSession.cs index 352d50b4..a222a069 100644 --- a/src/Ydb.Sdk/src/Services/Topic/TopicSession.cs +++ b/src/Ydb.Sdk/src/Services/Topic/TopicSession.cs @@ -10,6 +10,7 @@ internal abstract class TopicSession : IDisposable protected readonly string SessionId; private int _isActive = 1; + private bool _disposed; protected TopicSession(ILogger logger, string sessionId, Func initialize) { @@ -22,13 +23,14 @@ protected async void ReconnectSession() { if (Interlocked.CompareExchange(ref _isActive, 0, 1) == 0) { - Logger.LogWarning("The reconnect has already been launched"); + Logger.LogWarning("Skipping reconnect. A reconnect session has already been initiated"); return; } - - while (true) + Logger.LogInformation("WriterSession[{SessionId}] has been deactivated, starting to reconnect", SessionId); + + while (!_disposed) { try { @@ -37,12 +39,16 @@ protected async void ReconnectSession() } catch (Exception e) { - Logger.LogError(e, "Error reconnect session!"); + Logger.LogError(e, "Unable to reconnect the session due to the following error"); } } } public void Dispose() { + lock (this) + { + _disposed = true; + } } } diff --git a/src/Ydb.Sdk/src/Services/Topic/SendResult.cs b/src/Ydb.Sdk/src/Services/Topic/WriteResult.cs similarity index 81% rename from src/Ydb.Sdk/src/Services/Topic/SendResult.cs rename to src/Ydb.Sdk/src/Services/Topic/WriteResult.cs index fdc3dc2c..1291946d 100644 --- a/src/Ydb.Sdk/src/Services/Topic/SendResult.cs +++ b/src/Ydb.Sdk/src/Services/Topic/WriteResult.cs @@ -2,11 +2,11 @@ namespace Ydb.Sdk.Services.Topic; -public class SendResult +public class WriteResult { private readonly long _offset; - internal SendResult(StreamWriteMessage.Types.WriteResponse.Types.WriteAck ack) + internal WriteResult(StreamWriteMessage.Types.WriteResponse.Types.WriteAck ack) { switch (ack.MessageWriteStatusCase) { @@ -19,7 +19,7 @@ internal SendResult(StreamWriteMessage.Types.WriteResponse.Types.WriteAck ack) break; case StreamWriteMessage.Types.WriteResponse.Types.WriteAck.MessageWriteStatusOneofCase.None: default: - throw new YdbProducerException($"Unexpected WriteAck status: {ack.MessageWriteStatusCase}"); + throw new YdbWriterException($"Unexpected WriteAck status: {ack.MessageWriteStatusCase}"); } } diff --git a/src/Ydb.Sdk/src/Services/Topic/Producer.cs b/src/Ydb.Sdk/src/Services/Topic/Writer.cs similarity index 56% rename from src/Ydb.Sdk/src/Services/Topic/Producer.cs rename to src/Ydb.Sdk/src/Services/Topic/Writer.cs index e62d1589..5120e88f 100644 --- a/src/Ydb.Sdk/src/Services/Topic/Producer.cs +++ b/src/Ydb.Sdk/src/Services/Topic/Writer.cs @@ -11,33 +11,33 @@ namespace Ydb.Sdk.Services.Topic; using InitResponse = StreamWriteMessage.Types.InitResponse; using MessageData = StreamWriteMessage.Types.WriteRequest.Types.MessageData; using MessageFromClient = StreamWriteMessage.Types.FromClient; -using ProducerStream = Driver.BidirectionalStream< +using WriterStream = Driver.BidirectionalStream< StreamWriteMessage.Types.FromClient, StreamWriteMessage.Types.FromServer >; -internal class Producer : IProducer +internal class Writer : IWriter { - private readonly ProducerConfig _config; - private readonly ILogger> _logger; + private readonly WriterConfig _config; + private readonly ILogger> _logger; private readonly ISerializer _serializer; private readonly ConcurrentQueue _inFlightMessages = new(); private readonly ConcurrentQueue _toSendBuffer = new(); private readonly SemaphoreSlim _writeSemaphoreSlim = new(1); - private volatile ProducerSession _session = null!; + private volatile WriterSession _session = null!; - internal Producer(ProducerConfig producerConfig, ISerializer serializer) + internal Writer(WriterConfig config, ISerializer serializer) { - _config = producerConfig; + _config = config; _serializer = serializer; - _logger = producerConfig.Driver.LoggerFactory.CreateLogger>(); + _logger = config.Driver.LoggerFactory.CreateLogger>(); } internal async Task Initialize() { - _logger.LogInformation("Producer session initialization started. ProducerConfig: {ProducerConfig}", _config); + _logger.LogInformation("Writer session initialization started. WriterConfig: {WriterConfig}", _config); var stream = _config.Driver.BidirectionalStreamCall( TopicService.StreamWriteMethod, @@ -55,10 +55,12 @@ internal async Task Initialize() initRequest.MessageGroupId = _config.MessageGroupId; } + _logger.LogDebug("Sending initialization request for the write stream: {InitRequest}", initRequest); + await stream.Write(new MessageFromClient { InitRequest = initRequest }); if (!await stream.MoveNextAsync()) { - throw new YdbProducerException( + throw new YdbWriterException( $"Stream unexpectedly closed by YDB server. Current InitRequest: {initRequest}"); } @@ -68,23 +70,39 @@ internal async Task Initialize() var initResponse = receivedInitMessage.InitResponse; + _logger.LogDebug("Received a response for the initialization request on the write stream: {InitResponse}", + initResponse); + if (!initResponse.SupportedCodecs.Codecs.Contains((int)_config.Codec)) { - throw new YdbProducerException($"Topic is not supported codec: {_config.Codec}"); + throw new YdbWriterException($"Topic[{_config.TopicPath}] is not supported codec: {_config.Codec}"); + } + + _session = new WriterSession(_config, stream, initResponse, Initialize, _logger); + + await _writeSemaphoreSlim.WaitAsync(); + try + { + _logger.LogDebug("Retrying to send pending in-flight messages after stream restart"); + + await _session.Write(_inFlightMessages, _inFlightMessages); + } + finally + { + _writeSemaphoreSlim.Release(); } - _session = new ProducerSession(_config, stream, initResponse, Initialize, _logger); _ = _session.RunProcessingWriteAck(_inFlightMessages); } - public Task SendAsync(TValue data) + public Task WriteAsync(TValue data) { - return SendAsync(new Message(data)); + return WriteAsync(new Message(data)); } - public async Task SendAsync(Message message) + public async Task WriteAsync(Message message) { - TaskCompletionSource completeTask = new(); + TaskCompletionSource completeTask = new(); var data = _serializer.Serialize(message.Data); var messageData = new MessageData @@ -123,16 +141,16 @@ public async Task SendAsync(Message message) } // No thread safe -internal class ProducerSession : TopicSession +internal class WriterSession : TopicSession { - private readonly ProducerConfig _config; - private readonly ProducerStream _stream; + private readonly WriterConfig _config; + private readonly WriterStream _stream; private long _seqNum; - public ProducerSession( - ProducerConfig config, - ProducerStream stream, + public WriterSession( + WriterConfig config, + WriterStream stream, InitResponse initResponse, Func initialize, ILogger logger) : base(logger, initResponse.SessionId, initialize) @@ -146,7 +164,7 @@ internal async Task RunProcessingWriteAck(ConcurrentQueue inFlig { try { - Logger.LogInformation("ProducerSession[{SessionId}] is running processing writeAck", SessionId); + Logger.LogInformation("WriterSession[{SessionId}] is running processing writeAck", SessionId); await foreach (var messageFromServer in _stream) { @@ -155,25 +173,55 @@ internal async Task RunProcessingWriteAck(ConcurrentQueue inFlig if (status.IsNotSuccess) { Logger.LogWarning( - "ProducerSession[{SessionId}] received unsuccessful status while processing writeAck: {Status}", + "WriterSession[{SessionId}] received unsuccessful status while processing writeAck: {Status}", SessionId, status); return; } foreach (var ack in messageFromServer.WriteResponse.Acks) { - if (!inFlightMessages.TryDequeue(out var messageFromClient)) + if (!inFlightMessages.TryPeek(out var messageFromClient)) { + Logger.LogCritical("No client message was found upon receipt of an acknowledgement: {WriteAck}", + ack); + break; } - messageFromClient.TaskCompletionSource.SetResult(new SendResult(ack)); + if (messageFromClient.MessageData.SeqNo > ack.SeqNo) + { + Logger.LogCritical( + @"The sequence number of the client's message in the queue is greater than the server's write acknowledgment number. +Skipping the WriteAck... +Client SeqNo: {SeqNo}, WriteAck: {WriteAck}", + messageFromClient.MessageData.SeqNo, ack); + + continue; + } + + if (messageFromClient.MessageData.SeqNo < ack.SeqNo) + { + Logger.LogCritical( + @"The sequence number of the client's message in the queue is less than the server's write acknowledgment number. +Completing task on exception... +Client SeqNo: {SeqNo}, WriteAck: {WriteAck}", + messageFromClient.MessageData.SeqNo, ack); + + messageFromClient.TaskCompletionSource.SetException(new YdbWriterException( + $"Client SeqNo[{messageFromClient.MessageData.SeqNo}] is less then server's WriteAck[{ack}]")); + } + else + { + messageFromClient.TaskCompletionSource.SetResult(new WriteResult(ack)); + } + + inFlightMessages.TryDequeue(out _); // Dequeue } } } catch (Exception e) { - Logger.LogError(e, "ProducerSession[{SessionId}] have error on processing writeAck", SessionId); + Logger.LogError(e, "WriterSession[{SessionId}] have error on processing writeAck", SessionId); } finally { @@ -207,10 +255,10 @@ internal async Task Write(ConcurrentQueue toSendBuffer, } catch (TransactionException e) { - ReconnectSession(); + Logger.LogError(e, "WriterSession[{SessionId}] have error on Write, last SeqNo={SeqNo}", + SessionId, Volatile.Read(ref _seqNum)); - Console.WriteLine(e); - throw; + ReconnectSession(); } } @@ -220,4 +268,4 @@ public ValueTask DisposeAsync() } } -internal record MessageSending(MessageData MessageData, TaskCompletionSource TaskCompletionSource); +internal record MessageSending(MessageData MessageData, TaskCompletionSource TaskCompletionSource); diff --git a/src/Ydb.Sdk/src/Services/Topic/ProducerBuilder.cs b/src/Ydb.Sdk/src/Services/Topic/WriterBuilder.cs similarity index 50% rename from src/Ydb.Sdk/src/Services/Topic/ProducerBuilder.cs rename to src/Ydb.Sdk/src/Services/Topic/WriterBuilder.cs index 0c60b579..df39f54c 100644 --- a/src/Ydb.Sdk/src/Services/Topic/ProducerBuilder.cs +++ b/src/Ydb.Sdk/src/Services/Topic/WriterBuilder.cs @@ -1,29 +1,29 @@ namespace Ydb.Sdk.Services.Topic; -public class ProducerBuilder +public class WriterBuilder { - private readonly ProducerConfig _config; + private readonly WriterConfig _config; - public ProducerBuilder(ProducerConfig config) + public WriterBuilder(WriterConfig config) { _config = config; } public ISerializer? Serializer { get; set; } - public async Task> Build() + public async Task> Build() { - var producer = new Producer( + var writer = new Writer( _config, Serializer ?? (ISerializer)( Serializers.DefaultSerializers.TryGetValue(typeof(TValue), out var serializer) ? serializer - : throw new YdbProducerException("The serializer is not set") + : throw new YdbWriterException("The serializer is not set") ) ); - await producer.Initialize(); + await writer.Initialize(); - return producer; + return writer; } } diff --git a/src/Ydb.Sdk/src/Services/Topic/ProducerConfig.cs b/src/Ydb.Sdk/src/Services/Topic/WriterConfig.cs similarity index 90% rename from src/Ydb.Sdk/src/Services/Topic/ProducerConfig.cs rename to src/Ydb.Sdk/src/Services/Topic/WriterConfig.cs index ca98fe97..f9dd2c6b 100644 --- a/src/Ydb.Sdk/src/Services/Topic/ProducerConfig.cs +++ b/src/Ydb.Sdk/src/Services/Topic/WriterConfig.cs @@ -2,9 +2,9 @@ namespace Ydb.Sdk.Services.Topic; -public class ProducerConfig +public class WriterConfig { - public ProducerConfig(Driver driver, string topicPath) + public WriterConfig(Driver driver, string topicPath) { Driver = driver; TopicPath = topicPath; From ca941e93d06757994cab47c06c609af95cfbba59 Mon Sep 17 00:00:00 2001 From: KirillKurdyukov Date: Fri, 25 Oct 2024 15:26:21 +0300 Subject: [PATCH 5/6] fix --- src/Ydb.Sdk/src/Services/Topic/Writer.cs | 8 +++++--- src/Ydb.Sdk/src/Services/Topic/WriterBuilder.cs | 5 ++++- src/Ydb.Sdk/src/Services/Topic/WriterConfig.cs | 4 +--- 3 files changed, 10 insertions(+), 7 deletions(-) diff --git a/src/Ydb.Sdk/src/Services/Topic/Writer.cs b/src/Ydb.Sdk/src/Services/Topic/Writer.cs index 5120e88f..a2f51ffe 100644 --- a/src/Ydb.Sdk/src/Services/Topic/Writer.cs +++ b/src/Ydb.Sdk/src/Services/Topic/Writer.cs @@ -18,6 +18,7 @@ namespace Ydb.Sdk.Services.Topic; internal class Writer : IWriter { + private readonly Driver _driver; private readonly WriterConfig _config; private readonly ILogger> _logger; private readonly ISerializer _serializer; @@ -28,18 +29,19 @@ internal class Writer : IWriter private volatile WriterSession _session = null!; - internal Writer(WriterConfig config, ISerializer serializer) + internal Writer(Driver driver, WriterConfig config, ISerializer serializer) { + _driver = driver; _config = config; _serializer = serializer; - _logger = config.Driver.LoggerFactory.CreateLogger>(); + _logger = driver.LoggerFactory.CreateLogger>(); } internal async Task Initialize() { _logger.LogInformation("Writer session initialization started. WriterConfig: {WriterConfig}", _config); - var stream = _config.Driver.BidirectionalStreamCall( + var stream = _driver.BidirectionalStreamCall( TopicService.StreamWriteMethod, GrpcRequestSettings.DefaultInstance ); diff --git a/src/Ydb.Sdk/src/Services/Topic/WriterBuilder.cs b/src/Ydb.Sdk/src/Services/Topic/WriterBuilder.cs index df39f54c..f7110786 100644 --- a/src/Ydb.Sdk/src/Services/Topic/WriterBuilder.cs +++ b/src/Ydb.Sdk/src/Services/Topic/WriterBuilder.cs @@ -3,9 +3,11 @@ namespace Ydb.Sdk.Services.Topic; public class WriterBuilder { private readonly WriterConfig _config; + private readonly Driver _driver; - public WriterBuilder(WriterConfig config) + public WriterBuilder(Driver driver, WriterConfig config) { + _driver = driver; _config = config; } @@ -14,6 +16,7 @@ public WriterBuilder(WriterConfig config) public async Task> Build() { var writer = new Writer( + _driver, _config, Serializer ?? (ISerializer)( Serializers.DefaultSerializers.TryGetValue(typeof(TValue), out var serializer) diff --git a/src/Ydb.Sdk/src/Services/Topic/WriterConfig.cs b/src/Ydb.Sdk/src/Services/Topic/WriterConfig.cs index f9dd2c6b..f0e1b56d 100644 --- a/src/Ydb.Sdk/src/Services/Topic/WriterConfig.cs +++ b/src/Ydb.Sdk/src/Services/Topic/WriterConfig.cs @@ -4,13 +4,11 @@ namespace Ydb.Sdk.Services.Topic; public class WriterConfig { - public WriterConfig(Driver driver, string topicPath) + public WriterConfig(string topicPath) { - Driver = driver; TopicPath = topicPath; } - public Driver Driver { get; } public string TopicPath { get; } public string? ProducerId { get; set; } public string? MessageGroupId { get; set; } From 8bf79ad4ac72840438f605c7f1b7c1e6c7f0754b Mon Sep 17 00:00:00 2001 From: KirillKurdyukov Date: Fri, 25 Oct 2024 15:40:42 +0300 Subject: [PATCH 6/6] fix --- src/Ydb.Sdk/src/Driver.cs | 14 +++----------- .../src/Services/Topic/TopicSession.cs | 9 +++++++-- src/Ydb.Sdk/src/Services/Topic/Writer.cs | 19 +++++++------------ 3 files changed, 17 insertions(+), 25 deletions(-) diff --git a/src/Ydb.Sdk/src/Driver.cs b/src/Ydb.Sdk/src/Driver.cs index b9dbc604..5abe3ae9 100644 --- a/src/Ydb.Sdk/src/Driver.cs +++ b/src/Ydb.Sdk/src/Driver.cs @@ -367,8 +367,7 @@ public async ValueTask MoveNextAsync() } } - internal sealed class BidirectionalStream : IAsyncEnumerator, - IAsyncEnumerable + internal sealed class BidirectionalStream : IDisposable { private readonly AsyncDuplexStreamingCall _bidirectionalStream; private readonly Action _rpcErrorAction; @@ -394,13 +393,6 @@ public async Task Write(TRequest request) } } - public ValueTask DisposeAsync() - { - _bidirectionalStream.Dispose(); - - return default; - } - public async ValueTask MoveNextAsync() { try @@ -417,9 +409,9 @@ public async ValueTask MoveNextAsync() public TResponse Current => _bidirectionalStream.ResponseStream.Current; - public IAsyncEnumerator GetAsyncEnumerator(CancellationToken cancellationToken = new()) + public void Dispose() { - return this; + _bidirectionalStream.Dispose(); } } diff --git a/src/Ydb.Sdk/src/Services/Topic/TopicSession.cs b/src/Ydb.Sdk/src/Services/Topic/TopicSession.cs index a222a069..e7a1fa6f 100644 --- a/src/Ydb.Sdk/src/Services/Topic/TopicSession.cs +++ b/src/Ydb.Sdk/src/Services/Topic/TopicSession.cs @@ -2,18 +2,21 @@ namespace Ydb.Sdk.Services.Topic; -internal abstract class TopicSession : IDisposable +internal abstract class TopicSession : IDisposable { private readonly Func _initialize; + protected readonly Driver.BidirectionalStream Stream; protected readonly ILogger Logger; protected readonly string SessionId; private int _isActive = 1; private bool _disposed; - protected TopicSession(ILogger logger, string sessionId, Func initialize) + protected TopicSession(Driver.BidirectionalStream stream, ILogger logger, + string sessionId, Func initialize) { + Stream = stream; Logger = logger; SessionId = sessionId; _initialize = initialize; @@ -50,5 +53,7 @@ public void Dispose() { _disposed = true; } + + Stream.Dispose(); } } diff --git a/src/Ydb.Sdk/src/Services/Topic/Writer.cs b/src/Ydb.Sdk/src/Services/Topic/Writer.cs index a2f51ffe..0e3c8b15 100644 --- a/src/Ydb.Sdk/src/Services/Topic/Writer.cs +++ b/src/Ydb.Sdk/src/Services/Topic/Writer.cs @@ -11,6 +11,7 @@ namespace Ydb.Sdk.Services.Topic; using InitResponse = StreamWriteMessage.Types.InitResponse; using MessageData = StreamWriteMessage.Types.WriteRequest.Types.MessageData; using MessageFromClient = StreamWriteMessage.Types.FromClient; +using MessageFromServer = StreamWriteMessage.Types.FromServer; using WriterStream = Driver.BidirectionalStream< StreamWriteMessage.Types.FromClient, StreamWriteMessage.Types.FromServer @@ -143,10 +144,9 @@ public async Task WriteAsync(Message message) } // No thread safe -internal class WriterSession : TopicSession +internal class WriterSession : TopicSession { private readonly WriterConfig _config; - private readonly WriterStream _stream; private long _seqNum; @@ -155,11 +155,10 @@ public WriterSession( WriterStream stream, InitResponse initResponse, Func initialize, - ILogger logger) : base(logger, initResponse.SessionId, initialize) + ILogger logger) : base(stream, logger, initResponse.SessionId, initialize) { _config = config; - _stream = stream; - _seqNum = initResponse.LastSeqNo; + Volatile.Write(ref _seqNum, initResponse.LastSeqNo); // happens-before for Volatile.Read } internal async Task RunProcessingWriteAck(ConcurrentQueue inFlightMessages) @@ -168,8 +167,9 @@ internal async Task RunProcessingWriteAck(ConcurrentQueue inFlig { Logger.LogInformation("WriterSession[{SessionId}] is running processing writeAck", SessionId); - await foreach (var messageFromServer in _stream) + while (await Stream.MoveNextAsync()) { + var messageFromServer = Stream.Current; var status = Status.FromProto(messageFromServer.Status, messageFromServer.Issues); if (status.IsNotSuccess) @@ -253,7 +253,7 @@ internal async Task Write(ConcurrentQueue toSendBuffer, } Volatile.Write(ref _seqNum, currentSeqNum); - await _stream.Write(new MessageFromClient { WriteRequest = writeMessage }); + await Stream.Write(new MessageFromClient { WriteRequest = writeMessage }); } catch (TransactionException e) { @@ -263,11 +263,6 @@ internal async Task Write(ConcurrentQueue toSendBuffer, ReconnectSession(); } } - - public ValueTask DisposeAsync() - { - return _stream.DisposeAsync(); - } } internal record MessageSending(MessageData MessageData, TaskCompletionSource TaskCompletionSource);