Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
253b08c
feat: initial commit
KirillKurdyukov Nov 13, 2024
b24c825
feat: added first integration tests.yml
KirillKurdyukov Nov 15, 2024
0d318d2
feat: update CAS on buffer
KirillKurdyukov Nov 17, 2024
ce61a54
fix linter
KirillKurdyukov Nov 17, 2024
39b766d
micro fixex
KirillKurdyukov Nov 19, 2024
cb32d5e
feat: mock tests on init WriterSession
KirillKurdyukov Nov 20, 2024
bf97959
BufferOverflow processing: WriteAsync
KirillKurdyukov Nov 21, 2024
014bc38
fix linter
KirillKurdyukov Nov 21, 2024
7322f04
added
KirillKurdyukov Nov 21, 2024
df674fc
commit reconnection work
KirillKurdyukov Nov 22, 2024
6df57bf
new init WriterSession strategy
KirillKurdyukov Nov 22, 2024
2b27665
IWriter added cancelToken
KirillKurdyukov Nov 22, 2024
dd6ab64
feat: make WriterConfig internal
KirillKurdyukov Nov 22, 2024
8093a7f
feat: supported cancellation token
KirillKurdyukov Nov 25, 2024
6507193
fix test
KirillKurdyukov Nov 25, 2024
a95c8d8
feat: renaming tcs
KirillKurdyukov Nov 25, 2024
bbfb57c
rename
KirillKurdyukov Nov 25, 2024
4aa0f35
make internal WriterConfig
KirillKurdyukov Nov 25, 2024
eb71c99
fix setting SeqNo in message (inFlightBuffer already has seqNo) and a…
KirillKurdyukov Nov 26, 2024
3019062
delete SeqNo Field
KirillKurdyukov Nov 26, 2024
98f097c
fix setting SeqNo
KirillKurdyukov Nov 26, 2024
2a87cb1
fix linter
KirillKurdyukov Nov 26, 2024
5372f75
feat: updating Serializer / Deserializer, added stress Integration Test
KirillKurdyukov Nov 27, 2024
e1ed7bb
fix linter
KirillKurdyukov Nov 27, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 41 additions & 3 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ jobs:
run: |
cd src
dotnet test --filter "Category=Unit" -f ${{ matrix.dotnet-target-framework }}

ado-net-tests:
runs-on: ubuntu-22.04
strategy:
Expand Down Expand Up @@ -74,8 +73,47 @@ jobs:
docker cp ydb-local:/ydb_certs/ca.pem ~/
cd src
dotnet test --filter "(FullyQualifiedName~Ado) | (FullyQualifiedName~Dapper)" -l "console;verbosity=normal"

core-integration-tests:
topic-tests:
runs-on: ubuntu-22.04
strategy:
fail-fast: false
matrix:
ydb-version: [ 'trunk' ]
dotnet-version: [ 6.0.x, 7.0.x ]
include:
- dotnet-version: 6.0.x
dotnet-target-framework: net6.0
- dotnet-version: 7.0.x
dotnet-target-framework: net7.0
services:
ydb:
image: cr.yandex/yc/yandex-docker-local-ydb:${{ matrix.ydb-version }}
ports:
- 2135:2135
- 2136:2136
- 8765:8765
env:
YDB_LOCAL_SURVIVE_RESTART: true
YDB_USE_IN_MEMORY_PDISKS: true
options: '--name ydb-local -h localhost'
env:
OS: ubuntu-22.04
YDB_VERSION: ${{ matrix.ydb-version }}
YDB_CONNECTION_STRING: grpc://localhost:2136/local
YDB_CONNECTION_STRING_SECURE: grpcs://localhost:2135/local
steps:
- name: Checkout code
uses: actions/checkout@v4
- name: Install Dotnet
uses: actions/setup-dotnet@v4
with:
dotnet-version: ${{ matrix.dotnet-version }}
- name: Run Topic tests
run: |
docker cp ydb-local:/ydb_certs/ca.pem ~/
cd src
dotnet test --filter "FullyQualifiedName~Topic" -l "console;verbosity=normal"
integration-tests:
runs-on: ubuntu-22.04
strategy:
fail-fast: false
Expand Down
19 changes: 14 additions & 5 deletions src/Ydb.Sdk/src/IDriver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,21 @@ namespace Ydb.Sdk;

public interface IDriver : IAsyncDisposable, IDisposable
{
internal Task<TResponse> UnaryCall<TRequest, TResponse>(
public Task<TResponse> UnaryCall<TRequest, TResponse>(
Method<TRequest, TResponse> method,
TRequest request,
GrpcRequestSettings settings)
where TRequest : class
where TResponse : class;

internal ServerStream<TResponse> ServerStreamCall<TRequest, TResponse>(
public ServerStream<TResponse> ServerStreamCall<TRequest, TResponse>(
Method<TRequest, TResponse> method,
TRequest request,
GrpcRequestSettings settings)
where TRequest : class
where TResponse : class;

internal BidirectionalStream<TRequest, TResponse> BidirectionalStreamCall<TRequest, TResponse>(
public IBidirectionalStream<TRequest, TResponse> BidirectionalStreamCall<TRequest, TResponse>(
Method<TRequest, TResponse> method,
GrpcRequestSettings settings)
where TRequest : class
Expand All @@ -29,6 +29,15 @@ internal BidirectionalStream<TRequest, TResponse> BidirectionalStreamCall<TReque
ILoggerFactory LoggerFactory { get; }
}

public interface IBidirectionalStream<in TRequest, out TResponse> : IDisposable
{
public Task Write(TRequest request);

public ValueTask<bool> MoveNextAsync();

public TResponse Current { get; }
}

public abstract class BaseDriver : IDriver
{
protected readonly DriverConfig Config;
Expand Down Expand Up @@ -95,7 +104,7 @@ public ServerStream<TResponse> ServerStreamCall<TRequest, TResponse>(
return new ServerStream<TResponse>(call, e => { OnRpcError(endpoint, e); });
}

public BidirectionalStream<TRequest, TResponse> BidirectionalStreamCall<TRequest, TResponse>(
public IBidirectionalStream<TRequest, TResponse> BidirectionalStreamCall<TRequest, TResponse>(
Method<TRequest, TResponse> method,
GrpcRequestSettings settings)
where TRequest : class
Expand Down Expand Up @@ -213,7 +222,7 @@ public async ValueTask<bool> MoveNextAsync()
}
}

public sealed class BidirectionalStream<TRequest, TResponse> : IDisposable
public class BidirectionalStream<TRequest, TResponse> : IBidirectionalStream<TRequest, TResponse>
{
private readonly AsyncDuplexStreamingCall<TRequest, TResponse> _stream;
private readonly Action<RpcException> _rpcErrorAction;
Expand Down
17 changes: 2 additions & 15 deletions src/Ydb.Sdk/src/Services/Topic/Deserializer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -55,28 +55,15 @@ private class Int64Deserializer : IDeserializer<long>
{
public long Deserialize(byte[] data)
{
if (data.Length != 8)
{
throw new ArgumentException(
$"Deserializer<Long> encountered data of length ${data.Length}. Expecting data length to be 8");
}

return ((long)data[0] << 56) | ((long)data[1] << 48) | ((long)data[2] << 40) | ((long)data[3] << 32) |
((long)data[4] << 24) | ((long)data[5] << 16) | ((long)data[6] << 8) | data[7];
return BitConverter.ToInt64(data);
}
}

private class Int32Deserializer : IDeserializer<int>
{
public int Deserialize(byte[] data)
{
if (data.Length != 4)
{
throw new ArgumentException(
$"Deserializer<Int32> encountered data of length ${data.Length}. Expecting data length to be 4");
}

return (data[0] << 24) | (data[1] << 16) | (data[2] << 8) | data[3];
return BitConverter.ToInt32(data);
}
}

Expand Down
22 changes: 14 additions & 8 deletions src/Ydb.Sdk/src/Services/Topic/Exceptions.cs
Original file line number Diff line number Diff line change
@@ -1,22 +1,28 @@
namespace Ydb.Sdk.Services.Topic;

public class YdbTopicException : Exception
public class WriterException : Exception
{
protected YdbTopicException(string message) : base(message)
public WriterException(string message) : base(message)
{
Status = new Status(StatusCode.Unspecified);
}
}

public class YdbWriterException : YdbTopicException
{
public YdbWriterException(string message) : base(message)
public WriterException(string message, Status status) : base(message + ": " + status)
{
Status = status;
}

public WriterException(string message, Driver.TransportException e) : base(message, e)
{
Status = e.Status;
}

public Status Status { get; }
}

public class YdbReaderException : YdbTopicException
public class ReaderException : Exception
{
protected YdbReaderException(string message) : base(message)
protected ReaderException(string message) : base(message)
{
}
}
6 changes: 3 additions & 3 deletions src/Ydb.Sdk/src/Services/Topic/IWriter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@

namespace Ydb.Sdk.Services.Topic;

public interface IWriter<TValue>
public interface IWriter<TValue> : IDisposable
{
public Task<WriteResult> WriteAsync(TValue data);
public Task<WriteResult> WriteAsync(TValue data, CancellationToken cancellationToken = default);

public Task<WriteResult> WriteAsync(Message<TValue> message);
public Task<WriteResult> WriteAsync(Message<TValue> message, CancellationToken cancellationToken = default);
}
2 changes: 1 addition & 1 deletion src/Ydb.Sdk/src/Services/Topic/Reader/ReaderBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public Task<IReader<TValue>> Build()
// Deserializer ?? (IDeserializer<TValue>)(
// Deserializers.DefaultDeserializers.TryGetValue(typeof(TValue), out var deserializer)
// ? deserializer
// : throw new YdbWriterException("The serializer is not set")
// : throw new WriterException("The serializer is not set")
// )
// );
//
Expand Down
20 changes: 2 additions & 18 deletions src/Ydb.Sdk/src/Services/Topic/Serializer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -49,31 +49,15 @@ private class Int64Serializer : ISerializer<long>
{
public byte[] Serialize(long data)
{
return new[]
{
(byte)(data >> 56),
(byte)(data >> 48),
(byte)(data >> 40),
(byte)(data >> 32),
(byte)(data >> 24),
(byte)(data >> 16),
(byte)(data >> 8),
(byte)data
};
return BitConverter.GetBytes(data);
}
}

private class Int32Serializer : ISerializer<int>
{
public byte[] Serialize(int data)
{
return new[]
{
(byte)(data >> 24),
(byte)(data >> 16),
(byte)(data >> 8),
(byte)data
};
return BitConverter.GetBytes(data);
}
}

Expand Down
4 changes: 2 additions & 2 deletions src/Ydb.Sdk/src/Services/Topic/TopicClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ namespace Ydb.Sdk.Services.Topic;

public class TopicClient
{
private readonly Driver _driver;
private readonly IDriver _driver;

public TopicClient(Driver driver)
public TopicClient(IDriver driver)
{
_driver = driver;
}
Expand Down
37 changes: 14 additions & 23 deletions src/Ydb.Sdk/src/Services/Topic/TopicSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,55 +5,46 @@ namespace Ydb.Sdk.Services.Topic;
internal abstract class TopicSession<TFromClient, TFromServer> : IDisposable
{
private readonly Func<Task> _initialize;
private readonly Action<WriterException> _resetSessionOnTransportError;

protected readonly BidirectionalStream<TFromClient, TFromServer> Stream;
protected readonly IBidirectionalStream<TFromClient, TFromServer> Stream;
protected readonly ILogger Logger;
protected readonly string SessionId;

private int _isActive = 1;
private bool _disposed;

protected TopicSession(BidirectionalStream<TFromClient, TFromServer> stream, ILogger logger,
string sessionId, Func<Task> initialize)
protected TopicSession(
IBidirectionalStream<TFromClient, TFromServer> stream,
ILogger logger,
string sessionId,
Func<Task> initialize,
Action<WriterException> resetSessionOnTransportError)
{
Stream = stream;
Logger = logger;
SessionId = sessionId;
_initialize = initialize;
_resetSessionOnTransportError = resetSessionOnTransportError;
}

protected async void ReconnectSession()
protected async void ReconnectSession(WriterException exception)
{
if (Interlocked.CompareExchange(ref _isActive, 0, 1) == 0)
{
Logger.LogWarning("Skipping reconnect. A reconnect session has already been initiated");
Logger.LogDebug("Skipping reconnect. A reconnect session has already been initiated");

return;
}

_resetSessionOnTransportError(exception);

Logger.LogInformation("WriterSession[{SessionId}] has been deactivated, starting to reconnect", SessionId);

while (!_disposed)
{
try
{
await _initialize();
break;
}
catch (Exception e)
{
Logger.LogError(e, "Unable to reconnect the session due to the following error");
}
}
await _initialize();
}

public void Dispose()
{
lock (this)
{
_disposed = true;
}

Stream.Dispose();
}
}
2 changes: 1 addition & 1 deletion src/Ydb.Sdk/src/Services/Topic/Writer/WriteResult.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ internal WriteResult(StreamWriteMessage.Types.WriteResponse.Types.WriteAck ack)
break;
case StreamWriteMessage.Types.WriteResponse.Types.WriteAck.MessageWriteStatusOneofCase.None:
default:
throw new YdbWriterException($"Unexpected WriteAck status: {ack.MessageWriteStatusCase}");
throw new WriterException($"Unexpected WriteAck status: {ack.MessageWriteStatusCase}");
}
}

Expand Down
Loading
Loading