Skip to content

Commit 8093a7f

Browse files
feat: supported cancellation token
1 parent dd6ab64 commit 8093a7f

File tree

3 files changed

+92
-2
lines changed

3 files changed

+92
-2
lines changed

src/Ydb.Sdk/src/Services/Topic/Writer/Writer.cs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,11 @@ public Task<WriteResult> WriteAsync(TValue data, CancellationToken cancellationT
4949

5050
public async Task<WriteResult> WriteAsync(Message<TValue> message, CancellationToken cancellationToken)
5151
{
52-
TaskCompletionSource<WriteResult> completeTask = new(cancellationToken);
52+
TaskCompletionSource<WriteResult> completeTask = new();
53+
cancellationToken.Register(
54+
() => completeTask.TrySetCanceled(cancellationToken),
55+
useSynchronizationContext: false
56+
);
5357

5458
var data = _serializer.Serialize(message.Data);
5559
var messageData = new MessageData

src/Ydb.Sdk/tests/Topic/WriterIntegrationTests.cs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
using System.Text;
21
using Xunit;
32
using Ydb.Sdk.Services.Topic;
43
using Ydb.Sdk.Services.Topic.Writer;

src/Ydb.Sdk/tests/Topic/WriterMockTests.cs

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -477,4 +477,91 @@ public async Task WriteAsync_WhenStreamIsClosingOnProcessingWriteAckThenReconnec
477477
_mockStream.Verify(stream => stream.MoveNextAsync(), Times.Exactly(3));
478478
_mockStream.Verify(stream => stream.Current, Times.Once);
479479
}
480+
481+
[Fact]
482+
public async Task WriteAsync_WhenCancellationTokenIsClosed_ThrowCancellationException()
483+
{
484+
var cancellationTokenSource = new CancellationTokenSource();
485+
var nextCompleted = new TaskCompletionSource<bool>();
486+
_mockStream.Setup(stream => stream.Write(It.IsAny<FromClient>()))
487+
.Returns(Task.CompletedTask);
488+
_mockStream.SetupSequence(stream => stream.MoveNextAsync())
489+
.ReturnsAsync(true)
490+
.Returns(new ValueTask<bool>(nextCompleted.Task));
491+
_mockStream.SetupSequence(stream => stream.Current)
492+
.Returns(new StreamWriteMessage.Types.FromServer
493+
{
494+
InitResponse = new StreamWriteMessage.Types.InitResponse
495+
{ LastSeqNo = 0, PartitionId = 1, SessionId = "SessionId" },
496+
Status = StatusIds.Types.StatusCode.Success
497+
})
498+
.Returns(new StreamWriteMessage.Types.FromServer
499+
{
500+
WriteResponse = new StreamWriteMessage.Types.WriteResponse
501+
{
502+
PartitionId = 1,
503+
Acks =
504+
{
505+
new StreamWriteMessage.Types.WriteResponse.Types.WriteAck
506+
{
507+
SeqNo = 1,
508+
Written = new StreamWriteMessage.Types.WriteResponse.Types.WriteAck.Types.Written
509+
{ Offset = 0 }
510+
}
511+
}
512+
},
513+
Status = StatusIds.Types.StatusCode.Success
514+
});
515+
using var writer = new WriterBuilder<long>(_mockIDriver.Object, "/topic")
516+
{ ProducerId = "producerId" }.Build();
517+
518+
var task = writer.WriteAsync(123L, cancellationTokenSource.Token);
519+
cancellationTokenSource.Cancel();
520+
nextCompleted.SetResult(true);
521+
522+
await Assert.ThrowsAsync<TaskCanceledException>(() => task);
523+
}
524+
525+
[Fact]
526+
public async Task WriteAsync_WhenTaskIsAcceptedBeforeCancel_ThrowCancellationException()
527+
{
528+
var cancellationTokenSource = new CancellationTokenSource();
529+
var nextCompleted = new TaskCompletionSource<bool>();
530+
_mockStream.Setup(stream => stream.Write(It.IsAny<FromClient>()))
531+
.Returns(Task.CompletedTask);
532+
_mockStream.SetupSequence(stream => stream.MoveNextAsync())
533+
.ReturnsAsync(true)
534+
.Returns(new ValueTask<bool>(nextCompleted.Task));
535+
_mockStream.SetupSequence(stream => stream.Current)
536+
.Returns(new StreamWriteMessage.Types.FromServer
537+
{
538+
InitResponse = new StreamWriteMessage.Types.InitResponse
539+
{ LastSeqNo = 0, PartitionId = 1, SessionId = "SessionId" },
540+
Status = StatusIds.Types.StatusCode.Success
541+
})
542+
.Returns(new StreamWriteMessage.Types.FromServer
543+
{
544+
WriteResponse = new StreamWriteMessage.Types.WriteResponse
545+
{
546+
PartitionId = 1,
547+
Acks =
548+
{
549+
new StreamWriteMessage.Types.WriteResponse.Types.WriteAck
550+
{
551+
SeqNo = 1,
552+
Written = new StreamWriteMessage.Types.WriteResponse.Types.WriteAck.Types.Written
553+
{ Offset = 0 }
554+
}
555+
}
556+
},
557+
Status = StatusIds.Types.StatusCode.Success
558+
});
559+
using var writer = new WriterBuilder<long>(_mockIDriver.Object, "/topic")
560+
{ ProducerId = "producerId" }.Build();
561+
562+
var task = writer.WriteAsync(123L, cancellationTokenSource.Token);
563+
nextCompleted.SetResult(true);
564+
await Assert.ThrowsAsync<TaskCanceledException>(() => task);
565+
cancellationTokenSource.Cancel();
566+
}
480567
}

0 commit comments

Comments
 (0)