Skip to content

Commit e2f5c62

Browse files
fix: race condition on inflight messages (#249)
* fix: race condition on _inFlightMessages * dev: fix flap test with SessionBusy in ADO.NET * fix: flap tests in Writer
1 parent 2ec52c2 commit e2f5c62

File tree

5 files changed

+75
-39
lines changed

5 files changed

+75
-39
lines changed

src/Ydb.Sdk/src/Services/Topic/TopicSession.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ internal abstract class TopicSession<TFromClient, TFromServer> : IDisposable
1212

1313
private int _isActive = 1;
1414

15+
public bool IsActive => Volatile.Read(ref _isActive) == 1;
16+
1517
protected TopicSession(
1618
IBidirectionalStream<TFromClient, TFromServer> stream,
1719
ILogger logger,

src/Ydb.Sdk/src/Services/Topic/Writer/Writer.cs

Lines changed: 62 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ internal class Writer<TValue> : IWriter<TValue>
2525
private readonly ConcurrentQueue<MessageSending> _toSendBuffer = new();
2626
private readonly ConcurrentQueue<MessageSending> _inFlightMessages = new();
2727
private readonly CancellationTokenSource _disposeCts = new();
28+
private readonly SemaphoreSlim _clearInFlightMessagesSemaphoreSlim = new(1);
2829

2930
private volatile TaskCompletionSource _tcsWakeUp = new();
3031
private volatile TaskCompletionSource _tcsBufferAvailableEvent = new();
@@ -161,7 +162,18 @@ private async void StartWriteWorker()
161162
continue;
162163
}
163164

164-
await _session.Write(_toSendBuffer);
165+
await _clearInFlightMessagesSemaphoreSlim.WaitAsync(_disposeCts.Token);
166+
try
167+
{
168+
if (_session.IsActive)
169+
{
170+
await _session.Write(_toSendBuffer);
171+
}
172+
}
173+
finally
174+
{
175+
_clearInFlightMessagesSemaphoreSlim.Release();
176+
}
165177
}
166178
}
167179
catch (OperationCanceledException)
@@ -255,54 +267,66 @@ private async Task Initialize()
255267
return;
256268
}
257269

258-
var copyInFlightMessages = new ConcurrentQueue<MessageSending>();
259-
var lastSeqNo = initResponse.LastSeqNo;
260-
while (_inFlightMessages.TryDequeue(out var sendData))
270+
await _clearInFlightMessagesSemaphoreSlim.WaitAsync(_disposeCts.Token);
271+
try
261272
{
262-
if (lastSeqNo >= sendData.MessageData.SeqNo)
273+
var copyInFlightMessages = new ConcurrentQueue<MessageSending>();
274+
var lastSeqNo = initResponse.LastSeqNo;
275+
while (_inFlightMessages.TryDequeue(out var sendData))
263276
{
264-
_logger.LogWarning(
265-
"Message[SeqNo={SeqNo}] has been skipped because its sequence number " +
266-
"is less than or equal to the last processed server's SeqNo[{LastSeqNo}]",
267-
sendData.MessageData.SeqNo, lastSeqNo);
277+
if (lastSeqNo >= sendData.MessageData.SeqNo)
278+
{
279+
_logger.LogWarning(
280+
"Message[SeqNo={SeqNo}] has been skipped because its sequence number " +
281+
"is less than or equal to the last processed server's SeqNo[{LastSeqNo}]",
282+
sendData.MessageData.SeqNo, lastSeqNo);
268283

269-
sendData.Tcs.TrySetResult(WriteResult.Skipped);
284+
sendData.Tcs.TrySetResult(WriteResult.Skipped);
270285

271-
continue;
272-
}
286+
continue;
287+
}
273288

274289

275-
// Calculate the next sequence number from the calculated previous messages.
276-
lastSeqNo = Math.Max(lastSeqNo, sendData.MessageData.SeqNo);
290+
// Calculate the next sequence number from the calculated previous messages.
291+
lastSeqNo = Math.Max(lastSeqNo, sendData.MessageData.SeqNo);
277292

278-
copyInFlightMessages.Enqueue(sendData);
279-
}
293+
copyInFlightMessages.Enqueue(sendData);
294+
}
295+
296+
var newSession = new WriterSession(
297+
config: _config,
298+
stream: stream,
299+
lastSeqNo: lastSeqNo,
300+
sessionId: initResponse.SessionId,
301+
initialize: Initialize,
302+
logger: _logger,
303+
inFlightMessages: _inFlightMessages
304+
);
305+
306+
if (!copyInFlightMessages.IsEmpty)
307+
{
308+
await newSession.Write(copyInFlightMessages); // retry prev in flight messages
309+
}
280310

281-
var newSession = new WriterSession(
282-
config: _config,
283-
stream: stream,
284-
lastSeqNo: lastSeqNo,
285-
sessionId: initResponse.SessionId,
286-
initialize: Initialize,
287-
logger: _logger,
288-
inFlightMessages: _inFlightMessages
289-
);
290-
291-
if (!copyInFlightMessages.IsEmpty)
311+
_session = newSession;
312+
newSession.RunProcessingWriteAck();
313+
WakeUpWorker(); // attempt send buffer
314+
}
315+
finally
292316
{
293-
await newSession.Write(copyInFlightMessages); // retry prev in flight messages
317+
_clearInFlightMessagesSemaphoreSlim.Release();
294318
}
295-
296-
_session = newSession;
297-
newSession.RunProcessingWriteAck();
298-
WakeUpWorker(); // attempt send buffer
299319
}
300320
catch (Driver.TransportException e)
301321
{
302322
_logger.LogError(e, "Transport error on creating WriterSession");
303323

304324
_ = Task.Run(Initialize, _disposeCts.Token);
305325
}
326+
catch (OperationCanceledException)
327+
{
328+
_logger.LogWarning("Initialize writer is canceled because it has been disposed");
329+
}
306330
}
307331

308332
public void Dispose()
@@ -318,6 +342,8 @@ internal record MessageSending(MessageData MessageData, TaskCompletionSource<Wri
318342
internal interface IWriteSession
319343
{
320344
Task Write(ConcurrentQueue<MessageSending> toSendBuffer);
345+
346+
bool IsActive { get; }
321347
}
322348

323349
internal class NotStartedWriterSession : IWriteSession
@@ -343,6 +369,8 @@ public Task Write(ConcurrentQueue<MessageSending> toSendBuffer)
343369

344370
return Task.CompletedTask;
345371
}
372+
373+
public bool IsActive => true;
346374
}
347375

348376
internal class DummyWriterSession : IWriteSession
@@ -357,6 +385,8 @@ public Task Write(ConcurrentQueue<MessageSending> toSendBuffer)
357385
{
358386
return Task.CompletedTask;
359387
}
388+
389+
public bool IsActive => false;
360390
}
361391

362392
internal class WriterSession : TopicSession<MessageFromClient, MessageFromServer>, IWriteSession

src/Ydb.Sdk/tests/Ado/YdbCommandTests.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,7 @@ public async Task CloseAsync_WhenDoubleInvoke_Idempotent()
156156
var ydbDataReader = await ydbCommand.ExecuteReaderAsync();
157157

158158
Assert.True(await ydbDataReader.NextResultAsync());
159+
Assert.False(await ydbDataReader.NextResultAsync());
159160
await ydbDataReader.CloseAsync();
160161
await ydbDataReader.CloseAsync();
161162
Assert.False(await ydbDataReader.NextResultAsync());

src/Ydb.Sdk/tests/Ado/YdbSchemaTests.cs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -86,8 +86,9 @@ public async Task GetSchema_WhenTablesWithStatsCollection_ReturnAllTables()
8686
{
8787
tableNames.Remove(row["table_name"].ToString()!);
8888

89+
Assert.NotNull(row["rows_estimate"]);
8990
Assert.NotNull(row["creation_time"]);
90-
Assert.Equal(DBNull.Value, row["modification_time"]);
91+
Assert.NotNull(row["modification_time"]);
9192
}
9293

9394
Assert.Empty(tableNames);
@@ -96,15 +97,17 @@ public async Task GetSchema_WhenTablesWithStatsCollection_ReturnAllTables()
9697
Assert.Equal(1, singleTable1.Rows.Count);
9798
Assert.Equal(table1, singleTable1.Rows[0]["table_name"].ToString());
9899
Assert.Equal("TABLE", singleTable1.Rows[0]["table_type"].ToString());
100+
Assert.NotNull(singleTable1.Rows[0]["rows_estimate"]);
99101
Assert.NotNull(singleTable1.Rows[0]["creation_time"]);
100-
Assert.Equal(DBNull.Value, singleTable1.Rows[0]["modification_time"]);
102+
Assert.NotNull(singleTable1.Rows[0]["modification_time"]);
101103

102104
var singleTable2 = await ydbConnection.GetSchemaAsync("TablesWithStats", new[] { table2, null });
103105
Assert.Equal(1, singleTable2.Rows.Count);
104106
Assert.Equal(table2, singleTable2.Rows[0]["table_name"].ToString());
105107
Assert.Equal("TABLE", singleTable2.Rows[0]["table_type"].ToString());
108+
Assert.NotNull(singleTable2.Rows[0]["rows_estimate"]);
106109
Assert.NotNull(singleTable2.Rows[0]["creation_time"]);
107-
Assert.Equal(DBNull.Value, singleTable2.Rows[0]["modification_time"]);
110+
Assert.NotNull(singleTable2.Rows[0]["modification_time"]);
108111

109112
// not found case
110113
var notFound = await ydbConnection.GetSchemaAsync("Tables", new[] { "not_found", null });

src/Ydb.Sdk/tests/Topic/WriterUnitTests.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -334,7 +334,7 @@ public async Task Initialize_WhenNotSupportedCodec_ThrowWriterExceptionOnWriteAs
334334
IBidirectionalStream<StreamWriteMessage.Types.FromClient, StreamWriteMessage.Types.FromServer>.Write({ "writeRequest": { "messages": [ { "seqNo": "1", "createdAt": "2024-12-03T12:44:23.276086Z", "data": "ZAAAAAAAAAA=", "uncompressedSize": "8" } ], "codec": 1 } })
335335
IBidirectionalStream<StreamWriteMessage.Types.FromClient, StreamWriteMessage.Types.FromServer>.MoveNextAsync()
336336
IBidirectionalStream<StreamWriteMessage.Types.FromClient, StreamWriteMessage.Types.FromServer>.Current
337-
IBidirectionalStream<StreamWriteMessage.Types.FromClient, StreamWriteMessage.Types.FromServer>.MoveNextAsync()
337+
IBidirectionalStream<StreamWriteMessage.Types.FromClient, StreamWriteMessage.Types.FromServer>.MoveNextAsync() [Maybe]
338338
*/
339339
[Fact]
340340
public async Task WriteAsync_WhenTransportExceptionOnWriteInWriterSession_ShouldReconnectAndReturnWriteResult()
@@ -391,7 +391,7 @@ public async Task WriteAsync_WhenTransportExceptionOnWriteInWriterSession_Should
391391

392392
Assert.Equal(PersistenceStatus.Written, (await writer.WriteAsync(100L)).Status);
393393
_mockStream.Verify(stream => stream.Write(It.IsAny<FromClient>()), Times.Exactly(4));
394-
_mockStream.Verify(stream => stream.MoveNextAsync(), Times.Exactly(5));
394+
_mockStream.Verify(stream => stream.MoveNextAsync(), Times.AtLeast(4));
395395
_mockStream.Verify(stream => stream.Current, Times.Exactly(3));
396396
}
397397

@@ -410,7 +410,7 @@ public async Task WriteAsync_WhenTransportExceptionOnWriteInWriterSession_Should
410410
IBidirectionalStream<StreamWriteMessage.Types.FromClient, StreamWriteMessage.Types.FromServer>.MoveNextAsync() <- return true after write message
411411
IBidirectionalStream<StreamWriteMessage.Types.FromClient, StreamWriteMessage.Types.FromServer>.Write({ "writeRequest": { "messages": [ { "seqNo": "1", "createdAt": "2024-12-03T14:06:06.408114Z", "data": "ZAAAAAAAAAA=", "uncompressedSize": "8" } ], "codec": 1 } })
412412
IBidirectionalStream<StreamWriteMessage.Types.FromClient, StreamWriteMessage.Types.FromServer>.Current
413-
IBidirectionalStream<StreamWriteMessage.Types.FromClient, StreamWriteMessage.Types.FromServer>.MoveNextAsync()
413+
IBidirectionalStream<StreamWriteMessage.Types.FromClient, StreamWriteMessage.Types.FromServer>.MoveNextAsync() [Maybe]
414414
*/
415415
[Fact]
416416
public async Task WriteAsync_WhenTransportExceptionOnProcessingWriteAck_ShouldReconnectThenReturnWriteResult()
@@ -466,7 +466,7 @@ public async Task WriteAsync_WhenTransportExceptionOnProcessingWriteAck_ShouldRe
466466
Assert.Equal(PersistenceStatus.Written, (await writer.WriteAsync(100L)).Status);
467467

468468
_mockStream.Verify(stream => stream.Write(It.IsAny<FromClient>()), Times.Exactly(3));
469-
_mockStream.Verify(stream => stream.MoveNextAsync(), Times.Exactly(5));
469+
_mockStream.Verify(stream => stream.MoveNextAsync(), Times.AtLeast(4));
470470
_mockStream.Verify(stream => stream.Current, Times.Exactly(3));
471471
}
472472

0 commit comments

Comments
 (0)