Skip to content

Commit 2b27665

Browse files
IWriter added cancelToken
1 parent 6df57bf commit 2b27665

File tree

3 files changed

+61
-9
lines changed

3 files changed

+61
-9
lines changed

src/Ydb.Sdk/src/Services/Topic/IWriter.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ namespace Ydb.Sdk.Services.Topic;
44

55
public interface IWriter<TValue> : IDisposable
66
{
7-
public Task<WriteResult> WriteAsync(TValue data);
7+
public Task<WriteResult> WriteAsync(TValue data, CancellationToken cancellationToken = default);
88

9-
public Task<WriteResult> WriteAsync(Message<TValue> message);
9+
public Task<WriteResult> WriteAsync(Message<TValue> message, CancellationToken cancellationToken = default);
1010
}

src/Ydb.Sdk/src/Services/Topic/Writer/Writer.cs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -42,14 +42,14 @@ internal Writer(IDriver driver, WriterConfig config, ISerializer<TValue> seriali
4242
StartWriteWorker();
4343
}
4444

45-
public Task<WriteResult> WriteAsync(TValue data)
45+
public Task<WriteResult> WriteAsync(TValue data, CancellationToken cancellationToken)
4646
{
47-
return WriteAsync(new Message<TValue>(data));
47+
return WriteAsync(new Message<TValue>(data), cancellationToken);
4848
}
4949

50-
public async Task<WriteResult> WriteAsync(Message<TValue> message)
50+
public async Task<WriteResult> WriteAsync(Message<TValue> message, CancellationToken cancellationToken)
5151
{
52-
TaskCompletionSource<WriteResult> completeTask = new();
52+
TaskCompletionSource<WriteResult> completeTask = new(cancellationToken);
5353

5454
var data = _serializer.Serialize(message.Data);
5555
var messageData = new MessageData
@@ -207,7 +207,7 @@ private async Task Initialize()
207207
_logger,
208208
_inFlightMessages
209209
);
210-
210+
211211
if (!_inFlightMessages.IsEmpty)
212212
{
213213
var copyInFlightMessages = new ConcurrentQueue<MessageSending>();

src/Ydb.Sdk/tests/Topic/WriterMockTests.cs

Lines changed: 54 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -318,7 +318,7 @@ public async Task WriteAsync_WhenTransportExceptionOnWriteInWriterSessionThenRec
318318
return Task.CompletedTask;
319319
});
320320
_mockStream.SetupSequence(stream => stream.MoveNextAsync())
321-
.Returns(new ValueTask<bool>(true))
321+
.ReturnsAsync(true)
322322
.Returns(new ValueTask<bool>(moveFirstNextSource.Task))
323323
.Returns(new ValueTask<bool>(moveSecondNextSource.Task))
324324
.Returns(new ValueTask<bool>(moveThirdNextSource.Task))
@@ -392,7 +392,7 @@ public async Task WriteAsync_WhenTransportExceptionOnProcessingWriteAckThenRecon
392392
_mockStream.Setup(stream => stream.Write(It.IsAny<FromClient>()))
393393
.Returns(Task.CompletedTask);
394394
_mockStream.SetupSequence(stream => stream.MoveNextAsync())
395-
.Returns(new ValueTask<bool>(true))
395+
.ReturnsAsync(true)
396396
.ThrowsAsync(new Driver.TransportException(new RpcException(Grpc.Core.Status.DefaultCancelled)))
397397
.Returns(() =>
398398
{
@@ -425,4 +425,56 @@ public async Task WriteAsync_WhenTransportExceptionOnProcessingWriteAckThenRecon
425425
_mockStream.Verify(stream => stream.MoveNextAsync(), Times.Exactly(3));
426426
_mockStream.Verify(stream => stream.Current, Times.Once);
427427
}
428+
429+
/*
430+
* Performed invocations:
431+
432+
Mock<IBidirectionalStream<StreamWriteMessage.Types.FromClient, StreamWriteMessage.Types.FromServer>:1> (stream):
433+
434+
IBidirectionalStream<StreamWriteMessage.Types.FromClient, StreamWriteMessage.Types.FromServer>.Write({ "initRequest": { "path": "/topic", "producerId": "producerId" } })
435+
IBidirectionalStream<StreamWriteMessage.Types.FromClient, StreamWriteMessage.Types.FromServer>.MoveNextAsync()
436+
IBidirectionalStream<StreamWriteMessage.Types.FromClient, StreamWriteMessage.Types.FromServer>.Current
437+
IBidirectionalStream<StreamWriteMessage.Types.FromClient, StreamWriteMessage.Types.FromServer>.MoveNextAsync()
438+
IBidirectionalStream<StreamWriteMessage.Types.FromClient, StreamWriteMessage.Types.FromServer>.Write({ "initRequest": { "path": "/topic", "producerId": "producerId" } })
439+
IBidirectionalStream<StreamWriteMessage.Types.FromClient, StreamWriteMessage.Types.FromServer>.MoveNextAsync()
440+
*/
441+
[Fact]
442+
public async Task WriteAsync_WhenStreamIsClosingOnProcessingWriteAckThenReconnectSession_ReturnWriterException()
443+
{
444+
var nextCompleted = new TaskCompletionSource();
445+
_mockStream.Setup(stream => stream.Write(It.IsAny<FromClient>()))
446+
.Returns(Task.CompletedTask);
447+
_mockStream.SetupSequence(stream => stream.MoveNextAsync())
448+
.ReturnsAsync(true)
449+
.ReturnsAsync(false)
450+
.Returns(() =>
451+
{
452+
nextCompleted.SetResult();
453+
return new ValueTask<bool>(new TaskCompletionSource<bool>().Task);
454+
}); // retry init writer session
455+
_mockStream.SetupSequence(stream => stream.Current)
456+
.Returns(new StreamWriteMessage.Types.FromServer
457+
{
458+
InitResponse = new StreamWriteMessage.Types.InitResponse
459+
{ LastSeqNo = 0, PartitionId = 1, SessionId = "SessionId" },
460+
Status = StatusIds.Types.StatusCode.Success
461+
})
462+
.Returns(new StreamWriteMessage.Types.FromServer
463+
{
464+
InitResponse = new StreamWriteMessage.Types.InitResponse
465+
{ LastSeqNo = 0, PartitionId = 1, SessionId = "SessionId" },
466+
Status = StatusIds.Types.StatusCode.Success
467+
});
468+
using var writer = new WriterBuilder<long>(_mockIDriver.Object, new WriterConfig("/topic")
469+
{ ProducerId = "producerId" }).Build();
470+
471+
await nextCompleted.Task;
472+
var writerExceptionAfterResetSession = await Assert.ThrowsAsync<WriterException>(() => writer.WriteAsync(100));
473+
Assert.Equal("WriterStream is closed", writerExceptionAfterResetSession.Message);
474+
Assert.Equal(StatusCode.Unspecified, writerExceptionAfterResetSession.Status.StatusCode);
475+
476+
_mockStream.Verify(stream => stream.Write(It.IsAny<FromClient>()), Times.Exactly(2));
477+
_mockStream.Verify(stream => stream.MoveNextAsync(), Times.Exactly(3));
478+
_mockStream.Verify(stream => stream.Current, Times.Once);
479+
}
428480
}

0 commit comments

Comments
 (0)