Skip to content

Commit 6b52027

Browse files
dev: reader first unit test on start fail
1 parent ab8fb03 commit 6b52027

File tree

3 files changed

+51
-20
lines changed

3 files changed

+51
-20
lines changed

src/Ydb.Sdk/src/Services/Topic/Reader/Reader.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,8 @@ public void Dispose()
204204
try
205205
{
206206
_disposeCts.Cancel();
207+
208+
_receivedMessagesChannel.Writer.Complete();
207209
}
208210
finally
209211
{
@@ -246,7 +248,6 @@ internal class ReaderSession<TValue> : TopicSession<MessageFromClient, MessageFr
246248
Channel.CreateUnbounded<MessageFromClient>(
247249
new UnboundedChannelOptions
248250
{
249-
SingleWriter = true,
250251
SingleReader = true,
251252
AllowSynchronousContinuations = false
252253
}

src/Ydb.Sdk/tests/Topic/ReaderUnitTests.cs

Lines changed: 42 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
using Google.Protobuf;
2+
using Google.Protobuf.WellKnownTypes;
23
using Grpc.Core;
34
using Moq;
5+
using Xunit;
46
using Ydb.Sdk.Services.Topic.Reader;
57
using Ydb.Topic;
68

@@ -25,23 +27,42 @@ public ReaderUnitTests()
2527
_mockIDriver.Setup(driver => driver.LoggerFactory).Returns(Utils.GetLoggerFactory);
2628
}
2729

28-
// [Fact]
30+
[Fact]
2931
public async Task Initialize_WhenFailWriteMessage_ShouldRetryInitializeAndReadThenCommitMessage()
3032
{
31-
var tcs = new TaskCompletionSource<bool>();
33+
var tcsMoveNext = new TaskCompletionSource<bool>();
34+
var tcsCommitMessage = new TaskCompletionSource<bool>();
3235

3336
_mockStream.SetupSequence(stream => stream.Write(It.IsAny<FromClient>()))
37+
// Write Throws Exception
3438
.ThrowsAsync(new Driver.TransportException(new RpcException(Grpc.Core.Status.DefaultCancelled)))
39+
// Write init
3540
.Returns(Task.CompletedTask)
41+
// Write ReadRequest { 200 bytes }
3642
.Returns(Task.CompletedTask)
37-
.Returns(Task.CompletedTask);
43+
// Write StartSessionPartitionRequest
44+
.Returns(() =>
45+
{
46+
tcsMoveNext.SetResult(true);
47+
48+
return Task.CompletedTask;
49+
})
50+
// Write ReadRequest { 50 bytes }
51+
.Returns(Task.CompletedTask)
52+
// Write CommitRequest
53+
.Returns(() =>
54+
{
55+
tcsCommitMessage.SetResult(true);
56+
57+
return Task.CompletedTask;
58+
});
3859

3960
_mockStream.SetupSequence(stream => stream.MoveNextAsync())
40-
.Returns(new ValueTask<bool>(true))
41-
.Returns(new ValueTask<bool>(true))
42-
.Returns(new ValueTask<bool>(true))
43-
.Returns(new ValueTask<bool>(true))
44-
.Returns(new ValueTask<bool>(tcs.Task));
61+
.ReturnsAsync(true)
62+
.ReturnsAsync(true)
63+
.Returns(new ValueTask<bool>(tcsMoveNext.Task))
64+
.Returns(new ValueTask<bool>(tcsCommitMessage.Task))
65+
.Returns(new ValueTask<bool>(new TaskCompletionSource<bool>().Task));
4566

4667
_mockStream.SetupSequence(stream => stream.Current)
4768
.Returns(new FromServer
@@ -64,20 +85,27 @@ public async Task Initialize_WhenFailWriteMessage_ShouldRetryInitializeAndReadTh
6485
.Returns(
6586
new FromServer
6687
{
88+
Status = StatusIds.Types.StatusCode.Success,
6789
ReadResponse = new StreamReadMessage.Types.ReadResponse
6890
{
6991
BytesSize = 50, PartitionData =
7092
{
7193
new StreamReadMessage.Types.ReadResponse.Types.PartitionData
7294
{
95+
PartitionSessionId = 1,
7396
Batches =
7497
{
7598
new StreamReadMessage.Types.ReadResponse.Types.Batch
7699
{
100+
ProducerId = "ProducerId",
77101
MessageData =
78102
{
79103
new StreamReadMessage.Types.ReadResponse.Types.MessageData
80-
{ Data = ByteString.CopyFrom(BitConverter.GetBytes(100)) }
104+
{
105+
Data = ByteString.CopyFrom(BitConverter.GetBytes(100)),
106+
Offset = 1,
107+
CreatedAt = new Timestamp()
108+
}
81109
}
82110
}
83111
}
@@ -89,6 +117,7 @@ public async Task Initialize_WhenFailWriteMessage_ShouldRetryInitializeAndReadTh
89117
.Returns(
90118
new FromServer
91119
{
120+
Status = StatusIds.Types.StatusCode.Success,
92121
CommitOffsetResponse =
93122
new StreamReadMessage.Types.CommitOffsetResponse
94123
{
@@ -97,7 +126,7 @@ public async Task Initialize_WhenFailWriteMessage_ShouldRetryInitializeAndReadTh
97126
new StreamReadMessage.Types.CommitOffsetResponse.Types.PartitionCommittedOffset
98127
{
99128
PartitionSessionId = 1,
100-
CommittedOffset = 50
129+
CommittedOffset = 2
101130
}
102131
}
103132
}
@@ -112,8 +141,8 @@ public async Task Initialize_WhenFailWriteMessage_ShouldRetryInitializeAndReadTh
112141
SubscribeSettings = { new SubscribeSettings("/topic") }
113142
}.Build();
114143

115-
await reader.ReadAsync();
116-
// await message.CommitAsync();
117-
// Assert.Equal(100, message.Data);
144+
var message = await reader.ReadAsync();
145+
await message.CommitAsync();
146+
Assert.Equal(100, message.Data);
118147
}
119148
}

src/Ydb.Sdk/tests/Topic/WriterUnitTests.cs

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
using Ydb.Sdk.Services.Topic.Writer;
77
using Ydb.Topic;
88
using Codec = Ydb.Sdk.Services.Topic.Codec;
9+
using Range = Moq.Range;
910

1011
namespace Ydb.Sdk.Tests.Topic;
1112

@@ -132,7 +133,7 @@ public async Task Initialize_WhenFailWriteMessage_ShouldRetryInitializeAndReturn
132133
// check attempt repeated!!!
133134
_mockStream.Verify(stream => stream.Write(It.IsAny<FromClient>()), Times.Exactly(3));
134135
_mockStream.Verify(stream => stream.MoveNextAsync(),
135-
Times.AtLeast(2)); // run processing ack may not be able to start on time
136+
Times.Between(2, 3, Range.Inclusive)); // run processing ack may not be able to start on time
136137
_mockStream.Verify(stream => stream.Current, Times.Exactly(2));
137138
}
138139

@@ -180,7 +181,7 @@ public async Task Initialize_WhenFailMoveNextAsync_ShouldRetryInitializeAndRetur
180181
// check attempt repeated!!!
181182
_mockStream.Verify(stream => stream.Write(It.IsAny<FromClient>()), Times.Exactly(3));
182183
_mockStream.Verify(stream => stream.MoveNextAsync(),
183-
Times.AtLeast(3)); // run processing ack may not be able to start on time
184+
Times.Between(3, 4, Range.Inclusive)); // run processing ack may not be able to start on time
184185
_mockStream.Verify(stream => stream.Current, Times.Exactly(2));
185186
}
186187

@@ -257,7 +258,7 @@ public async Task Initialize_WhenInitResponseStatusIsRetryable_ShouldRetryInitia
257258
// check attempt repeated!!!
258259
_mockStream.Verify(stream => stream.Write(It.IsAny<FromClient>()), Times.Exactly(3));
259260
_mockStream.Verify(stream => stream.MoveNextAsync(),
260-
Times.AtLeast(3)); // run processing ack may not be able to start on time
261+
Times.Between(3, 4, Range.Inclusive)); // run processing ack may not be able to start on time
261262
_mockStream.Verify(stream => stream.Current, Times.Exactly(3));
262263
}
263264

@@ -391,7 +392,7 @@ public async Task WriteAsync_WhenTransportExceptionOnWriteInWriterSession_Should
391392

392393
Assert.Equal(PersistenceStatus.Written, (await writer.WriteAsync(100L)).Status);
393394
_mockStream.Verify(stream => stream.Write(It.IsAny<FromClient>()), Times.Exactly(4));
394-
_mockStream.Verify(stream => stream.MoveNextAsync(), Times.AtLeast(4));
395+
_mockStream.Verify(stream => stream.MoveNextAsync(), Times.Between(4, 5, Range.Inclusive));
395396
_mockStream.Verify(stream => stream.Current, Times.Exactly(3));
396397
}
397398

@@ -466,7 +467,7 @@ public async Task WriteAsync_WhenTransportExceptionOnProcessingWriteAck_ShouldRe
466467
Assert.Equal(PersistenceStatus.Written, (await writer.WriteAsync(100L)).Status);
467468

468469
_mockStream.Verify(stream => stream.Write(It.IsAny<FromClient>()), Times.Exactly(3));
469-
_mockStream.Verify(stream => stream.MoveNextAsync(), Times.AtLeast(4));
470+
_mockStream.Verify(stream => stream.MoveNextAsync(), Times.Between(4, 5, Range.Inclusive));
470471
_mockStream.Verify(stream => stream.Current, Times.Exactly(3));
471472
}
472473

@@ -543,7 +544,7 @@ public async Task WriteAsync_WhenStreamIsClosingOnProcessingWriteAck_ShouldRecon
543544
Assert.Equal(PersistenceStatus.Written, (await writer.WriteAsync(100L)).Status);
544545

545546
_mockStream.Verify(stream => stream.Write(It.IsAny<FromClient>()), Times.Exactly(3));
546-
_mockStream.Verify(stream => stream.MoveNextAsync(), Times.AtLeast(4));
547+
_mockStream.Verify(stream => stream.MoveNextAsync(), Times.Between(4, 5, Range.Inclusive));
547548
_mockStream.Verify(stream => stream.Current, Times.Exactly(3));
548549
}
549550

0 commit comments

Comments
 (0)