diff --git a/src/Ydb.Sdk/src/Driver.cs b/src/Ydb.Sdk/src/Driver.cs index b9dbc604..5abe3ae9 100644 --- a/src/Ydb.Sdk/src/Driver.cs +++ b/src/Ydb.Sdk/src/Driver.cs @@ -367,8 +367,7 @@ public async ValueTask MoveNextAsync() } } - internal sealed class BidirectionalStream : IAsyncEnumerator, - IAsyncEnumerable + internal sealed class BidirectionalStream : IDisposable { private readonly AsyncDuplexStreamingCall _bidirectionalStream; private readonly Action _rpcErrorAction; @@ -394,13 +393,6 @@ public async Task Write(TRequest request) } } - public ValueTask DisposeAsync() - { - _bidirectionalStream.Dispose(); - - return default; - } - public async ValueTask MoveNextAsync() { try @@ -417,9 +409,9 @@ public async ValueTask MoveNextAsync() public TResponse Current => _bidirectionalStream.ResponseStream.Current; - public IAsyncEnumerator GetAsyncEnumerator(CancellationToken cancellationToken = new()) + public void Dispose() { - return this; + _bidirectionalStream.Dispose(); } } diff --git a/src/Ydb.Sdk/src/Services/Topic/Exceptions.cs b/src/Ydb.Sdk/src/Services/Topic/Exceptions.cs index b9b80a1c..cd1c08fd 100644 --- a/src/Ydb.Sdk/src/Services/Topic/Exceptions.cs +++ b/src/Ydb.Sdk/src/Services/Topic/Exceptions.cs @@ -7,9 +7,9 @@ protected YdbTopicException(string message) : base(message) } } -public class YdbProducerException : YdbTopicException +public class YdbWriterException : YdbTopicException { - public YdbProducerException(string message) : base(message) + public YdbWriterException(string message) : base(message) { } } diff --git a/src/Ydb.Sdk/src/Services/Topic/IProducer.cs b/src/Ydb.Sdk/src/Services/Topic/IProducer.cs deleted file mode 100644 index f528e1c7..00000000 --- a/src/Ydb.Sdk/src/Services/Topic/IProducer.cs +++ /dev/null @@ -1,8 +0,0 @@ -namespace Ydb.Sdk.Services.Topic; - -public interface IProducer -{ - public Task SendAsync(TValue data); - - public Task SendAsync(Message message); -} diff --git a/src/Ydb.Sdk/src/Services/Topic/IWriter.cs b/src/Ydb.Sdk/src/Services/Topic/IWriter.cs new file mode 100644 index 00000000..eabe571f --- /dev/null +++ b/src/Ydb.Sdk/src/Services/Topic/IWriter.cs @@ -0,0 +1,8 @@ +namespace Ydb.Sdk.Services.Topic; + +public interface IWriter +{ + public Task WriteAsync(TValue data); + + public Task WriteAsync(Message message); +} diff --git a/src/Ydb.Sdk/src/Services/Topic/Message.cs b/src/Ydb.Sdk/src/Services/Topic/Message.cs new file mode 100644 index 00000000..820923a2 --- /dev/null +++ b/src/Ydb.Sdk/src/Services/Topic/Message.cs @@ -0,0 +1,17 @@ +namespace Ydb.Sdk.Services.Topic; + +public class Message +{ + public Message(TValue data) + { + Data = data; + } + + public DateTime Timestamp { get; set; } = DateTime.Now; + + public TValue Data { get; } + + public List Metadata { get; } = new(); +} + +public record Metadata(string Key, byte[] Value); diff --git a/src/Ydb.Sdk/src/Services/Topic/Producer.cs b/src/Ydb.Sdk/src/Services/Topic/Producer.cs deleted file mode 100644 index a7d9dc3b..00000000 --- a/src/Ydb.Sdk/src/Services/Topic/Producer.cs +++ /dev/null @@ -1,88 +0,0 @@ -// using System.Collections.Concurrent; -// using Microsoft.Extensions.Logging; -// using Ydb.Topic; - -namespace Ydb.Sdk.Services.Topic; - -// using ProducerStream = Driver.BidirectionalStream< -// StreamWriteMessage.Types.FromClient, -// StreamWriteMessage.Types.FromServer -// >; - -internal class Producer : IProducer -{ - // private readonly Driver _driver; - // private readonly ILogger> _logger; - // private readonly long _partitionId; - // private readonly string _sessionId; - // private readonly ISerializer _serializer; - // - // private long _seqNum; - // - // private readonly ConcurrentQueue _inFlightMessages; - // private volatile ProducerStream _stream; - // - // internal Producer( - // ProducerConfig producerConfig, - // StreamWriteMessage.Types.InitResponse initResponse, - // ProducerStream stream, - // ISerializer serializer) - // { - // _driver = producerConfig.Driver; - // _stream = stream; - // _serializer = serializer; - // _logger = producerConfig.Driver.LoggerFactory.CreateLogger>(); - // _partitionId = initResponse.PartitionId; - // _sessionId = initResponse.SessionId; - // _seqNum = initResponse.LastSeqNo; - // _inFlightMessages = new ConcurrentQueue(); - // } - - public Task SendAsync(TValue data) - { - throw new NotImplementedException(); - } - - public Task SendAsync(Message message) - { - throw new NotImplementedException(); - } -} - -public class Message -{ - public Message(TValue data) - { - Data = data; - } - - public DateTime Timestamp { get; set; } - - public TValue Data { get; } - - public List Metadata { get; } = new(); -} - -public record Metadata(string Key, byte[] Value); - -public class SendResult -{ - public SendResult(State status) - { - State = status; - } - - public State State { get; } -} - -public enum State -{ - Written, - AlreadyWritten -} - -internal enum ProducerState -{ - Ready - // Broken -} diff --git a/src/Ydb.Sdk/src/Services/Topic/ProducerBuilder.cs b/src/Ydb.Sdk/src/Services/Topic/ProducerBuilder.cs deleted file mode 100644 index 668c147b..00000000 --- a/src/Ydb.Sdk/src/Services/Topic/ProducerBuilder.cs +++ /dev/null @@ -1,60 +0,0 @@ -using Ydb.Topic; -using Ydb.Topic.V1; - -namespace Ydb.Sdk.Services.Topic; - -public class ProducerBuilder -{ - private readonly ProducerConfig _config; - - public ProducerBuilder(ProducerConfig config) - { - _config = config; - } - - public ISerializer? Serializer { get; set; } - - public async Task> Build() - { - var stream = _config.Driver.BidirectionalStreamCall(TopicService.StreamWriteMethod, - GrpcRequestSettings.DefaultInstance); - - var initRequest = new StreamWriteMessage.Types.InitRequest { Path = _config.TopicPath }; - if (_config.ProducerId != null) - { - initRequest.ProducerId = _config.ProducerId; - } - - if (_config.MessageGroupId != null) - { - initRequest.MessageGroupId = _config.MessageGroupId; - } - - await stream.Write(new StreamWriteMessage.Types.FromClient { InitRequest = initRequest }); - if (!await stream.MoveNextAsync()) - { - throw new YdbProducerException("Write stream is closed by YDB server"); - } - - var receivedInitMessage = stream.Current; - - Status.FromProto(receivedInitMessage.Status, receivedInitMessage.Issues).EnsureSuccess(); - - var initResponse = receivedInitMessage.InitResponse; - - if (!initResponse.SupportedCodecs.Codecs.Contains((int)_config.Codec)) - { - throw new YdbProducerException($"Topic is not supported codec: {_config.Codec}"); - } - - throw new NotImplementedException(); - // return new Producer( - // _config, initResponse, stream, - // Serializer ?? (ISerializer)( - // Serializers.DefaultSerializers.TryGetValue(typeof(TValue), out var serializer) - // ? serializer - // : throw new YdbProducerException("The serializer is not set") - // ) - // ); - } -} diff --git a/src/Ydb.Sdk/src/Services/Topic/ProducerConfig.cs b/src/Ydb.Sdk/src/Services/Topic/ProducerConfig.cs deleted file mode 100644 index fcf72af3..00000000 --- a/src/Ydb.Sdk/src/Services/Topic/ProducerConfig.cs +++ /dev/null @@ -1,16 +0,0 @@ -namespace Ydb.Sdk.Services.Topic; - -public class ProducerConfig -{ - public ProducerConfig(Driver driver, string topicPath) - { - Driver = driver; - TopicPath = topicPath; - } - - public Driver Driver { get; } - public string TopicPath { get; } - public string? ProducerId { get; set; } - public string? MessageGroupId { get; set; } - public Codec Codec { get; set; } = Codec.Raw; // TODO Supported only Raw -} diff --git a/src/Ydb.Sdk/src/Services/Topic/TopicSession.cs b/src/Ydb.Sdk/src/Services/Topic/TopicSession.cs new file mode 100644 index 00000000..e7a1fa6f --- /dev/null +++ b/src/Ydb.Sdk/src/Services/Topic/TopicSession.cs @@ -0,0 +1,59 @@ +using Microsoft.Extensions.Logging; + +namespace Ydb.Sdk.Services.Topic; + +internal abstract class TopicSession : IDisposable +{ + private readonly Func _initialize; + + protected readonly Driver.BidirectionalStream Stream; + protected readonly ILogger Logger; + protected readonly string SessionId; + + private int _isActive = 1; + private bool _disposed; + + protected TopicSession(Driver.BidirectionalStream stream, ILogger logger, + string sessionId, Func initialize) + { + Stream = stream; + Logger = logger; + SessionId = sessionId; + _initialize = initialize; + } + + protected async void ReconnectSession() + { + if (Interlocked.CompareExchange(ref _isActive, 0, 1) == 0) + { + Logger.LogWarning("Skipping reconnect. A reconnect session has already been initiated"); + + return; + } + + Logger.LogInformation("WriterSession[{SessionId}] has been deactivated, starting to reconnect", SessionId); + + while (!_disposed) + { + try + { + await _initialize(); + break; + } + catch (Exception e) + { + Logger.LogError(e, "Unable to reconnect the session due to the following error"); + } + } + } + + public void Dispose() + { + lock (this) + { + _disposed = true; + } + + Stream.Dispose(); + } +} diff --git a/src/Ydb.Sdk/src/Services/Topic/WriteResult.cs b/src/Ydb.Sdk/src/Services/Topic/WriteResult.cs new file mode 100644 index 00000000..1291946d --- /dev/null +++ b/src/Ydb.Sdk/src/Services/Topic/WriteResult.cs @@ -0,0 +1,40 @@ +using Ydb.Topic; + +namespace Ydb.Sdk.Services.Topic; + +public class WriteResult +{ + private readonly long _offset; + + internal WriteResult(StreamWriteMessage.Types.WriteResponse.Types.WriteAck ack) + { + switch (ack.MessageWriteStatusCase) + { + case StreamWriteMessage.Types.WriteResponse.Types.WriteAck.MessageWriteStatusOneofCase.Written: + Status = PersistenceStatus.Written; + _offset = ack.Written.Offset; + break; + case StreamWriteMessage.Types.WriteResponse.Types.WriteAck.MessageWriteStatusOneofCase.Skipped: + Status = PersistenceStatus.AlreadyWritten; + break; + case StreamWriteMessage.Types.WriteResponse.Types.WriteAck.MessageWriteStatusOneofCase.None: + default: + throw new YdbWriterException($"Unexpected WriteAck status: {ack.MessageWriteStatusCase}"); + } + } + + public PersistenceStatus Status { get; } + + public bool TryGetOffset(out long offset) + { + offset = _offset; + + return Status == PersistenceStatus.Written; + } +} + +public enum PersistenceStatus +{ + Written, + AlreadyWritten +} diff --git a/src/Ydb.Sdk/src/Services/Topic/Writer.cs b/src/Ydb.Sdk/src/Services/Topic/Writer.cs new file mode 100644 index 00000000..0e3c8b15 --- /dev/null +++ b/src/Ydb.Sdk/src/Services/Topic/Writer.cs @@ -0,0 +1,268 @@ +using System.Collections.Concurrent; +using System.Transactions; +using Google.Protobuf; +using Google.Protobuf.WellKnownTypes; +using Microsoft.Extensions.Logging; +using Ydb.Topic; +using Ydb.Topic.V1; + +namespace Ydb.Sdk.Services.Topic; + +using InitResponse = StreamWriteMessage.Types.InitResponse; +using MessageData = StreamWriteMessage.Types.WriteRequest.Types.MessageData; +using MessageFromClient = StreamWriteMessage.Types.FromClient; +using MessageFromServer = StreamWriteMessage.Types.FromServer; +using WriterStream = Driver.BidirectionalStream< + StreamWriteMessage.Types.FromClient, + StreamWriteMessage.Types.FromServer +>; + +internal class Writer : IWriter +{ + private readonly Driver _driver; + private readonly WriterConfig _config; + private readonly ILogger> _logger; + private readonly ISerializer _serializer; + + private readonly ConcurrentQueue _inFlightMessages = new(); + private readonly ConcurrentQueue _toSendBuffer = new(); + private readonly SemaphoreSlim _writeSemaphoreSlim = new(1); + + private volatile WriterSession _session = null!; + + internal Writer(Driver driver, WriterConfig config, ISerializer serializer) + { + _driver = driver; + _config = config; + _serializer = serializer; + _logger = driver.LoggerFactory.CreateLogger>(); + } + + internal async Task Initialize() + { + _logger.LogInformation("Writer session initialization started. WriterConfig: {WriterConfig}", _config); + + var stream = _driver.BidirectionalStreamCall( + TopicService.StreamWriteMethod, + GrpcRequestSettings.DefaultInstance + ); + + var initRequest = new StreamWriteMessage.Types.InitRequest { Path = _config.TopicPath }; + if (_config.ProducerId != null) + { + initRequest.ProducerId = _config.ProducerId; + } + + if (_config.MessageGroupId != null) + { + initRequest.MessageGroupId = _config.MessageGroupId; + } + + _logger.LogDebug("Sending initialization request for the write stream: {InitRequest}", initRequest); + + await stream.Write(new MessageFromClient { InitRequest = initRequest }); + if (!await stream.MoveNextAsync()) + { + throw new YdbWriterException( + $"Stream unexpectedly closed by YDB server. Current InitRequest: {initRequest}"); + } + + var receivedInitMessage = stream.Current; + + Status.FromProto(receivedInitMessage.Status, receivedInitMessage.Issues).EnsureSuccess(); + + var initResponse = receivedInitMessage.InitResponse; + + _logger.LogDebug("Received a response for the initialization request on the write stream: {InitResponse}", + initResponse); + + if (!initResponse.SupportedCodecs.Codecs.Contains((int)_config.Codec)) + { + throw new YdbWriterException($"Topic[{_config.TopicPath}] is not supported codec: {_config.Codec}"); + } + + _session = new WriterSession(_config, stream, initResponse, Initialize, _logger); + + await _writeSemaphoreSlim.WaitAsync(); + try + { + _logger.LogDebug("Retrying to send pending in-flight messages after stream restart"); + + await _session.Write(_inFlightMessages, _inFlightMessages); + } + finally + { + _writeSemaphoreSlim.Release(); + } + + _ = _session.RunProcessingWriteAck(_inFlightMessages); + } + + public Task WriteAsync(TValue data) + { + return WriteAsync(new Message(data)); + } + + public async Task WriteAsync(Message message) + { + TaskCompletionSource completeTask = new(); + + var data = _serializer.Serialize(message.Data); + var messageData = new MessageData + { + Data = ByteString.CopyFrom(data), + CreatedAt = Timestamp.FromDateTime(message.Timestamp), + UncompressedSize = data.Length + }; + + foreach (var metadata in message.Metadata) + { + messageData.MetadataItems.Add( + new MetadataItem { Key = metadata.Key, Value = ByteString.CopyFrom(metadata.Value) } + ); + } + + _toSendBuffer.Enqueue(new MessageSending(messageData, completeTask)); + + if (_toSendBuffer.IsEmpty) // concurrent sending + { + return await completeTask.Task; + } + + await _writeSemaphoreSlim.WaitAsync(); + try + { + await _session.Write(_toSendBuffer, _inFlightMessages); + } + finally + { + _writeSemaphoreSlim.Release(); + } + + return await completeTask.Task; + } +} + +// No thread safe +internal class WriterSession : TopicSession +{ + private readonly WriterConfig _config; + + private long _seqNum; + + public WriterSession( + WriterConfig config, + WriterStream stream, + InitResponse initResponse, + Func initialize, + ILogger logger) : base(stream, logger, initResponse.SessionId, initialize) + { + _config = config; + Volatile.Write(ref _seqNum, initResponse.LastSeqNo); // happens-before for Volatile.Read + } + + internal async Task RunProcessingWriteAck(ConcurrentQueue inFlightMessages) + { + try + { + Logger.LogInformation("WriterSession[{SessionId}] is running processing writeAck", SessionId); + + while (await Stream.MoveNextAsync()) + { + var messageFromServer = Stream.Current; + var status = Status.FromProto(messageFromServer.Status, messageFromServer.Issues); + + if (status.IsNotSuccess) + { + Logger.LogWarning( + "WriterSession[{SessionId}] received unsuccessful status while processing writeAck: {Status}", + SessionId, status); + return; + } + + foreach (var ack in messageFromServer.WriteResponse.Acks) + { + if (!inFlightMessages.TryPeek(out var messageFromClient)) + { + Logger.LogCritical("No client message was found upon receipt of an acknowledgement: {WriteAck}", + ack); + + break; + } + + if (messageFromClient.MessageData.SeqNo > ack.SeqNo) + { + Logger.LogCritical( + @"The sequence number of the client's message in the queue is greater than the server's write acknowledgment number. +Skipping the WriteAck... +Client SeqNo: {SeqNo}, WriteAck: {WriteAck}", + messageFromClient.MessageData.SeqNo, ack); + + continue; + } + + if (messageFromClient.MessageData.SeqNo < ack.SeqNo) + { + Logger.LogCritical( + @"The sequence number of the client's message in the queue is less than the server's write acknowledgment number. +Completing task on exception... +Client SeqNo: {SeqNo}, WriteAck: {WriteAck}", + messageFromClient.MessageData.SeqNo, ack); + + messageFromClient.TaskCompletionSource.SetException(new YdbWriterException( + $"Client SeqNo[{messageFromClient.MessageData.SeqNo}] is less then server's WriteAck[{ack}]")); + } + else + { + messageFromClient.TaskCompletionSource.SetResult(new WriteResult(ack)); + } + + inFlightMessages.TryDequeue(out _); // Dequeue + } + } + } + catch (Exception e) + { + Logger.LogError(e, "WriterSession[{SessionId}] have error on processing writeAck", SessionId); + } + finally + { + ReconnectSession(); + } + } + + internal async Task Write(ConcurrentQueue toSendBuffer, + ConcurrentQueue inFlightMessages) + { + try + { + var writeMessage = new StreamWriteMessage.Types.WriteRequest + { + Codec = (int)_config.Codec + }; + + var currentSeqNum = Volatile.Read(ref _seqNum); + + while (toSendBuffer.TryDequeue(out var sendData)) + { + var messageData = sendData.MessageData; + + messageData.SeqNo = ++currentSeqNum; + writeMessage.Messages.Add(messageData); + inFlightMessages.Enqueue(sendData); + } + + Volatile.Write(ref _seqNum, currentSeqNum); + await Stream.Write(new MessageFromClient { WriteRequest = writeMessage }); + } + catch (TransactionException e) + { + Logger.LogError(e, "WriterSession[{SessionId}] have error on Write, last SeqNo={SeqNo}", + SessionId, Volatile.Read(ref _seqNum)); + + ReconnectSession(); + } + } +} + +internal record MessageSending(MessageData MessageData, TaskCompletionSource TaskCompletionSource); diff --git a/src/Ydb.Sdk/src/Services/Topic/WriterBuilder.cs b/src/Ydb.Sdk/src/Services/Topic/WriterBuilder.cs new file mode 100644 index 00000000..f7110786 --- /dev/null +++ b/src/Ydb.Sdk/src/Services/Topic/WriterBuilder.cs @@ -0,0 +1,32 @@ +namespace Ydb.Sdk.Services.Topic; + +public class WriterBuilder +{ + private readonly WriterConfig _config; + private readonly Driver _driver; + + public WriterBuilder(Driver driver, WriterConfig config) + { + _driver = driver; + _config = config; + } + + public ISerializer? Serializer { get; set; } + + public async Task> Build() + { + var writer = new Writer( + _driver, + _config, + Serializer ?? (ISerializer)( + Serializers.DefaultSerializers.TryGetValue(typeof(TValue), out var serializer) + ? serializer + : throw new YdbWriterException("The serializer is not set") + ) + ); + + await writer.Initialize(); + + return writer; + } +} diff --git a/src/Ydb.Sdk/src/Services/Topic/WriterConfig.cs b/src/Ydb.Sdk/src/Services/Topic/WriterConfig.cs new file mode 100644 index 00000000..f0e1b56d --- /dev/null +++ b/src/Ydb.Sdk/src/Services/Topic/WriterConfig.cs @@ -0,0 +1,33 @@ +using System.Text; + +namespace Ydb.Sdk.Services.Topic; + +public class WriterConfig +{ + public WriterConfig(string topicPath) + { + TopicPath = topicPath; + } + + public string TopicPath { get; } + public string? ProducerId { get; set; } + public string? MessageGroupId { get; set; } + public Codec Codec { get; set; } = Codec.Raw; // TODO Supported only Raw + + public override string ToString() + { + var toString = new StringBuilder().Append("[TopicPath: ").Append(TopicPath); + + if (ProducerId != null) + { + toString.Append(", ProducerId: ").Append(ProducerId); + } + + if (MessageGroupId != null) + { + toString.Append(", MessageGroupId: ").Append(MessageGroupId); + } + + return toString.Append(", Codec: ").Append(Codec).Append(']').ToString(); + } +}