diff --git a/CHANGELOG.md b/CHANGELOG.md index e804db27..9234adf2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,5 @@ +- Topic Writer updated release candidate + ## v0.9.0-rc0 - Topic Writer release candidate - Fixed: grpc requests go via proxy on Grpc.NET.Client >= 2.44 diff --git a/src/Ydb.Sdk/src/Services/Topic/Exceptions.cs b/src/Ydb.Sdk/src/Services/Topic/Exceptions.cs index 486d93cf..76543d90 100644 --- a/src/Ydb.Sdk/src/Services/Topic/Exceptions.cs +++ b/src/Ydb.Sdk/src/Services/Topic/Exceptions.cs @@ -4,20 +4,15 @@ public class WriterException : Exception { public WriterException(string message) : base(message) { - Status = new Status(StatusCode.Unspecified); } public WriterException(string message, Status status) : base(message + ": " + status) { - Status = status; } - public WriterException(string message, Driver.TransportException e) : base(message, e) + public WriterException(string message, Exception inner) : base(message, inner) { - Status = e.Status; } - - public Status Status { get; } } public class ReaderException : Exception diff --git a/src/Ydb.Sdk/src/Services/Topic/TopicSession.cs b/src/Ydb.Sdk/src/Services/Topic/TopicSession.cs index 0c4141ab..5b6ff824 100644 --- a/src/Ydb.Sdk/src/Services/Topic/TopicSession.cs +++ b/src/Ydb.Sdk/src/Services/Topic/TopicSession.cs @@ -5,7 +5,6 @@ namespace Ydb.Sdk.Services.Topic; internal abstract class TopicSession : IDisposable { private readonly Func _initialize; - private readonly Action _resetSessionOnTransportError; protected readonly IBidirectionalStream Stream; protected readonly ILogger Logger; @@ -17,17 +16,15 @@ protected TopicSession( IBidirectionalStream stream, ILogger logger, string sessionId, - Func initialize, - Action resetSessionOnTransportError) + Func initialize) { Stream = stream; Logger = logger; SessionId = sessionId; _initialize = initialize; - _resetSessionOnTransportError = resetSessionOnTransportError; } - protected async void ReconnectSession(WriterException exception) + protected async void ReconnectSession() { if (Interlocked.CompareExchange(ref _isActive, 0, 1) == 0) { @@ -36,9 +33,7 @@ protected async void ReconnectSession(WriterException exception) return; } - _resetSessionOnTransportError(exception); - - Logger.LogInformation("WriterSession[{SessionId}] has been deactivated, starting to reconnect", SessionId); + Logger.LogInformation("TopicSession[{SessionId}] has been deactivated, starting to reconnect", SessionId); await _initialize(); } diff --git a/src/Ydb.Sdk/src/Services/Topic/Writer/WriteResult.cs b/src/Ydb.Sdk/src/Services/Topic/Writer/WriteResult.cs index 1d6284b0..160d2844 100644 --- a/src/Ydb.Sdk/src/Services/Topic/Writer/WriteResult.cs +++ b/src/Ydb.Sdk/src/Services/Topic/Writer/WriteResult.cs @@ -4,6 +4,8 @@ namespace Ydb.Sdk.Services.Topic.Writer; public class WriteResult { + internal static readonly WriteResult Skipped = new(); + private readonly long _offset; internal WriteResult(StreamWriteMessage.Types.WriteResponse.Types.WriteAck ack) @@ -23,6 +25,11 @@ internal WriteResult(StreamWriteMessage.Types.WriteResponse.Types.WriteAck ack) } } + private WriteResult() + { + Status = PersistenceStatus.AlreadyWritten; + } + public PersistenceStatus Status { get; } public bool TryGetOffset(out long offset) diff --git a/src/Ydb.Sdk/src/Services/Topic/Writer/Writer.cs b/src/Ydb.Sdk/src/Services/Topic/Writer/Writer.cs index 79b25bbb..544bb065 100644 --- a/src/Ydb.Sdk/src/Services/Topic/Writer/Writer.cs +++ b/src/Ydb.Sdk/src/Services/Topic/Writer/Writer.cs @@ -7,7 +7,6 @@ namespace Ydb.Sdk.Services.Topic.Writer; -using InitResponse = StreamWriteMessage.Types.InitResponse; using MessageData = StreamWriteMessage.Types.WriteRequest.Types.MessageData; using MessageFromClient = StreamWriteMessage.Types.FromClient; using MessageFromServer = StreamWriteMessage.Types.FromServer; @@ -24,10 +23,10 @@ internal class Writer : IWriter private readonly ISerializer _serializer; private readonly ConcurrentQueue _toSendBuffer = new(); private readonly ConcurrentQueue _inFlightMessages = new(); - private readonly CancellationTokenSource _disposeTokenSource = new(); + private readonly CancellationTokenSource _disposeCts = new(); private volatile TaskCompletionSource _tcsWakeUp = new(); - private volatile IWriteSession _session = new NotStartedWriterSession("Session not started!"); + private volatile IWriteSession _session = null!; private int _limitBufferMaxSize; @@ -51,18 +50,27 @@ public async Task WriteAsync(Message message, CancellationT { TaskCompletionSource tcs = new(); cancellationToken.Register( - () => tcs.TrySetCanceled(cancellationToken), - useSynchronizationContext: false + () => tcs.TrySetException( + new WriterException("The write operation was canceled before it could be completed") + ), useSynchronizationContext: false ); - var data = _serializer.Serialize(message.Data); + byte[] data; + try + { + data = _serializer.Serialize(message.Data); + } + catch (Exception e) + { + throw new WriterException("Error when serializing message data", e); + } + var messageData = new MessageData { Data = ByteString.CopyFrom(data), CreatedAt = Timestamp.FromDateTime(message.Timestamp.ToUniversalTime()), UncompressedSize = data.Length }; - foreach (var metadata in message.Metadata) { messageData.MetadataItems.Add(new MetadataItem @@ -94,7 +102,14 @@ public async Task WriteAsync(Message message, CancellationT "Buffer overflow: the data size [{DataLength}] exceeds the current buffer limit ({CurLimitBufferSize}) [BufferMaxSize = {BufferMaxSize}]", data.Length, curLimitBufferSize, _config.BufferMaxSize); - throw new WriterException("Buffer overflow"); + try + { + await Task.Delay(_config.BufferOverflowRetryTimeoutMs, cancellationToken); + } + catch (TaskCanceledException) + { + throw new WriterException("Buffer overflow"); + } } try @@ -113,11 +128,16 @@ private async void StartWriteWorker() { await Initialize(); - while (!_disposeTokenSource.Token.IsCancellationRequested) + while (!_disposeCts.Token.IsCancellationRequested) { await _tcsWakeUp.Task; _tcsWakeUp = new TaskCompletionSource(); + if (_toSendBuffer.IsEmpty) + { + continue; + } + await _session.Write(_toSendBuffer); } } @@ -129,10 +149,14 @@ private void WakeUpWorker() private async Task Initialize() { + _session = DummyWriterSession.Instance; + try { - if (_disposeTokenSource.IsCancellationRequested) + if (_disposeCts.IsCancellationRequested) { + _logger.LogWarning("Initialize writer is canceled because it has been disposed"); + return; } @@ -159,10 +183,10 @@ private async Task Initialize() await stream.Write(new MessageFromClient { InitRequest = initRequest }); if (!await stream.MoveNextAsync()) { - _session = new NotStartedWriterSession( - $"Stream unexpectedly closed by YDB server. Current InitRequest: {initRequest}"); + _logger.LogError("Stream unexpectedly closed by YDB server. Current InitRequest: {initRequest}", + initRequest); - _ = Task.Run(Initialize, _disposeTokenSource.Token); + _ = Task.Run(Initialize, _disposeCts.Token); return; } @@ -173,14 +197,18 @@ private async Task Initialize() if (status.IsNotSuccess) { - _session = new NotStartedWriterSession("Initialization failed", status); - - if (status.StatusCode != StatusCode.SchemeError) + if (RetrySettings.DefaultInstance.GetRetryRule(status.StatusCode).Policy != RetryPolicy.None) { - _ = Task.Run(Initialize, _disposeTokenSource.Token); + _logger.LogError("Writer initialization failed to start. Reason: {Status}", status); + + _ = Task.Run(Initialize, _disposeCts.Token); } + else + { + _logger.LogCritical("Writer initialization failed to start. Reason: {Status}", status); - _logger.LogCritical("Writer initialization failed to start. Reason: {Status}", status); + _session = new NotStartedWriterSession("Initialization failed", status); + } return; } @@ -202,45 +230,53 @@ private async Task Initialize() return; } - var newSession = new WriterSession( - _config, - stream, - initResponse, - Initialize, - e => { _session = new NotStartedWriterSession(e); }, - _logger, - _inFlightMessages - ); - - if (!_inFlightMessages.IsEmpty) + var copyInFlightMessages = new ConcurrentQueue(); + var lastSeqNo = initResponse.LastSeqNo; + while (_inFlightMessages.TryDequeue(out var sendData)) { - var copyInFlightMessages = new ConcurrentQueue(); - while (_inFlightMessages.TryDequeue(out var sendData)) + if (lastSeqNo >= sendData.MessageData.SeqNo) { - if (sendData.Tcs.Task.IsCanceled) - { - _logger.LogWarning("Message[SeqNo={SeqNo}] is cancelled", sendData.MessageData.SeqNo); + _logger.LogWarning( + "Message[SeqNo={SeqNo}] has been skipped because its sequence number " + + "is less than or equal to the last processed server's SeqNo[{LastSeqNo}]", + sendData.MessageData.SeqNo, lastSeqNo); - continue; - } + sendData.Tcs.TrySetResult(WriteResult.Skipped); - copyInFlightMessages.Enqueue(sendData); + continue; } - await newSession.Write(copyInFlightMessages); // retry prev in flight messages + + // Calculate the next sequence number from the calculated previous messages. + lastSeqNo = Math.Max(lastSeqNo, sendData.MessageData.SeqNo); + + copyInFlightMessages.Enqueue(sendData); + } + + var newSession = new WriterSession( + config: _config, + stream: stream, + lastSeqNo: lastSeqNo, + sessionId: initResponse.SessionId, + initialize: Initialize, + logger: _logger, + inFlightMessages: _inFlightMessages + ); + + if (!copyInFlightMessages.IsEmpty) + { + await newSession.Write(copyInFlightMessages); // retry prev in flight messages } _session = newSession; newSession.RunProcessingWriteAck(); + WakeUpWorker(); // attempt send buffer } catch (Driver.TransportException e) { - _logger.LogError(e, "Unable to connect the session"); + _logger.LogError(e, "Transport error on creating WriterSession"); - _session = new NotStartedWriterSession( - new WriterException("Transport error on creating WriterSession", e)); - - _ = Task.Run(Initialize, _disposeTokenSource.Token); + _ = Task.Run(Initialize, _disposeCts.Token); } } @@ -248,13 +284,13 @@ public void Dispose() { try { - _disposeTokenSource.Cancel(); + _disposeCts.Cancel(); _session.Dispose(); } finally { - _disposeTokenSource.Dispose(); + _disposeCts.Dispose(); } } } @@ -280,11 +316,6 @@ public NotStartedWriterSession(string reasonExceptionMessage, Status status) _reasonException = new WriterException(reasonExceptionMessage, status); } - public NotStartedWriterSession(WriterException reasonException) - { - _reasonException = reasonException; - } - public Task Write(ConcurrentQueue toSendBuffer) { while (toSendBuffer.TryDequeue(out var messageSending)) @@ -297,6 +328,25 @@ public Task Write(ConcurrentQueue toSendBuffer) public void Dispose() { + // Do nothing + } +} + +internal class DummyWriterSession : IWriteSession +{ + internal static readonly DummyWriterSession Instance = new(); + + private DummyWriterSession() + { + } + + public void Dispose() + { + } + + public Task Write(ConcurrentQueue toSendBuffer) + { + return Task.CompletedTask; } } @@ -310,22 +360,21 @@ internal class WriterSession : TopicSession initialize, - Action resetSessionOnTransportError, ILogger logger, ConcurrentQueue inFlightMessages ) : base( stream, logger, - initResponse.SessionId, - initialize, - resetSessionOnTransportError + sessionId, + initialize ) { _config = config; _inFlightMessages = inFlightMessages; - Volatile.Write(ref _seqNum, initResponse.LastSeqNo); // happens-before for Volatile.Read + Volatile.Write(ref _seqNum, lastSeqNo); // happens-before for Volatile.Read } public async Task Write(ConcurrentQueue toSendBuffer) @@ -341,6 +390,13 @@ public async Task Write(ConcurrentQueue toSendBuffer) while (toSendBuffer.TryDequeue(out var sendData)) { + if (sendData.Tcs.Task.IsFaulted) + { + Logger.LogWarning("Message[SeqNo={SeqNo}] is cancelled", sendData.MessageData.SeqNo); + + continue; + } + var messageData = sendData.MessageData; if (messageData.SeqNo == default) @@ -357,10 +413,10 @@ public async Task Write(ConcurrentQueue toSendBuffer) } catch (Driver.TransportException e) { - Logger.LogError(e, "WriterSession[{SessionId}] have error on Write, last SeqNo={SeqNo}", + Logger.LogError(e, "WriterSession[{SessionId}] have transport error on Write, last SeqNo={SeqNo}", SessionId, Volatile.Read(ref _seqNum)); - ReconnectSession(new WriterException("Transport error in the WriterSession on write messages", e)); + ReconnectSession(); } } @@ -377,7 +433,7 @@ internal async void RunProcessingWriteAck() if (status.IsNotSuccess) { - Logger.LogWarning( + Logger.LogError( "WriterSession[{SessionId}] received unsuccessful status while processing writeAck: {Status}", SessionId, status); return; @@ -428,13 +484,13 @@ Completing task on exception... { Logger.LogError(e, "WriterSession[{SessionId}] have error on processing writeAck", SessionId); - ReconnectSession(new WriterException("Transport error in the WriterSession on processing writeAck", e)); + ReconnectSession(); return; } Logger.LogWarning("WriterSession[{SessionId}]: stream is closed", SessionId); - ReconnectSession(new WriterException("WriterStream is closed")); + ReconnectSession(); } } diff --git a/src/Ydb.Sdk/src/Services/Topic/Writer/WriterBuilder.cs b/src/Ydb.Sdk/src/Services/Topic/Writer/WriterBuilder.cs index ea4a3907..3064ceec 100644 --- a/src/Ydb.Sdk/src/Services/Topic/Writer/WriterBuilder.cs +++ b/src/Ydb.Sdk/src/Services/Topic/Writer/WriterBuilder.cs @@ -39,8 +39,27 @@ public WriterBuilder(IDriver driver, string topicPath) /// public long? PartitionId { get; set; } + /// + /// The serializer to use to serialize values. + /// + /// + /// If your value serializer throws an exception, this will be + /// wrapped in a WriterException with unspecified status. + /// public ISerializer? Serializer { get; set; } + + /// + /// Represents the timeout duration, in milliseconds, used when a buffer overflow is detected. + /// This timeout specifies how long the system should wait before attempting to retry the operation. + /// + /// + /// This timeout is important for managing system performance and stability. + /// Too short a timeout could lead to rapid retry attempts, potentially causing further resource contention + /// and degrading system performance. Conversely, too long a timeout might delay processing significantly. + /// + public int BufferOverflowRetryTimeoutMs { get; set; } = 10; + public IWriter Build() { var config = new WriterConfig( @@ -48,7 +67,8 @@ public IWriter Build() producerId: ProducerId, codec: Codec, bufferMaxSize: BufferMaxSize, - partitionId: PartitionId + partitionId: PartitionId, + bufferOverflowRetryTimeoutMs: BufferOverflowRetryTimeoutMs ); return new Writer( diff --git a/src/Ydb.Sdk/src/Services/Topic/Writer/WriterConfig.cs b/src/Ydb.Sdk/src/Services/Topic/Writer/WriterConfig.cs index 1bedc5e2..460ca697 100644 --- a/src/Ydb.Sdk/src/Services/Topic/Writer/WriterConfig.cs +++ b/src/Ydb.Sdk/src/Services/Topic/Writer/WriterConfig.cs @@ -9,14 +9,15 @@ internal WriterConfig( string? producerId, Codec codec, int bufferMaxSize, - long? partitionId - ) + long? partitionId, + int bufferOverflowRetryTimeoutMs) { TopicPath = topicPath; ProducerId = producerId; Codec = codec; BufferMaxSize = bufferMaxSize; PartitionId = partitionId; + BufferOverflowRetryTimeoutMs = bufferOverflowRetryTimeoutMs; } public string TopicPath { get; } @@ -29,6 +30,8 @@ internal WriterConfig( public long? PartitionId { get; } + public int BufferOverflowRetryTimeoutMs { get; } + public override string ToString() { var toString = new StringBuilder().Append("[TopicPath: ").Append(TopicPath); diff --git a/src/Ydb.Sdk/tests/Topic/WriterIntegrationTests.cs b/src/Ydb.Sdk/tests/Topic/WriterIntegrationTests.cs index b0219d2d..924f0383 100644 --- a/src/Ydb.Sdk/tests/Topic/WriterIntegrationTests.cs +++ b/src/Ydb.Sdk/tests/Topic/WriterIntegrationTests.cs @@ -45,8 +45,10 @@ public async Task WriteAsync_WhenTopicNotFound_ReturnNotFoundException() using var writer = new WriterBuilder(_driver, _topicName + "_not_found") { ProducerId = "producerId" }.Build(); - Assert.Equal(StatusCode.SchemeError, (await Assert.ThrowsAsync( - () => writer.WriteAsync("hello world"))).Status.StatusCode); + Assert.Contains( + $"Initialization failed: Status: SchemeError, Issues:\n[500017] Error: no path 'local/{_topicName + "_not_found"}'", + (await Assert.ThrowsAsync(() => writer.WriteAsync("hello world"))).Message + ); } [Fact] diff --git a/src/Ydb.Sdk/tests/Topic/WriterUnitTests.cs b/src/Ydb.Sdk/tests/Topic/WriterUnitTests.cs index 607ea9be..baf8e78a 100644 --- a/src/Ydb.Sdk/tests/Topic/WriterUnitTests.cs +++ b/src/Ydb.Sdk/tests/Topic/WriterUnitTests.cs @@ -30,124 +30,239 @@ public WriterUnitTests(ITestOutputHelper testOutputHelper) _mockIDriver.Setup(driver => driver.LoggerFactory).Returns(Utils.GetLoggerFactory); } + private class FailSerializer : ISerializer + { + public byte[] Serialize(int data) + { + throw new Exception("Some serialize exception"); + } + } + [Fact] - public async Task Initialize_WhenStreamIsClosedByServer_ThrowWriterExceptionOnWriteAsyncAndTryNextInitialize() + public async Task WriteAsync_WhenSerializeThrowException_ThrowWriterException() { - var moveNextTry = new TaskCompletionSource(); - var taskNextComplete = new TaskCompletionSource(); + using var writer = new WriterBuilder(_mockIDriver.Object, "/topic") + { ProducerId = "producerId", Serializer = new FailSerializer() }.Build(); - _mockStream.Setup(stream => stream.Write(It.IsAny())) - .Returns(Task.CompletedTask); - _mockStream.SetupSequence(stream => stream.MoveNextAsync()) - .ReturnsAsync(false) + Assert.Equal("Error when serializing message data", + (await Assert.ThrowsAsync(() => writer.WriteAsync(123))).Message); + } + + /* + * Performed invocations: + + Mock:1> (stream): + + IBidirectionalStream.Write({ "initRequest": { "path": "/topic", "producerId": "producerId" } }) + IBidirectionalStream.MoveNextAsync() <- return false + IBidirectionalStream.Write({ "initRequest": { "path": "/topic", "producerId": "producerId" } }) + IBidirectionalStream.MoveNextAsync() <- return true + IBidirectionalStream.Current + IBidirectionalStream.MoveNextAsync() + IBidirectionalStream.Write({ "writeRequest": { "messages": [ { "seqNo": "1", "createdAt": "2024-12-03T10:46:43.954622Z", "data": "ZAAAAA==", "uncompressedSize": "4" } ], "codec": 1 } }) + IBidirectionalStream.Current + IBidirectionalStream.MoveNextAsync() + */ + [Fact] + public async Task Initialize_WhenStreamClosedByServer_ShouldRetryInitializeAndReturnWrittenMessageStatus() + { + var taskNextComplete = new TaskCompletionSource(); + _mockStream.SetupSequence(stream => stream.Write(It.IsAny())) + .Returns(Task.CompletedTask) + .Returns(Task.CompletedTask) .Returns(() => { - taskNextComplete.SetResult(); - return new ValueTask(moveNextTry.Task); + taskNextComplete.SetResult(true); + return Task.CompletedTask; }); + _mockStream.SetupSequence(stream => stream.MoveNextAsync()) + .ReturnsAsync(false) + .ReturnsAsync(true) + .Returns(() => new ValueTask(taskNextComplete.Task)) + .Returns(new ValueTask(new TaskCompletionSource().Task)); + + SetupReadOneWriteAckMessage(); using var writer = new WriterBuilder(_mockIDriver.Object, "/topic") { ProducerId = "producerId" }.Build(); - Assert.Equal("Stream unexpectedly closed by YDB server. " + - "Current InitRequest: { \"path\": \"/topic\", \"producerId\": \"producerId\" }", - (await Assert.ThrowsAsync(() => writer.WriteAsync(100))).Message); + Assert.Equal(PersistenceStatus.Written, (await writer.WriteAsync(100)).Status); - await taskNextComplete.Task; // check attempt repeated!!! - _mockStream.Verify(stream => stream.Write(It.IsAny()), Times.Exactly(2)); - _mockStream.Verify(stream => stream.MoveNextAsync(), Times.Exactly(2)); + _mockStream.Verify(stream => stream.Write(It.IsAny()), Times.Exactly(3)); + _mockStream.Verify(stream => stream.MoveNextAsync(), Times.Exactly(4)); + _mockStream.Verify(stream => stream.Current, Times.Exactly(2)); } + /* + * Performed invocations: + + Mock:1> (stream): + + IBidirectionalStream.Write({ "initRequest": { "path": "/topic", "producerId": "producerId" } }) <- Driver.TransportException + IBidirectionalStream.Write({ "initRequest": { "path": "/topic", "producerId": "producerId" } }) + IBidirectionalStream.MoveNextAsync() <- return true + IBidirectionalStream.Current + IBidirectionalStream.MoveNextAsync() <- return await write operation ValueTask + IBidirectionalStream.Write({ "writeRequest": { "messages": [ { "seqNo": "1", "createdAt": "2024-12-03T10:59:47.408712Z", "data": "YWJhY2FiYQ==", "uncompressedSize": "7" } ], "codec": 1 } }) + IBidirectionalStream.Current + IBidirectionalStream.MoveNextAsync() <- sleep + */ [Fact] - public async Task Initialize_WhenFailWriteMessage_ThrowWriterExceptionOnWriteAsyncAndTryNextInitialize() + public async Task Initialize_WhenFailWriteMessage_ShouldRetryInitializeAndReturnWrittenMessageStatus() { - var taskSource = new TaskCompletionSource(); - var taskNextComplete = new TaskCompletionSource(); + var taskNextComplete = new TaskCompletionSource(); _mockStream.SetupSequence(stream => stream.Write(It.IsAny())) .ThrowsAsync(new Driver.TransportException(new RpcException(Grpc.Core.Status.DefaultCancelled))) + .Returns(Task.CompletedTask) .Returns(() => { - taskNextComplete.SetResult(); - return taskSource.Task; + taskNextComplete.SetResult(true); + return Task.CompletedTask; }); + _mockStream.SetupSequence(stream => stream.MoveNextAsync()) + .ReturnsAsync(true) + .Returns(() => new ValueTask(taskNextComplete.Task)) + .Returns(new ValueTask(new TaskCompletionSource().Task)); + + SetupReadOneWriteAckMessage(); using var writer = new WriterBuilder(_mockIDriver.Object, "/topic") { ProducerId = "producerId" }.Build(); - var writerException = await Assert.ThrowsAsync(() => writer.WriteAsync("abacaba")); - Assert.Equal("Transport error on creating WriterSession", writerException.Message); - Assert.Equal(StatusCode.Cancelled, writerException.Status.StatusCode); + Assert.Equal(PersistenceStatus.Written, (await writer.WriteAsync("abacaba")).Status); - await taskNextComplete.Task; // check attempt repeated!!! - _mockStream.Verify(stream => stream.Write(It.IsAny()), Times.Exactly(2)); + _mockStream.Verify(stream => stream.Write(It.IsAny()), Times.Exactly(3)); + _mockStream.Verify(stream => stream.MoveNextAsync(), Times.Exactly(3)); + _mockStream.Verify(stream => stream.Current, Times.Exactly(2)); } + /* + * Performed invocations: + + Mock:1> (stream): + + IBidirectionalStream.Write({ "initRequest": { "path": "/topic", "producerId": "producerId" } }) + IBidirectionalStream.MoveNextAsync() <- throw exception + IBidirectionalStream.Write({ "initRequest": { "path": "/topic", "producerId": "producerId" } }) + IBidirectionalStream.MoveNextAsync() + IBidirectionalStream.Current + IBidirectionalStream.MoveNextAsync() + IBidirectionalStream.Write({ "writeRequest": { "messages": [ { "seqNo": "1", "createdAt": "2024-12-03T11:07:42.201080Z", "data": "YWJhY2FiYQ==", "uncompressedSize": "7" } ], "codec": 1 } }) + IBidirectionalStream.Current + IBidirectionalStream.MoveNextAsync() + */ [Fact] - public async Task Initialize_WhenFailMoveNextAsync_ThrowWriterExceptionOnWriteAsyncAndTryNextInitialize() + public async Task Initialize_WhenFailMoveNextAsync_ShouldRetryInitializeAndReturnWrittenMessageStatus() { - var taskSource = new TaskCompletionSource(); - var taskNextComplete = new TaskCompletionSource(); - _mockStream.Setup(stream => stream.Write(It.IsAny())) - .Returns(Task.CompletedTask); - _mockStream.SetupSequence(stream => stream.MoveNextAsync()) - .ThrowsAsync(new Driver.TransportException( - new RpcException(new Grpc.Core.Status(Grpc.Core.StatusCode.DeadlineExceeded, "Some message")))) + var taskNextComplete = new TaskCompletionSource(); + _mockStream.SetupSequence(stream => stream.Write(It.IsAny())) + .Returns(Task.CompletedTask) + .Returns(Task.CompletedTask) .Returns(() => { - taskNextComplete.SetResult(); - return new ValueTask(taskSource.Task); + taskNextComplete.SetResult(true); + return Task.CompletedTask; }); + _mockStream.SetupSequence(stream => stream.MoveNextAsync()) + .ThrowsAsync(new Driver.TransportException( + new RpcException(new Grpc.Core.Status(Grpc.Core.StatusCode.DeadlineExceeded, "Some message")))) + .ReturnsAsync(true) + .Returns(() => new ValueTask(taskNextComplete.Task)) + .Returns(new ValueTask(new TaskCompletionSource().Task)); + + SetupReadOneWriteAckMessage(); using var writer = new WriterBuilder(_mockIDriver.Object, "/topic") { ProducerId = "producerId" }.Build(); - var writerException = await Assert.ThrowsAsync(() => writer.WriteAsync("abacaba")); - Assert.Equal("Transport error on creating WriterSession", writerException.Message); - Assert.Equal(StatusCode.ClientTransportTimeout, writerException.Status.StatusCode); + Assert.Equal(PersistenceStatus.Written, (await writer.WriteAsync("abacaba")).Status); - await taskNextComplete.Task; // check attempt repeated!!! - _mockStream.Verify(stream => stream.Write(It.IsAny()), Times.Exactly(2)); - _mockStream.Verify(stream => stream.MoveNextAsync(), Times.Exactly(2)); + _mockStream.Verify(stream => stream.Write(It.IsAny()), Times.Exactly(3)); + _mockStream.Verify(stream => stream.MoveNextAsync(), Times.Exactly(4)); + _mockStream.Verify(stream => stream.Current, Times.Exactly(2)); } + /* + * Performed invocations: + + Mock:1> (stream): + + IBidirectionalStream.Write({ "initRequest": { "path": "/topic", "producerId": "producerId" } }) + IBidirectionalStream.MoveNextAsync() + IBidirectionalStream.Current + IBidirectionalStream.Write({ "initRequest": { "path": "/topic", "producerId": "producerId" } }) + IBidirectionalStream.MoveNextAsync() + IBidirectionalStream.Current + IBidirectionalStream.MoveNextAsync() + IBidirectionalStream.Write({ "writeRequest": { "messages": [ { "seqNo": "1", "createdAt": "2024-12-03T11:42:03.516900Z", "data": "ewAAAAAAAAA=", "uncompressedSize": "8" } ], "codec": 1 } }) + IBidirectionalStream.Current + IBidirectionalStream.MoveNextAsync() + */ [Fact] - public async Task Initialize_WhenInitResponseNotSuccess_ThrowWriterExceptionOnWriteAsyncAndTryNextInitialize() + public async Task Initialize_WhenInitResponseStatusIsRetryable_ShouldRetryInitializeAndReturnWrittenMessageStatus() { - var taskSource = new TaskCompletionSource(); - var taskNextComplete = new TaskCompletionSource(); - _mockStream.Setup(stream => stream.Write(It.IsAny())) - .Returns(Task.CompletedTask); - _mockStream.SetupSequence(stream => stream.MoveNextAsync()) - .Returns(new ValueTask(true)) + var taskNextComplete = new TaskCompletionSource(); + _mockStream.SetupSequence(stream => stream.Write(It.IsAny())) + .Returns(Task.CompletedTask) + .Returns(Task.CompletedTask) .Returns(() => { - taskNextComplete.SetResult(); - return new ValueTask(taskSource.Task); + taskNextComplete.SetResult(true); + return Task.CompletedTask; }); - _mockStream.Setup(stream => stream.Current) + _mockStream.SetupSequence(stream => stream.MoveNextAsync()) + .ReturnsAsync(true) + .ReturnsAsync(true) + .Returns(() => new ValueTask(taskNextComplete.Task)) + .Returns(new ValueTask(new TaskCompletionSource().Task)); + + + _mockStream.SetupSequence(stream => stream.Current) .Returns(new StreamWriteMessage.Types.FromServer { Status = StatusIds.Types.StatusCode.BadSession, Issues = { new IssueMessage { Message = "Some message" } } + }) + .Returns(new StreamWriteMessage.Types.FromServer + { + InitResponse = new StreamWriteMessage.Types.InitResponse + { LastSeqNo = 0, PartitionId = 1, SessionId = "SessionId" }, + Status = StatusIds.Types.StatusCode.Success + }) + .Returns(new StreamWriteMessage.Types.FromServer + { + WriteResponse = new StreamWriteMessage.Types.WriteResponse + { + PartitionId = 1, + Acks = + { + new StreamWriteMessage.Types.WriteResponse.Types.WriteAck + { + SeqNo = 1, + Written = new StreamWriteMessage.Types.WriteResponse.Types.WriteAck.Types.Written + { Offset = 0 } + } + } + }, + Status = StatusIds.Types.StatusCode.Success }); using var writer = new WriterBuilder(_mockIDriver.Object, "/topic") { ProducerId = "producerId" }.Build(); - Assert.Equal("Initialization failed: Status: BadSession, Issues:\n[0] Fatal: Some message\n", - (await Assert.ThrowsAsync(() => writer.WriteAsync(123L))).Message); + Assert.Equal(PersistenceStatus.Written, (await writer.WriteAsync(123L)).Status); - await taskNextComplete.Task; // check attempt repeated!!! - _mockStream.Verify(stream => stream.Write(It.IsAny()), Times.Exactly(2)); - _mockStream.Verify(stream => stream.MoveNextAsync(), Times.Exactly(2)); + _mockStream.Verify(stream => stream.Write(It.IsAny()), Times.Exactly(3)); + _mockStream.Verify(stream => stream.MoveNextAsync(), Times.Exactly(4)); + _mockStream.Verify(stream => stream.Current, Times.Exactly(3)); } [Fact] - public async Task Initialize_WhenInitResponseIsSchemaError_ThrowWriterExceptionOnWriteAsyncAndStopInitializing() + public async Task Initialize_WhenInitResponseStatusIsRetryable_ThrowWriterExceptionOnWriteAsyncAndStopInitializing() { _mockStream.Setup(stream => stream.Write(It.IsAny())) .Returns(Task.CompletedTask); @@ -165,6 +280,8 @@ public async Task Initialize_WhenInitResponseIsSchemaError_ThrowWriterExceptionO Assert.Equal("Initialization failed: Status: SchemeError, Issues:\n[0] Fatal: Topic not found\n", (await Assert.ThrowsAsync(() => writer.WriteAsync(123L))).Message); + Assert.Equal("Initialization failed: Status: SchemeError, Issues:\n[0] Fatal: Topic not found\n", + (await Assert.ThrowsAsync(() => writer.WriteAsync(1L))).Message); // check not attempt repeated!!! _mockStream.Verify(stream => stream.Write(It.IsAny()), Times.Once); @@ -225,11 +342,16 @@ public async Task WriteAsyncStress_WhenBufferIsOverflow_ThrowWriterExceptionOnBu Status = StatusIds.Types.StatusCode.Success }); using var writer = new WriterBuilder(_mockIDriver.Object, "/topic") - { ProducerId = "producerId", BufferMaxSize = bufferSize /* bytes */ }.Build(); + { + ProducerId = "producerId", + BufferMaxSize = bufferSize /* bytes */, + BufferOverflowRetryTimeoutMs = 1_000 + }.Build(); for (var attempt = 0; attempt < countBatchSendingSize; attempt++) { _testOutputHelper.WriteLine($"Processing attempt {attempt}"); + var cts = new CancellationTokenSource(); var tasks = new List>(); var serverAck = new StreamWriteMessage.Types.FromServer @@ -239,7 +361,7 @@ public async Task WriteAsyncStress_WhenBufferIsOverflow_ThrowWriterExceptionOnBu }; for (var i = 0; i < batchTasksSize; i++) { - tasks.Add(writer.WriteAsync(100)); + tasks.Add(writer.WriteAsync(100, cts.Token)); serverAck.WriteResponse.Acks.Add(new StreamWriteMessage.Types.WriteResponse.Types.WriteAck { SeqNo = bufferSize / messageSize * attempt + i + 1, @@ -252,7 +374,11 @@ public async Task WriteAsyncStress_WhenBufferIsOverflow_ThrowWriterExceptionOnBu { // ReSharper disable once AccessToModifiedClosure Volatile.Write(ref taskSource, new TaskCompletionSource()); - mockNextAsync.Returns(new ValueTask(Volatile.Read(ref taskSource).Task)); + mockNextAsync.Returns(() => + { + cts.Cancel(); + return new ValueTask(Volatile.Read(ref taskSource).Task); + }); return serverAck; }); taskSource.SetResult(true); @@ -288,40 +414,34 @@ public async Task WriteAsyncStress_WhenBufferIsOverflow_ThrowWriterExceptionOnBu IBidirectionalStream.MoveNextAsync() IBidirectionalStream.Current IBidirectionalStream.MoveNextAsync() - IBidirectionalStream.Write({ "writeRequest": { "messages": [ { "seqNo": "1", "createdAt": "2024-11-22T10:08:58.732882Z", "data": "AAAAAAAAAGQ=", "uncompressedSize": "8" } ], "codec": 1 } }) + IBidirectionalStream.Write({ "writeRequest": { "messages": [ { "seqNo": "1", "createdAt": "2024-12-03T12:44:23.276086Z", "data": "ZAAAAAAAAAA=", "uncompressedSize": "8" } ], "codec": 1 } }) IBidirectionalStream.Write({ "initRequest": { "path": "/topic", "producerId": "producerId" } }) IBidirectionalStream.MoveNextAsync() IBidirectionalStream.Current + IBidirectionalStream.Write({ "writeRequest": { "messages": [ { "seqNo": "1", "createdAt": "2024-12-03T12:44:23.276086Z", "data": "ZAAAAAAAAAA=", "uncompressedSize": "8" } ], "codec": 1 } }) IBidirectionalStream.MoveNextAsync() - IBidirectionalStream.Write({ "writeRequest": { "messages": [ { "seqNo": "1", "createdAt": "2024-11-22T10:08:58.732882Z", "data": "AAAAAAAAAGQ=", "uncompressedSize": "8" } ], "codec": 1 } }) IBidirectionalStream.Current IBidirectionalStream.MoveNextAsync() */ [Fact] - public async Task WriteAsync_WhenTransportExceptionOnWriteInWriterSessionThenReconnectSession_ReturnWriteResult() + public async Task WriteAsync_WhenTransportExceptionOnWriteInWriterSession_ShouldReconnectAndReturnWriteResult() { - var moveFirstNextSource = new TaskCompletionSource(); - var moveSecondNextSource = new TaskCompletionSource(); - var moveThirdNextSource = new TaskCompletionSource(); - var nextCompleted = new TaskCompletionSource(); + var moveTcs = new TaskCompletionSource(); + _mockStream.SetupSequence(stream => stream.Write(It.IsAny())) .Returns(Task.CompletedTask) - .ThrowsAsync(new Driver.TransportException(new RpcException(Grpc.Core.Status.DefaultCancelled))) - .Returns(() => + .Throws(() => { - moveFirstNextSource.SetResult(false); - return Task.CompletedTask; + moveTcs.SetResult(false); + return new Driver.TransportException(new RpcException(Grpc.Core.Status.DefaultCancelled)); }) - .Returns(() => - { - nextCompleted.SetResult(); - return Task.CompletedTask; - }); + .Returns(Task.CompletedTask) + .Returns(Task.CompletedTask); _mockStream.SetupSequence(stream => stream.MoveNextAsync()) .ReturnsAsync(true) - .Returns(new ValueTask(moveFirstNextSource.Task)) - .Returns(new ValueTask(moveSecondNextSource.Task)) - .Returns(new ValueTask(moveThirdNextSource.Task)) + .Returns(new ValueTask(moveTcs.Task)) + .ReturnsAsync(true) + .ReturnsAsync(true) .Returns(new ValueTask(new TaskCompletionSource().Task)); _mockStream.SetupSequence(stream => stream.Current) .Returns(new StreamWriteMessage.Types.FromServer @@ -356,18 +476,7 @@ public async Task WriteAsync_WhenTransportExceptionOnWriteInWriterSessionThenRec using var writer = new WriterBuilder(_mockIDriver.Object, "/topic") { ProducerId = "producerId" }.Build(); - var runTask = writer.WriteAsync(100L); - - var writerExceptionAfterResetSession = await Assert.ThrowsAsync(() => writer.WriteAsync(100)); - Assert.Equal("Transport error in the WriterSession on write messages", - writerExceptionAfterResetSession.Message); - Assert.Equal(StatusCode.Cancelled, writerExceptionAfterResetSession.Status.StatusCode); - - moveSecondNextSource.SetResult(true); - await nextCompleted.Task; - moveThirdNextSource.SetResult(true); - - Assert.Equal(PersistenceStatus.Written, (await runTask).Status); + Assert.Equal(PersistenceStatus.Written, (await writer.WriteAsync(100L)).Status); _mockStream.Verify(stream => stream.Write(It.IsAny()), Times.Exactly(4)); _mockStream.Verify(stream => stream.MoveNextAsync(), Times.Exactly(5)); _mockStream.Verify(stream => stream.Current, Times.Exactly(3)); @@ -381,24 +490,33 @@ public async Task WriteAsync_WhenTransportExceptionOnWriteInWriterSessionThenRec IBidirectionalStream.Write({ "initRequest": { "path": "/topic", "producerId": "producerId" } }) IBidirectionalStream.MoveNextAsync() IBidirectionalStream.Current - IBidirectionalStream.MoveNextAsync() + IBidirectionalStream.MoveNextAsync() <- transport exception IBidirectionalStream.Write({ "initRequest": { "path": "/topic", "producerId": "producerId" } }) + IBidirectionalStream.MoveNextAsync() <- return true + IBidirectionalStream.Current + IBidirectionalStream.MoveNextAsync() <- return true after write message + IBidirectionalStream.Write({ "writeRequest": { "messages": [ { "seqNo": "1", "createdAt": "2024-12-03T14:06:06.408114Z", "data": "ZAAAAAAAAAA=", "uncompressedSize": "8" } ], "codec": 1 } }) + IBidirectionalStream.Current IBidirectionalStream.MoveNextAsync() */ [Fact] - public async Task WriteAsync_WhenTransportExceptionOnProcessingWriteAckThenReconnectSession_ReturnWriterException() + public async Task WriteAsync_WhenTransportExceptionOnProcessingWriteAck_ShouldReconnectThenReturnWriteResult() { - var nextCompleted = new TaskCompletionSource(); - _mockStream.Setup(stream => stream.Write(It.IsAny())) - .Returns(Task.CompletedTask); + var moveTcs = new TaskCompletionSource(); + _mockStream.SetupSequence(stream => stream.Write(It.IsAny())) + .Returns(Task.CompletedTask) + .Returns(Task.CompletedTask) + .Returns(() => + { + moveTcs.SetResult(true); + return Task.CompletedTask; + }); _mockStream.SetupSequence(stream => stream.MoveNextAsync()) .ReturnsAsync(true) .ThrowsAsync(new Driver.TransportException(new RpcException(Grpc.Core.Status.DefaultCancelled))) - .Returns(() => - { - nextCompleted.SetResult(); - return new ValueTask(new TaskCompletionSource().Task); - }); // retry init writer session + .ReturnsAsync(true) + .Returns(() => new ValueTask(moveTcs.Task)) // retry init writer session + .Returns(new ValueTask(new TaskCompletionSource().Task)); _mockStream.SetupSequence(stream => stream.Current) .Returns(new StreamWriteMessage.Types.FromServer { @@ -411,19 +529,32 @@ public async Task WriteAsync_WhenTransportExceptionOnProcessingWriteAckThenRecon InitResponse = new StreamWriteMessage.Types.InitResponse { LastSeqNo = 0, PartitionId = 1, SessionId = "SessionId" }, Status = StatusIds.Types.StatusCode.Success + }) + .Returns(new StreamWriteMessage.Types.FromServer + { + WriteResponse = new StreamWriteMessage.Types.WriteResponse + { + PartitionId = 1, + Acks = + { + new StreamWriteMessage.Types.WriteResponse.Types.WriteAck + { + SeqNo = 1, + Written = new StreamWriteMessage.Types.WriteResponse.Types.WriteAck.Types.Written + { Offset = 0 } + } + } + }, + Status = StatusIds.Types.StatusCode.Success }); using var writer = new WriterBuilder(_mockIDriver.Object, "/topic") { ProducerId = "producerId" }.Build(); - await nextCompleted.Task; - var writerExceptionAfterResetSession = await Assert.ThrowsAsync(() => writer.WriteAsync(100)); - Assert.Equal("Transport error in the WriterSession on processing writeAck", - writerExceptionAfterResetSession.Message); - Assert.Equal(StatusCode.Cancelled, writerExceptionAfterResetSession.Status.StatusCode); + Assert.Equal(PersistenceStatus.Written, (await writer.WriteAsync(100L)).Status); - _mockStream.Verify(stream => stream.Write(It.IsAny()), Times.Exactly(2)); - _mockStream.Verify(stream => stream.MoveNextAsync(), Times.Exactly(3)); - _mockStream.Verify(stream => stream.Current, Times.Once); + _mockStream.Verify(stream => stream.Write(It.IsAny()), Times.Exactly(3)); + _mockStream.Verify(stream => stream.MoveNextAsync(), Times.Exactly(5)); + _mockStream.Verify(stream => stream.Current, Times.Exactly(3)); } /* @@ -437,21 +568,30 @@ public async Task WriteAsync_WhenTransportExceptionOnProcessingWriteAckThenRecon IBidirectionalStream.MoveNextAsync() IBidirectionalStream.Write({ "initRequest": { "path": "/topic", "producerId": "producerId" } }) IBidirectionalStream.MoveNextAsync() + IBidirectionalStream.Current + IBidirectionalStream.MoveNextAsync() + IBidirectionalStream.Write({ "writeRequest": { "messages": [ { "seqNo": "1", "createdAt": "2024-12-03T14:12:59.548210Z", "data": "ZAAAAAAAAAA=", "uncompressedSize": "8" } ], "codec": 1 } }) + IBidirectionalStream.Current + IBidirectionalStream.MoveNextAsync() */ [Fact] - public async Task WriteAsync_WhenStreamIsClosingOnProcessingWriteAckThenReconnectSession_ReturnWriterException() + public async Task WriteAsync_WhenStreamIsClosingOnProcessingWriteAck_ShouldReconnectThenReturnWriteResult() { - var nextCompleted = new TaskCompletionSource(); - _mockStream.Setup(stream => stream.Write(It.IsAny())) - .Returns(Task.CompletedTask); + var moveTcs = new TaskCompletionSource(); + _mockStream.SetupSequence(stream => stream.Write(It.IsAny())) + .Returns(Task.CompletedTask) + .Returns(Task.CompletedTask) + .Returns(() => + { + moveTcs.SetResult(true); + return Task.CompletedTask; + }); _mockStream.SetupSequence(stream => stream.MoveNextAsync()) .ReturnsAsync(true) .ReturnsAsync(false) - .Returns(() => - { - nextCompleted.SetResult(); - return new ValueTask(new TaskCompletionSource().Task); - }); // retry init writer session + .ReturnsAsync(true) + .Returns(() => new ValueTask(moveTcs.Task)) // retry init writer session + .Returns(new ValueTask(new TaskCompletionSource().Task)); _mockStream.SetupSequence(stream => stream.Current) .Returns(new StreamWriteMessage.Types.FromServer { @@ -459,36 +599,6 @@ public async Task WriteAsync_WhenStreamIsClosingOnProcessingWriteAckThenReconnec { LastSeqNo = 0, PartitionId = 1, SessionId = "SessionId" }, Status = StatusIds.Types.StatusCode.Success }) - .Returns(new StreamWriteMessage.Types.FromServer - { - InitResponse = new StreamWriteMessage.Types.InitResponse - { LastSeqNo = 0, PartitionId = 1, SessionId = "SessionId" }, - Status = StatusIds.Types.StatusCode.Success - }); - using var writer = new WriterBuilder(_mockIDriver.Object, "/topic") - { ProducerId = "producerId" }.Build(); - - await nextCompleted.Task; - var writerExceptionAfterResetSession = await Assert.ThrowsAsync(() => writer.WriteAsync(100)); - Assert.Equal("WriterStream is closed", writerExceptionAfterResetSession.Message); - Assert.Equal(StatusCode.Unspecified, writerExceptionAfterResetSession.Status.StatusCode); - - _mockStream.Verify(stream => stream.Write(It.IsAny()), Times.Exactly(2)); - _mockStream.Verify(stream => stream.MoveNextAsync(), Times.Exactly(3)); - _mockStream.Verify(stream => stream.Current, Times.Once); - } - - [Fact] - public async Task WriteAsync_WhenCancellationTokenIsClosed_ThrowCancellationException() - { - var cancellationTokenSource = new CancellationTokenSource(); - var nextCompleted = new TaskCompletionSource(); - _mockStream.Setup(stream => stream.Write(It.IsAny())) - .Returns(Task.CompletedTask); - _mockStream.SetupSequence(stream => stream.MoveNextAsync()) - .ReturnsAsync(true) - .Returns(new ValueTask(nextCompleted.Task)); - _mockStream.SetupSequence(stream => stream.Current) .Returns(new StreamWriteMessage.Types.FromServer { InitResponse = new StreamWriteMessage.Types.InitResponse @@ -512,6 +622,30 @@ public async Task WriteAsync_WhenCancellationTokenIsClosed_ThrowCancellationExce }, Status = StatusIds.Types.StatusCode.Success }); + + using var writer = new WriterBuilder(_mockIDriver.Object, "/topic") + { ProducerId = "producerId" }.Build(); + + + Assert.Equal(PersistenceStatus.Written, (await writer.WriteAsync(100L)).Status); + + _mockStream.Verify(stream => stream.Write(It.IsAny()), Times.Exactly(3)); + _mockStream.Verify(stream => stream.MoveNextAsync(), Times.Exactly(5)); + _mockStream.Verify(stream => stream.Current, Times.Exactly(3)); + } + + [Fact] + public async Task WriteAsync_WhenCancellationTokenIsClosed_ThrowCancellationException() + { + var cancellationTokenSource = new CancellationTokenSource(); + var nextCompleted = new TaskCompletionSource(); + _mockStream.Setup(stream => stream.Write(It.IsAny())) + .Returns(Task.CompletedTask); + _mockStream.SetupSequence(stream => stream.MoveNextAsync()) + .ReturnsAsync(true) + .Returns(new ValueTask(nextCompleted.Task)); + SetupReadOneWriteAckMessage(); + using var writer = new WriterBuilder(_mockIDriver.Object, "/topic") { ProducerId = "producerId" }.Build(); @@ -519,11 +653,12 @@ public async Task WriteAsync_WhenCancellationTokenIsClosed_ThrowCancellationExce cancellationTokenSource.Cancel(); nextCompleted.SetResult(true); - await Assert.ThrowsAsync(() => task); + Assert.Equal("The write operation was canceled before it could be completed", + (await Assert.ThrowsAsync(() => task)).Message); } [Fact] - public async Task WriteAsync_WhenTaskIsAcceptedBeforeCancel_ThrowCancellationException() + public async Task WriteAsync_WhenTaskIsAcceptedBeforeCancel_ReturnWrittenStatus() { var cancellationTokenSource = new CancellationTokenSource(); var nextCompleted = new TaskCompletionSource(); @@ -532,30 +667,8 @@ public async Task WriteAsync_WhenTaskIsAcceptedBeforeCancel_ThrowCancellationExc _mockStream.SetupSequence(stream => stream.MoveNextAsync()) .ReturnsAsync(true) .Returns(new ValueTask(nextCompleted.Task)); - _mockStream.SetupSequence(stream => stream.Current) - .Returns(new StreamWriteMessage.Types.FromServer - { - InitResponse = new StreamWriteMessage.Types.InitResponse - { LastSeqNo = 0, PartitionId = 1, SessionId = "SessionId" }, - Status = StatusIds.Types.StatusCode.Success - }) - .Returns(new StreamWriteMessage.Types.FromServer - { - WriteResponse = new StreamWriteMessage.Types.WriteResponse - { - PartitionId = 1, - Acks = - { - new StreamWriteMessage.Types.WriteResponse.Types.WriteAck - { - SeqNo = 1, - Written = new StreamWriteMessage.Types.WriteResponse.Types.WriteAck.Types.Written - { Offset = 0 } - } - } - }, - Status = StatusIds.Types.StatusCode.Success - }); + SetupReadOneWriteAckMessage(); + using var writer = new WriterBuilder(_mockIDriver.Object, "/topic") { ProducerId = "producerId" }.Build(); @@ -565,6 +678,7 @@ public async Task WriteAsync_WhenTaskIsAcceptedBeforeCancel_ThrowCancellationExc cancellationTokenSource.Cancel(); } + /* * Performed invocations: @@ -574,42 +688,50 @@ public async Task WriteAsync_WhenTaskIsAcceptedBeforeCancel_ThrowCancellationExc IBidirectionalStream.MoveNextAsync() IBidirectionalStream.Current IBidirectionalStream.MoveNextAsync() - IBidirectionalStream.Write({ "writeRequest": { "messages": [ { "seqNo": "1", "createdAt": "2024-11-26T14:03:57.473289Z", "data": "AAAAAAAAAGQ=", "uncompressedSize": "8" } ], "codec": 1 } }) - IBidirectionalStream.Write({ "writeRequest": { "messages": [ { "seqNo": "2", "createdAt": "2024-11-26T14:03:57.475008Z", "data": "AAAAAAAAAGQ=", "uncompressedSize": "8" } ], "codec": 1 } }) + IBidirectionalStream.Write({ "writeRequest": { "messages": [ { "seqNo": "1", "createdAt": "2024-12-03T15:43:34.479478Z", "data": "ZAAAAAAAAAA=", "uncompressedSize": "8" } ], "codec": 1 } }) + IBidirectionalStream.Write({ "writeRequest": { "messages": [ { "seqNo": "2", "createdAt": "2024-12-03T15:43:34.481385Z", "data": "ZAAAAAAAAAA=", "uncompressedSize": "8" } ], "codec": 1 } }) + IBidirectionalStream.Write({ "writeRequest": { "messages": [ { "seqNo": "3", "createdAt": "2024-12-03T15:43:34.481425Z", "data": "ZAAAAAAAAAA=", "uncompressedSize": "8" } ], "codec": 1 } }) IBidirectionalStream.Write({ "initRequest": { "path": "/topic", "producerId": "producerId" } }) IBidirectionalStream.MoveNextAsync() IBidirectionalStream.Current - IBidirectionalStream.Write({ "writeRequest": { "messages": [ { "seqNo": "2", "createdAt": "2024-11-26T14:03:57.475008Z", "data": "AAAAAAAAAGQ=", "uncompressedSize": "8" } ], "codec": 1 } }) + IBidirectionalStream.Write({ "writeRequest": { "messages": [ { "seqNo": "3", "createdAt": "2024-12-03T15:43:34.481425Z", "data": "ZAAAAAAAAAA=", "uncompressedSize": "8" } ], "codec": 1 } }) IBidirectionalStream.MoveNextAsync() IBidirectionalStream.Current IBidirectionalStream.MoveNextAsync() */ [Fact] - public async Task WriteAsync_WhenCancelTaskInOnOfTwoMessagesInFlightBuffer_ReturnCancelExceptionAndWriteResult() + public async Task WriteAsync_WhenInFlightBufferSendInInitialize_ReturnCompletedTasks() { - var moveFirstNextSource = new TaskCompletionSource(); - var moveSecondNextSource = new TaskCompletionSource(); - var moveThirdNextSource = new TaskCompletionSource(); - var nextCompleted = new TaskCompletionSource(); + var writeTcs1 = new TaskCompletionSource(); + var writeTcs2 = new TaskCompletionSource(); + var writeTcs3 = new TaskCompletionSource(); + var moveTcs = new TaskCompletionSource(); + _mockStream.SetupSequence(stream => stream.Write(It.IsAny())) .Returns(Task.CompletedTask) - .Returns(Task.CompletedTask) - .ThrowsAsync(new Driver.TransportException(new RpcException(Grpc.Core.Status.DefaultCancelled))) .Returns(() => { - moveFirstNextSource.SetResult(false); + writeTcs1.SetResult(); return Task.CompletedTask; }) .Returns(() => { - nextCompleted.SetResult(); // for seqNo + writeTcs2.SetResult(); return Task.CompletedTask; - }); + }) + .Returns(() => + { + writeTcs3.SetResult(); + return Task.CompletedTask; + }) + .Returns(Task.CompletedTask) + .Returns(Task.CompletedTask); + _mockStream.SetupSequence(stream => stream.MoveNextAsync()) .ReturnsAsync(true) - .Returns(new ValueTask(moveFirstNextSource.Task)) - .Returns(new ValueTask(moveSecondNextSource.Task)) - .Returns(new ValueTask(moveThirdNextSource.Task)) + .Returns(new ValueTask(moveTcs.Task)) + .ReturnsAsync(true) + .ReturnsAsync(true) .Returns(new ValueTask(new TaskCompletionSource().Task)); _mockStream.SetupSequence(stream => stream.Current) .Returns(new StreamWriteMessage.Types.FromServer @@ -621,7 +743,7 @@ public async Task WriteAsync_WhenCancelTaskInOnOfTwoMessagesInFlightBuffer_Retur .Returns(new StreamWriteMessage.Types.FromServer { InitResponse = new StreamWriteMessage.Types.InitResponse - { LastSeqNo = 0, PartitionId = 1, SessionId = "SessionId" }, + { LastSeqNo = 2, PartitionId = 1, SessionId = "SessionId" }, Status = StatusIds.Types.StatusCode.Success }) .Returns(new StreamWriteMessage.Types.FromServer @@ -633,7 +755,7 @@ public async Task WriteAsync_WhenCancelTaskInOnOfTwoMessagesInFlightBuffer_Retur { new StreamWriteMessage.Types.WriteResponse.Types.WriteAck { - SeqNo = 2, + SeqNo = 3, Written = new StreamWriteMessage.Types.WriteResponse.Types.WriteAck.Types.Written { Offset = 0 } } @@ -646,25 +768,54 @@ public async Task WriteAsync_WhenCancelTaskInOnOfTwoMessagesInFlightBuffer_Retur var ctx = new CancellationTokenSource(); var runTaskWithCancel = writer.WriteAsync(100L, ctx.Token); + await writeTcs1.Task; + ctx.Cancel(); // reconnect write invoke cancel on cancellation token + // ReSharper disable once MethodSupportsCancellation - var runTask = writer.WriteAsync(100L); + var runTask1 = writer.WriteAsync(100L); + await writeTcs2.Task; // ReSharper disable once MethodSupportsCancellation - var writerExceptionAfterResetSession = await Assert.ThrowsAsync(() => writer.WriteAsync(100)); - Assert.Equal("Transport error in the WriterSession on write messages", - writerExceptionAfterResetSession.Message); - Assert.Equal(StatusCode.Cancelled, writerExceptionAfterResetSession.Status.StatusCode); + var runTask2 = writer.WriteAsync(100); + await writeTcs3.Task; - ctx.Cancel(); // reconnect write invoke cancel on cancellation token - moveSecondNextSource.SetResult(true); - await nextCompleted.Task; - moveThirdNextSource.SetResult(true); + moveTcs.SetResult(false); // Fail write ack stream => start reconnect - Assert.Equal(PersistenceStatus.Written, (await runTask).Status); - _mockStream.Verify(stream => stream.Write(It.IsAny()), Times.Exactly(5)); + Assert.Equal("The write operation was canceled before it could be completed", + (await Assert.ThrowsAsync(() => runTaskWithCancel)).Message); + Assert.Equal(PersistenceStatus.AlreadyWritten, (await runTask1).Status); + Assert.Equal(PersistenceStatus.Written, (await runTask2).Status); + + _mockStream.Verify(stream => stream.Write(It.IsAny()), Times.Exactly(6)); _mockStream.Verify(stream => stream.MoveNextAsync(), Times.Exactly(5)); _mockStream.Verify(stream => stream.Current, Times.Exactly(3)); + } - await Assert.ThrowsAsync(() => runTaskWithCancel); + private void SetupReadOneWriteAckMessage() + { + _mockStream.SetupSequence(stream => stream.Current) + .Returns(new StreamWriteMessage.Types.FromServer + { + InitResponse = new StreamWriteMessage.Types.InitResponse + { LastSeqNo = 0, PartitionId = 1, SessionId = "SessionId" }, + Status = StatusIds.Types.StatusCode.Success + }) + .Returns(new StreamWriteMessage.Types.FromServer + { + WriteResponse = new StreamWriteMessage.Types.WriteResponse + { + PartitionId = 1, + Acks = + { + new StreamWriteMessage.Types.WriteResponse.Types.WriteAck + { + SeqNo = 1, + Written = new StreamWriteMessage.Types.WriteResponse.Types.WriteAck.Types.Written + { Offset = 0 } + } + } + }, + Status = StatusIds.Types.StatusCode.Success + }); } }