diff --git a/src/Ydb.Sdk/src/Services/Topic/TopicSession.cs b/src/Ydb.Sdk/src/Services/Topic/TopicSession.cs index 5b6ff824..59428c2d 100644 --- a/src/Ydb.Sdk/src/Services/Topic/TopicSession.cs +++ b/src/Ydb.Sdk/src/Services/Topic/TopicSession.cs @@ -12,6 +12,8 @@ internal abstract class TopicSession : IDisposable private int _isActive = 1; + public bool IsActive => Volatile.Read(ref _isActive) == 1; + protected TopicSession( IBidirectionalStream stream, ILogger logger, diff --git a/src/Ydb.Sdk/src/Services/Topic/Writer/Writer.cs b/src/Ydb.Sdk/src/Services/Topic/Writer/Writer.cs index 278c4501..baddb0ba 100644 --- a/src/Ydb.Sdk/src/Services/Topic/Writer/Writer.cs +++ b/src/Ydb.Sdk/src/Services/Topic/Writer/Writer.cs @@ -25,6 +25,7 @@ internal class Writer : IWriter private readonly ConcurrentQueue _toSendBuffer = new(); private readonly ConcurrentQueue _inFlightMessages = new(); private readonly CancellationTokenSource _disposeCts = new(); + private readonly SemaphoreSlim _clearInFlightMessagesSemaphoreSlim = new(1); private volatile TaskCompletionSource _tcsWakeUp = new(); private volatile TaskCompletionSource _tcsBufferAvailableEvent = new(); @@ -161,7 +162,18 @@ private async void StartWriteWorker() continue; } - await _session.Write(_toSendBuffer); + await _clearInFlightMessagesSemaphoreSlim.WaitAsync(_disposeCts.Token); + try + { + if (_session.IsActive) + { + await _session.Write(_toSendBuffer); + } + } + finally + { + _clearInFlightMessagesSemaphoreSlim.Release(); + } } } catch (OperationCanceledException) @@ -255,47 +267,55 @@ private async Task Initialize() return; } - var copyInFlightMessages = new ConcurrentQueue(); - var lastSeqNo = initResponse.LastSeqNo; - while (_inFlightMessages.TryDequeue(out var sendData)) + await _clearInFlightMessagesSemaphoreSlim.WaitAsync(_disposeCts.Token); + try { - if (lastSeqNo >= sendData.MessageData.SeqNo) + var copyInFlightMessages = new ConcurrentQueue(); + var lastSeqNo = initResponse.LastSeqNo; + while (_inFlightMessages.TryDequeue(out var sendData)) { - _logger.LogWarning( - "Message[SeqNo={SeqNo}] has been skipped because its sequence number " + - "is less than or equal to the last processed server's SeqNo[{LastSeqNo}]", - sendData.MessageData.SeqNo, lastSeqNo); + if (lastSeqNo >= sendData.MessageData.SeqNo) + { + _logger.LogWarning( + "Message[SeqNo={SeqNo}] has been skipped because its sequence number " + + "is less than or equal to the last processed server's SeqNo[{LastSeqNo}]", + sendData.MessageData.SeqNo, lastSeqNo); - sendData.Tcs.TrySetResult(WriteResult.Skipped); + sendData.Tcs.TrySetResult(WriteResult.Skipped); - continue; - } + continue; + } - // Calculate the next sequence number from the calculated previous messages. - lastSeqNo = Math.Max(lastSeqNo, sendData.MessageData.SeqNo); + // Calculate the next sequence number from the calculated previous messages. + lastSeqNo = Math.Max(lastSeqNo, sendData.MessageData.SeqNo); - copyInFlightMessages.Enqueue(sendData); - } + copyInFlightMessages.Enqueue(sendData); + } + + var newSession = new WriterSession( + config: _config, + stream: stream, + lastSeqNo: lastSeqNo, + sessionId: initResponse.SessionId, + initialize: Initialize, + logger: _logger, + inFlightMessages: _inFlightMessages + ); + + if (!copyInFlightMessages.IsEmpty) + { + await newSession.Write(copyInFlightMessages); // retry prev in flight messages + } - var newSession = new WriterSession( - config: _config, - stream: stream, - lastSeqNo: lastSeqNo, - sessionId: initResponse.SessionId, - initialize: Initialize, - logger: _logger, - inFlightMessages: _inFlightMessages - ); - - if (!copyInFlightMessages.IsEmpty) + _session = newSession; + newSession.RunProcessingWriteAck(); + WakeUpWorker(); // attempt send buffer + } + finally { - await newSession.Write(copyInFlightMessages); // retry prev in flight messages + _clearInFlightMessagesSemaphoreSlim.Release(); } - - _session = newSession; - newSession.RunProcessingWriteAck(); - WakeUpWorker(); // attempt send buffer } catch (Driver.TransportException e) { @@ -303,6 +323,10 @@ private async Task Initialize() _ = Task.Run(Initialize, _disposeCts.Token); } + catch (OperationCanceledException) + { + _logger.LogWarning("Initialize writer is canceled because it has been disposed"); + } } public void Dispose() @@ -318,6 +342,8 @@ internal record MessageSending(MessageData MessageData, TaskCompletionSource toSendBuffer); + + bool IsActive { get; } } internal class NotStartedWriterSession : IWriteSession @@ -343,6 +369,8 @@ public Task Write(ConcurrentQueue toSendBuffer) return Task.CompletedTask; } + + public bool IsActive => true; } internal class DummyWriterSession : IWriteSession @@ -357,6 +385,8 @@ public Task Write(ConcurrentQueue toSendBuffer) { return Task.CompletedTask; } + + public bool IsActive => false; } internal class WriterSession : TopicSession, IWriteSession diff --git a/src/Ydb.Sdk/tests/Ado/YdbCommandTests.cs b/src/Ydb.Sdk/tests/Ado/YdbCommandTests.cs index d99b86b0..325dd714 100644 --- a/src/Ydb.Sdk/tests/Ado/YdbCommandTests.cs +++ b/src/Ydb.Sdk/tests/Ado/YdbCommandTests.cs @@ -156,6 +156,7 @@ public async Task CloseAsync_WhenDoubleInvoke_Idempotent() var ydbDataReader = await ydbCommand.ExecuteReaderAsync(); Assert.True(await ydbDataReader.NextResultAsync()); + Assert.False(await ydbDataReader.NextResultAsync()); await ydbDataReader.CloseAsync(); await ydbDataReader.CloseAsync(); Assert.False(await ydbDataReader.NextResultAsync()); diff --git a/src/Ydb.Sdk/tests/Ado/YdbSchemaTests.cs b/src/Ydb.Sdk/tests/Ado/YdbSchemaTests.cs index 32296ef4..b14316dd 100644 --- a/src/Ydb.Sdk/tests/Ado/YdbSchemaTests.cs +++ b/src/Ydb.Sdk/tests/Ado/YdbSchemaTests.cs @@ -86,8 +86,9 @@ public async Task GetSchema_WhenTablesWithStatsCollection_ReturnAllTables() { tableNames.Remove(row["table_name"].ToString()!); + Assert.NotNull(row["rows_estimate"]); Assert.NotNull(row["creation_time"]); - Assert.Equal(DBNull.Value, row["modification_time"]); + Assert.NotNull(row["modification_time"]); } Assert.Empty(tableNames); @@ -96,15 +97,17 @@ public async Task GetSchema_WhenTablesWithStatsCollection_ReturnAllTables() Assert.Equal(1, singleTable1.Rows.Count); Assert.Equal(table1, singleTable1.Rows[0]["table_name"].ToString()); Assert.Equal("TABLE", singleTable1.Rows[0]["table_type"].ToString()); + Assert.NotNull(singleTable1.Rows[0]["rows_estimate"]); Assert.NotNull(singleTable1.Rows[0]["creation_time"]); - Assert.Equal(DBNull.Value, singleTable1.Rows[0]["modification_time"]); + Assert.NotNull(singleTable1.Rows[0]["modification_time"]); var singleTable2 = await ydbConnection.GetSchemaAsync("TablesWithStats", new[] { table2, null }); Assert.Equal(1, singleTable2.Rows.Count); Assert.Equal(table2, singleTable2.Rows[0]["table_name"].ToString()); Assert.Equal("TABLE", singleTable2.Rows[0]["table_type"].ToString()); + Assert.NotNull(singleTable2.Rows[0]["rows_estimate"]); Assert.NotNull(singleTable2.Rows[0]["creation_time"]); - Assert.Equal(DBNull.Value, singleTable2.Rows[0]["modification_time"]); + Assert.NotNull(singleTable2.Rows[0]["modification_time"]); // not found case var notFound = await ydbConnection.GetSchemaAsync("Tables", new[] { "not_found", null }); diff --git a/src/Ydb.Sdk/tests/Topic/WriterUnitTests.cs b/src/Ydb.Sdk/tests/Topic/WriterUnitTests.cs index 3c60e880..1b402702 100644 --- a/src/Ydb.Sdk/tests/Topic/WriterUnitTests.cs +++ b/src/Ydb.Sdk/tests/Topic/WriterUnitTests.cs @@ -334,7 +334,7 @@ public async Task Initialize_WhenNotSupportedCodec_ThrowWriterExceptionOnWriteAs IBidirectionalStream.Write({ "writeRequest": { "messages": [ { "seqNo": "1", "createdAt": "2024-12-03T12:44:23.276086Z", "data": "ZAAAAAAAAAA=", "uncompressedSize": "8" } ], "codec": 1 } }) IBidirectionalStream.MoveNextAsync() IBidirectionalStream.Current - IBidirectionalStream.MoveNextAsync() + IBidirectionalStream.MoveNextAsync() [Maybe] */ [Fact] public async Task WriteAsync_WhenTransportExceptionOnWriteInWriterSession_ShouldReconnectAndReturnWriteResult() @@ -391,7 +391,7 @@ public async Task WriteAsync_WhenTransportExceptionOnWriteInWriterSession_Should Assert.Equal(PersistenceStatus.Written, (await writer.WriteAsync(100L)).Status); _mockStream.Verify(stream => stream.Write(It.IsAny()), Times.Exactly(4)); - _mockStream.Verify(stream => stream.MoveNextAsync(), Times.Exactly(5)); + _mockStream.Verify(stream => stream.MoveNextAsync(), Times.AtLeast(4)); _mockStream.Verify(stream => stream.Current, Times.Exactly(3)); } @@ -410,7 +410,7 @@ public async Task WriteAsync_WhenTransportExceptionOnWriteInWriterSession_Should IBidirectionalStream.MoveNextAsync() <- return true after write message IBidirectionalStream.Write({ "writeRequest": { "messages": [ { "seqNo": "1", "createdAt": "2024-12-03T14:06:06.408114Z", "data": "ZAAAAAAAAAA=", "uncompressedSize": "8" } ], "codec": 1 } }) IBidirectionalStream.Current - IBidirectionalStream.MoveNextAsync() + IBidirectionalStream.MoveNextAsync() [Maybe] */ [Fact] public async Task WriteAsync_WhenTransportExceptionOnProcessingWriteAck_ShouldReconnectThenReturnWriteResult() @@ -466,7 +466,7 @@ public async Task WriteAsync_WhenTransportExceptionOnProcessingWriteAck_ShouldRe Assert.Equal(PersistenceStatus.Written, (await writer.WriteAsync(100L)).Status); _mockStream.Verify(stream => stream.Write(It.IsAny()), Times.Exactly(3)); - _mockStream.Verify(stream => stream.MoveNextAsync(), Times.Exactly(5)); + _mockStream.Verify(stream => stream.MoveNextAsync(), Times.AtLeast(4)); _mockStream.Verify(stream => stream.Current, Times.Exactly(3)); }