Skip to content

Commit 6df57bf

Browse files
new init WriterSession strategy
1 parent df674fc commit 6df57bf

File tree

2 files changed

+57
-4
lines changed

2 files changed

+57
-4
lines changed

src/Ydb.Sdk/src/Services/Topic/Writer/Writer.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -203,10 +203,11 @@ private async Task Initialize()
203203
stream,
204204
initResponse,
205205
Initialize,
206-
e => _session = new NotStartedWriterSession(e),
206+
e => { _session = new NotStartedWriterSession(e); },
207207
_logger,
208208
_inFlightMessages
209209
);
210+
210211
if (!_inFlightMessages.IsEmpty)
211212
{
212213
var copyInFlightMessages = new ConcurrentQueue<MessageSending>();
@@ -219,6 +220,7 @@ private async Task Initialize()
219220
}
220221

221222
_session = newSession;
223+
newSession.RunProcessingWriteAck();
222224
}
223225
catch (Driver.TransportException e)
224226
{
@@ -314,8 +316,6 @@ ConcurrentQueue<MessageSending> inFlightMessages
314316
_config = config;
315317
_inFlightMessages = inFlightMessages;
316318
Volatile.Write(ref _seqNum, initResponse.LastSeqNo); // happens-before for Volatile.Read
317-
318-
RunProcessingWriteAck();
319319
}
320320

321321
public async Task Write(ConcurrentQueue<MessageSending> toSendBuffer)
@@ -350,7 +350,7 @@ public async Task Write(ConcurrentQueue<MessageSending> toSendBuffer)
350350
}
351351
}
352352

353-
private async void RunProcessingWriteAck()
353+
internal async void RunProcessingWriteAck()
354354
{
355355
try
356356
{

src/Ydb.Sdk/tests/Topic/WriterMockTests.cs

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -372,4 +372,57 @@ public async Task WriteAsync_WhenTransportExceptionOnWriteInWriterSessionThenRec
372372
_mockStream.Verify(stream => stream.MoveNextAsync(), Times.Exactly(5));
373373
_mockStream.Verify(stream => stream.Current, Times.Exactly(3));
374374
}
375+
376+
/*
377+
* Performed invocations:
378+
379+
Mock<IBidirectionalStream<StreamWriteMessage.Types.FromClient, StreamWriteMessage.Types.FromServer>:1> (stream):
380+
381+
IBidirectionalStream<StreamWriteMessage.Types.FromClient, StreamWriteMessage.Types.FromServer>.Write({ "initRequest": { "path": "/topic", "producerId": "producerId" } })
382+
IBidirectionalStream<StreamWriteMessage.Types.FromClient, StreamWriteMessage.Types.FromServer>.MoveNextAsync()
383+
IBidirectionalStream<StreamWriteMessage.Types.FromClient, StreamWriteMessage.Types.FromServer>.Current
384+
IBidirectionalStream<StreamWriteMessage.Types.FromClient, StreamWriteMessage.Types.FromServer>.MoveNextAsync()
385+
IBidirectionalStream<StreamWriteMessage.Types.FromClient, StreamWriteMessage.Types.FromServer>.Write({ "initRequest": { "path": "/topic", "producerId": "producerId" } })
386+
IBidirectionalStream<StreamWriteMessage.Types.FromClient, StreamWriteMessage.Types.FromServer>.MoveNextAsync()
387+
*/
388+
[Fact]
389+
public async Task WriteAsync_WhenTransportExceptionOnProcessingWriteAckThenReconnectSession_ReturnWriterException()
390+
{
391+
var nextCompleted = new TaskCompletionSource();
392+
_mockStream.Setup(stream => stream.Write(It.IsAny<FromClient>()))
393+
.Returns(Task.CompletedTask);
394+
_mockStream.SetupSequence(stream => stream.MoveNextAsync())
395+
.Returns(new ValueTask<bool>(true))
396+
.ThrowsAsync(new Driver.TransportException(new RpcException(Grpc.Core.Status.DefaultCancelled)))
397+
.Returns(() =>
398+
{
399+
nextCompleted.SetResult();
400+
return new ValueTask<bool>(new TaskCompletionSource<bool>().Task);
401+
}); // retry init writer session
402+
_mockStream.SetupSequence(stream => stream.Current)
403+
.Returns(new StreamWriteMessage.Types.FromServer
404+
{
405+
InitResponse = new StreamWriteMessage.Types.InitResponse
406+
{ LastSeqNo = 0, PartitionId = 1, SessionId = "SessionId" },
407+
Status = StatusIds.Types.StatusCode.Success
408+
})
409+
.Returns(new StreamWriteMessage.Types.FromServer
410+
{
411+
InitResponse = new StreamWriteMessage.Types.InitResponse
412+
{ LastSeqNo = 0, PartitionId = 1, SessionId = "SessionId" },
413+
Status = StatusIds.Types.StatusCode.Success
414+
});
415+
using var writer = new WriterBuilder<long>(_mockIDriver.Object, new WriterConfig("/topic")
416+
{ ProducerId = "producerId" }).Build();
417+
418+
await nextCompleted.Task;
419+
var writerExceptionAfterResetSession = await Assert.ThrowsAsync<WriterException>(() => writer.WriteAsync(100));
420+
Assert.Equal("Transport error in the WriterSession on processing writeAck",
421+
writerExceptionAfterResetSession.Message);
422+
Assert.Equal(StatusCode.Cancelled, writerExceptionAfterResetSession.Status.StatusCode);
423+
424+
_mockStream.Verify(stream => stream.Write(It.IsAny<FromClient>()), Times.Exactly(2));
425+
_mockStream.Verify(stream => stream.MoveNextAsync(), Times.Exactly(3));
426+
_mockStream.Verify(stream => stream.Current, Times.Once);
427+
}
375428
}

0 commit comments

Comments
 (0)