Skip to content

Commit 39b766d

Browse files
micro fixex
1 parent ce61a54 commit 39b766d

File tree

4 files changed

+67
-14
lines changed

4 files changed

+67
-14
lines changed

src/Ydb.Sdk/src/IDriver.cs

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,21 +6,21 @@ namespace Ydb.Sdk;
66

77
public interface IDriver : IAsyncDisposable, IDisposable
88
{
9-
internal Task<TResponse> UnaryCall<TRequest, TResponse>(
9+
public Task<TResponse> UnaryCall<TRequest, TResponse>(
1010
Method<TRequest, TResponse> method,
1111
TRequest request,
1212
GrpcRequestSettings settings)
1313
where TRequest : class
1414
where TResponse : class;
1515

16-
internal ServerStream<TResponse> ServerStreamCall<TRequest, TResponse>(
16+
public ServerStream<TResponse> ServerStreamCall<TRequest, TResponse>(
1717
Method<TRequest, TResponse> method,
1818
TRequest request,
1919
GrpcRequestSettings settings)
2020
where TRequest : class
2121
where TResponse : class;
2222

23-
internal BidirectionalStream<TRequest, TResponse> BidirectionalStreamCall<TRequest, TResponse>(
23+
public BidirectionalStream<TRequest, TResponse> BidirectionalStreamCall<TRequest, TResponse>(
2424
Method<TRequest, TResponse> method,
2525
GrpcRequestSettings settings)
2626
where TRequest : class
@@ -213,11 +213,16 @@ public async ValueTask<bool> MoveNextAsync()
213213
}
214214
}
215215

216-
public sealed class BidirectionalStream<TRequest, TResponse> : IDisposable
216+
public class BidirectionalStream<TRequest, TResponse> : IDisposable
217217
{
218218
private readonly AsyncDuplexStreamingCall<TRequest, TResponse> _stream;
219219
private readonly Action<RpcException> _rpcErrorAction;
220220

221+
public BidirectionalStream()
222+
{
223+
224+
}
225+
221226
internal BidirectionalStream(
222227
AsyncDuplexStreamingCall<TRequest, TResponse> stream,
223228
Action<RpcException> rpcErrorAction)

src/Ydb.Sdk/src/Services/Topic/Writer/Writer.cs

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
using System.Collections.Concurrent;
2-
using System.Transactions;
32
using Google.Protobuf;
43
using Google.Protobuf.WellKnownTypes;
54
using Microsoft.Extensions.Logging;
@@ -303,17 +302,14 @@ public async Task Write(ConcurrentQueue<MessageSending> toSendBuffer)
303302
Volatile.Write(ref _seqNum, currentSeqNum);
304303
await Stream.Write(new MessageFromClient { WriteRequest = writeMessage });
305304
}
306-
catch (TransactionException e)
305+
catch (Driver.TransportException e)
307306
{
308307
Logger.LogError(e, "WriterSession[{SessionId}] have error on Write, last SeqNo={SeqNo}",
309308
SessionId, Volatile.Read(ref _seqNum));
310309

311-
ReconnectSession();
310+
ClearInFlightMessages(e);
312311

313-
while (_inFlightMessages.TryDequeue(out var sendData))
314-
{
315-
sendData.TaskCompletionSource.SetException(e);
316-
}
312+
ReconnectSession();
317313
}
318314
}
319315

@@ -377,13 +373,23 @@ Completing task on exception...
377373
}
378374
}
379375
}
380-
catch (Exception e)
376+
catch (Driver.TransportException e)
381377
{
382378
Logger.LogError(e, "WriterSession[{SessionId}] have error on processing writeAck", SessionId);
379+
380+
ClearInFlightMessages(e);
383381
}
384382
finally
385383
{
386384
ReconnectSession();
387385
}
388386
}
387+
388+
private void ClearInFlightMessages(Driver.TransportException e)
389+
{
390+
while (_inFlightMessages.TryDequeue(out var sendData))
391+
{
392+
sendData.TaskCompletionSource.SetException(e);
393+
}
394+
}
389395
}

src/Ydb.Sdk/tests/Topic/WriterIntegrationTests.cs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,7 @@ public async Task WriteAsync_WhenOneMessage_ReturnWritten()
4040
[Fact]
4141
public async Task WriteAsync_WhenTopicNotFound_ReturnNotFoundException()
4242
{
43-
using var writer = new WriterBuilder<string>(_driver,
44-
new WriterConfig(_topicName + "_not_found") { ProducerId = "producerId" })
43+
using var writer = new WriterBuilder<string>(_driver, new WriterConfig(_topicName + "_not_found") { ProducerId = "producerId" })
4544
.Build();
4645

4746
Assert.Equal(StatusCode.SchemeError, (await Assert.ThrowsAsync<WriterException>(
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
using Grpc.Core;
2+
using Moq;
3+
using Xunit;
4+
using Ydb.Sdk.Services.Topic;
5+
using Ydb.Sdk.Services.Topic.Writer;
6+
using Ydb.Topic;
7+
8+
namespace Ydb.Sdk.Tests.Topic;
9+
10+
using WriterStream = BidirectionalStream<StreamWriteMessage.Types.FromClient, StreamWriteMessage.Types.FromServer>;
11+
12+
public class WriterMockTests
13+
{
14+
private readonly Mock<IDriver> _mockIDriver = new();
15+
private readonly Mock<WriterStream> _mockStream = new();
16+
17+
public WriterMockTests()
18+
{
19+
_mockIDriver.Setup(driver => driver.BidirectionalStreamCall(
20+
It.IsAny<Method<StreamWriteMessage.Types.FromClient, StreamWriteMessage.Types.FromServer>>(),
21+
It.IsAny<GrpcRequestSettings>())
22+
).Returns(_mockStream.Object);
23+
}
24+
25+
// [Fact]
26+
public async Task NotStarted_Failed_Test()
27+
{
28+
var moveNextTry = new TaskCompletionSource<bool>();
29+
30+
_mockStream
31+
.Setup(stream => stream.Write(It.IsAny<StreamWriteMessage.Types.FromClient>()))
32+
.Returns(Task.CompletedTask);
33+
34+
_mockStream.SetupSequence(stream => stream.MoveNextAsync())
35+
.ReturnsAsync(false)
36+
.Returns(new ValueTask<bool>(moveNextTry.Task)); // For retry
37+
38+
using var writer = new WriterBuilder<int>(_mockIDriver.Object, new WriterConfig("/topic")
39+
{ ProducerId = "producerId" }).Build();
40+
41+
await Assert.ThrowsAsync<WriterException>(() => writer.WriteAsync(100));
42+
}
43+
}

0 commit comments

Comments
 (0)