Skip to content

Commit f1ebe19

Browse files
dev: ydb topic example & stop init task with canceled tasks in inFlight buffer
1 parent b40dbb4 commit f1ebe19

File tree

2 files changed

+28
-9
lines changed

2 files changed

+28
-9
lines changed

src/Ydb.Sdk/src/Services/Topic/Writer/Writer.cs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -312,7 +312,6 @@ private async Task Initialize()
312312
}
313313

314314
_session = newSession;
315-
newSession.RunProcessingWriteAck();
316315
WakeUpWorker(); // attempt send buffer
317316
}
318317
finally
@@ -444,6 +443,7 @@ internal class WriterSession : TopicSession<MessageFromClient, MessageFromServer
444443
{
445444
private readonly WriterConfig _config;
446445
private readonly ConcurrentQueue<MessageSending> _inFlightMessages;
446+
private readonly Task _processingResponseStream;
447447

448448
private long _seqNum;
449449

@@ -467,6 +467,8 @@ ConcurrentQueue<MessageSending> inFlightMessages
467467
_config = config;
468468
_inFlightMessages = inFlightMessages;
469469
Volatile.Write(ref _seqNum, lastSeqNo); // happens-before for Volatile.Read
470+
471+
_processingResponseStream = RunProcessingWriteAck();
470472
}
471473

472474
public async Task Write(ConcurrentQueue<MessageSending> toSendBuffer)
@@ -514,7 +516,7 @@ public async Task Write(ConcurrentQueue<MessageSending> toSendBuffer)
514516
}
515517
}
516518

517-
internal async void RunProcessingWriteAck()
519+
private async Task RunProcessingWriteAck()
518520
{
519521
try
520522
{
@@ -574,7 +576,7 @@ Completing task on exception...
574576
}
575577
}
576578

577-
Logger.LogWarning("WriterSession[{SessionId}]: stream is closed", SessionId);
579+
Logger.LogInformation("WriterSession[{SessionId}]: stream is closed", SessionId);
578580
}
579581
catch (Driver.TransportException e)
580582
{
@@ -606,6 +608,7 @@ public override async ValueTask DisposeAsync()
606608
Logger.LogDebug("WriterSession[{SessionId}]: start dispose process", SessionId);
607609

608610
await Stream.RequestStreamComplete();
611+
await _processingResponseStream;
609612

610613
Stream.Dispose();
611614
}

src/Ydb.Sdk/tests/Topic/WriterUnitTests.cs

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -358,6 +358,7 @@ public async Task Initialize_WhenNotSupportedCodec_ThrowWriterExceptionOnWriteAs
358358
public async Task WriteAsync_WhenTransportExceptionOnWriteInWriterSession_ShouldReconnectAndReturnWriteResult()
359359
{
360360
var moveTcs = new TaskCompletionSource<bool>();
361+
var moveTcsRetry = new TaskCompletionSource<bool>();
361362

362363
_mockStream.SetupSequence(stream => stream.Write(It.IsAny<FromClient>()))
363364
.Returns(Task.CompletedTask)
@@ -367,12 +368,17 @@ public async Task WriteAsync_WhenTransportExceptionOnWriteInWriterSession_Should
367368
return new Driver.TransportException(new RpcException(Grpc.Core.Status.DefaultCancelled));
368369
})
369370
.Returns(Task.CompletedTask)
370-
.Returns(Task.CompletedTask);
371+
.Returns(() =>
372+
{
373+
moveTcsRetry.SetResult(true);
374+
375+
return Task.CompletedTask;
376+
});
371377
_mockStream.SetupSequence(stream => stream.MoveNextAsync())
372378
.ReturnsAsync(true)
373379
.Returns(new ValueTask<bool>(moveTcs.Task))
374380
.ReturnsAsync(true)
375-
.ReturnsAsync(true)
381+
.Returns(new ValueTask<bool>(moveTcsRetry.Task))
376382
.Returns(_lastMoveNext);
377383
_mockStream.SetupSequence(stream => stream.Current)
378384
.Returns(new StreamWriteMessage.Types.FromServer
@@ -635,6 +641,7 @@ public async Task WriteAsync_WhenInFlightBufferSendInInitialize_ReturnCompletedT
635641
var writeTcs2 = new TaskCompletionSource();
636642
var writeTcs3 = new TaskCompletionSource();
637643
var moveTcs = new TaskCompletionSource<bool>();
644+
var moveTcsRetry = new TaskCompletionSource<bool>();
638645

639646
_mockStream.SetupSequence(stream => stream.Write(It.IsAny<FromClient>()))
640647
.Returns(Task.CompletedTask)
@@ -654,13 +661,17 @@ public async Task WriteAsync_WhenInFlightBufferSendInInitialize_ReturnCompletedT
654661
return Task.CompletedTask;
655662
})
656663
.Returns(Task.CompletedTask)
657-
.Returns(Task.CompletedTask);
664+
.Returns(() =>
665+
{
666+
moveTcsRetry.SetResult(true);
667+
return Task.CompletedTask;
668+
});
658669

659670
_mockStream.SetupSequence(stream => stream.MoveNextAsync())
660671
.ReturnsAsync(true)
661672
.Returns(new ValueTask<bool>(moveTcs.Task))
662673
.ReturnsAsync(true)
663-
.ReturnsAsync(true)
674+
.Returns(new ValueTask<bool>(moveTcsRetry.Task))
664675
.Returns(_lastMoveNext);
665676
_mockStream.SetupSequence(stream => stream.Current)
666677
.Returns(new StreamWriteMessage.Types.FromServer
@@ -859,6 +870,7 @@ public async Task DisposeAsync_WhenInFlightMessages_WaitingInFlightMessages()
859870
{
860871
var tcsDetectedWrite = new TaskCompletionSource();
861872
var writeTcs1 = new TaskCompletionSource<bool>();
873+
var moveTcsRetry = new TaskCompletionSource<bool>();
862874

863875
_mockStream.SetupSequence(stream => stream.Write(It.IsAny<FromClient>()))
864876
.Returns(Task.CompletedTask)
@@ -868,12 +880,16 @@ public async Task DisposeAsync_WhenInFlightMessages_WaitingInFlightMessages()
868880
return Task.CompletedTask;
869881
})
870882
.Returns(Task.CompletedTask)
871-
.Returns(Task.CompletedTask);
883+
.Returns(() =>
884+
{
885+
moveTcsRetry.SetResult(true);
886+
return Task.CompletedTask;
887+
});
872888
_mockStream.SetupSequence(stream => stream.MoveNextAsync())
873889
.ReturnsAsync(true)
874890
.Returns(new ValueTask<bool>(writeTcs1.Task))
875891
.ReturnsAsync(true)
876-
.ReturnsAsync(true)
892+
.Returns(new ValueTask<bool>(moveTcsRetry.Task))
877893
.Returns(_lastMoveNext);
878894

879895
_mockStream.SetupSequence(stream => stream.Current)

0 commit comments

Comments
 (0)