Skip to content

Commit 031088b

Browse files
fix tests
1 parent bb1a3cd commit 031088b

File tree

5 files changed

+59
-33
lines changed

5 files changed

+59
-33
lines changed

slo/src/TopicService/SloTopicContext.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ public async Task Run(RunConfig config)
125125
{
126126
try
127127
{
128-
using var reader = new ReaderBuilder<string>(driver)
128+
await using var reader = new ReaderBuilder<string>(driver)
129129
{
130130
ConsumerName = ConsumerName,
131131
SubscribeSettings =

src/Ydb.Sdk/src/Services/Topic/Reader/Reader.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -356,7 +356,7 @@ private async Task RunProcessingStreamRequest()
356356
Logger.LogError(e, "ReaderSession[{SessionId}] have transport error on Write", SessionId);
357357

358358
ReconnectSession();
359-
359+
360360
_lifecycleReaderSessionCts.Cancel();
361361
}
362362
}
@@ -558,7 +558,7 @@ protected override MessageFromClient GetSendUpdateTokenRequest(string token)
558558
public override async ValueTask DisposeAsync()
559559
{
560560
Logger.LogInformation("ReaderSession[{SessionId}]: start dispose process", SessionId);
561-
561+
562562
_channelFromClientMessageSending.Writer.Complete();
563563

564564
try

src/Ydb.Sdk/src/Services/Topic/Writer/Writer.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -597,9 +597,9 @@ protected override MessageFromClient GetSendUpdateTokenRequest(string token)
597597
public override async ValueTask DisposeAsync()
598598
{
599599
Logger.LogInformation("WriterSession[{SessionId}]: start dispose process", SessionId);
600-
600+
601601
await Stream.RequestStreamComplete();
602-
602+
603603
Stream.Dispose();
604604
}
605605
}

src/Ydb.Sdk/tests/Topic/ReaderUnitTests.cs

Lines changed: 24 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ public class ReaderUnitTests
2626

2727
private readonly Mock<IDriver> _mockIDriver = new();
2828
private readonly Mock<ReaderStream> _mockStream = new();
29+
private readonly ValueTask<bool> _lastMoveNext;
2930

3031
public ReaderUnitTests()
3132
{
@@ -35,6 +36,16 @@ public ReaderUnitTests()
3536
).Returns(_mockStream.Object);
3637

3738
_mockIDriver.Setup(driver => driver.LoggerFactory).Returns(Utils.GetLoggerFactory);
39+
40+
var tcsLastMoveNext = new TaskCompletionSource<bool>();
41+
42+
_lastMoveNext = new ValueTask<bool>(tcsLastMoveNext.Task);
43+
_mockStream.Setup(stream => stream.RequestStreamComplete()).Returns(() =>
44+
{
45+
tcsLastMoveNext.TrySetResult(false);
46+
47+
return Task.CompletedTask;
48+
});
3849
}
3950

4051
/*
@@ -87,7 +98,7 @@ public async Task Initialize_WhenFailWriteMessage_ShouldRetryInitializeAndReadTh
8798
.ReturnsAsync(true)
8899
.Returns(new ValueTask<bool>(tcsMoveNext.Task))
89100
.Returns(new ValueTask<bool>(tcsCommitMessage.Task))
90-
.Returns(new ValueTask<bool>(new TaskCompletionSource<bool>().Task));
101+
.Returns(_lastMoveNext);
91102

92103
_mockStream.SetupSequence(stream => stream.Current)
93104
.Returns(InitResponseFromServer)
@@ -180,7 +191,7 @@ public async Task Initialize_WhenFailMoveNextAsync_ShouldRetryInitializeAndReadT
180191
.ReturnsAsync(true)
181192
.Returns(new ValueTask<bool>(tcsMoveNext.Task))
182193
.Returns(new ValueTask<bool>(tcsCommitMessage.Task))
183-
.Returns(new ValueTask<bool>(new TaskCompletionSource<bool>().Task));
194+
.Returns(_lastMoveNext);
184195

185196
_mockStream.SetupSequence(stream => stream.Current)
186197
.Returns(InitResponseFromServer)
@@ -279,7 +290,7 @@ public async Task Initialize_WhenInitResponseStatusIsRetryable_ShouldRetryInitia
279290
.ReturnsAsync(true)
280291
.Returns(new ValueTask<bool>(tcsMoveNext.Task))
281292
.Returns(new ValueTask<bool>(tcsCommitMessage.Task))
282-
.Returns(new ValueTask<bool>(new TaskCompletionSource<bool>().Task));
293+
.Returns(_lastMoveNext);
283294

284295
_mockStream.SetupSequence(stream => stream.Current)
285296
.Returns(new FromServer
@@ -435,7 +446,7 @@ public async Task
435446
.ReturnsAsync(true)
436447
.Returns(new ValueTask<bool>(tcsMoveNext.Task))
437448
.Returns(new ValueTask<bool>(tcsCommitMessage.Task))
438-
.Returns(new ValueTask<bool>(new TaskCompletionSource<bool>().Task));
449+
.Returns(_lastMoveNext);
439450

440451
_mockStream.SetupSequence(stream => stream.Current)
441452
.Returns(InitResponseFromServer)
@@ -579,7 +590,7 @@ public async Task
579590
.Returns(new ValueTask<bool>(tcsCommitMessage1.Task))
580591
.Returns(new ValueTask<bool>(tcsCommitMessage2.Task))
581592
.Returns(new ValueTask<bool>(tcsCommitMessage3.Task))
582-
.Returns(new ValueTask<bool>(new TaskCompletionSource<bool>().Task));
593+
.Returns(_lastMoveNext);
583594

584595
_mockStream.SetupSequence(stream => stream.Current)
585596
.Returns(InitResponseFromServer)
@@ -684,9 +695,7 @@ public async Task
684695
IBidirectionalStream<StreamReadMessage.Types.FromClient, StreamReadMessage.Types.FromServer>.Current
685696
IBidirectionalStream<StreamReadMessage.Types.FromClient, StreamReadMessage.Types.FromServer>.MoveNextAsync() [Maybe]
686697
*/
687-
#pragma warning disable xUnit1004
688-
[Fact(Skip = "FLAP TEST")]
689-
#pragma warning restore xUnit1004
698+
[Fact]
690699
public async Task
691700
RunProcessingTopic_WhenReadRequestAfterInitializeThrowTransportException_ShouldRetryInitializeAndReadThenCommitMessage()
692701
{
@@ -736,7 +745,7 @@ public async Task
736745
.ReturnsAsync(true)
737746
.Returns(new ValueTask<bool>(tcsMoveNextThird.Task))
738747
.Returns(new ValueTask<bool>(tcsCommitMessage.Task))
739-
.Returns(new ValueTask<bool>(new TaskCompletionSource<bool>().Task));
748+
.Returns(_lastMoveNext);
740749

741750
_mockStream.SetupSequence(stream => stream.Current)
742751
.Returns(InitResponseFromServer)
@@ -872,7 +881,7 @@ public async Task
872881
.ReturnsAsync(true)
873882
.Returns(new ValueTask<bool>(tcsMoveNextThird.Task))
874883
.Returns(new ValueTask<bool>(tcsCommitMessage.Task))
875-
.Returns(new ValueTask<bool>(new TaskCompletionSource<bool>().Task));
884+
.Returns(_lastMoveNext);
876885

877886
_mockStream.SetupSequence(stream => stream.Current)
878887
.Returns(InitResponseFromServer)
@@ -1048,7 +1057,7 @@ public async Task
10481057
.ReturnsAsync(true)
10491058
.Returns(new ValueTask<bool>(tcsMoveNextThird.Task))
10501059
.Returns(new ValueTask<bool>(tcsCommitMessage.Task))
1051-
.Returns(new ValueTask<bool>(new TaskCompletionSource<bool>().Task));
1060+
.Returns(_lastMoveNext);
10521061

10531062
_mockStream.SetupSequence(stream => stream.Current)
10541063
.Returns(InitResponseFromServer)
@@ -1183,7 +1192,7 @@ public async Task
11831192
.Returns(new ValueTask<bool>(tcsMoveNextThird.Task))
11841193
.Returns(new ValueTask<bool>(tcsCommitMessage.Task))
11851194
.ReturnsAsync(true)
1186-
.Returns(new ValueTask<bool>(new TaskCompletionSource<bool>().Task));
1195+
.Returns(_lastMoveNext);
11871196

11881197
_mockStream.SetupSequence(stream => stream.Current)
11891198
.Returns(InitResponseFromServer)
@@ -1288,7 +1297,7 @@ public async Task
12881297
.Returns(new ValueTask<bool>(tcsMoveNext.Task))
12891298
.Returns(new ValueTask<bool>(stopPartitionSessionRequest.Task))
12901299
.ReturnsAsync(true)
1291-
.Returns(new ValueTask<bool>(new TaskCompletionSource<bool>().Task));
1300+
.Returns(_lastMoveNext);
12921301

12931302
_mockStream.SetupSequence(stream => stream.Current)
12941303
.Returns(InitResponseFromServer)
@@ -1376,7 +1385,7 @@ public async Task ReadAsync_WhenFailDeserializer_ThrowReaderExceptionAndInvokeRe
13761385
.ReturnsAsync(true)
13771386
.ReturnsAsync(true)
13781387
.Returns(new ValueTask<bool>(tcsMoveNext.Task))
1379-
.Returns(new ValueTask<bool>(new TaskCompletionSource<bool>().Task));
1388+
.Returns(_lastMoveNext);
13801389

13811390
_mockStream.SetupSequence(stream => stream.Current)
13821391
.Returns(InitResponseFromServer)
@@ -1452,7 +1461,7 @@ public async Task ReadAsync_WhenTokenIsUpdatedOneTime_SuccessUpdateToken()
14521461
.ReturnsAsync(true)
14531462
.Returns(new ValueTask<bool>(tcsMoveNext.Task))
14541463
.Returns(new ValueTask<bool>(tcsCommitMessage.Task))
1455-
.Returns(new ValueTask<bool>(new TaskCompletionSource<bool>().Task));
1464+
.Returns(_lastMoveNext);
14561465

14571466
_mockStream.SetupSequence(stream => stream.Current)
14581467
.Returns(InitResponseFromServer)

src/Ydb.Sdk/tests/Topic/WriterUnitTests.cs

Lines changed: 30 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ public class WriterUnitTests
1818
{
1919
private readonly Mock<IDriver> _mockIDriver = new();
2020
private readonly Mock<WriterStream> _mockStream = new();
21+
private readonly ValueTask<bool> _lastMoveNext;
2122

2223
public WriterUnitTests()
2324
{
@@ -27,6 +28,16 @@ public WriterUnitTests()
2728
).Returns(_mockStream.Object);
2829

2930
_mockIDriver.Setup(driver => driver.LoggerFactory).Returns(Utils.GetLoggerFactory);
31+
32+
var tcsLastMoveNext = new TaskCompletionSource<bool>();
33+
34+
_lastMoveNext = new ValueTask<bool>(tcsLastMoveNext.Task);
35+
_mockStream.Setup(stream => stream.RequestStreamComplete()).Returns(() =>
36+
{
37+
tcsLastMoveNext.TrySetResult(false);
38+
39+
return Task.CompletedTask;
40+
});
3041
}
3142

3243
private class FailSerializer : ISerializer<int>
@@ -43,6 +54,12 @@ public async Task WriteAsync_WhenSerializeThrowException_ThrowWriterException()
4354
await using var writer = new WriterBuilder<int>(_mockIDriver.Object, "/topic-1")
4455
{ ProducerId = "producerId", Serializer = new FailSerializer() }.Build();
4556

57+
_mockStream.Setup(stream => stream.Write(It.IsAny<FromClient>()))
58+
.Returns(Task.CompletedTask);
59+
_mockStream.SetupSequence(stream => stream.MoveNextAsync())
60+
.ReturnsAsync(true)
61+
.Returns(_lastMoveNext);
62+
4663
Assert.Equal("Error when serializing message data",
4764
(await Assert.ThrowsAsync<WriterException>(() => writer.WriteAsync(123))).Message);
4865
}
@@ -78,7 +95,7 @@ public async Task Initialize_WhenStreamClosedByServer_ShouldRetryInitializeAndRe
7895
.ReturnsAsync(false)
7996
.ReturnsAsync(true)
8097
.Returns(() => new ValueTask<bool>(taskNextComplete.Task))
81-
.Returns(new ValueTask<bool>(new TaskCompletionSource<bool>().Task));
98+
.Returns(_lastMoveNext);
8299

83100
SetupReadOneWriteAckMessage();
84101

@@ -122,7 +139,7 @@ public async Task Initialize_WhenFailWriteMessage_ShouldRetryInitializeAndReturn
122139
_mockStream.SetupSequence(stream => stream.MoveNextAsync())
123140
.ReturnsAsync(true)
124141
.Returns(() => new ValueTask<bool>(taskNextComplete.Task))
125-
.Returns(new ValueTask<bool>(new TaskCompletionSource<bool>().Task));
142+
.Returns(_lastMoveNext);
126143

127144
SetupReadOneWriteAckMessage();
128145

@@ -170,7 +187,7 @@ public async Task Initialize_WhenFailMoveNextAsync_ShouldRetryInitializeAndRetur
170187
new RpcException(new Grpc.Core.Status(Grpc.Core.StatusCode.DeadlineExceeded, "Some message"))))
171188
.ReturnsAsync(true)
172189
.Returns(() => new ValueTask<bool>(taskNextComplete.Task))
173-
.Returns(new ValueTask<bool>(new TaskCompletionSource<bool>().Task));
190+
.Returns(_lastMoveNext);
174191

175192
SetupReadOneWriteAckMessage();
176193

@@ -218,8 +235,7 @@ public async Task Initialize_WhenInitResponseStatusIsRetryable_ShouldRetryInitia
218235
.ReturnsAsync(true)
219236
.ReturnsAsync(true)
220237
.Returns(() => new ValueTask<bool>(taskNextComplete.Task))
221-
.Returns(new ValueTask<bool>(new TaskCompletionSource<bool>().Task));
222-
238+
.Returns(_lastMoveNext);
223239

224240
_mockStream.SetupSequence(stream => stream.Current)
225241
.Returns(new StreamWriteMessage.Types.FromServer
@@ -357,7 +373,7 @@ public async Task WriteAsync_WhenTransportExceptionOnWriteInWriterSession_Should
357373
.Returns(new ValueTask<bool>(moveTcs.Task))
358374
.ReturnsAsync(true)
359375
.ReturnsAsync(true)
360-
.Returns(new ValueTask<bool>(new TaskCompletionSource<bool>().Task));
376+
.Returns(_lastMoveNext);
361377
_mockStream.SetupSequence(stream => stream.Current)
362378
.Returns(new StreamWriteMessage.Types.FromServer
363379
{
@@ -431,7 +447,7 @@ public async Task WriteAsync_WhenTransportExceptionOnProcessingWriteAck_ShouldRe
431447
.ThrowsAsync(new Driver.TransportException(new RpcException(Grpc.Core.Status.DefaultCancelled)))
432448
.ReturnsAsync(true)
433449
.Returns(() => new ValueTask<bool>(moveTcs.Task)) // retry init writer session
434-
.Returns(new ValueTask<bool>(new TaskCompletionSource<bool>().Task));
450+
.Returns(_lastMoveNext);
435451
_mockStream.SetupSequence(stream => stream.Current)
436452
.Returns(new StreamWriteMessage.Types.FromServer
437453
{
@@ -506,7 +522,7 @@ public async Task WriteAsync_WhenStreamIsClosingOnProcessingWriteAck_ShouldRecon
506522
.ReturnsAsync(false)
507523
.ReturnsAsync(true)
508524
.Returns(() => new ValueTask<bool>(moveTcs.Task)) // retry init writer session
509-
.Returns(new ValueTask<bool>(new TaskCompletionSource<bool>().Task));
525+
.Returns(_lastMoveNext);
510526
_mockStream.SetupSequence(stream => stream.Current)
511527
.Returns(new StreamWriteMessage.Types.FromServer
512528
{
@@ -558,7 +574,7 @@ public async Task WriteAsync_WhenCancellationTokenIsClosed_ThrowCancellationExce
558574
.Returns(Task.CompletedTask);
559575
_mockStream.SetupSequence(stream => stream.MoveNextAsync())
560576
.ReturnsAsync(true)
561-
.Returns(new ValueTask<bool>(nextCompleted.Task));
577+
.Returns(_lastMoveNext);
562578
SetupReadOneWriteAckMessage();
563579

564580
await using var writer = new WriterBuilder<long>(_mockIDriver.Object, "/topic-11")
@@ -581,7 +597,8 @@ public async Task WriteAsync_WhenTaskIsAcceptedBeforeCancel_ReturnWrittenStatus(
581597
.Returns(Task.CompletedTask);
582598
_mockStream.SetupSequence(stream => stream.MoveNextAsync())
583599
.ReturnsAsync(true)
584-
.Returns(new ValueTask<bool>(nextCompleted.Task));
600+
.Returns(new ValueTask<bool>(nextCompleted.Task))
601+
.Returns(_lastMoveNext);
585602
SetupReadOneWriteAckMessage();
586603

587604
await using var writer = new WriterBuilder<long>(_mockIDriver.Object, "/topic-12")
@@ -647,7 +664,7 @@ public async Task WriteAsync_WhenInFlightBufferSendInInitialize_ReturnCompletedT
647664
.Returns(new ValueTask<bool>(moveTcs.Task))
648665
.ReturnsAsync(true)
649666
.ReturnsAsync(true)
650-
.Returns(new ValueTask<bool>(new TaskCompletionSource<bool>().Task));
667+
.Returns(_lastMoveNext);
651668
_mockStream.SetupSequence(stream => stream.Current)
652669
.Returns(new StreamWriteMessage.Types.FromServer
653670
{
@@ -784,7 +801,7 @@ public async Task WriteAsync_WhenTokenIsUpdatedOneTime_SuccessUpdateToken()
784801
.Returns(new ValueTask<bool>(writeTcs1.Task))
785802
.Returns(new ValueTask<bool>(writeTcs2.Task))
786803
.Returns(new ValueTask<bool>(writeTcs3.Task))
787-
.Returns(new ValueTask<bool>(new TaskCompletionSource<bool>().Task));
804+
.Returns(_lastMoveNext);
788805

789806
SetupReadOneWriteAckMessage()
790807
.Returns(new StreamWriteMessage.Types.FromServer
@@ -861,7 +878,7 @@ public async Task DisposeAsync_WhenInFlightMessages_WaitingInFlightMessages()
861878
.Returns(new ValueTask<bool>(writeTcs1.Task))
862879
.ReturnsAsync(true)
863880
.ReturnsAsync(true)
864-
.Returns(new ValueTask<bool>(new TaskCompletionSource<bool>().Task));
881+
.Returns(_lastMoveNext);
865882

866883
_mockStream.SetupSequence(stream => stream.Current)
867884
.Returns(new StreamWriteMessage.Types.FromServer

0 commit comments

Comments
 (0)