Skip to content

Commit cb32d5e

Browse files
feat: mock tests on init WriterSession
1 parent 39b766d commit cb32d5e

File tree

5 files changed

+228
-23
lines changed

5 files changed

+228
-23
lines changed

src/Ydb.Sdk/src/IDriver.cs

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ public ServerStream<TResponse> ServerStreamCall<TRequest, TResponse>(
2020
where TRequest : class
2121
where TResponse : class;
2222

23-
public BidirectionalStream<TRequest, TResponse> BidirectionalStreamCall<TRequest, TResponse>(
23+
public IBidirectionalStream<TRequest, TResponse> BidirectionalStreamCall<TRequest, TResponse>(
2424
Method<TRequest, TResponse> method,
2525
GrpcRequestSettings settings)
2626
where TRequest : class
@@ -29,6 +29,13 @@ public BidirectionalStream<TRequest, TResponse> BidirectionalStreamCall<TRequest
2929
ILoggerFactory LoggerFactory { get; }
3030
}
3131

32+
public interface IBidirectionalStream<in TRequest, out TResponse> : IDisposable
33+
{
34+
public Task Write(TRequest request);
35+
public ValueTask<bool> MoveNextAsync();
36+
public TResponse Current { get; }
37+
}
38+
3239
public abstract class BaseDriver : IDriver
3340
{
3441
protected readonly DriverConfig Config;
@@ -95,7 +102,7 @@ public ServerStream<TResponse> ServerStreamCall<TRequest, TResponse>(
95102
return new ServerStream<TResponse>(call, e => { OnRpcError(endpoint, e); });
96103
}
97104

98-
public BidirectionalStream<TRequest, TResponse> BidirectionalStreamCall<TRequest, TResponse>(
105+
public IBidirectionalStream<TRequest, TResponse> BidirectionalStreamCall<TRequest, TResponse>(
99106
Method<TRequest, TResponse> method,
100107
GrpcRequestSettings settings)
101108
where TRequest : class
@@ -213,16 +220,11 @@ public async ValueTask<bool> MoveNextAsync()
213220
}
214221
}
215222

216-
public class BidirectionalStream<TRequest, TResponse> : IDisposable
223+
public class BidirectionalStream<TRequest, TResponse> : IBidirectionalStream<TRequest, TResponse>
217224
{
218225
private readonly AsyncDuplexStreamingCall<TRequest, TResponse> _stream;
219226
private readonly Action<RpcException> _rpcErrorAction;
220227

221-
public BidirectionalStream()
222-
{
223-
224-
}
225-
226228
internal BidirectionalStream(
227229
AsyncDuplexStreamingCall<TRequest, TResponse> stream,
228230
Action<RpcException> rpcErrorAction)

src/Ydb.Sdk/src/Services/Topic/TopicSession.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,13 @@ internal abstract class TopicSession<TFromClient, TFromServer> : IDisposable
66
{
77
private readonly Func<Task> _initialize;
88

9-
protected readonly BidirectionalStream<TFromClient, TFromServer> Stream;
9+
protected readonly IBidirectionalStream<TFromClient, TFromServer> Stream;
1010
protected readonly ILogger Logger;
1111
protected readonly string SessionId;
1212

1313
private int _isActive = 1;
1414

15-
protected TopicSession(BidirectionalStream<TFromClient, TFromServer> stream, ILogger logger,
15+
protected TopicSession(IBidirectionalStream<TFromClient, TFromServer> stream, ILogger logger,
1616
string sessionId, Func<Task> initialize)
1717
{
1818
Stream = stream;

src/Ydb.Sdk/src/Services/Topic/Writer/Writer.cs

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ namespace Ydb.Sdk.Services.Topic.Writer;
1111
using MessageData = StreamWriteMessage.Types.WriteRequest.Types.MessageData;
1212
using MessageFromClient = StreamWriteMessage.Types.FromClient;
1313
using MessageFromServer = StreamWriteMessage.Types.FromServer;
14-
using WriterStream = BidirectionalStream<
14+
using WriterStream = IBidirectionalStream<
1515
StreamWriteMessage.Types.FromClient,
1616
StreamWriteMessage.Types.FromServer
1717
>;
@@ -172,6 +172,8 @@ private async Task Initialize()
172172
_ = Task.Run(Initialize, _disposeTokenSource.Token);
173173
}
174174

175+
_logger.LogCritical("Writer initialization failed to start. Reason: {Status}", status);
176+
175177
return;
176178
}
177179

@@ -183,11 +185,12 @@ private async Task Initialize()
183185
if (initResponse.SupportedCodecs != null &&
184186
!initResponse.SupportedCodecs.Codecs.Contains((int)_config.Codec))
185187
{
186-
_logger.LogCritical("Topic[{TopicPath}] is not supported codec: {Codec}", _config.TopicPath,
187-
_config.Codec);
188+
_logger.LogCritical(
189+
"Writer initialization failed to start. Reason: topic[Path=\"{TopicPath}\"] is not supported codec {Codec}",
190+
_config.TopicPath, _config.Codec);
188191

189192
_session = new NotStartedWriterSession(
190-
$"Topic[{_config.TopicPath}] is not supported codec: {_config.Codec}");
193+
$"Topic[Path=\"{_config.TopicPath}\"] is not supported codec: {_config.Codec}");
191194
return;
192195
}
193196

@@ -199,6 +202,8 @@ private async Task Initialize()
199202

200203
_session = new NotStartedWriterSession(
201204
new WriterException("Transport error on creating write session", e));
205+
206+
_ = Task.Run(Initialize, _disposeTokenSource.Token);
202207
}
203208
}
204209

@@ -245,7 +250,7 @@ public NotStartedWriterSession(WriterException reasonException)
245250

246251
public Task Write(ConcurrentQueue<MessageSending> toSendBuffer)
247252
{
248-
foreach (var messageSending in toSendBuffer)
253+
while (toSendBuffer.TryDequeue(out var messageSending))
249254
{
250255
messageSending.TaskCompletionSource.SetException(_reasonException);
251256
}

src/Ydb.Sdk/src/Status.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -248,6 +248,7 @@ internal static Status ConvertStatus(this Grpc.Core.Status rpcStatus)
248248
Grpc.Core.StatusCode.DeadlineExceeded => StatusCode.ClientTransportTimeout,
249249
Grpc.Core.StatusCode.ResourceExhausted => StatusCode.ClientTransportResourceExhausted,
250250
Grpc.Core.StatusCode.Unimplemented => StatusCode.ClientTransportUnimplemented,
251+
Grpc.Core.StatusCode.Cancelled => StatusCode.Cancelled,
251252
_ => StatusCode.ClientTransportUnknown
252253
},
253254
new List<Issue> { new(rpcStatus.Detail) }
Lines changed: 205 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,15 @@
11
using Grpc.Core;
22
using Moq;
33
using Xunit;
4+
using Ydb.Issue;
45
using Ydb.Sdk.Services.Topic;
56
using Ydb.Sdk.Services.Topic.Writer;
67
using Ydb.Topic;
8+
using Codec = Ydb.Sdk.Services.Topic.Codec;
79

810
namespace Ydb.Sdk.Tests.Topic;
911

10-
using WriterStream = BidirectionalStream<StreamWriteMessage.Types.FromClient, StreamWriteMessage.Types.FromServer>;
12+
using WriterStream = IBidirectionalStream<StreamWriteMessage.Types.FromClient, StreamWriteMessage.Types.FromServer>;
1113

1214
public class WriterMockTests
1315
{
@@ -20,24 +22,219 @@ public WriterMockTests()
2022
It.IsAny<Method<StreamWriteMessage.Types.FromClient, StreamWriteMessage.Types.FromServer>>(),
2123
It.IsAny<GrpcRequestSettings>())
2224
).Returns(_mockStream.Object);
25+
26+
_mockIDriver.Setup(driver => driver.LoggerFactory).Returns(Utils.GetLoggerFactory);
2327
}
2428

25-
// [Fact]
26-
public async Task NotStarted_Failed_Test()
29+
[Fact]
30+
public async Task Initialize_WhenStreamIsClosedByServer_ThrowWriterExceptionOnWriteAsyncAndTryNextInitialize()
2731
{
2832
var moveNextTry = new TaskCompletionSource<bool>();
33+
var taskNextComplete = new TaskCompletionSource();
2934

30-
_mockStream
31-
.Setup(stream => stream.Write(It.IsAny<StreamWriteMessage.Types.FromClient>()))
35+
_mockStream.Setup(stream => stream.Write(It.IsAny<StreamWriteMessage.Types.FromClient>()))
3236
.Returns(Task.CompletedTask);
33-
3437
_mockStream.SetupSequence(stream => stream.MoveNextAsync())
3538
.ReturnsAsync(false)
36-
.Returns(new ValueTask<bool>(moveNextTry.Task)); // For retry
39+
.Returns(() =>
40+
{
41+
taskNextComplete.SetResult();
42+
return new ValueTask<bool>(moveNextTry.Task);
43+
});
3744

3845
using var writer = new WriterBuilder<int>(_mockIDriver.Object, new WriterConfig("/topic")
3946
{ ProducerId = "producerId" }).Build();
4047

41-
await Assert.ThrowsAsync<WriterException>(() => writer.WriteAsync(100));
48+
Assert.Equal("Stream unexpectedly closed by YDB server. " +
49+
"Current InitRequest: { \"path\": \"/topic\", \"producerId\": \"producerId\" }",
50+
(await Assert.ThrowsAsync<WriterException>(() => writer.WriteAsync(100))).Message);
51+
52+
await taskNextComplete.Task;
53+
// check attempt repeated!!!
54+
_mockStream.Verify(stream => stream.Write(It.IsAny<StreamWriteMessage.Types.FromClient>()), Times.Exactly(2));
55+
_mockStream.Verify(stream => stream.MoveNextAsync(), Times.Exactly(2));
56+
}
57+
58+
[Fact]
59+
public async Task Initialize_WhenFailWriteMessage_ThrowWriterExceptionOnWriteAsyncAndTryNextInitialize()
60+
{
61+
var taskSource = new TaskCompletionSource();
62+
var taskNextComplete = new TaskCompletionSource();
63+
_mockStream.SetupSequence(stream => stream.Write(It.IsAny<StreamWriteMessage.Types.FromClient>()))
64+
.ThrowsAsync(new Driver.TransportException(new RpcException(Grpc.Core.Status.DefaultCancelled)))
65+
.Returns(() =>
66+
{
67+
taskNextComplete.SetResult();
68+
return taskSource.Task;
69+
});
70+
71+
using var writer = new WriterBuilder<string>(_mockIDriver.Object, new WriterConfig("/topic")
72+
{ ProducerId = "producerId" }).Build();
73+
74+
var writerException = await Assert.ThrowsAsync<WriterException>(() => writer.WriteAsync("abacaba"));
75+
Assert.Equal("Transport error on creating write session", writerException.Message);
76+
Assert.Equal(StatusCode.Cancelled, writerException.Status.StatusCode);
77+
78+
await taskNextComplete.Task;
79+
// check attempt repeated!!!
80+
_mockStream.Verify(stream => stream.Write(It.IsAny<StreamWriteMessage.Types.FromClient>()), Times.Exactly(2));
81+
}
82+
83+
[Fact]
84+
public async Task Initialize_WhenFailMoveNextAsync_ThrowWriterExceptionOnWriteAsyncAndTryNextInitialize()
85+
{
86+
var taskSource = new TaskCompletionSource<bool>();
87+
var taskNextComplete = new TaskCompletionSource();
88+
_mockStream.Setup(stream => stream.Write(It.IsAny<StreamWriteMessage.Types.FromClient>()))
89+
.Returns(Task.CompletedTask);
90+
_mockStream.SetupSequence(stream => stream.MoveNextAsync())
91+
.ThrowsAsync(new Driver.TransportException(
92+
new RpcException(new Grpc.Core.Status(Grpc.Core.StatusCode.DeadlineExceeded, "Some message"))))
93+
.Returns(() =>
94+
{
95+
taskNextComplete.SetResult();
96+
return new ValueTask<bool>(taskSource.Task);
97+
});
98+
99+
using var writer = new WriterBuilder<string>(_mockIDriver.Object, new WriterConfig("/topic")
100+
{ ProducerId = "producerId" }).Build();
101+
102+
var writerException = await Assert.ThrowsAsync<WriterException>(() => writer.WriteAsync("abacaba"));
103+
Assert.Equal("Transport error on creating write session", writerException.Message);
104+
Assert.Equal(StatusCode.ClientTransportTimeout, writerException.Status.StatusCode);
105+
106+
await taskNextComplete.Task;
107+
// check attempt repeated!!!
108+
_mockStream.Verify(stream => stream.Write(It.IsAny<StreamWriteMessage.Types.FromClient>()), Times.Exactly(2));
109+
_mockStream.Verify(stream => stream.MoveNextAsync(), Times.Exactly(2));
42110
}
111+
112+
[Fact]
113+
public async Task Initialize_WhenInitResponseNotSuccess_ThrowWriterExceptionOnWriteAsyncAndTryNextInitialize()
114+
{
115+
var taskSource = new TaskCompletionSource<bool>();
116+
var taskNextComplete = new TaskCompletionSource();
117+
_mockStream.Setup(stream => stream.Write(It.IsAny<StreamWriteMessage.Types.FromClient>()))
118+
.Returns(Task.CompletedTask);
119+
_mockStream.SetupSequence(stream => stream.MoveNextAsync())
120+
.Returns(new ValueTask<bool>(true))
121+
.Returns(() =>
122+
{
123+
taskNextComplete.SetResult();
124+
return new ValueTask<bool>(taskSource.Task);
125+
});
126+
_mockStream.Setup(stream => stream.Current)
127+
.Returns(new StreamWriteMessage.Types.FromServer
128+
{
129+
Status = StatusIds.Types.StatusCode.BadSession,
130+
Issues = { new IssueMessage { Message = "Some message" } }
131+
});
132+
133+
using var writer = new WriterBuilder<long>(_mockIDriver.Object, new WriterConfig("/topic")
134+
{ ProducerId = "producerId" }).Build();
135+
136+
Assert.Equal("Initialization failed: Status: BadSession, Issues:\n[0] Fatal: Some message\n",
137+
(await Assert.ThrowsAsync<WriterException>(() => writer.WriteAsync(123L))).Message);
138+
139+
await taskNextComplete.Task;
140+
// check attempt repeated!!!
141+
_mockStream.Verify(stream => stream.Write(It.IsAny<StreamWriteMessage.Types.FromClient>()), Times.Exactly(2));
142+
_mockStream.Verify(stream => stream.MoveNextAsync(), Times.Exactly(2));
143+
}
144+
145+
[Fact]
146+
public async Task Initialize_WhenInitResponseIsSchemaError_ThrowWriterExceptionOnWriteAsyncAndStopInitializing()
147+
{
148+
_mockStream.Setup(stream => stream.Write(It.IsAny<StreamWriteMessage.Types.FromClient>()))
149+
.Returns(Task.CompletedTask);
150+
_mockStream.Setup(stream => stream.MoveNextAsync())
151+
.Returns(new ValueTask<bool>(true));
152+
_mockStream.Setup(stream => stream.Current)
153+
.Returns(new StreamWriteMessage.Types.FromServer
154+
{
155+
Status = StatusIds.Types.StatusCode.SchemeError,
156+
Issues = { new IssueMessage { Message = "Topic not found" } }
157+
});
158+
159+
using var writer = new WriterBuilder<long>(_mockIDriver.Object, new WriterConfig("/topic")
160+
{ ProducerId = "producerId" }).Build();
161+
162+
Assert.Equal("Initialization failed: Status: SchemeError, Issues:\n[0] Fatal: Topic not found\n",
163+
(await Assert.ThrowsAsync<WriterException>(() => writer.WriteAsync(123L))).Message);
164+
165+
// check not attempt repeated!!!
166+
_mockStream.Verify(stream => stream.Write(It.IsAny<StreamWriteMessage.Types.FromClient>()), Times.Once);
167+
_mockStream.Verify(stream => stream.MoveNextAsync(), Times.Once);
168+
}
169+
170+
[Fact]
171+
public async Task Initialize_WhenNotSupportedCodec_ThrowWriterExceptionOnWriteAsyncAndStopInitializing()
172+
{
173+
_mockStream.Setup(stream => stream.Write(It.IsAny<StreamWriteMessage.Types.FromClient>()))
174+
.Returns(Task.CompletedTask);
175+
_mockStream.Setup(stream => stream.MoveNextAsync())
176+
.Returns(new ValueTask<bool>(true));
177+
_mockStream.Setup(stream => stream.Current)
178+
.Returns(new StreamWriteMessage.Types.FromServer
179+
{
180+
InitResponse = new StreamWriteMessage.Types.InitResponse
181+
{
182+
LastSeqNo = 1, PartitionId = 1, SessionId = "SessionId",
183+
SupportedCodecs = new SupportedCodecs { Codecs = { 2 /* Gzip */, 3 /* Lzop */ } }
184+
},
185+
Status = StatusIds.Types.StatusCode.Success,
186+
});
187+
188+
using var writer = new WriterBuilder<long>(_mockIDriver.Object, new WriterConfig("/topic")
189+
{ ProducerId = "producerId", Codec = Codec.Raw }).Build();
190+
191+
Assert.Equal("Topic[Path=\"/topic\"] is not supported codec: Raw",
192+
(await Assert.ThrowsAsync<WriterException>(() => writer.WriteAsync(123L))).Message);
193+
194+
// check not attempt repeated!!!
195+
_mockStream.Verify(stream => stream.Write(It.IsAny<StreamWriteMessage.Types.FromClient>()), Times.Once);
196+
_mockStream.Verify(stream => stream.MoveNextAsync(), Times.Once);
197+
}
198+
199+
200+
201+
/*
202+
* _mockStream.Setup(stream => stream.Current)
203+
.Returns(new StreamWriteMessage.Types.FromServer
204+
{
205+
InitResponse = new StreamWriteMessage.Types.InitResponse
206+
{ LastSeqNo = 1, PartitionId = 1, SessionId = "SessionId" },
207+
Status = StatusIds.Types.StatusCode.Success,
208+
});
209+
moveNextTry.SetResult(true);
210+
await Task.Yield();
211+
212+
var writeTask = writer.WriteAsync(100);
213+
moveNextTryWriteAck.SetResult(true);
214+
215+
_mockStream.Setup(stream => stream.Current).Returns(
216+
new StreamWriteMessage.Types.FromServer
217+
{
218+
WriteResponse = new StreamWriteMessage.Types.WriteResponse
219+
{
220+
Acks =
221+
{
222+
new StreamWriteMessage.Types.WriteResponse.Types.WriteAck
223+
{
224+
SeqNo = 1, Written =
225+
new StreamWriteMessage.Types.WriteResponse.Types.WriteAck.Types.Written
226+
{ Offset = 2 }
227+
}
228+
}
229+
},
230+
Status = StatusIds.Types.StatusCode.Success
231+
});
232+
_mockStream.Setup(stream => stream.MoveNextAsync()).ReturnsAsync(true);
233+
234+
var writeResult = await writeTask;
235+
Assert.Equal(PersistenceStatus.Written, writeResult.Status);
236+
Assert.True(writeResult.TryGetOffset(out var offset));
237+
Assert.Equal(2, offset);
238+
_mockStream.Setup(stream => stream.MoveNextAsync()).ReturnsAsync(false);
239+
*/
43240
}

0 commit comments

Comments
 (0)