diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 4ca1164c..0b9e8a8f 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -33,7 +33,6 @@ jobs: run: | cd src dotnet test --filter "Category=Unit" -f ${{ matrix.dotnet-target-framework }} - ado-net-tests: runs-on: ubuntu-22.04 strategy: @@ -74,8 +73,47 @@ jobs: docker cp ydb-local:/ydb_certs/ca.pem ~/ cd src dotnet test --filter "(FullyQualifiedName~Ado) | (FullyQualifiedName~Dapper)" -l "console;verbosity=normal" - - core-integration-tests: + topic-tests: + runs-on: ubuntu-22.04 + strategy: + fail-fast: false + matrix: + ydb-version: [ 'trunk' ] + dotnet-version: [ 6.0.x, 7.0.x ] + include: + - dotnet-version: 6.0.x + dotnet-target-framework: net6.0 + - dotnet-version: 7.0.x + dotnet-target-framework: net7.0 + services: + ydb: + image: cr.yandex/yc/yandex-docker-local-ydb:${{ matrix.ydb-version }} + ports: + - 2135:2135 + - 2136:2136 + - 8765:8765 + env: + YDB_LOCAL_SURVIVE_RESTART: true + YDB_USE_IN_MEMORY_PDISKS: true + options: '--name ydb-local -h localhost' + env: + OS: ubuntu-22.04 + YDB_VERSION: ${{ matrix.ydb-version }} + YDB_CONNECTION_STRING: grpc://localhost:2136/local + YDB_CONNECTION_STRING_SECURE: grpcs://localhost:2135/local + steps: + - name: Checkout code + uses: actions/checkout@v4 + - name: Install Dotnet + uses: actions/setup-dotnet@v4 + with: + dotnet-version: ${{ matrix.dotnet-version }} + - name: Run Topic tests + run: | + docker cp ydb-local:/ydb_certs/ca.pem ~/ + cd src + dotnet test --filter "FullyQualifiedName~Topic" -l "console;verbosity=normal" + integration-tests: runs-on: ubuntu-22.04 strategy: fail-fast: false diff --git a/src/Ydb.Sdk/src/IDriver.cs b/src/Ydb.Sdk/src/IDriver.cs index 97d1a080..da293395 100644 --- a/src/Ydb.Sdk/src/IDriver.cs +++ b/src/Ydb.Sdk/src/IDriver.cs @@ -6,21 +6,21 @@ namespace Ydb.Sdk; public interface IDriver : IAsyncDisposable, IDisposable { - internal Task UnaryCall( + public Task UnaryCall( Method method, TRequest request, GrpcRequestSettings settings) where TRequest : class where TResponse : class; - internal ServerStream ServerStreamCall( + public ServerStream ServerStreamCall( Method method, TRequest request, GrpcRequestSettings settings) where TRequest : class where TResponse : class; - internal BidirectionalStream BidirectionalStreamCall( + public IBidirectionalStream BidirectionalStreamCall( Method method, GrpcRequestSettings settings) where TRequest : class @@ -29,6 +29,15 @@ internal BidirectionalStream BidirectionalStreamCall : IDisposable +{ + public Task Write(TRequest request); + + public ValueTask MoveNextAsync(); + + public TResponse Current { get; } +} + public abstract class BaseDriver : IDriver { protected readonly DriverConfig Config; @@ -95,7 +104,7 @@ public ServerStream ServerStreamCall( return new ServerStream(call, e => { OnRpcError(endpoint, e); }); } - public BidirectionalStream BidirectionalStreamCall( + public IBidirectionalStream BidirectionalStreamCall( Method method, GrpcRequestSettings settings) where TRequest : class @@ -213,7 +222,7 @@ public async ValueTask MoveNextAsync() } } -public sealed class BidirectionalStream : IDisposable +public class BidirectionalStream : IBidirectionalStream { private readonly AsyncDuplexStreamingCall _stream; private readonly Action _rpcErrorAction; diff --git a/src/Ydb.Sdk/src/Services/Topic/Deserializer.cs b/src/Ydb.Sdk/src/Services/Topic/Deserializer.cs index c0b8b11f..5d4b3515 100644 --- a/src/Ydb.Sdk/src/Services/Topic/Deserializer.cs +++ b/src/Ydb.Sdk/src/Services/Topic/Deserializer.cs @@ -55,14 +55,7 @@ private class Int64Deserializer : IDeserializer { public long Deserialize(byte[] data) { - if (data.Length != 8) - { - throw new ArgumentException( - $"Deserializer encountered data of length ${data.Length}. Expecting data length to be 8"); - } - - return ((long)data[0] << 56) | ((long)data[1] << 48) | ((long)data[2] << 40) | ((long)data[3] << 32) | - ((long)data[4] << 24) | ((long)data[5] << 16) | ((long)data[6] << 8) | data[7]; + return BitConverter.ToInt64(data); } } @@ -70,13 +63,7 @@ private class Int32Deserializer : IDeserializer { public int Deserialize(byte[] data) { - if (data.Length != 4) - { - throw new ArgumentException( - $"Deserializer encountered data of length ${data.Length}. Expecting data length to be 4"); - } - - return (data[0] << 24) | (data[1] << 16) | (data[2] << 8) | data[3]; + return BitConverter.ToInt32(data); } } diff --git a/src/Ydb.Sdk/src/Services/Topic/Exceptions.cs b/src/Ydb.Sdk/src/Services/Topic/Exceptions.cs index 84648c71..486d93cf 100644 --- a/src/Ydb.Sdk/src/Services/Topic/Exceptions.cs +++ b/src/Ydb.Sdk/src/Services/Topic/Exceptions.cs @@ -1,22 +1,28 @@ namespace Ydb.Sdk.Services.Topic; -public class YdbTopicException : Exception +public class WriterException : Exception { - protected YdbTopicException(string message) : base(message) + public WriterException(string message) : base(message) { + Status = new Status(StatusCode.Unspecified); } -} -public class YdbWriterException : YdbTopicException -{ - public YdbWriterException(string message) : base(message) + public WriterException(string message, Status status) : base(message + ": " + status) + { + Status = status; + } + + public WriterException(string message, Driver.TransportException e) : base(message, e) { + Status = e.Status; } + + public Status Status { get; } } -public class YdbReaderException : YdbTopicException +public class ReaderException : Exception { - protected YdbReaderException(string message) : base(message) + protected ReaderException(string message) : base(message) { } } diff --git a/src/Ydb.Sdk/src/Services/Topic/IWriter.cs b/src/Ydb.Sdk/src/Services/Topic/IWriter.cs index 5df9cfed..1df99eae 100644 --- a/src/Ydb.Sdk/src/Services/Topic/IWriter.cs +++ b/src/Ydb.Sdk/src/Services/Topic/IWriter.cs @@ -2,9 +2,9 @@ namespace Ydb.Sdk.Services.Topic; -public interface IWriter +public interface IWriter : IDisposable { - public Task WriteAsync(TValue data); + public Task WriteAsync(TValue data, CancellationToken cancellationToken = default); - public Task WriteAsync(Message message); + public Task WriteAsync(Message message, CancellationToken cancellationToken = default); } diff --git a/src/Ydb.Sdk/src/Services/Topic/Reader/ReaderBuilder.cs b/src/Ydb.Sdk/src/Services/Topic/Reader/ReaderBuilder.cs index ebfc67f6..97306ed6 100644 --- a/src/Ydb.Sdk/src/Services/Topic/Reader/ReaderBuilder.cs +++ b/src/Ydb.Sdk/src/Services/Topic/Reader/ReaderBuilder.cs @@ -22,7 +22,7 @@ public Task> Build() // Deserializer ?? (IDeserializer)( // Deserializers.DefaultDeserializers.TryGetValue(typeof(TValue), out var deserializer) // ? deserializer - // : throw new YdbWriterException("The serializer is not set") + // : throw new WriterException("The serializer is not set") // ) // ); // diff --git a/src/Ydb.Sdk/src/Services/Topic/Serializer.cs b/src/Ydb.Sdk/src/Services/Topic/Serializer.cs index 58423169..184f911b 100644 --- a/src/Ydb.Sdk/src/Services/Topic/Serializer.cs +++ b/src/Ydb.Sdk/src/Services/Topic/Serializer.cs @@ -49,17 +49,7 @@ private class Int64Serializer : ISerializer { public byte[] Serialize(long data) { - return new[] - { - (byte)(data >> 56), - (byte)(data >> 48), - (byte)(data >> 40), - (byte)(data >> 32), - (byte)(data >> 24), - (byte)(data >> 16), - (byte)(data >> 8), - (byte)data - }; + return BitConverter.GetBytes(data); } } @@ -67,13 +57,7 @@ private class Int32Serializer : ISerializer { public byte[] Serialize(int data) { - return new[] - { - (byte)(data >> 24), - (byte)(data >> 16), - (byte)(data >> 8), - (byte)data - }; + return BitConverter.GetBytes(data); } } diff --git a/src/Ydb.Sdk/src/Services/Topic/TopicClient.cs b/src/Ydb.Sdk/src/Services/Topic/TopicClient.cs index 265814df..4f70dcfd 100644 --- a/src/Ydb.Sdk/src/Services/Topic/TopicClient.cs +++ b/src/Ydb.Sdk/src/Services/Topic/TopicClient.cs @@ -6,9 +6,9 @@ namespace Ydb.Sdk.Services.Topic; public class TopicClient { - private readonly Driver _driver; + private readonly IDriver _driver; - public TopicClient(Driver driver) + public TopicClient(IDriver driver) { _driver = driver; } diff --git a/src/Ydb.Sdk/src/Services/Topic/TopicSession.cs b/src/Ydb.Sdk/src/Services/Topic/TopicSession.cs index 0d425331..0c4141ab 100644 --- a/src/Ydb.Sdk/src/Services/Topic/TopicSession.cs +++ b/src/Ydb.Sdk/src/Services/Topic/TopicSession.cs @@ -5,55 +5,46 @@ namespace Ydb.Sdk.Services.Topic; internal abstract class TopicSession : IDisposable { private readonly Func _initialize; + private readonly Action _resetSessionOnTransportError; - protected readonly BidirectionalStream Stream; + protected readonly IBidirectionalStream Stream; protected readonly ILogger Logger; protected readonly string SessionId; private int _isActive = 1; - private bool _disposed; - protected TopicSession(BidirectionalStream stream, ILogger logger, - string sessionId, Func initialize) + protected TopicSession( + IBidirectionalStream stream, + ILogger logger, + string sessionId, + Func initialize, + Action resetSessionOnTransportError) { Stream = stream; Logger = logger; SessionId = sessionId; _initialize = initialize; + _resetSessionOnTransportError = resetSessionOnTransportError; } - protected async void ReconnectSession() + protected async void ReconnectSession(WriterException exception) { if (Interlocked.CompareExchange(ref _isActive, 0, 1) == 0) { - Logger.LogWarning("Skipping reconnect. A reconnect session has already been initiated"); + Logger.LogDebug("Skipping reconnect. A reconnect session has already been initiated"); return; } + _resetSessionOnTransportError(exception); + Logger.LogInformation("WriterSession[{SessionId}] has been deactivated, starting to reconnect", SessionId); - while (!_disposed) - { - try - { - await _initialize(); - break; - } - catch (Exception e) - { - Logger.LogError(e, "Unable to reconnect the session due to the following error"); - } - } + await _initialize(); } public void Dispose() { - lock (this) - { - _disposed = true; - } - Stream.Dispose(); } } diff --git a/src/Ydb.Sdk/src/Services/Topic/Writer/WriteResult.cs b/src/Ydb.Sdk/src/Services/Topic/Writer/WriteResult.cs index 1320c5a4..1d6284b0 100644 --- a/src/Ydb.Sdk/src/Services/Topic/Writer/WriteResult.cs +++ b/src/Ydb.Sdk/src/Services/Topic/Writer/WriteResult.cs @@ -19,7 +19,7 @@ internal WriteResult(StreamWriteMessage.Types.WriteResponse.Types.WriteAck ack) break; case StreamWriteMessage.Types.WriteResponse.Types.WriteAck.MessageWriteStatusOneofCase.None: default: - throw new YdbWriterException($"Unexpected WriteAck status: {ack.MessageWriteStatusCase}"); + throw new WriterException($"Unexpected WriteAck status: {ack.MessageWriteStatusCase}"); } } diff --git a/src/Ydb.Sdk/src/Services/Topic/Writer/Writer.cs b/src/Ydb.Sdk/src/Services/Topic/Writer/Writer.cs index 8e09e6f0..79b25bbb 100644 --- a/src/Ydb.Sdk/src/Services/Topic/Writer/Writer.cs +++ b/src/Ydb.Sdk/src/Services/Topic/Writer/Writer.cs @@ -1,5 +1,4 @@ using System.Collections.Concurrent; -using System.Transactions; using Google.Protobuf; using Google.Protobuf.WellKnownTypes; using Microsoft.Extensions.Logging; @@ -12,7 +11,7 @@ namespace Ydb.Sdk.Services.Topic.Writer; using MessageData = StreamWriteMessage.Types.WriteRequest.Types.MessageData; using MessageFromClient = StreamWriteMessage.Types.FromClient; using MessageFromServer = StreamWriteMessage.Types.FromServer; -using WriterStream = BidirectionalStream< +using WriterStream = IBidirectionalStream< StreamWriteMessage.Types.FromClient, StreamWriteMessage.Types.FromServer >; @@ -23,130 +22,288 @@ internal class Writer : IWriter private readonly WriterConfig _config; private readonly ILogger> _logger; private readonly ISerializer _serializer; - - private readonly ConcurrentQueue _inFlightMessages = new(); private readonly ConcurrentQueue _toSendBuffer = new(); - private readonly SemaphoreSlim _writeSemaphoreSlim = new(1); + private readonly ConcurrentQueue _inFlightMessages = new(); + private readonly CancellationTokenSource _disposeTokenSource = new(); - private volatile WriterSession _session = null!; + private volatile TaskCompletionSource _tcsWakeUp = new(); + private volatile IWriteSession _session = new NotStartedWriterSession("Session not started!"); + + private int _limitBufferMaxSize; internal Writer(IDriver driver, WriterConfig config, ISerializer serializer) { _driver = driver; _config = config; - _serializer = serializer; _logger = driver.LoggerFactory.CreateLogger>(); + _serializer = serializer; + _limitBufferMaxSize = config.BufferMaxSize; + + StartWriteWorker(); } - internal async Task Initialize() + public Task WriteAsync(TValue data, CancellationToken cancellationToken) { - _logger.LogInformation("Writer session initialization started. WriterConfig: {WriterConfig}", _config); + return WriteAsync(new Message(data), cancellationToken); + } - var stream = _driver.BidirectionalStreamCall( - TopicService.StreamWriteMethod, - GrpcRequestSettings.DefaultInstance + public async Task WriteAsync(Message message, CancellationToken cancellationToken) + { + TaskCompletionSource tcs = new(); + cancellationToken.Register( + () => tcs.TrySetCanceled(cancellationToken), + useSynchronizationContext: false ); - var initRequest = new StreamWriteMessage.Types.InitRequest { Path = _config.TopicPath }; - if (_config.ProducerId != null) + var data = _serializer.Serialize(message.Data); + var messageData = new MessageData { - initRequest.ProducerId = _config.ProducerId; - } + Data = ByteString.CopyFrom(data), + CreatedAt = Timestamp.FromDateTime(message.Timestamp.ToUniversalTime()), + UncompressedSize = data.Length + }; - if (_config.MessageGroupId != null) + foreach (var metadata in message.Metadata) { - initRequest.MessageGroupId = _config.MessageGroupId; + messageData.MetadataItems.Add(new MetadataItem + { Key = metadata.Key, Value = ByteString.CopyFrom(metadata.Value) }); } - _logger.LogDebug("Sending initialization request for the write stream: {InitRequest}", initRequest); - - await stream.Write(new MessageFromClient { InitRequest = initRequest }); - if (!await stream.MoveNextAsync()) + while (true) { - throw new YdbWriterException( - $"Stream unexpectedly closed by YDB server. Current InitRequest: {initRequest}"); - } + var curLimitBufferSize = Volatile.Read(ref _limitBufferMaxSize); - var receivedInitMessage = stream.Current; + if ( // sending one biggest message anyway + (curLimitBufferSize == _config.BufferMaxSize && data.Length > curLimitBufferSize) + || curLimitBufferSize >= data.Length) + { + if (Interlocked.CompareExchange(ref _limitBufferMaxSize, + curLimitBufferSize - data.Length, curLimitBufferSize) == curLimitBufferSize) + { + _toSendBuffer.Enqueue(new MessageSending(messageData, tcs)); + WakeUpWorker(); - Status.FromProto(receivedInitMessage.Status, receivedInitMessage.Issues).EnsureSuccess(); + break; + } - var initResponse = receivedInitMessage.InitResponse; + // Next try on race condition + continue; + } - _logger.LogDebug("Received a response for the initialization request on the write stream: {InitResponse}", - initResponse); + _logger.LogWarning( + "Buffer overflow: the data size [{DataLength}] exceeds the current buffer limit ({CurLimitBufferSize}) [BufferMaxSize = {BufferMaxSize}]", + data.Length, curLimitBufferSize, _config.BufferMaxSize); - if (!initResponse.SupportedCodecs.Codecs.Contains((int)_config.Codec)) - { - throw new YdbWriterException($"Topic[{_config.TopicPath}] is not supported codec: {_config.Codec}"); + throw new WriterException("Buffer overflow"); } - _session = new WriterSession(_config, stream, initResponse, Initialize, _logger); - - await _writeSemaphoreSlim.WaitAsync(); try { - _logger.LogDebug("Retrying to send pending in-flight messages after stream restart"); + var writeResult = await tcs.Task; - await _session.Write(_inFlightMessages, _inFlightMessages); + return writeResult; } finally { - _writeSemaphoreSlim.Release(); + Interlocked.Add(ref _limitBufferMaxSize, data.Length); } - - _ = _session.RunProcessingWriteAck(_inFlightMessages); } - public Task WriteAsync(TValue data) + private async void StartWriteWorker() { - return WriteAsync(new Message(data)); + await Initialize(); + + while (!_disposeTokenSource.Token.IsCancellationRequested) + { + await _tcsWakeUp.Task; + _tcsWakeUp = new TaskCompletionSource(); + + await _session.Write(_toSendBuffer); + } } - public async Task WriteAsync(Message message) + private void WakeUpWorker() { - TaskCompletionSource completeTask = new(); + _tcsWakeUp.TrySetResult(); + } - var data = _serializer.Serialize(message.Data); - var messageData = new MessageData + private async Task Initialize() + { + try { - Data = ByteString.CopyFrom(data), - CreatedAt = Timestamp.FromDateTime(message.Timestamp), - UncompressedSize = data.Length - }; + if (_disposeTokenSource.IsCancellationRequested) + { + return; + } - foreach (var metadata in message.Metadata) - { - messageData.MetadataItems.Add( - new MetadataItem { Key = metadata.Key, Value = ByteString.CopyFrom(metadata.Value) } + _logger.LogInformation("Writer session initialization started. WriterConfig: {WriterConfig}", _config); + + var stream = _driver.BidirectionalStreamCall( + TopicService.StreamWriteMethod, + GrpcRequestSettings.DefaultInstance ); - } - _toSendBuffer.Enqueue(new MessageSending(messageData, completeTask)); + var initRequest = new StreamWriteMessage.Types.InitRequest { Path = _config.TopicPath }; + if (_config.ProducerId != null) + { + initRequest.ProducerId = _config.ProducerId; + } + + if (_config.PartitionId != null) + { + initRequest.PartitionId = _config.PartitionId.Value; + } + + _logger.LogDebug("Sending initialization request for the write stream: {InitRequest}", initRequest); + + await stream.Write(new MessageFromClient { InitRequest = initRequest }); + if (!await stream.MoveNextAsync()) + { + _session = new NotStartedWriterSession( + $"Stream unexpectedly closed by YDB server. Current InitRequest: {initRequest}"); + + _ = Task.Run(Initialize, _disposeTokenSource.Token); + + return; + } + + var receivedInitMessage = stream.Current; + + var status = Status.FromProto(receivedInitMessage.Status, receivedInitMessage.Issues); + + if (status.IsNotSuccess) + { + _session = new NotStartedWriterSession("Initialization failed", status); + + if (status.StatusCode != StatusCode.SchemeError) + { + _ = Task.Run(Initialize, _disposeTokenSource.Token); + } + + _logger.LogCritical("Writer initialization failed to start. Reason: {Status}", status); + + return; + } + + var initResponse = receivedInitMessage.InitResponse; - if (_toSendBuffer.IsEmpty) // concurrent sending + _logger.LogDebug("Received a response for the initialization request on the writer stream: {InitResponse}", + initResponse); + + if (initResponse.SupportedCodecs != null && + !initResponse.SupportedCodecs.Codecs.Contains((int)_config.Codec)) + { + _logger.LogCritical( + "Writer initialization failed to start. Reason: topic[Path=\"{TopicPath}\"] is not supported codec {Codec}", + _config.TopicPath, _config.Codec); + + _session = new NotStartedWriterSession( + $"Topic[Path=\"{_config.TopicPath}\"] is not supported codec: {_config.Codec}"); + return; + } + + var newSession = new WriterSession( + _config, + stream, + initResponse, + Initialize, + e => { _session = new NotStartedWriterSession(e); }, + _logger, + _inFlightMessages + ); + + if (!_inFlightMessages.IsEmpty) + { + var copyInFlightMessages = new ConcurrentQueue(); + while (_inFlightMessages.TryDequeue(out var sendData)) + { + if (sendData.Tcs.Task.IsCanceled) + { + _logger.LogWarning("Message[SeqNo={SeqNo}] is cancelled", sendData.MessageData.SeqNo); + + continue; + } + + copyInFlightMessages.Enqueue(sendData); + } + + await newSession.Write(copyInFlightMessages); // retry prev in flight messages + } + + _session = newSession; + newSession.RunProcessingWriteAck(); + } + catch (Driver.TransportException e) { - return await completeTask.Task; + _logger.LogError(e, "Unable to connect the session"); + + _session = new NotStartedWriterSession( + new WriterException("Transport error on creating WriterSession", e)); + + _ = Task.Run(Initialize, _disposeTokenSource.Token); } + } - await _writeSemaphoreSlim.WaitAsync(); + public void Dispose() + { try { - await _session.Write(_toSendBuffer, _inFlightMessages); + _disposeTokenSource.Cancel(); + + _session.Dispose(); } finally { - _writeSemaphoreSlim.Release(); + _disposeTokenSource.Dispose(); } + } +} + +internal record MessageSending(MessageData MessageData, TaskCompletionSource Tcs); + +internal interface IWriteSession : IDisposable +{ + Task Write(ConcurrentQueue toSendBuffer); +} + +internal class NotStartedWriterSession : IWriteSession +{ + private readonly WriterException _reasonException; - return await completeTask.Task; + public NotStartedWriterSession(string reasonExceptionMessage) + { + _reasonException = new WriterException(reasonExceptionMessage); + } + + public NotStartedWriterSession(string reasonExceptionMessage, Status status) + { + _reasonException = new WriterException(reasonExceptionMessage, status); + } + + public NotStartedWriterSession(WriterException reasonException) + { + _reasonException = reasonException; + } + + public Task Write(ConcurrentQueue toSendBuffer) + { + while (toSendBuffer.TryDequeue(out var messageSending)) + { + messageSending.Tcs.TrySetException(_reasonException); + } + + return Task.CompletedTask; + } + + public void Dispose() + { } } -// No thread safe -internal class WriterSession : TopicSession +internal class WriterSession : TopicSession, IWriteSession { private readonly WriterConfig _config; + private readonly ConcurrentQueue _inFlightMessages; private long _seqNum; @@ -155,13 +312,59 @@ public WriterSession( WriterStream stream, InitResponse initResponse, Func initialize, - ILogger logger) : base(stream, logger, initResponse.SessionId, initialize) + Action resetSessionOnTransportError, + ILogger logger, + ConcurrentQueue inFlightMessages + ) : base( + stream, + logger, + initResponse.SessionId, + initialize, + resetSessionOnTransportError + ) { _config = config; + _inFlightMessages = inFlightMessages; Volatile.Write(ref _seqNum, initResponse.LastSeqNo); // happens-before for Volatile.Read } - internal async Task RunProcessingWriteAck(ConcurrentQueue inFlightMessages) + public async Task Write(ConcurrentQueue toSendBuffer) + { + try + { + var writeMessage = new StreamWriteMessage.Types.WriteRequest + { + Codec = (int)_config.Codec + }; + + var currentSeqNum = Volatile.Read(ref _seqNum); + + while (toSendBuffer.TryDequeue(out var sendData)) + { + var messageData = sendData.MessageData; + + if (messageData.SeqNo == default) + { + messageData.SeqNo = ++currentSeqNum; + } + + writeMessage.Messages.Add(messageData); + _inFlightMessages.Enqueue(sendData); + } + + Volatile.Write(ref _seqNum, currentSeqNum); + await Stream.Write(new MessageFromClient { WriteRequest = writeMessage }); + } + catch (Driver.TransportException e) + { + Logger.LogError(e, "WriterSession[{SessionId}] have error on Write, last SeqNo={SeqNo}", + SessionId, Volatile.Read(ref _seqNum)); + + ReconnectSession(new WriterException("Transport error in the WriterSession on write messages", e)); + } + } + + internal async void RunProcessingWriteAck() { try { @@ -182,7 +385,7 @@ internal async Task RunProcessingWriteAck(ConcurrentQueue inFlig foreach (var ack in messageFromServer.WriteResponse.Acks) { - if (!inFlightMessages.TryPeek(out var messageFromClient)) + if (!_inFlightMessages.TryPeek(out var messageFromClient)) { Logger.LogCritical("No client message was found upon receipt of an acknowledgement: {WriteAck}", ack); @@ -209,60 +412,29 @@ Completing task on exception... Client SeqNo: {SeqNo}, WriteAck: {WriteAck}", messageFromClient.MessageData.SeqNo, ack); - messageFromClient.TaskCompletionSource.SetException(new YdbWriterException( + messageFromClient.Tcs.TrySetException(new WriterException( $"Client SeqNo[{messageFromClient.MessageData.SeqNo}] is less then server's WriteAck[{ack}]")); } else { - messageFromClient.TaskCompletionSource.SetResult(new WriteResult(ack)); + messageFromClient.Tcs.TrySetResult(new WriteResult(ack)); } - inFlightMessages.TryDequeue(out _); // Dequeue + _inFlightMessages.TryDequeue(out _); // Dequeue } } } - catch (Exception e) + catch (Driver.TransportException e) { Logger.LogError(e, "WriterSession[{SessionId}] have error on processing writeAck", SessionId); - } - finally - { - ReconnectSession(); - } - } - internal async Task Write(ConcurrentQueue toSendBuffer, - ConcurrentQueue inFlightMessages) - { - try - { - var writeMessage = new StreamWriteMessage.Types.WriteRequest - { - Codec = (int)_config.Codec - }; - - var currentSeqNum = Volatile.Read(ref _seqNum); + ReconnectSession(new WriterException("Transport error in the WriterSession on processing writeAck", e)); - while (toSendBuffer.TryDequeue(out var sendData)) - { - var messageData = sendData.MessageData; - - messageData.SeqNo = ++currentSeqNum; - writeMessage.Messages.Add(messageData); - inFlightMessages.Enqueue(sendData); - } - - Volatile.Write(ref _seqNum, currentSeqNum); - await Stream.Write(new MessageFromClient { WriteRequest = writeMessage }); + return; } - catch (TransactionException e) - { - Logger.LogError(e, "WriterSession[{SessionId}] have error on Write, last SeqNo={SeqNo}", - SessionId, Volatile.Read(ref _seqNum)); - ReconnectSession(); - } + Logger.LogWarning("WriterSession[{SessionId}]: stream is closed", SessionId); + + ReconnectSession(new WriterException("WriterStream is closed")); } } - -internal record MessageSending(MessageData MessageData, TaskCompletionSource TaskCompletionSource); diff --git a/src/Ydb.Sdk/src/Services/Topic/Writer/WriterBuilder.cs b/src/Ydb.Sdk/src/Services/Topic/Writer/WriterBuilder.cs index 2f565aad..ea4a3907 100644 --- a/src/Ydb.Sdk/src/Services/Topic/Writer/WriterBuilder.cs +++ b/src/Ydb.Sdk/src/Services/Topic/Writer/WriterBuilder.cs @@ -2,31 +2,63 @@ namespace Ydb.Sdk.Services.Topic.Writer; public class WriterBuilder { - private readonly WriterConfig _config; - private readonly Driver _driver; + private readonly IDriver _driver; - public WriterBuilder(Driver driver, WriterConfig config) + public WriterBuilder(IDriver driver, string topicPath) { _driver = driver; - _config = config; + TopicPath = topicPath; } + /// + /// Full path of topic to write to. + /// + public string TopicPath { get; } + + /// + /// Producer identifier of client data stream. + /// Used for message deduplication by sequence numbers. + /// + public string? ProducerId { get; set; } + + /// + /// Codec that is used for data compression. + /// See enum Codec above for values. + /// + public Codec Codec { get; set; } = Codec.Raw; // TODO Supported only Raw + + /// + /// Maximum size (in bytes) of all messages batched in one Message Set, excluding protocol framing overhead. + /// This limit is applied after the first message has been added to the batch, + /// regardless of the first message's size, this is to ensure that messages that exceed buffer size are produced. + /// + public int BufferMaxSize { get; set; } = 1024 * 1024; // 1 Mb + + /// + /// Explicit partition id to write to. + /// + public long? PartitionId { get; set; } + public ISerializer? Serializer { get; set; } - public async Task> Build() + public IWriter Build() { - var writer = new Writer( + var config = new WriterConfig( + topicPath: TopicPath, + producerId: ProducerId, + codec: Codec, + bufferMaxSize: BufferMaxSize, + partitionId: PartitionId + ); + + return new Writer( _driver, - _config, + config, Serializer ?? (ISerializer)( Serializers.DefaultSerializers.TryGetValue(typeof(TValue), out var serializer) ? serializer - : throw new YdbWriterException("The serializer is not set") + : throw new WriterException("The serializer is not set") ) ); - - await writer.Initialize(); - - return writer; } } diff --git a/src/Ydb.Sdk/src/Services/Topic/Writer/WriterConfig.cs b/src/Ydb.Sdk/src/Services/Topic/Writer/WriterConfig.cs index bf5dba66..1bedc5e2 100644 --- a/src/Ydb.Sdk/src/Services/Topic/Writer/WriterConfig.cs +++ b/src/Ydb.Sdk/src/Services/Topic/Writer/WriterConfig.cs @@ -2,35 +2,32 @@ namespace Ydb.Sdk.Services.Topic.Writer; -public class WriterConfig +internal class WriterConfig { - /// Full path of topic to write to. - public WriterConfig(string topicPath) + internal WriterConfig( + string topicPath, + string? producerId, + Codec codec, + int bufferMaxSize, + long? partitionId + ) { TopicPath = topicPath; + ProducerId = producerId; + Codec = codec; + BufferMaxSize = bufferMaxSize; + PartitionId = partitionId; } - /// - /// Full path of topic to write to. - /// public string TopicPath { get; } - /// - /// Producer identifier of client data stream. - /// Used for message deduplication by sequence numbers. - /// - public string? ProducerId { get; set; } + public string? ProducerId { get; } - /// - /// All messages with given pair (producer_id, message_group_id) go to single partition in order of writes. - /// - public string? MessageGroupId { get; set; } + public Codec Codec { get; } - /// - /// Codec that is used for data compression. - /// See enum Codec above for values. - /// - public Codec Codec { get; set; } = Codec.Raw; // TODO Supported only Raw + public int BufferMaxSize { get; } + + public long? PartitionId { get; } public override string ToString() { @@ -41,11 +38,6 @@ public override string ToString() toString.Append(", ProducerId: ").Append(ProducerId); } - if (MessageGroupId != null) - { - toString.Append(", MessageGroupId: ").Append(MessageGroupId); - } - return toString.Append(", Codec: ").Append(Codec).Append(']').ToString(); } } diff --git a/src/Ydb.Sdk/src/Status.cs b/src/Ydb.Sdk/src/Status.cs index f362a2e5..6cd45cb1 100644 --- a/src/Ydb.Sdk/src/Status.cs +++ b/src/Ydb.Sdk/src/Status.cs @@ -248,6 +248,7 @@ internal static Status ConvertStatus(this Grpc.Core.Status rpcStatus) Grpc.Core.StatusCode.DeadlineExceeded => StatusCode.ClientTransportTimeout, Grpc.Core.StatusCode.ResourceExhausted => StatusCode.ClientTransportResourceExhausted, Grpc.Core.StatusCode.Unimplemented => StatusCode.ClientTransportUnimplemented, + Grpc.Core.StatusCode.Cancelled => StatusCode.Cancelled, _ => StatusCode.ClientTransportUnknown }, new List { new(rpcStatus.Detail) } diff --git a/src/Ydb.Sdk/tests/Fixture/DriverFixture.cs b/src/Ydb.Sdk/tests/Fixture/DriverFixture.cs index e98b628f..450de26d 100644 --- a/src/Ydb.Sdk/tests/Fixture/DriverFixture.cs +++ b/src/Ydb.Sdk/tests/Fixture/DriverFixture.cs @@ -2,13 +2,13 @@ namespace Ydb.Sdk.Tests.Fixture; -public abstract class DriverFixture : IAsyncLifetime +public class DriverFixture : IAsyncLifetime { - protected readonly Driver Driver; + public Driver Driver { get; } - protected DriverFixture(DriverConfig? driverConfig = null) + public DriverFixture() { - driverConfig ??= new DriverConfig( + var driverConfig = new DriverConfig( endpoint: "grpc://localhost:2136", database: "/local" ); @@ -16,7 +16,9 @@ protected DriverFixture(DriverConfig? driverConfig = null) Driver = new Driver(driverConfig, Utils.GetLoggerFactory()); } - protected abstract void ClientDispose(); + protected virtual void ClientDispose() + { + } public Task InitializeAsync() { diff --git a/src/Ydb.Sdk/tests/Topic/SerializerDeserializerUnitTests.cs b/src/Ydb.Sdk/tests/Topic/SerializerDeserializerUnitTests.cs new file mode 100644 index 00000000..11d8517b --- /dev/null +++ b/src/Ydb.Sdk/tests/Topic/SerializerDeserializerUnitTests.cs @@ -0,0 +1,27 @@ +using Xunit; +using Ydb.Sdk.Services.Topic; + +namespace Ydb.Sdk.Tests.Topic; + +public class SerializerDeserializerUnitTests +{ + [Fact] + public void SerializeDeserialize_WhenSerializer64Deserializer32_ReturnInt32() + { + Assert.Equal(32, Deserializers.Int32.Deserialize(Serializers.Int32.Serialize(32))); + } + + [Fact] + public void SerializeDeserialize_WhenSerializer64Deserializer64_ReturnInt64() + { + Assert.Equal(32 * 1_000_000_000L, + Deserializers.Int64.Deserialize(Serializers.Int64.Serialize(32 * 1_000_000_000L))); + } + + [Fact] + public void SerializeDeserialize_WhenSerializerUtf8DeserializerUtf8_ReturnString() + { + Assert.Equal("abacaba", + Deserializers.Utf8.Deserialize(Serializers.Utf8.Serialize("abacaba"))); + } +} diff --git a/src/Ydb.Sdk/tests/Topic/WriterIntegrationTests.cs b/src/Ydb.Sdk/tests/Topic/WriterIntegrationTests.cs new file mode 100644 index 00000000..b0219d2d --- /dev/null +++ b/src/Ydb.Sdk/tests/Topic/WriterIntegrationTests.cs @@ -0,0 +1,128 @@ +using Google.Protobuf.WellKnownTypes; +using Xunit; +using Ydb.Sdk.Services.Topic; +using Ydb.Sdk.Services.Topic.Writer; +using Ydb.Sdk.Tests.Fixture; +using Ydb.Topic; +using Ydb.Topic.V1; +using Consumer = Ydb.Sdk.Services.Topic.Consumer; + +namespace Ydb.Sdk.Tests.Topic; + +public class WriterIntegrationTests : IClassFixture +{ + private readonly IDriver _driver; + private readonly string _topicName; + + public WriterIntegrationTests(DriverFixture driverFixture) + { + _driver = driverFixture.Driver; + _topicName = "topic_" + Utils.Net; + } + + [Fact] + public async Task WriteAsync_WhenOneMessage_ReturnWritten() + { + var topicClient = new TopicClient(_driver); + var topicSettings = new CreateTopicSettings + { + Path = _topicName + }; + await topicClient.CreateTopic(topicSettings); + + using var writer = new WriterBuilder(_driver, _topicName) { ProducerId = "producerId" }.Build(); + + var result = await writer.WriteAsync("abacaba"); + + Assert.Equal(PersistenceStatus.Written, result.Status); + + await topicClient.DropTopic(new DropTopicSettings { Path = _topicName }); + } + + [Fact] + public async Task WriteAsync_WhenTopicNotFound_ReturnNotFoundException() + { + using var writer = new WriterBuilder(_driver, _topicName + "_not_found") + { ProducerId = "producerId" }.Build(); + + Assert.Equal(StatusCode.SchemeError, (await Assert.ThrowsAsync( + () => writer.WriteAsync("hello world"))).Status.StatusCode); + } + + [Fact] + public async Task WriteAsync_When1000Messages_ReturnWriteResultIsPersisted() + { + const int messageCount = 1000; + var topicName = _topicName + "_stress"; + var topicClient = new TopicClient(_driver); + var topicSettings = new CreateTopicSettings { Path = topicName }; + topicSettings.Consumers.Add(new Consumer("Consumer")); + await topicClient.CreateTopic(topicSettings); + + using var writer = new WriterBuilder(_driver, topicName) { ProducerId = "producerId" }.Build(); + + var tasks = new List(); + for (var i = 0; i < messageCount; i++) + { + var i1 = i; + tasks.Add(Task.Run(async () => + { + // ReSharper disable once AccessToDisposedClosure + var message = await writer.WriteAsync(i1); + + Assert.Equal(PersistenceStatus.Written, message.Status); + })); + } + + await Task.WhenAll(tasks); + + var initStream = _driver.BidirectionalStreamCall(TopicService.StreamReadMethod, new GrpcRequestSettings()); + await initStream.Write(new StreamReadMessage.Types.FromClient + { + InitRequest = new StreamReadMessage.Types.InitRequest + { + Consumer = "Consumer", ReaderName = "reader-test", TopicsReadSettings = + { + new StreamReadMessage.Types.InitRequest.Types.TopicReadSettings + { ReadFrom = new Timestamp(), Path = topicName } + } + } + }); + + var ans = 0; + + await initStream.MoveNextAsync(); + await initStream.Write(new StreamReadMessage.Types.FromClient + { + ReadRequest = new StreamReadMessage.Types.ReadRequest + { + BytesSize = 2000 * messageCount * sizeof(int) + } + }); + await initStream.MoveNextAsync(); + var startRequest = initStream.Current.StartPartitionSessionRequest; + await initStream.Write(new StreamReadMessage.Types.FromClient + { + StartPartitionSessionResponse = new StreamReadMessage.Types.StartPartitionSessionResponse + { + CommitOffset = startRequest.CommittedOffset, + PartitionSessionId = startRequest.PartitionSession.PartitionSessionId + } + }); + var receivedMessageCount = 0; + while (receivedMessageCount < messageCount) + { + await initStream.MoveNextAsync(); + ans += initStream.Current.ReadResponse.PartitionData.Sum(data => data.Batches.Sum(batch => + batch.MessageData.Sum(message => + { + receivedMessageCount++; + return Deserializers.Int32.Deserialize(message.Data.ToByteArray()); + }))); + } + + Assert.Equal(messageCount * (messageCount - 1) / 2, ans); + + await topicClient.DropTopic(new DropTopicSettings { Path = topicName }); + } +} diff --git a/src/Ydb.Sdk/tests/Topic/WriterUnitTests.cs b/src/Ydb.Sdk/tests/Topic/WriterUnitTests.cs new file mode 100644 index 00000000..607ea9be --- /dev/null +++ b/src/Ydb.Sdk/tests/Topic/WriterUnitTests.cs @@ -0,0 +1,670 @@ +using Grpc.Core; +using Moq; +using Xunit; +using Xunit.Abstractions; +using Ydb.Issue; +using Ydb.Sdk.Services.Topic; +using Ydb.Sdk.Services.Topic.Writer; +using Ydb.Topic; +using Codec = Ydb.Sdk.Services.Topic.Codec; + +namespace Ydb.Sdk.Tests.Topic; + +using WriterStream = IBidirectionalStream; +using FromClient = StreamWriteMessage.Types.FromClient; + +public class WriterUnitTests +{ + private readonly ITestOutputHelper _testOutputHelper; + private readonly Mock _mockIDriver = new(); + private readonly Mock _mockStream = new(); + + public WriterUnitTests(ITestOutputHelper testOutputHelper) + { + _testOutputHelper = testOutputHelper; + _mockIDriver.Setup(driver => driver.BidirectionalStreamCall( + It.IsAny>(), + It.IsAny()) + ).Returns(_mockStream.Object); + + _mockIDriver.Setup(driver => driver.LoggerFactory).Returns(Utils.GetLoggerFactory); + } + + [Fact] + public async Task Initialize_WhenStreamIsClosedByServer_ThrowWriterExceptionOnWriteAsyncAndTryNextInitialize() + { + var moveNextTry = new TaskCompletionSource(); + var taskNextComplete = new TaskCompletionSource(); + + _mockStream.Setup(stream => stream.Write(It.IsAny())) + .Returns(Task.CompletedTask); + _mockStream.SetupSequence(stream => stream.MoveNextAsync()) + .ReturnsAsync(false) + .Returns(() => + { + taskNextComplete.SetResult(); + return new ValueTask(moveNextTry.Task); + }); + + using var writer = new WriterBuilder(_mockIDriver.Object, "/topic") + { ProducerId = "producerId" }.Build(); + + Assert.Equal("Stream unexpectedly closed by YDB server. " + + "Current InitRequest: { \"path\": \"/topic\", \"producerId\": \"producerId\" }", + (await Assert.ThrowsAsync(() => writer.WriteAsync(100))).Message); + + await taskNextComplete.Task; + // check attempt repeated!!! + _mockStream.Verify(stream => stream.Write(It.IsAny()), Times.Exactly(2)); + _mockStream.Verify(stream => stream.MoveNextAsync(), Times.Exactly(2)); + } + + [Fact] + public async Task Initialize_WhenFailWriteMessage_ThrowWriterExceptionOnWriteAsyncAndTryNextInitialize() + { + var taskSource = new TaskCompletionSource(); + var taskNextComplete = new TaskCompletionSource(); + _mockStream.SetupSequence(stream => stream.Write(It.IsAny())) + .ThrowsAsync(new Driver.TransportException(new RpcException(Grpc.Core.Status.DefaultCancelled))) + .Returns(() => + { + taskNextComplete.SetResult(); + return taskSource.Task; + }); + + using var writer = new WriterBuilder(_mockIDriver.Object, "/topic") + { ProducerId = "producerId" }.Build(); + + var writerException = await Assert.ThrowsAsync(() => writer.WriteAsync("abacaba")); + Assert.Equal("Transport error on creating WriterSession", writerException.Message); + Assert.Equal(StatusCode.Cancelled, writerException.Status.StatusCode); + + await taskNextComplete.Task; + // check attempt repeated!!! + _mockStream.Verify(stream => stream.Write(It.IsAny()), Times.Exactly(2)); + } + + [Fact] + public async Task Initialize_WhenFailMoveNextAsync_ThrowWriterExceptionOnWriteAsyncAndTryNextInitialize() + { + var taskSource = new TaskCompletionSource(); + var taskNextComplete = new TaskCompletionSource(); + _mockStream.Setup(stream => stream.Write(It.IsAny())) + .Returns(Task.CompletedTask); + _mockStream.SetupSequence(stream => stream.MoveNextAsync()) + .ThrowsAsync(new Driver.TransportException( + new RpcException(new Grpc.Core.Status(Grpc.Core.StatusCode.DeadlineExceeded, "Some message")))) + .Returns(() => + { + taskNextComplete.SetResult(); + return new ValueTask(taskSource.Task); + }); + + using var writer = new WriterBuilder(_mockIDriver.Object, "/topic") + { ProducerId = "producerId" }.Build(); + + var writerException = await Assert.ThrowsAsync(() => writer.WriteAsync("abacaba")); + Assert.Equal("Transport error on creating WriterSession", writerException.Message); + Assert.Equal(StatusCode.ClientTransportTimeout, writerException.Status.StatusCode); + + await taskNextComplete.Task; + // check attempt repeated!!! + _mockStream.Verify(stream => stream.Write(It.IsAny()), Times.Exactly(2)); + _mockStream.Verify(stream => stream.MoveNextAsync(), Times.Exactly(2)); + } + + [Fact] + public async Task Initialize_WhenInitResponseNotSuccess_ThrowWriterExceptionOnWriteAsyncAndTryNextInitialize() + { + var taskSource = new TaskCompletionSource(); + var taskNextComplete = new TaskCompletionSource(); + _mockStream.Setup(stream => stream.Write(It.IsAny())) + .Returns(Task.CompletedTask); + _mockStream.SetupSequence(stream => stream.MoveNextAsync()) + .Returns(new ValueTask(true)) + .Returns(() => + { + taskNextComplete.SetResult(); + return new ValueTask(taskSource.Task); + }); + _mockStream.Setup(stream => stream.Current) + .Returns(new StreamWriteMessage.Types.FromServer + { + Status = StatusIds.Types.StatusCode.BadSession, + Issues = { new IssueMessage { Message = "Some message" } } + }); + + using var writer = new WriterBuilder(_mockIDriver.Object, "/topic") + { ProducerId = "producerId" }.Build(); + + Assert.Equal("Initialization failed: Status: BadSession, Issues:\n[0] Fatal: Some message\n", + (await Assert.ThrowsAsync(() => writer.WriteAsync(123L))).Message); + + await taskNextComplete.Task; + // check attempt repeated!!! + _mockStream.Verify(stream => stream.Write(It.IsAny()), Times.Exactly(2)); + _mockStream.Verify(stream => stream.MoveNextAsync(), Times.Exactly(2)); + } + + [Fact] + public async Task Initialize_WhenInitResponseIsSchemaError_ThrowWriterExceptionOnWriteAsyncAndStopInitializing() + { + _mockStream.Setup(stream => stream.Write(It.IsAny())) + .Returns(Task.CompletedTask); + _mockStream.Setup(stream => stream.MoveNextAsync()) + .Returns(new ValueTask(true)); + _mockStream.Setup(stream => stream.Current) + .Returns(new StreamWriteMessage.Types.FromServer + { + Status = StatusIds.Types.StatusCode.SchemeError, + Issues = { new IssueMessage { Message = "Topic not found" } } + }); + + using var writer = new WriterBuilder(_mockIDriver.Object, "/topic") + { ProducerId = "producerId" }.Build(); + + Assert.Equal("Initialization failed: Status: SchemeError, Issues:\n[0] Fatal: Topic not found\n", + (await Assert.ThrowsAsync(() => writer.WriteAsync(123L))).Message); + + // check not attempt repeated!!! + _mockStream.Verify(stream => stream.Write(It.IsAny()), Times.Once); + _mockStream.Verify(stream => stream.MoveNextAsync(), Times.Once); + } + + [Fact] + public async Task Initialize_WhenNotSupportedCodec_ThrowWriterExceptionOnWriteAsyncAndStopInitializing() + { + _mockStream.Setup(stream => stream.Write(It.IsAny())) + .Returns(Task.CompletedTask); + _mockStream.Setup(stream => stream.MoveNextAsync()) + .Returns(new ValueTask(true)); + _mockStream.Setup(stream => stream.Current) + .Returns(new StreamWriteMessage.Types.FromServer + { + InitResponse = new StreamWriteMessage.Types.InitResponse + { + LastSeqNo = 1, PartitionId = 1, SessionId = "SessionId", + SupportedCodecs = new SupportedCodecs { Codecs = { 2 /* Gzip */, 3 /* Lzop */ } } + }, + Status = StatusIds.Types.StatusCode.Success + }); + + using var writer = new WriterBuilder(_mockIDriver.Object, "/topic") + { ProducerId = "producerId", Codec = Codec.Raw }.Build(); + + Assert.Equal("Topic[Path=\"/topic\"] is not supported codec: Raw", + (await Assert.ThrowsAsync(() => writer.WriteAsync(123L))).Message); + + // check not attempt repeated!!! + _mockStream.Verify(stream => stream.Write(It.IsAny()), Times.Once); + _mockStream.Verify(stream => stream.MoveNextAsync(), Times.Once); + } + + [Fact] + public async Task WriteAsyncStress_WhenBufferIsOverflow_ThrowWriterExceptionOnBufferOverflow() + { + const int countBatchSendingSize = 1000; + const int batchTasksSize = 100; + const int bufferSize = 100; + const int messageSize = sizeof(int); + + Assert.True(batchTasksSize > bufferSize / 4); + Assert.True(bufferSize % 4 == 0); + + var taskSource = new TaskCompletionSource(); + _mockStream.Setup(stream => stream.Write(It.IsAny())) + .Returns(Task.CompletedTask); + var mockNextAsync = _mockStream.SetupSequence(stream => stream.MoveNextAsync()) + .Returns(new ValueTask(true)) + .Returns(new ValueTask(taskSource.Task)); + var sequentialResult = _mockStream.SetupSequence(stream => stream.Current) + .Returns(new StreamWriteMessage.Types.FromServer + { + InitResponse = new StreamWriteMessage.Types.InitResponse + { LastSeqNo = 0, PartitionId = 1, SessionId = "SessionId" }, + Status = StatusIds.Types.StatusCode.Success + }); + using var writer = new WriterBuilder(_mockIDriver.Object, "/topic") + { ProducerId = "producerId", BufferMaxSize = bufferSize /* bytes */ }.Build(); + + for (var attempt = 0; attempt < countBatchSendingSize; attempt++) + { + _testOutputHelper.WriteLine($"Processing attempt {attempt}"); + + var tasks = new List>(); + var serverAck = new StreamWriteMessage.Types.FromServer + { + WriteResponse = new StreamWriteMessage.Types.WriteResponse { PartitionId = 1 }, + Status = StatusIds.Types.StatusCode.Success + }; + for (var i = 0; i < batchTasksSize; i++) + { + tasks.Add(writer.WriteAsync(100)); + serverAck.WriteResponse.Acks.Add(new StreamWriteMessage.Types.WriteResponse.Types.WriteAck + { + SeqNo = bufferSize / messageSize * attempt + i + 1, + Written = new StreamWriteMessage.Types.WriteResponse.Types.WriteAck.Types.Written + { Offset = i * messageSize + bufferSize * attempt } + }); + } + + sequentialResult.Returns(() => + { + // ReSharper disable once AccessToModifiedClosure + Volatile.Write(ref taskSource, new TaskCompletionSource()); + mockNextAsync.Returns(new ValueTask(Volatile.Read(ref taskSource).Task)); + return serverAck; + }); + taskSource.SetResult(true); + + var countSuccess = 0; + var countErrors = 0; + foreach (var task in tasks) + { + try + { + var res = await task; + countSuccess++; + Assert.Equal(PersistenceStatus.Written, res.Status); + } + catch (WriterException e) + { + countErrors++; + Assert.Equal("Buffer overflow", e.Message); + } + } + + Assert.Equal(bufferSize / messageSize, countSuccess); + Assert.Equal(batchTasksSize - bufferSize / messageSize, countErrors); + } + } + + /* + * Performed invocations: + + Mock:1> (stream): + + IBidirectionalStream.Write({ "initRequest": { "path": "/topic", "producerId": "producerId" } }) + IBidirectionalStream.MoveNextAsync() + IBidirectionalStream.Current + IBidirectionalStream.MoveNextAsync() + IBidirectionalStream.Write({ "writeRequest": { "messages": [ { "seqNo": "1", "createdAt": "2024-11-22T10:08:58.732882Z", "data": "AAAAAAAAAGQ=", "uncompressedSize": "8" } ], "codec": 1 } }) + IBidirectionalStream.Write({ "initRequest": { "path": "/topic", "producerId": "producerId" } }) + IBidirectionalStream.MoveNextAsync() + IBidirectionalStream.Current + IBidirectionalStream.MoveNextAsync() + IBidirectionalStream.Write({ "writeRequest": { "messages": [ { "seqNo": "1", "createdAt": "2024-11-22T10:08:58.732882Z", "data": "AAAAAAAAAGQ=", "uncompressedSize": "8" } ], "codec": 1 } }) + IBidirectionalStream.Current + IBidirectionalStream.MoveNextAsync() + */ + [Fact] + public async Task WriteAsync_WhenTransportExceptionOnWriteInWriterSessionThenReconnectSession_ReturnWriteResult() + { + var moveFirstNextSource = new TaskCompletionSource(); + var moveSecondNextSource = new TaskCompletionSource(); + var moveThirdNextSource = new TaskCompletionSource(); + var nextCompleted = new TaskCompletionSource(); + _mockStream.SetupSequence(stream => stream.Write(It.IsAny())) + .Returns(Task.CompletedTask) + .ThrowsAsync(new Driver.TransportException(new RpcException(Grpc.Core.Status.DefaultCancelled))) + .Returns(() => + { + moveFirstNextSource.SetResult(false); + return Task.CompletedTask; + }) + .Returns(() => + { + nextCompleted.SetResult(); + return Task.CompletedTask; + }); + _mockStream.SetupSequence(stream => stream.MoveNextAsync()) + .ReturnsAsync(true) + .Returns(new ValueTask(moveFirstNextSource.Task)) + .Returns(new ValueTask(moveSecondNextSource.Task)) + .Returns(new ValueTask(moveThirdNextSource.Task)) + .Returns(new ValueTask(new TaskCompletionSource().Task)); + _mockStream.SetupSequence(stream => stream.Current) + .Returns(new StreamWriteMessage.Types.FromServer + { + InitResponse = new StreamWriteMessage.Types.InitResponse + { LastSeqNo = 0, PartitionId = 1, SessionId = "SessionId" }, + Status = StatusIds.Types.StatusCode.Success + }) + .Returns(new StreamWriteMessage.Types.FromServer + { + InitResponse = new StreamWriteMessage.Types.InitResponse + { LastSeqNo = 0, PartitionId = 1, SessionId = "SessionId" }, + Status = StatusIds.Types.StatusCode.Success + }) + .Returns(new StreamWriteMessage.Types.FromServer + { + WriteResponse = new StreamWriteMessage.Types.WriteResponse + { + PartitionId = 1, + Acks = + { + new StreamWriteMessage.Types.WriteResponse.Types.WriteAck + { + SeqNo = 1, + Written = new StreamWriteMessage.Types.WriteResponse.Types.WriteAck.Types.Written + { Offset = 0 } + } + } + }, + Status = StatusIds.Types.StatusCode.Success + }); + using var writer = new WriterBuilder(_mockIDriver.Object, "/topic") + { ProducerId = "producerId" }.Build(); + + var runTask = writer.WriteAsync(100L); + + var writerExceptionAfterResetSession = await Assert.ThrowsAsync(() => writer.WriteAsync(100)); + Assert.Equal("Transport error in the WriterSession on write messages", + writerExceptionAfterResetSession.Message); + Assert.Equal(StatusCode.Cancelled, writerExceptionAfterResetSession.Status.StatusCode); + + moveSecondNextSource.SetResult(true); + await nextCompleted.Task; + moveThirdNextSource.SetResult(true); + + Assert.Equal(PersistenceStatus.Written, (await runTask).Status); + _mockStream.Verify(stream => stream.Write(It.IsAny()), Times.Exactly(4)); + _mockStream.Verify(stream => stream.MoveNextAsync(), Times.Exactly(5)); + _mockStream.Verify(stream => stream.Current, Times.Exactly(3)); + } + + /* + * Performed invocations: + + Mock:1> (stream): + + IBidirectionalStream.Write({ "initRequest": { "path": "/topic", "producerId": "producerId" } }) + IBidirectionalStream.MoveNextAsync() + IBidirectionalStream.Current + IBidirectionalStream.MoveNextAsync() + IBidirectionalStream.Write({ "initRequest": { "path": "/topic", "producerId": "producerId" } }) + IBidirectionalStream.MoveNextAsync() + */ + [Fact] + public async Task WriteAsync_WhenTransportExceptionOnProcessingWriteAckThenReconnectSession_ReturnWriterException() + { + var nextCompleted = new TaskCompletionSource(); + _mockStream.Setup(stream => stream.Write(It.IsAny())) + .Returns(Task.CompletedTask); + _mockStream.SetupSequence(stream => stream.MoveNextAsync()) + .ReturnsAsync(true) + .ThrowsAsync(new Driver.TransportException(new RpcException(Grpc.Core.Status.DefaultCancelled))) + .Returns(() => + { + nextCompleted.SetResult(); + return new ValueTask(new TaskCompletionSource().Task); + }); // retry init writer session + _mockStream.SetupSequence(stream => stream.Current) + .Returns(new StreamWriteMessage.Types.FromServer + { + InitResponse = new StreamWriteMessage.Types.InitResponse + { LastSeqNo = 0, PartitionId = 1, SessionId = "SessionId" }, + Status = StatusIds.Types.StatusCode.Success + }) + .Returns(new StreamWriteMessage.Types.FromServer + { + InitResponse = new StreamWriteMessage.Types.InitResponse + { LastSeqNo = 0, PartitionId = 1, SessionId = "SessionId" }, + Status = StatusIds.Types.StatusCode.Success + }); + using var writer = new WriterBuilder(_mockIDriver.Object, "/topic") + { ProducerId = "producerId" }.Build(); + + await nextCompleted.Task; + var writerExceptionAfterResetSession = await Assert.ThrowsAsync(() => writer.WriteAsync(100)); + Assert.Equal("Transport error in the WriterSession on processing writeAck", + writerExceptionAfterResetSession.Message); + Assert.Equal(StatusCode.Cancelled, writerExceptionAfterResetSession.Status.StatusCode); + + _mockStream.Verify(stream => stream.Write(It.IsAny()), Times.Exactly(2)); + _mockStream.Verify(stream => stream.MoveNextAsync(), Times.Exactly(3)); + _mockStream.Verify(stream => stream.Current, Times.Once); + } + + /* + * Performed invocations: + + Mock:1> (stream): + + IBidirectionalStream.Write({ "initRequest": { "path": "/topic", "producerId": "producerId" } }) + IBidirectionalStream.MoveNextAsync() + IBidirectionalStream.Current + IBidirectionalStream.MoveNextAsync() + IBidirectionalStream.Write({ "initRequest": { "path": "/topic", "producerId": "producerId" } }) + IBidirectionalStream.MoveNextAsync() + */ + [Fact] + public async Task WriteAsync_WhenStreamIsClosingOnProcessingWriteAckThenReconnectSession_ReturnWriterException() + { + var nextCompleted = new TaskCompletionSource(); + _mockStream.Setup(stream => stream.Write(It.IsAny())) + .Returns(Task.CompletedTask); + _mockStream.SetupSequence(stream => stream.MoveNextAsync()) + .ReturnsAsync(true) + .ReturnsAsync(false) + .Returns(() => + { + nextCompleted.SetResult(); + return new ValueTask(new TaskCompletionSource().Task); + }); // retry init writer session + _mockStream.SetupSequence(stream => stream.Current) + .Returns(new StreamWriteMessage.Types.FromServer + { + InitResponse = new StreamWriteMessage.Types.InitResponse + { LastSeqNo = 0, PartitionId = 1, SessionId = "SessionId" }, + Status = StatusIds.Types.StatusCode.Success + }) + .Returns(new StreamWriteMessage.Types.FromServer + { + InitResponse = new StreamWriteMessage.Types.InitResponse + { LastSeqNo = 0, PartitionId = 1, SessionId = "SessionId" }, + Status = StatusIds.Types.StatusCode.Success + }); + using var writer = new WriterBuilder(_mockIDriver.Object, "/topic") + { ProducerId = "producerId" }.Build(); + + await nextCompleted.Task; + var writerExceptionAfterResetSession = await Assert.ThrowsAsync(() => writer.WriteAsync(100)); + Assert.Equal("WriterStream is closed", writerExceptionAfterResetSession.Message); + Assert.Equal(StatusCode.Unspecified, writerExceptionAfterResetSession.Status.StatusCode); + + _mockStream.Verify(stream => stream.Write(It.IsAny()), Times.Exactly(2)); + _mockStream.Verify(stream => stream.MoveNextAsync(), Times.Exactly(3)); + _mockStream.Verify(stream => stream.Current, Times.Once); + } + + [Fact] + public async Task WriteAsync_WhenCancellationTokenIsClosed_ThrowCancellationException() + { + var cancellationTokenSource = new CancellationTokenSource(); + var nextCompleted = new TaskCompletionSource(); + _mockStream.Setup(stream => stream.Write(It.IsAny())) + .Returns(Task.CompletedTask); + _mockStream.SetupSequence(stream => stream.MoveNextAsync()) + .ReturnsAsync(true) + .Returns(new ValueTask(nextCompleted.Task)); + _mockStream.SetupSequence(stream => stream.Current) + .Returns(new StreamWriteMessage.Types.FromServer + { + InitResponse = new StreamWriteMessage.Types.InitResponse + { LastSeqNo = 0, PartitionId = 1, SessionId = "SessionId" }, + Status = StatusIds.Types.StatusCode.Success + }) + .Returns(new StreamWriteMessage.Types.FromServer + { + WriteResponse = new StreamWriteMessage.Types.WriteResponse + { + PartitionId = 1, + Acks = + { + new StreamWriteMessage.Types.WriteResponse.Types.WriteAck + { + SeqNo = 1, + Written = new StreamWriteMessage.Types.WriteResponse.Types.WriteAck.Types.Written + { Offset = 0 } + } + } + }, + Status = StatusIds.Types.StatusCode.Success + }); + using var writer = new WriterBuilder(_mockIDriver.Object, "/topic") + { ProducerId = "producerId" }.Build(); + + var task = writer.WriteAsync(123L, cancellationTokenSource.Token); + cancellationTokenSource.Cancel(); + nextCompleted.SetResult(true); + + await Assert.ThrowsAsync(() => task); + } + + [Fact] + public async Task WriteAsync_WhenTaskIsAcceptedBeforeCancel_ThrowCancellationException() + { + var cancellationTokenSource = new CancellationTokenSource(); + var nextCompleted = new TaskCompletionSource(); + _mockStream.Setup(stream => stream.Write(It.IsAny())) + .Returns(Task.CompletedTask); + _mockStream.SetupSequence(stream => stream.MoveNextAsync()) + .ReturnsAsync(true) + .Returns(new ValueTask(nextCompleted.Task)); + _mockStream.SetupSequence(stream => stream.Current) + .Returns(new StreamWriteMessage.Types.FromServer + { + InitResponse = new StreamWriteMessage.Types.InitResponse + { LastSeqNo = 0, PartitionId = 1, SessionId = "SessionId" }, + Status = StatusIds.Types.StatusCode.Success + }) + .Returns(new StreamWriteMessage.Types.FromServer + { + WriteResponse = new StreamWriteMessage.Types.WriteResponse + { + PartitionId = 1, + Acks = + { + new StreamWriteMessage.Types.WriteResponse.Types.WriteAck + { + SeqNo = 1, + Written = new StreamWriteMessage.Types.WriteResponse.Types.WriteAck.Types.Written + { Offset = 0 } + } + } + }, + Status = StatusIds.Types.StatusCode.Success + }); + using var writer = new WriterBuilder(_mockIDriver.Object, "/topic") + { ProducerId = "producerId" }.Build(); + + var task = writer.WriteAsync(123L, cancellationTokenSource.Token); + nextCompleted.SetResult(true); + Assert.Equal(PersistenceStatus.Written, (await task).Status); + cancellationTokenSource.Cancel(); + } + + /* + * Performed invocations: + + Mock:1> (stream): + + IBidirectionalStream.Write({ "initRequest": { "path": "/topic", "producerId": "producerId" } }) + IBidirectionalStream.MoveNextAsync() + IBidirectionalStream.Current + IBidirectionalStream.MoveNextAsync() + IBidirectionalStream.Write({ "writeRequest": { "messages": [ { "seqNo": "1", "createdAt": "2024-11-26T14:03:57.473289Z", "data": "AAAAAAAAAGQ=", "uncompressedSize": "8" } ], "codec": 1 } }) + IBidirectionalStream.Write({ "writeRequest": { "messages": [ { "seqNo": "2", "createdAt": "2024-11-26T14:03:57.475008Z", "data": "AAAAAAAAAGQ=", "uncompressedSize": "8" } ], "codec": 1 } }) + IBidirectionalStream.Write({ "initRequest": { "path": "/topic", "producerId": "producerId" } }) + IBidirectionalStream.MoveNextAsync() + IBidirectionalStream.Current + IBidirectionalStream.Write({ "writeRequest": { "messages": [ { "seqNo": "2", "createdAt": "2024-11-26T14:03:57.475008Z", "data": "AAAAAAAAAGQ=", "uncompressedSize": "8" } ], "codec": 1 } }) + IBidirectionalStream.MoveNextAsync() + IBidirectionalStream.Current + IBidirectionalStream.MoveNextAsync() + */ + [Fact] + public async Task WriteAsync_WhenCancelTaskInOnOfTwoMessagesInFlightBuffer_ReturnCancelExceptionAndWriteResult() + { + var moveFirstNextSource = new TaskCompletionSource(); + var moveSecondNextSource = new TaskCompletionSource(); + var moveThirdNextSource = new TaskCompletionSource(); + var nextCompleted = new TaskCompletionSource(); + _mockStream.SetupSequence(stream => stream.Write(It.IsAny())) + .Returns(Task.CompletedTask) + .Returns(Task.CompletedTask) + .ThrowsAsync(new Driver.TransportException(new RpcException(Grpc.Core.Status.DefaultCancelled))) + .Returns(() => + { + moveFirstNextSource.SetResult(false); + return Task.CompletedTask; + }) + .Returns(() => + { + nextCompleted.SetResult(); // for seqNo + return Task.CompletedTask; + }); + _mockStream.SetupSequence(stream => stream.MoveNextAsync()) + .ReturnsAsync(true) + .Returns(new ValueTask(moveFirstNextSource.Task)) + .Returns(new ValueTask(moveSecondNextSource.Task)) + .Returns(new ValueTask(moveThirdNextSource.Task)) + .Returns(new ValueTask(new TaskCompletionSource().Task)); + _mockStream.SetupSequence(stream => stream.Current) + .Returns(new StreamWriteMessage.Types.FromServer + { + InitResponse = new StreamWriteMessage.Types.InitResponse + { LastSeqNo = 0, PartitionId = 1, SessionId = "SessionId" }, + Status = StatusIds.Types.StatusCode.Success + }) + .Returns(new StreamWriteMessage.Types.FromServer + { + InitResponse = new StreamWriteMessage.Types.InitResponse + { LastSeqNo = 0, PartitionId = 1, SessionId = "SessionId" }, + Status = StatusIds.Types.StatusCode.Success + }) + .Returns(new StreamWriteMessage.Types.FromServer + { + WriteResponse = new StreamWriteMessage.Types.WriteResponse + { + PartitionId = 1, + Acks = + { + new StreamWriteMessage.Types.WriteResponse.Types.WriteAck + { + SeqNo = 2, + Written = new StreamWriteMessage.Types.WriteResponse.Types.WriteAck.Types.Written + { Offset = 0 } + } + } + }, + Status = StatusIds.Types.StatusCode.Success + }); + using var writer = new WriterBuilder(_mockIDriver.Object, "/topic") + { ProducerId = "producerId" }.Build(); + + var ctx = new CancellationTokenSource(); + var runTaskWithCancel = writer.WriteAsync(100L, ctx.Token); + // ReSharper disable once MethodSupportsCancellation + var runTask = writer.WriteAsync(100L); + + // ReSharper disable once MethodSupportsCancellation + var writerExceptionAfterResetSession = await Assert.ThrowsAsync(() => writer.WriteAsync(100)); + Assert.Equal("Transport error in the WriterSession on write messages", + writerExceptionAfterResetSession.Message); + Assert.Equal(StatusCode.Cancelled, writerExceptionAfterResetSession.Status.StatusCode); + + ctx.Cancel(); // reconnect write invoke cancel on cancellation token + moveSecondNextSource.SetResult(true); + await nextCompleted.Task; + moveThirdNextSource.SetResult(true); + + Assert.Equal(PersistenceStatus.Written, (await runTask).Status); + _mockStream.Verify(stream => stream.Write(It.IsAny()), Times.Exactly(5)); + _mockStream.Verify(stream => stream.MoveNextAsync(), Times.Exactly(5)); + _mockStream.Verify(stream => stream.Current, Times.Exactly(3)); + + await Assert.ThrowsAsync(() => runTaskWithCancel); + } +} diff --git a/src/Ydb.Sdk/tests/Utils.cs b/src/Ydb.Sdk/tests/Utils.cs index 4da30819..825cad9c 100644 --- a/src/Ydb.Sdk/tests/Utils.cs +++ b/src/Ydb.Sdk/tests/Utils.cs @@ -47,7 +47,7 @@ public static async Task ExecuteSchemeQuery( internal static ILoggerFactory GetLoggerFactory() { return new ServiceCollection() - .AddLogging(configure => configure.AddConsole().SetMinimumLevel(LogLevel.Information)) + .AddLogging(configure => configure.AddConsole().SetMinimumLevel(LogLevel.Debug)) .BuildServiceProvider() .GetService() ?? NullLoggerFactory.Instance; }