Skip to content

Commit af631d7

Browse files
feat: impl IWriter (#213)
* Added mock / integration stress tests. * Updated CAS semantics for enqueuing in the buffer. * Processed buffer overflow on WriteAsync. * Setting NotStartedWriterSession with a fail reason on RPC and more errors. * New initialization strategy for WriterSession (background task). * Supported cancellation token for sending tasks. * Renamed TaskCompletionSource -> Tcs. * WriterConfig has become an internal class. * Fixed setting the SeqNo field in the message (in-flight buffer already has a seqNo) and added a check on canceled TCS. * Using BitConverter for Serializer / Deserializer.
1 parent 523be8e commit af631d7

File tree

19 files changed

+1269
-230
lines changed

19 files changed

+1269
-230
lines changed

.github/workflows/tests.yml

Lines changed: 41 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@ jobs:
3333
run: |
3434
cd src
3535
dotnet test --filter "Category=Unit" -f ${{ matrix.dotnet-target-framework }}
36-
3736
ado-net-tests:
3837
runs-on: ubuntu-22.04
3938
strategy:
@@ -74,8 +73,47 @@ jobs:
7473
docker cp ydb-local:/ydb_certs/ca.pem ~/
7574
cd src
7675
dotnet test --filter "(FullyQualifiedName~Ado) | (FullyQualifiedName~Dapper)" -l "console;verbosity=normal"
77-
78-
core-integration-tests:
76+
topic-tests:
77+
runs-on: ubuntu-22.04
78+
strategy:
79+
fail-fast: false
80+
matrix:
81+
ydb-version: [ 'trunk' ]
82+
dotnet-version: [ 6.0.x, 7.0.x ]
83+
include:
84+
- dotnet-version: 6.0.x
85+
dotnet-target-framework: net6.0
86+
- dotnet-version: 7.0.x
87+
dotnet-target-framework: net7.0
88+
services:
89+
ydb:
90+
image: cr.yandex/yc/yandex-docker-local-ydb:${{ matrix.ydb-version }}
91+
ports:
92+
- 2135:2135
93+
- 2136:2136
94+
- 8765:8765
95+
env:
96+
YDB_LOCAL_SURVIVE_RESTART: true
97+
YDB_USE_IN_MEMORY_PDISKS: true
98+
options: '--name ydb-local -h localhost'
99+
env:
100+
OS: ubuntu-22.04
101+
YDB_VERSION: ${{ matrix.ydb-version }}
102+
YDB_CONNECTION_STRING: grpc://localhost:2136/local
103+
YDB_CONNECTION_STRING_SECURE: grpcs://localhost:2135/local
104+
steps:
105+
- name: Checkout code
106+
uses: actions/checkout@v4
107+
- name: Install Dotnet
108+
uses: actions/setup-dotnet@v4
109+
with:
110+
dotnet-version: ${{ matrix.dotnet-version }}
111+
- name: Run Topic tests
112+
run: |
113+
docker cp ydb-local:/ydb_certs/ca.pem ~/
114+
cd src
115+
dotnet test --filter "FullyQualifiedName~Topic" -l "console;verbosity=normal"
116+
integration-tests:
79117
runs-on: ubuntu-22.04
80118
strategy:
81119
fail-fast: false

src/Ydb.Sdk/src/IDriver.cs

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,21 +6,21 @@ namespace Ydb.Sdk;
66

77
public interface IDriver : IAsyncDisposable, IDisposable
88
{
9-
internal Task<TResponse> UnaryCall<TRequest, TResponse>(
9+
public Task<TResponse> UnaryCall<TRequest, TResponse>(
1010
Method<TRequest, TResponse> method,
1111
TRequest request,
1212
GrpcRequestSettings settings)
1313
where TRequest : class
1414
where TResponse : class;
1515

16-
internal ServerStream<TResponse> ServerStreamCall<TRequest, TResponse>(
16+
public ServerStream<TResponse> ServerStreamCall<TRequest, TResponse>(
1717
Method<TRequest, TResponse> method,
1818
TRequest request,
1919
GrpcRequestSettings settings)
2020
where TRequest : class
2121
where TResponse : class;
2222

23-
internal BidirectionalStream<TRequest, TResponse> BidirectionalStreamCall<TRequest, TResponse>(
23+
public IBidirectionalStream<TRequest, TResponse> BidirectionalStreamCall<TRequest, TResponse>(
2424
Method<TRequest, TResponse> method,
2525
GrpcRequestSettings settings)
2626
where TRequest : class
@@ -29,6 +29,15 @@ internal BidirectionalStream<TRequest, TResponse> BidirectionalStreamCall<TReque
2929
ILoggerFactory LoggerFactory { get; }
3030
}
3131

32+
public interface IBidirectionalStream<in TRequest, out TResponse> : IDisposable
33+
{
34+
public Task Write(TRequest request);
35+
36+
public ValueTask<bool> MoveNextAsync();
37+
38+
public TResponse Current { get; }
39+
}
40+
3241
public abstract class BaseDriver : IDriver
3342
{
3443
protected readonly DriverConfig Config;
@@ -95,7 +104,7 @@ public ServerStream<TResponse> ServerStreamCall<TRequest, TResponse>(
95104
return new ServerStream<TResponse>(call, e => { OnRpcError(endpoint, e); });
96105
}
97106

98-
public BidirectionalStream<TRequest, TResponse> BidirectionalStreamCall<TRequest, TResponse>(
107+
public IBidirectionalStream<TRequest, TResponse> BidirectionalStreamCall<TRequest, TResponse>(
99108
Method<TRequest, TResponse> method,
100109
GrpcRequestSettings settings)
101110
where TRequest : class
@@ -213,7 +222,7 @@ public async ValueTask<bool> MoveNextAsync()
213222
}
214223
}
215224

216-
public sealed class BidirectionalStream<TRequest, TResponse> : IDisposable
225+
public class BidirectionalStream<TRequest, TResponse> : IBidirectionalStream<TRequest, TResponse>
217226
{
218227
private readonly AsyncDuplexStreamingCall<TRequest, TResponse> _stream;
219228
private readonly Action<RpcException> _rpcErrorAction;

src/Ydb.Sdk/src/Services/Topic/Deserializer.cs

Lines changed: 2 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -55,28 +55,15 @@ private class Int64Deserializer : IDeserializer<long>
5555
{
5656
public long Deserialize(byte[] data)
5757
{
58-
if (data.Length != 8)
59-
{
60-
throw new ArgumentException(
61-
$"Deserializer<Long> encountered data of length ${data.Length}. Expecting data length to be 8");
62-
}
63-
64-
return ((long)data[0] << 56) | ((long)data[1] << 48) | ((long)data[2] << 40) | ((long)data[3] << 32) |
65-
((long)data[4] << 24) | ((long)data[5] << 16) | ((long)data[6] << 8) | data[7];
58+
return BitConverter.ToInt64(data);
6659
}
6760
}
6861

6962
private class Int32Deserializer : IDeserializer<int>
7063
{
7164
public int Deserialize(byte[] data)
7265
{
73-
if (data.Length != 4)
74-
{
75-
throw new ArgumentException(
76-
$"Deserializer<Int32> encountered data of length ${data.Length}. Expecting data length to be 4");
77-
}
78-
79-
return (data[0] << 24) | (data[1] << 16) | (data[2] << 8) | data[3];
66+
return BitConverter.ToInt32(data);
8067
}
8168
}
8269

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,28 @@
11
namespace Ydb.Sdk.Services.Topic;
22

3-
public class YdbTopicException : Exception
3+
public class WriterException : Exception
44
{
5-
protected YdbTopicException(string message) : base(message)
5+
public WriterException(string message) : base(message)
66
{
7+
Status = new Status(StatusCode.Unspecified);
78
}
8-
}
99

10-
public class YdbWriterException : YdbTopicException
11-
{
12-
public YdbWriterException(string message) : base(message)
10+
public WriterException(string message, Status status) : base(message + ": " + status)
11+
{
12+
Status = status;
13+
}
14+
15+
public WriterException(string message, Driver.TransportException e) : base(message, e)
1316
{
17+
Status = e.Status;
1418
}
19+
20+
public Status Status { get; }
1521
}
1622

17-
public class YdbReaderException : YdbTopicException
23+
public class ReaderException : Exception
1824
{
19-
protected YdbReaderException(string message) : base(message)
25+
protected ReaderException(string message) : base(message)
2026
{
2127
}
2228
}

src/Ydb.Sdk/src/Services/Topic/IWriter.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,9 @@
22

33
namespace Ydb.Sdk.Services.Topic;
44

5-
public interface IWriter<TValue>
5+
public interface IWriter<TValue> : IDisposable
66
{
7-
public Task<WriteResult> WriteAsync(TValue data);
7+
public Task<WriteResult> WriteAsync(TValue data, CancellationToken cancellationToken = default);
88

9-
public Task<WriteResult> WriteAsync(Message<TValue> message);
9+
public Task<WriteResult> WriteAsync(Message<TValue> message, CancellationToken cancellationToken = default);
1010
}

src/Ydb.Sdk/src/Services/Topic/Reader/ReaderBuilder.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ public Task<IReader<TValue>> Build()
2222
// Deserializer ?? (IDeserializer<TValue>)(
2323
// Deserializers.DefaultDeserializers.TryGetValue(typeof(TValue), out var deserializer)
2424
// ? deserializer
25-
// : throw new YdbWriterException("The serializer is not set")
25+
// : throw new WriterException("The serializer is not set")
2626
// )
2727
// );
2828
//

src/Ydb.Sdk/src/Services/Topic/Serializer.cs

Lines changed: 2 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -49,31 +49,15 @@ private class Int64Serializer : ISerializer<long>
4949
{
5050
public byte[] Serialize(long data)
5151
{
52-
return new[]
53-
{
54-
(byte)(data >> 56),
55-
(byte)(data >> 48),
56-
(byte)(data >> 40),
57-
(byte)(data >> 32),
58-
(byte)(data >> 24),
59-
(byte)(data >> 16),
60-
(byte)(data >> 8),
61-
(byte)data
62-
};
52+
return BitConverter.GetBytes(data);
6353
}
6454
}
6555

6656
private class Int32Serializer : ISerializer<int>
6757
{
6858
public byte[] Serialize(int data)
6959
{
70-
return new[]
71-
{
72-
(byte)(data >> 24),
73-
(byte)(data >> 16),
74-
(byte)(data >> 8),
75-
(byte)data
76-
};
60+
return BitConverter.GetBytes(data);
7761
}
7862
}
7963

src/Ydb.Sdk/src/Services/Topic/TopicClient.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,9 @@ namespace Ydb.Sdk.Services.Topic;
66

77
public class TopicClient
88
{
9-
private readonly Driver _driver;
9+
private readonly IDriver _driver;
1010

11-
public TopicClient(Driver driver)
11+
public TopicClient(IDriver driver)
1212
{
1313
_driver = driver;
1414
}

src/Ydb.Sdk/src/Services/Topic/TopicSession.cs

Lines changed: 14 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -5,55 +5,46 @@ namespace Ydb.Sdk.Services.Topic;
55
internal abstract class TopicSession<TFromClient, TFromServer> : IDisposable
66
{
77
private readonly Func<Task> _initialize;
8+
private readonly Action<WriterException> _resetSessionOnTransportError;
89

9-
protected readonly BidirectionalStream<TFromClient, TFromServer> Stream;
10+
protected readonly IBidirectionalStream<TFromClient, TFromServer> Stream;
1011
protected readonly ILogger Logger;
1112
protected readonly string SessionId;
1213

1314
private int _isActive = 1;
14-
private bool _disposed;
1515

16-
protected TopicSession(BidirectionalStream<TFromClient, TFromServer> stream, ILogger logger,
17-
string sessionId, Func<Task> initialize)
16+
protected TopicSession(
17+
IBidirectionalStream<TFromClient, TFromServer> stream,
18+
ILogger logger,
19+
string sessionId,
20+
Func<Task> initialize,
21+
Action<WriterException> resetSessionOnTransportError)
1822
{
1923
Stream = stream;
2024
Logger = logger;
2125
SessionId = sessionId;
2226
_initialize = initialize;
27+
_resetSessionOnTransportError = resetSessionOnTransportError;
2328
}
2429

25-
protected async void ReconnectSession()
30+
protected async void ReconnectSession(WriterException exception)
2631
{
2732
if (Interlocked.CompareExchange(ref _isActive, 0, 1) == 0)
2833
{
29-
Logger.LogWarning("Skipping reconnect. A reconnect session has already been initiated");
34+
Logger.LogDebug("Skipping reconnect. A reconnect session has already been initiated");
3035

3136
return;
3237
}
3338

39+
_resetSessionOnTransportError(exception);
40+
3441
Logger.LogInformation("WriterSession[{SessionId}] has been deactivated, starting to reconnect", SessionId);
3542

36-
while (!_disposed)
37-
{
38-
try
39-
{
40-
await _initialize();
41-
break;
42-
}
43-
catch (Exception e)
44-
{
45-
Logger.LogError(e, "Unable to reconnect the session due to the following error");
46-
}
47-
}
43+
await _initialize();
4844
}
4945

5046
public void Dispose()
5147
{
52-
lock (this)
53-
{
54-
_disposed = true;
55-
}
56-
5748
Stream.Dispose();
5849
}
5950
}

src/Ydb.Sdk/src/Services/Topic/Writer/WriteResult.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ internal WriteResult(StreamWriteMessage.Types.WriteResponse.Types.WriteAck ack)
1919
break;
2020
case StreamWriteMessage.Types.WriteResponse.Types.WriteAck.MessageWriteStatusOneofCase.None:
2121
default:
22-
throw new YdbWriterException($"Unexpected WriteAck status: {ack.MessageWriteStatusCase}");
22+
throw new WriterException($"Unexpected WriteAck status: {ack.MessageWriteStatusCase}");
2323
}
2424
}
2525

0 commit comments

Comments
 (0)