Skip to content

Commit eb71c99

Browse files
fix setting SeqNo in message (inFlightBuffer already has seqNo) and added check on IsCanceled TCS
1 parent 4aa0f35 commit eb71c99

File tree

3 files changed

+112
-2
lines changed

3 files changed

+112
-2
lines changed

src/Ydb.Sdk/src/Services/Topic/Writer/Message.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ public Message(TValue data)
1313

1414
public List<Metadata> Metadata { get; } = new();
1515

16-
// long SeqNo = 0 spec
16+
internal long SeqNo { get; set; } = 0;
1717
}
1818

1919
public record Metadata(string Key, byte[] Value);

src/Ydb.Sdk/src/Services/Topic/Writer/Writer.cs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -217,6 +217,13 @@ private async Task Initialize()
217217
var copyInFlightMessages = new ConcurrentQueue<MessageSending>();
218218
while (_inFlightMessages.TryDequeue(out var sendData))
219219
{
220+
if (sendData.Tcs.Task.IsCanceled)
221+
{
222+
_logger.LogWarning("Message[SeqNo={SeqNo}] is cancelled", sendData.MessageData.SeqNo);
223+
224+
continue;
225+
}
226+
220227
copyInFlightMessages.Enqueue(sendData);
221228
}
222229

@@ -336,7 +343,7 @@ public async Task Write(ConcurrentQueue<MessageSending> toSendBuffer)
336343
{
337344
var messageData = sendData.MessageData;
338345

339-
messageData.SeqNo = ++currentSeqNum;
346+
messageData.SeqNo = messageData.SeqNo > 0 ? messageData.SeqNo : ++currentSeqNum;
340347
writeMessage.Messages.Add(messageData);
341348
_inFlightMessages.Enqueue(sendData);
342349
}

src/Ydb.Sdk/tests/Topic/WriterMockTests.cs

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -564,4 +564,107 @@ public async Task WriteAsync_WhenTaskIsAcceptedBeforeCancel_ThrowCancellationExc
564564
Assert.Equal(PersistenceStatus.Written, (await task).Status);
565565
cancellationTokenSource.Cancel();
566566
}
567+
568+
/*
569+
* Performed invocations:
570+
571+
Mock<IBidirectionalStream<StreamWriteMessage.Types.FromClient, StreamWriteMessage.Types.FromServer>:1> (stream):
572+
573+
IBidirectionalStream<StreamWriteMessage.Types.FromClient, StreamWriteMessage.Types.FromServer>.Write({ "initRequest": { "path": "/topic", "producerId": "producerId" } })
574+
IBidirectionalStream<StreamWriteMessage.Types.FromClient, StreamWriteMessage.Types.FromServer>.MoveNextAsync()
575+
IBidirectionalStream<StreamWriteMessage.Types.FromClient, StreamWriteMessage.Types.FromServer>.Current
576+
IBidirectionalStream<StreamWriteMessage.Types.FromClient, StreamWriteMessage.Types.FromServer>.MoveNextAsync()
577+
IBidirectionalStream<StreamWriteMessage.Types.FromClient, StreamWriteMessage.Types.FromServer>.Write({ "writeRequest": { "messages": [ { "seqNo": "1", "createdAt": "2024-11-26T14:03:57.473289Z", "data": "AAAAAAAAAGQ=", "uncompressedSize": "8" } ], "codec": 1 } })
578+
IBidirectionalStream<StreamWriteMessage.Types.FromClient, StreamWriteMessage.Types.FromServer>.Write({ "writeRequest": { "messages": [ { "seqNo": "2", "createdAt": "2024-11-26T14:03:57.475008Z", "data": "AAAAAAAAAGQ=", "uncompressedSize": "8" } ], "codec": 1 } })
579+
IBidirectionalStream<StreamWriteMessage.Types.FromClient, StreamWriteMessage.Types.FromServer>.Write({ "initRequest": { "path": "/topic", "producerId": "producerId" } })
580+
IBidirectionalStream<StreamWriteMessage.Types.FromClient, StreamWriteMessage.Types.FromServer>.MoveNextAsync()
581+
IBidirectionalStream<StreamWriteMessage.Types.FromClient, StreamWriteMessage.Types.FromServer>.Current
582+
IBidirectionalStream<StreamWriteMessage.Types.FromClient, StreamWriteMessage.Types.FromServer>.Write({ "writeRequest": { "messages": [ { "seqNo": "2", "createdAt": "2024-11-26T14:03:57.475008Z", "data": "AAAAAAAAAGQ=", "uncompressedSize": "8" } ], "codec": 1 } })
583+
IBidirectionalStream<StreamWriteMessage.Types.FromClient, StreamWriteMessage.Types.FromServer>.MoveNextAsync()
584+
IBidirectionalStream<StreamWriteMessage.Types.FromClient, StreamWriteMessage.Types.FromServer>.Current
585+
IBidirectionalStream<StreamWriteMessage.Types.FromClient, StreamWriteMessage.Types.FromServer>.MoveNextAsync()
586+
*/
587+
[Fact]
588+
public async Task WriteAsync_WhenCancelTaskInOnOfTwoMessagesInFlightBuffer_ReturnCancelExceptionAndWriteResult()
589+
{
590+
var moveFirstNextSource = new TaskCompletionSource<bool>();
591+
var moveSecondNextSource = new TaskCompletionSource<bool>();
592+
var moveThirdNextSource = new TaskCompletionSource<bool>();
593+
var nextCompleted = new TaskCompletionSource();
594+
_mockStream.SetupSequence(stream => stream.Write(It.IsAny<FromClient>()))
595+
.Returns(Task.CompletedTask)
596+
.Returns(Task.CompletedTask)
597+
.ThrowsAsync(new Driver.TransportException(new RpcException(Grpc.Core.Status.DefaultCancelled)))
598+
.Returns(() =>
599+
{
600+
moveFirstNextSource.SetResult(false);
601+
return Task.CompletedTask;
602+
})
603+
.Returns(() =>
604+
{
605+
nextCompleted.SetResult(); // for seqNo
606+
return Task.CompletedTask;
607+
});
608+
_mockStream.SetupSequence(stream => stream.MoveNextAsync())
609+
.ReturnsAsync(true)
610+
.Returns(new ValueTask<bool>(moveFirstNextSource.Task))
611+
.Returns(new ValueTask<bool>(moveSecondNextSource.Task))
612+
.Returns(new ValueTask<bool>(moveThirdNextSource.Task))
613+
.Returns(new ValueTask<bool>(new TaskCompletionSource<bool>().Task));
614+
_mockStream.SetupSequence(stream => stream.Current)
615+
.Returns(new StreamWriteMessage.Types.FromServer
616+
{
617+
InitResponse = new StreamWriteMessage.Types.InitResponse
618+
{ LastSeqNo = 0, PartitionId = 1, SessionId = "SessionId" },
619+
Status = StatusIds.Types.StatusCode.Success
620+
})
621+
.Returns(new StreamWriteMessage.Types.FromServer
622+
{
623+
InitResponse = new StreamWriteMessage.Types.InitResponse
624+
{ LastSeqNo = 0, PartitionId = 1, SessionId = "SessionId" },
625+
Status = StatusIds.Types.StatusCode.Success
626+
})
627+
.Returns(new StreamWriteMessage.Types.FromServer
628+
{
629+
WriteResponse = new StreamWriteMessage.Types.WriteResponse
630+
{
631+
PartitionId = 1,
632+
Acks =
633+
{
634+
new StreamWriteMessage.Types.WriteResponse.Types.WriteAck
635+
{
636+
SeqNo = 2,
637+
Written = new StreamWriteMessage.Types.WriteResponse.Types.WriteAck.Types.Written
638+
{ Offset = 0 }
639+
}
640+
}
641+
},
642+
Status = StatusIds.Types.StatusCode.Success
643+
});
644+
using var writer = new WriterBuilder<long>(_mockIDriver.Object, "/topic")
645+
{ ProducerId = "producerId" }.Build();
646+
647+
var ctx = new CancellationTokenSource();
648+
var runTaskWithCancel = writer.WriteAsync(100L, ctx.Token);
649+
// ReSharper disable once MethodSupportsCancellation
650+
var runTask = writer.WriteAsync(100L);
651+
652+
// ReSharper disable once MethodSupportsCancellation
653+
var writerExceptionAfterResetSession = await Assert.ThrowsAsync<WriterException>(() => writer.WriteAsync(100));
654+
Assert.Equal("Transport error in the WriterSession on write messages",
655+
writerExceptionAfterResetSession.Message);
656+
Assert.Equal(StatusCode.Cancelled, writerExceptionAfterResetSession.Status.StatusCode);
657+
658+
ctx.Cancel(); // reconnect write invoke cancel on cancellation token
659+
moveSecondNextSource.SetResult(true);
660+
await nextCompleted.Task;
661+
moveThirdNextSource.SetResult(true);
662+
663+
Assert.Equal(PersistenceStatus.Written, (await runTask).Status);
664+
_mockStream.Verify(stream => stream.Write(It.IsAny<FromClient>()), Times.Exactly(5));
665+
_mockStream.Verify(stream => stream.MoveNextAsync(), Times.Exactly(5));
666+
_mockStream.Verify(stream => stream.Current, Times.Exactly(3));
667+
668+
await Assert.ThrowsAsync<TaskCanceledException>(() => runTaskWithCancel);
669+
}
567670
}

0 commit comments

Comments
 (0)