Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/Ydb.Sdk/src/Services/Topic/TopicSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ internal abstract class TopicSession<TFromClient, TFromServer> : IDisposable

private int _isActive = 1;

public bool IsActive => Volatile.Read(ref _isActive) == 1;

protected TopicSession(
IBidirectionalStream<TFromClient, TFromServer> stream,
ILogger logger,
Expand Down
94 changes: 62 additions & 32 deletions src/Ydb.Sdk/src/Services/Topic/Writer/Writer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ internal class Writer<TValue> : IWriter<TValue>
private readonly ConcurrentQueue<MessageSending> _toSendBuffer = new();
private readonly ConcurrentQueue<MessageSending> _inFlightMessages = new();
private readonly CancellationTokenSource _disposeCts = new();
private readonly SemaphoreSlim _clearInFlightMessagesSemaphoreSlim = new(1);

private volatile TaskCompletionSource _tcsWakeUp = new();
private volatile TaskCompletionSource _tcsBufferAvailableEvent = new();
Expand Down Expand Up @@ -161,7 +162,18 @@ private async void StartWriteWorker()
continue;
}

await _session.Write(_toSendBuffer);
await _clearInFlightMessagesSemaphoreSlim.WaitAsync(_disposeCts.Token);
try
{
if (_session.IsActive)
{
await _session.Write(_toSendBuffer);
}
}
finally
{
_clearInFlightMessagesSemaphoreSlim.Release();
}
}
}
catch (OperationCanceledException)
Expand Down Expand Up @@ -255,54 +267,66 @@ private async Task Initialize()
return;
}

var copyInFlightMessages = new ConcurrentQueue<MessageSending>();
var lastSeqNo = initResponse.LastSeqNo;
while (_inFlightMessages.TryDequeue(out var sendData))
await _clearInFlightMessagesSemaphoreSlim.WaitAsync(_disposeCts.Token);
try
{
if (lastSeqNo >= sendData.MessageData.SeqNo)
var copyInFlightMessages = new ConcurrentQueue<MessageSending>();
var lastSeqNo = initResponse.LastSeqNo;
while (_inFlightMessages.TryDequeue(out var sendData))
{
_logger.LogWarning(
"Message[SeqNo={SeqNo}] has been skipped because its sequence number " +
"is less than or equal to the last processed server's SeqNo[{LastSeqNo}]",
sendData.MessageData.SeqNo, lastSeqNo);
if (lastSeqNo >= sendData.MessageData.SeqNo)
{
_logger.LogWarning(
"Message[SeqNo={SeqNo}] has been skipped because its sequence number " +
"is less than or equal to the last processed server's SeqNo[{LastSeqNo}]",
sendData.MessageData.SeqNo, lastSeqNo);

sendData.Tcs.TrySetResult(WriteResult.Skipped);
sendData.Tcs.TrySetResult(WriteResult.Skipped);

continue;
}
continue;
}


// Calculate the next sequence number from the calculated previous messages.
lastSeqNo = Math.Max(lastSeqNo, sendData.MessageData.SeqNo);
// Calculate the next sequence number from the calculated previous messages.
lastSeqNo = Math.Max(lastSeqNo, sendData.MessageData.SeqNo);

copyInFlightMessages.Enqueue(sendData);
}
copyInFlightMessages.Enqueue(sendData);
}

var newSession = new WriterSession(
config: _config,
stream: stream,
lastSeqNo: lastSeqNo,
sessionId: initResponse.SessionId,
initialize: Initialize,
logger: _logger,
inFlightMessages: _inFlightMessages
);

if (!copyInFlightMessages.IsEmpty)
{
await newSession.Write(copyInFlightMessages); // retry prev in flight messages
}

var newSession = new WriterSession(
config: _config,
stream: stream,
lastSeqNo: lastSeqNo,
sessionId: initResponse.SessionId,
initialize: Initialize,
logger: _logger,
inFlightMessages: _inFlightMessages
);

if (!copyInFlightMessages.IsEmpty)
_session = newSession;
newSession.RunProcessingWriteAck();
WakeUpWorker(); // attempt send buffer
}
finally
{
await newSession.Write(copyInFlightMessages); // retry prev in flight messages
_clearInFlightMessagesSemaphoreSlim.Release();
}

_session = newSession;
newSession.RunProcessingWriteAck();
WakeUpWorker(); // attempt send buffer
}
catch (Driver.TransportException e)
{
_logger.LogError(e, "Transport error on creating WriterSession");

_ = Task.Run(Initialize, _disposeCts.Token);
}
catch (OperationCanceledException)
{
_logger.LogWarning("Initialize writer is canceled because it has been disposed");
}
}

public void Dispose()
Expand All @@ -318,6 +342,8 @@ internal record MessageSending(MessageData MessageData, TaskCompletionSource<Wri
internal interface IWriteSession
{
Task Write(ConcurrentQueue<MessageSending> toSendBuffer);

bool IsActive { get; }
}

internal class NotStartedWriterSession : IWriteSession
Expand All @@ -343,6 +369,8 @@ public Task Write(ConcurrentQueue<MessageSending> toSendBuffer)

return Task.CompletedTask;
}

public bool IsActive => true;
}

internal class DummyWriterSession : IWriteSession
Expand All @@ -357,6 +385,8 @@ public Task Write(ConcurrentQueue<MessageSending> toSendBuffer)
{
return Task.CompletedTask;
}

public bool IsActive => false;
}

internal class WriterSession : TopicSession<MessageFromClient, MessageFromServer>, IWriteSession
Expand Down
1 change: 1 addition & 0 deletions src/Ydb.Sdk/tests/Ado/YdbCommandTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ public async Task CloseAsync_WhenDoubleInvoke_Idempotent()
var ydbDataReader = await ydbCommand.ExecuteReaderAsync();

Assert.True(await ydbDataReader.NextResultAsync());
Assert.False(await ydbDataReader.NextResultAsync());
await ydbDataReader.CloseAsync();
await ydbDataReader.CloseAsync();
Assert.False(await ydbDataReader.NextResultAsync());
Expand Down
9 changes: 6 additions & 3 deletions src/Ydb.Sdk/tests/Ado/YdbSchemaTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,9 @@ public async Task GetSchema_WhenTablesWithStatsCollection_ReturnAllTables()
{
tableNames.Remove(row["table_name"].ToString()!);

Assert.NotNull(row["rows_estimate"]);
Assert.NotNull(row["creation_time"]);
Assert.Equal(DBNull.Value, row["modification_time"]);
Assert.NotNull(row["modification_time"]);
}

Assert.Empty(tableNames);
Expand All @@ -96,15 +97,17 @@ public async Task GetSchema_WhenTablesWithStatsCollection_ReturnAllTables()
Assert.Equal(1, singleTable1.Rows.Count);
Assert.Equal(table1, singleTable1.Rows[0]["table_name"].ToString());
Assert.Equal("TABLE", singleTable1.Rows[0]["table_type"].ToString());
Assert.NotNull(singleTable1.Rows[0]["rows_estimate"]);
Assert.NotNull(singleTable1.Rows[0]["creation_time"]);
Assert.Equal(DBNull.Value, singleTable1.Rows[0]["modification_time"]);
Assert.NotNull(singleTable1.Rows[0]["modification_time"]);

var singleTable2 = await ydbConnection.GetSchemaAsync("TablesWithStats", new[] { table2, null });
Assert.Equal(1, singleTable2.Rows.Count);
Assert.Equal(table2, singleTable2.Rows[0]["table_name"].ToString());
Assert.Equal("TABLE", singleTable2.Rows[0]["table_type"].ToString());
Assert.NotNull(singleTable2.Rows[0]["rows_estimate"]);
Assert.NotNull(singleTable2.Rows[0]["creation_time"]);
Assert.Equal(DBNull.Value, singleTable2.Rows[0]["modification_time"]);
Assert.NotNull(singleTable2.Rows[0]["modification_time"]);

// not found case
var notFound = await ydbConnection.GetSchemaAsync("Tables", new[] { "not_found", null });
Expand Down
8 changes: 4 additions & 4 deletions src/Ydb.Sdk/tests/Topic/WriterUnitTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ public async Task Initialize_WhenNotSupportedCodec_ThrowWriterExceptionOnWriteAs
IBidirectionalStream<StreamWriteMessage.Types.FromClient, StreamWriteMessage.Types.FromServer>.Write({ "writeRequest": { "messages": [ { "seqNo": "1", "createdAt": "2024-12-03T12:44:23.276086Z", "data": "ZAAAAAAAAAA=", "uncompressedSize": "8" } ], "codec": 1 } })
IBidirectionalStream<StreamWriteMessage.Types.FromClient, StreamWriteMessage.Types.FromServer>.MoveNextAsync()
IBidirectionalStream<StreamWriteMessage.Types.FromClient, StreamWriteMessage.Types.FromServer>.Current
IBidirectionalStream<StreamWriteMessage.Types.FromClient, StreamWriteMessage.Types.FromServer>.MoveNextAsync()
IBidirectionalStream<StreamWriteMessage.Types.FromClient, StreamWriteMessage.Types.FromServer>.MoveNextAsync() [Maybe]
*/
[Fact]
public async Task WriteAsync_WhenTransportExceptionOnWriteInWriterSession_ShouldReconnectAndReturnWriteResult()
Expand Down Expand Up @@ -391,7 +391,7 @@ public async Task WriteAsync_WhenTransportExceptionOnWriteInWriterSession_Should

Assert.Equal(PersistenceStatus.Written, (await writer.WriteAsync(100L)).Status);
_mockStream.Verify(stream => stream.Write(It.IsAny<FromClient>()), Times.Exactly(4));
_mockStream.Verify(stream => stream.MoveNextAsync(), Times.Exactly(5));
_mockStream.Verify(stream => stream.MoveNextAsync(), Times.AtLeast(4));
_mockStream.Verify(stream => stream.Current, Times.Exactly(3));
}

Expand All @@ -410,7 +410,7 @@ public async Task WriteAsync_WhenTransportExceptionOnWriteInWriterSession_Should
IBidirectionalStream<StreamWriteMessage.Types.FromClient, StreamWriteMessage.Types.FromServer>.MoveNextAsync() <- return true after write message
IBidirectionalStream<StreamWriteMessage.Types.FromClient, StreamWriteMessage.Types.FromServer>.Write({ "writeRequest": { "messages": [ { "seqNo": "1", "createdAt": "2024-12-03T14:06:06.408114Z", "data": "ZAAAAAAAAAA=", "uncompressedSize": "8" } ], "codec": 1 } })
IBidirectionalStream<StreamWriteMessage.Types.FromClient, StreamWriteMessage.Types.FromServer>.Current
IBidirectionalStream<StreamWriteMessage.Types.FromClient, StreamWriteMessage.Types.FromServer>.MoveNextAsync()
IBidirectionalStream<StreamWriteMessage.Types.FromClient, StreamWriteMessage.Types.FromServer>.MoveNextAsync() [Maybe]
*/
[Fact]
public async Task WriteAsync_WhenTransportExceptionOnProcessingWriteAck_ShouldReconnectThenReturnWriteResult()
Expand Down Expand Up @@ -466,7 +466,7 @@ public async Task WriteAsync_WhenTransportExceptionOnProcessingWriteAck_ShouldRe
Assert.Equal(PersistenceStatus.Written, (await writer.WriteAsync(100L)).Status);

_mockStream.Verify(stream => stream.Write(It.IsAny<FromClient>()), Times.Exactly(3));
_mockStream.Verify(stream => stream.MoveNextAsync(), Times.Exactly(5));
_mockStream.Verify(stream => stream.MoveNextAsync(), Times.AtLeast(4));
_mockStream.Verify(stream => stream.Current, Times.Exactly(3));
}

Expand Down
Loading