Skip to content

Commit 1cde514

Browse files
skip canceled messages in Write Worker
1 parent 4b3ccbc commit 1cde514

File tree

1 file changed

+7
-7
lines changed
  • src/Ydb.Sdk/src/Services/Topic/Writer

1 file changed

+7
-7
lines changed

src/Ydb.Sdk/src/Services/Topic/Writer/Writer.cs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -234,13 +234,6 @@ private async Task Initialize()
234234
var lastSeqNo = initResponse.LastSeqNo;
235235
while (_inFlightMessages.TryDequeue(out var sendData))
236236
{
237-
if (sendData.Tcs.Task.IsFaulted)
238-
{
239-
_logger.LogWarning("Message[SeqNo={SeqNo}] is cancelled", sendData.MessageData.SeqNo);
240-
241-
continue;
242-
}
243-
244237
if (lastSeqNo >= sendData.MessageData.SeqNo)
245238
{
246239
_logger.LogWarning(
@@ -397,6 +390,13 @@ public async Task Write(ConcurrentQueue<MessageSending> toSendBuffer)
397390

398391
while (toSendBuffer.TryDequeue(out var sendData))
399392
{
393+
if (sendData.Tcs.Task.IsFaulted)
394+
{
395+
Logger.LogWarning("Message[SeqNo={SeqNo}] is cancelled", sendData.MessageData.SeqNo);
396+
397+
continue;
398+
}
399+
400400
var messageData = sendData.MessageData;
401401

402402
if (messageData.SeqNo == default)

0 commit comments

Comments
 (0)