diff --git a/src/Ydb.Sdk/src/Client/Response.cs b/src/Ydb.Sdk/src/Client/Response.cs index c71cbf7d..3e433fe7 100644 --- a/src/Ydb.Sdk/src/Client/Response.cs +++ b/src/Ydb.Sdk/src/Client/Response.cs @@ -63,11 +63,11 @@ public abstract class StreamResponse where TProtoResponse : class where TResponse : class { - private readonly Driver.ServerStream _iterator; + private readonly ServerStream _iterator; private TResponse? _response; private bool _transportError; - internal StreamResponse(Driver.ServerStream iterator) + internal StreamResponse(ServerStream iterator) { _iterator = iterator; } diff --git a/src/Ydb.Sdk/src/Driver.cs b/src/Ydb.Sdk/src/Driver.cs index 5abe3ae9..2ee81f77 100644 --- a/src/Ydb.Sdk/src/Driver.cs +++ b/src/Ydb.Sdk/src/Driver.cs @@ -11,27 +11,22 @@ namespace Ydb.Sdk; -public sealed class Driver : IDisposable, IAsyncDisposable +public sealed class Driver : BaseDriver { private const int AttemptDiscovery = 10; - private readonly DriverConfig _config; - private readonly ILogger _logger; private readonly string _sdkInfo; private readonly GrpcChannelFactory _grpcChannelFactory; private readonly EndpointPool _endpointPool; private readonly ChannelPool _channelPool; - private volatile bool _disposed; + internal string Database => Config.Database; - internal ILoggerFactory LoggerFactory { get; } - internal string Database => _config.Database; - - public Driver(DriverConfig config, ILoggerFactory? loggerFactory = null) + public Driver(DriverConfig config, ILoggerFactory? loggerFactory = null) : base( + config, loggerFactory ?? NullLoggerFactory.Instance, + (loggerFactory ?? NullLoggerFactory.Instance).CreateLogger() + ) { - LoggerFactory = loggerFactory ?? NullLoggerFactory.Instance; - _logger = LoggerFactory.CreateLogger(); - _config = config; _grpcChannelFactory = new GrpcChannelFactory(LoggerFactory, config); _endpointPool = new EndpointPool(LoggerFactory.CreateLogger()); _channelPool = new ChannelPool( @@ -51,39 +46,16 @@ public static async Task CreateInitialized(DriverConfig config, ILoggerF return driver; } - ~Driver() - { - Dispose(_disposed); - } - - public void Dispose() - { - Dispose(true); - GC.SuppressFinalize(this); - } - - private void Dispose(bool disposing) - { - _disposed = true; - - if (disposing) - { - _channelPool.DisposeAsync().AsTask().Wait(); - } - } - - public ValueTask DisposeAsync() + protected override ValueTask InternalDispose() { - Dispose(true); - GC.SuppressFinalize(this); - return default; + return _channelPool.DisposeAsync(); } public async Task Initialize() { - await _config.Credentials.ProvideAuthClient(new AuthClient(_config, _grpcChannelFactory, LoggerFactory)); + await Config.Credentials.ProvideAuthClient(new AuthClient(Config, _grpcChannelFactory, LoggerFactory)); - _logger.LogInformation("Started initial endpoint discovery"); + Logger.LogInformation("Started initial endpoint discovery"); for (var i = 0; i < AttemptDiscovery; i++) { @@ -97,11 +69,11 @@ public async Task Initialize() return; } - _logger.LogCritical("Error during initial endpoint discovery: {status}", status); + Logger.LogCritical("Error during initial endpoint discovery: {status}", status); } catch (RpcException e) { - _logger.LogCritical("RPC error during initial endpoint discovery: {e.Status}", e.Status); + Logger.LogCritical("RPC error during initial endpoint discovery: {e.Status}", e.Status); if (i == AttemptDiscovery - 1) { @@ -115,98 +87,40 @@ public async Task Initialize() throw new InitializationFailureException("Error during initial endpoint discovery"); } - internal async Task UnaryCall( - Method method, - TRequest request, - GrpcRequestSettings settings) - where TRequest : class - where TResponse : class + protected override (string, GrpcChannel) GetChannel(long nodeId) { - var (endpoint, channel) = GetChannel(settings.NodeId); - var callInvoker = channel.CreateCallInvoker(); - - _logger.LogTrace($"Unary call" + - $", method: {method.Name}" + - $", endpoint: {endpoint}"); - - try - { - using var call = callInvoker.AsyncUnaryCall( - method: method, - host: null, - options: GetCallOptions(settings, false), - request: request); - - var data = await call.ResponseAsync; - settings.TrailersHandler(call.GetTrailers()); - - return data; - } - catch (RpcException e) - { - PessimizeEndpoint(endpoint); - - throw new TransportException(e); - } - } - - internal ServerStream ServerStreamCall( - Method method, - TRequest request, - GrpcRequestSettings settings) - where TRequest : class - where TResponse : class - { - var (endpoint, channel) = GetChannel(settings.NodeId); - var callInvoker = channel.CreateCallInvoker(); - - var call = callInvoker.AsyncServerStreamingCall( - method: method, - host: null, - options: GetCallOptions(settings, true), - request: request); + var endpoint = _endpointPool.GetEndpoint(nodeId); - return new ServerStream(call, () => { PessimizeEndpoint(endpoint); }); + return (endpoint, _channelPool.GetChannel(endpoint)); } - internal BidirectionalStream BidirectionalStreamCall( - Method method, - GrpcRequestSettings settings) - where TRequest : class - where TResponse : class + protected override void OnRpcError(string endpoint, RpcException e) { - var (endpoint, channel) = GetChannel(settings.NodeId); - var callInvoker = channel.CreateCallInvoker(); - - var call = callInvoker.AsyncDuplexStreamingCall( - method: method, - host: null, - options: GetCallOptions(settings, true)); - - return new BidirectionalStream(call, () => { PessimizeEndpoint(endpoint); }); - } + Logger.LogWarning("gRPC error [{Status}] on channel {Endpoint}", e.Status, endpoint); + if (!_endpointPool.PessimizeEndpoint(endpoint)) + { + return; + } - private (string, GrpcChannel) GetChannel(long nodeId) - { - var endpoint = _endpointPool.GetEndpoint(nodeId); + Logger.LogInformation("Too many pessimized endpoints, initiated endpoint rediscovery."); - return (endpoint, _channelPool.GetChannel(endpoint)); + _ = Task.Run(DiscoverEndpoints); } private async Task DiscoverEndpoints() { - using var channel = _grpcChannelFactory.CreateChannel(_config.Endpoint); + using var channel = _grpcChannelFactory.CreateChannel(Config.Endpoint); var client = new DiscoveryService.DiscoveryServiceClient(channel); var request = new ListEndpointsRequest { - Database = _config.Database + Database = Config.Database }; var requestSettings = new GrpcRequestSettings { - TransportTimeout = _config.EndpointDiscoveryTimeout + TransportTimeout = Config.EndpointDiscoveryTimeout }; var options = GetCallOptions(requestSettings, false); @@ -218,8 +132,8 @@ private async Task DiscoverEndpoints() if (!response.Operation.Ready) { - var error = "Unexpected non-ready endpoint discovery operation."; - _logger.LogError($"Endpoint discovery internal error: {error}"); + const string error = "Unexpected non-ready endpoint discovery operation."; + Logger.LogError($"Endpoint discovery internal error: {error}"); return new Status(StatusCode.ClientInternalError, error); } @@ -227,21 +141,21 @@ private async Task DiscoverEndpoints() var status = Status.FromProto(response.Operation.Status, response.Operation.Issues); if (status.IsNotSuccess) { - _logger.LogWarning($"Unsuccessful endpoint discovery: {status}"); + Logger.LogWarning("Unsuccessful endpoint discovery: {Status}", status); return status; } if (response.Operation.Result is null) { - var error = "Unexpected empty endpoint discovery result."; - _logger.LogError($"Endpoint discovery internal error: {error}"); + const string error = "Unexpected empty endpoint discovery result."; + Logger.LogError($"Endpoint discovery internal error: {error}"); return new Status(StatusCode.ClientInternalError, error); } var resultProto = response.Operation.Result.Unpack(); - _logger.LogInformation( + Logger.LogInformation( "Successfully discovered endpoints: {EndpointsCount}, self location: {SelfLocation}, sdk info: {SdkInfo}", resultProto.Endpoints.Count, resultProto.SelfLocation, _sdkInfo); @@ -259,162 +173,24 @@ private async Task DiscoverEndpoints() private async Task PeriodicDiscovery() { - while (!_disposed) + while (Disposed == 0) { try { - await Task.Delay(_config.EndpointDiscoveryInterval); + await Task.Delay(Config.EndpointDiscoveryInterval); _ = await DiscoverEndpoints(); } catch (RpcException e) { - _logger.LogWarning($"RPC error during endpoint discovery: {e.Status}"); + Logger.LogWarning($"RPC error during endpoint discovery: {e.Status}"); } catch (Exception e) { - _logger.LogError($"Unexpected exception during session pool periodic check: {e}"); + Logger.LogError($"Unexpected exception during session pool periodic check: {e}"); } } } - private void PessimizeEndpoint(string endpoint) - { - if (!_endpointPool.PessimizeEndpoint(endpoint)) - { - return; - } - - _logger.LogInformation("Too many pessimized endpoints, initiated endpoint rediscovery."); - _ = Task.Run(DiscoverEndpoints); - } - - private CallOptions GetCallOptions(GrpcRequestSettings settings, bool streaming) - { - var meta = new Grpc.Core.Metadata - { - { Metadata.RpcDatabaseHeader, _config.Database } - }; - - var authInfo = _config.Credentials.GetAuthInfo(); - if (authInfo != null) - { - meta.Add(Metadata.RpcAuthHeader, authInfo); - } - - if (settings.TraceId.Length > 0) - { - meta.Add(Metadata.RpcTraceIdHeader, settings.TraceId); - } - - var transportTimeout = streaming - ? _config.DefaultStreamingTransportTimeout - : _config.DefaultTransportTimeout; - - if (settings.TransportTimeout != null) - { - transportTimeout = settings.TransportTimeout.Value; - } - - var options = new CallOptions( - headers: meta - ); - - if (transportTimeout != TimeSpan.Zero) - { - options = options.WithDeadline(DateTime.UtcNow + transportTimeout); - } - - return options; - } - - internal sealed class ServerStream : IAsyncEnumerator, IAsyncEnumerable - { - private readonly AsyncServerStreamingCall _responseStream; - private readonly Action _rpcErrorAction; - - internal ServerStream(AsyncServerStreamingCall responseStream, Action rpcErrorAction) - { - _responseStream = responseStream; - _rpcErrorAction = rpcErrorAction; - } - - public ValueTask DisposeAsync() - { - _responseStream.Dispose(); - - return default; - } - - public async ValueTask MoveNextAsync() - { - try - { - return await _responseStream.ResponseStream.MoveNext(CancellationToken.None); - } - catch (RpcException e) - { - _rpcErrorAction(); - - throw new TransportException(e); - } - } - - public TResponse Current => _responseStream.ResponseStream.Current; - - public IAsyncEnumerator GetAsyncEnumerator(CancellationToken cancellationToken = new()) - { - return this; - } - } - - internal sealed class BidirectionalStream : IDisposable - { - private readonly AsyncDuplexStreamingCall _bidirectionalStream; - private readonly Action _rpcErrorAction; - - public BidirectionalStream(AsyncDuplexStreamingCall bidirectionalStream, - Action rpcErrorAction) - { - _bidirectionalStream = bidirectionalStream; - _rpcErrorAction = rpcErrorAction; - } - - public async Task Write(TRequest request) - { - try - { - await _bidirectionalStream.RequestStream.WriteAsync(request); - } - catch (RpcException e) - { - _rpcErrorAction(); - - throw new TransportException(e); - } - } - - public async ValueTask MoveNextAsync() - { - try - { - return await _bidirectionalStream.ResponseStream.MoveNext(CancellationToken.None); - } - catch (RpcException e) - { - _rpcErrorAction(); - - throw new TransportException(e); - } - } - - public TResponse Current => _bidirectionalStream.ResponseStream.Current; - - public void Dispose() - { - _bidirectionalStream.Dispose(); - } - } - public class InitializationFailureException : Exception { internal InitializationFailureException(string message) : base(message) diff --git a/src/Ydb.Sdk/src/IDriver.cs b/src/Ydb.Sdk/src/IDriver.cs new file mode 100644 index 00000000..97d1a080 --- /dev/null +++ b/src/Ydb.Sdk/src/IDriver.cs @@ -0,0 +1,263 @@ +using Grpc.Core; +using Grpc.Net.Client; +using Microsoft.Extensions.Logging; + +namespace Ydb.Sdk; + +public interface IDriver : IAsyncDisposable, IDisposable +{ + internal Task UnaryCall( + Method method, + TRequest request, + GrpcRequestSettings settings) + where TRequest : class + where TResponse : class; + + internal ServerStream ServerStreamCall( + Method method, + TRequest request, + GrpcRequestSettings settings) + where TRequest : class + where TResponse : class; + + internal BidirectionalStream BidirectionalStreamCall( + Method method, + GrpcRequestSettings settings) + where TRequest : class + where TResponse : class; + + ILoggerFactory LoggerFactory { get; } +} + +public abstract class BaseDriver : IDriver +{ + protected readonly DriverConfig Config; + protected readonly ILogger Logger; + + protected int Disposed; + + protected BaseDriver(DriverConfig config, ILoggerFactory loggerFactory, ILogger logger) + { + Config = config; + Logger = logger; + LoggerFactory = loggerFactory; + } + + public async Task UnaryCall( + Method method, + TRequest request, + GrpcRequestSettings settings) + where TRequest : class + where TResponse : class + { + var (endpoint, channel) = GetChannel(settings.NodeId); + var callInvoker = channel.CreateCallInvoker(); + + Logger.LogTrace("Unary call, method: {MethodName}, endpoint: {Endpoint}", method.Name, endpoint); + + try + { + using var call = callInvoker.AsyncUnaryCall( + method: method, + host: null, + options: GetCallOptions(settings, false), + request: request + ); + + var response = await call.ResponseAsync; + settings.TrailersHandler(call.GetTrailers()); + + return response; + } + catch (RpcException e) + { + OnRpcError(endpoint, e); + throw new Driver.TransportException(e); + } + } + + public ServerStream ServerStreamCall( + Method method, + TRequest request, + GrpcRequestSettings settings) + where TRequest : class + where TResponse : class + { + var (endpoint, channel) = GetChannel(settings.NodeId); + var callInvoker = channel.CreateCallInvoker(); + + var call = callInvoker.AsyncServerStreamingCall( + method: method, + host: null, + options: GetCallOptions(settings, true), + request: request); + + return new ServerStream(call, e => { OnRpcError(endpoint, e); }); + } + + public BidirectionalStream BidirectionalStreamCall( + Method method, + GrpcRequestSettings settings) + where TRequest : class + where TResponse : class + { + var (endpoint, channel) = GetChannel(settings.NodeId); + var callInvoker = channel.CreateCallInvoker(); + + var call = callInvoker.AsyncDuplexStreamingCall( + method: method, + host: null, + options: GetCallOptions(settings, true)); + + return new BidirectionalStream(call, e => { OnRpcError(endpoint, e); }); + } + + protected abstract (string, GrpcChannel) GetChannel(long nodeId); + + protected abstract void OnRpcError(string endpoint, RpcException e); + + protected CallOptions GetCallOptions(GrpcRequestSettings settings, bool streaming) + { + var meta = new Grpc.Core.Metadata + { + { Metadata.RpcDatabaseHeader, Config.Database } + }; + + var authInfo = Config.Credentials.GetAuthInfo(); + if (authInfo != null) + { + meta.Add(Metadata.RpcAuthHeader, authInfo); + } + + if (settings.TraceId.Length > 0) + { + meta.Add(Metadata.RpcTraceIdHeader, settings.TraceId); + } + + var transportTimeout = streaming + ? Config.DefaultStreamingTransportTimeout + : Config.DefaultTransportTimeout; + + if (settings.TransportTimeout != null) + { + transportTimeout = settings.TransportTimeout.Value; + } + + var options = new CallOptions( + headers: meta + ); + + if (transportTimeout != TimeSpan.Zero) + { + options = options.WithDeadline(DateTime.UtcNow + transportTimeout); + } + + return options; + } + + public ILoggerFactory LoggerFactory { get; } + + public void Dispose() + { + DisposeAsync().AsTask().GetAwaiter().GetResult(); + } + + public async ValueTask DisposeAsync() + { + if (Interlocked.CompareExchange(ref Disposed, 1, 0) == 0) + { + await InternalDispose(); + } + } + + protected abstract ValueTask InternalDispose(); +} + +public sealed class ServerStream : IAsyncEnumerator, IAsyncEnumerable +{ + private readonly AsyncServerStreamingCall _stream; + private readonly Action _rpcErrorAction; + + internal ServerStream(AsyncServerStreamingCall stream, Action rpcErrorAction) + { + _stream = stream; + _rpcErrorAction = rpcErrorAction; + } + + public ValueTask DisposeAsync() + { + _stream.Dispose(); + + return default; + } + + public async ValueTask MoveNextAsync() + { + try + { + return await _stream.ResponseStream.MoveNext(CancellationToken.None); + } + catch (RpcException e) + { + _rpcErrorAction(e); + + throw new Driver.TransportException(e); + } + } + + public TResponse Current => _stream.ResponseStream.Current; + + public IAsyncEnumerator GetAsyncEnumerator(CancellationToken cancellationToken = new()) + { + return this; + } +} + +public sealed class BidirectionalStream : IDisposable +{ + private readonly AsyncDuplexStreamingCall _stream; + private readonly Action _rpcErrorAction; + + internal BidirectionalStream( + AsyncDuplexStreamingCall stream, + Action rpcErrorAction) + { + _stream = stream; + _rpcErrorAction = rpcErrorAction; + } + + public async Task Write(TRequest request) + { + try + { + await _stream.RequestStream.WriteAsync(request); + } + catch (RpcException e) + { + _rpcErrorAction(e); + + throw new Driver.TransportException(e); + } + } + + public async ValueTask MoveNextAsync() + { + try + { + return await _stream.ResponseStream.MoveNext(CancellationToken.None); + } + catch (RpcException e) + { + _rpcErrorAction(e); + + throw new Driver.TransportException(e); + } + } + + public TResponse Current => _stream.ResponseStream.Current; + + public void Dispose() + { + _stream.Dispose(); + } +} diff --git a/src/Ydb.Sdk/src/Services/Auth/AuthClient.cs b/src/Ydb.Sdk/src/Services/Auth/AuthClient.cs index 7715727c..36352383 100644 --- a/src/Ydb.Sdk/src/Services/Auth/AuthClient.cs +++ b/src/Ydb.Sdk/src/Services/Auth/AuthClient.cs @@ -37,7 +37,7 @@ public async Task Login(string user, string? password, LoginSetti try { - await using var transport = new AuthGrpcChannelTransport(_config, _grpcChannelFactory, _loggerFactory); + await using var transport = new AuthGrpcChannelDriver(_config, _grpcChannelFactory, _loggerFactory); var response = await transport.UnaryCall( method: AuthService.LoginMethod, diff --git a/src/Ydb.Sdk/src/Services/Query/SessionPool.cs b/src/Ydb.Sdk/src/Services/Query/SessionPool.cs index 10609fab..09e88a31 100644 --- a/src/Ydb.Sdk/src/Services/Query/SessionPool.cs +++ b/src/Ydb.Sdk/src/Services/Query/SessionPool.cs @@ -140,7 +140,7 @@ internal Session(Driver driver, SessionPool sessionPool, string session _driver = driver; } - internal Driver.ServerStream ExecuteQuery( + internal ServerStream ExecuteQuery( string query, Dictionary? parameters, ExecuteQuerySettings? settings, diff --git a/src/Ydb.Sdk/src/Services/Table/ExecuteScanQuery.cs b/src/Ydb.Sdk/src/Services/Table/ExecuteScanQuery.cs index f23cb909..9c6eb74b 100644 --- a/src/Ydb.Sdk/src/Services/Table/ExecuteScanQuery.cs +++ b/src/Ydb.Sdk/src/Services/Table/ExecuteScanQuery.cs @@ -34,7 +34,7 @@ internal static ResultData FromProto(ExecuteScanQueryPartialResult resultProto) public class ExecuteScanQueryStream : StreamResponse { - internal ExecuteScanQueryStream(Driver.ServerStream iterator) + internal ExecuteScanQueryStream(ServerStream iterator) : base(iterator) { } diff --git a/src/Ydb.Sdk/src/Services/Table/ReadTable.cs b/src/Ydb.Sdk/src/Services/Table/ReadTable.cs index 879e8edd..fcd7c457 100644 --- a/src/Ydb.Sdk/src/Services/Table/ReadTable.cs +++ b/src/Ydb.Sdk/src/Services/Table/ReadTable.cs @@ -39,7 +39,7 @@ internal static ResultData FromProto(ReadTableResult resultProto) public class ReadTableStream : StreamResponse { - internal ReadTableStream(Driver.ServerStream iterator) + internal ReadTableStream(ServerStream iterator) : base(iterator) { } diff --git a/src/Ydb.Sdk/src/Services/Topic/TopicSession.cs b/src/Ydb.Sdk/src/Services/Topic/TopicSession.cs index e7a1fa6f..0d425331 100644 --- a/src/Ydb.Sdk/src/Services/Topic/TopicSession.cs +++ b/src/Ydb.Sdk/src/Services/Topic/TopicSession.cs @@ -6,14 +6,14 @@ internal abstract class TopicSession : IDisposable { private readonly Func _initialize; - protected readonly Driver.BidirectionalStream Stream; + protected readonly BidirectionalStream Stream; protected readonly ILogger Logger; protected readonly string SessionId; private int _isActive = 1; private bool _disposed; - protected TopicSession(Driver.BidirectionalStream stream, ILogger logger, + protected TopicSession(BidirectionalStream stream, ILogger logger, string sessionId, Func initialize) { Stream = stream; diff --git a/src/Ydb.Sdk/src/Services/Topic/Writer/Writer.cs b/src/Ydb.Sdk/src/Services/Topic/Writer/Writer.cs index 7e1553a9..8e09e6f0 100644 --- a/src/Ydb.Sdk/src/Services/Topic/Writer/Writer.cs +++ b/src/Ydb.Sdk/src/Services/Topic/Writer/Writer.cs @@ -12,14 +12,14 @@ namespace Ydb.Sdk.Services.Topic.Writer; using MessageData = StreamWriteMessage.Types.WriteRequest.Types.MessageData; using MessageFromClient = StreamWriteMessage.Types.FromClient; using MessageFromServer = StreamWriteMessage.Types.FromServer; -using WriterStream = Driver.BidirectionalStream< +using WriterStream = BidirectionalStream< StreamWriteMessage.Types.FromClient, StreamWriteMessage.Types.FromServer >; internal class Writer : IWriter { - private readonly Driver _driver; + private readonly IDriver _driver; private readonly WriterConfig _config; private readonly ILogger> _logger; private readonly ISerializer _serializer; @@ -30,7 +30,7 @@ internal class Writer : IWriter private volatile WriterSession _session = null!; - internal Writer(Driver driver, WriterConfig config, ISerializer serializer) + internal Writer(IDriver driver, WriterConfig config, ISerializer serializer) { _driver = driver; _config = config; diff --git a/src/Ydb.Sdk/src/Transport/AuthGrpcChannelTransport.cs b/src/Ydb.Sdk/src/Transport/AuthGrpcChannelDriver.cs similarity index 78% rename from src/Ydb.Sdk/src/Transport/AuthGrpcChannelTransport.cs rename to src/Ydb.Sdk/src/Transport/AuthGrpcChannelDriver.cs index 5043ee81..f76274bd 100644 --- a/src/Ydb.Sdk/src/Transport/AuthGrpcChannelTransport.cs +++ b/src/Ydb.Sdk/src/Transport/AuthGrpcChannelDriver.cs @@ -5,11 +5,11 @@ namespace Ydb.Sdk.Transport; -internal class AuthGrpcChannelTransport : GrpcTransport +internal class AuthGrpcChannelDriver : BaseDriver { private readonly GrpcChannel _channel; - public AuthGrpcChannelTransport( + public AuthGrpcChannelDriver( DriverConfig driverConfig, GrpcChannelFactory grpcChannelFactory, ILoggerFactory loggerFactory @@ -18,20 +18,11 @@ ILoggerFactory loggerFactory endpoint: driverConfig.Endpoint, database: driverConfig.Database, customServerCertificate: driverConfig.CustomServerCertificate - ), loggerFactory.CreateLogger() - ) + ), loggerFactory, loggerFactory.CreateLogger()) { _channel = grpcChannelFactory.CreateChannel(Config.Endpoint); } - protected override void Dispose(bool disposing) - { - if (disposing) - { - _channel.Dispose(); - } - } - protected override (string, GrpcChannel) GetChannel(long nodeId) { return (Config.Endpoint, _channel); @@ -46,4 +37,11 @@ protected override void OnRpcError(string endpoint, RpcException e) status.StatusCode, status.Detail, endpoint); } } + + protected override async ValueTask InternalDispose() + { + await _channel.ShutdownAsync(); + + _channel.Dispose(); + } } diff --git a/src/Ydb.Sdk/src/Transport/GrpcTransport.cs b/src/Ydb.Sdk/src/Transport/GrpcTransport.cs deleted file mode 100644 index 63c4a4e7..00000000 --- a/src/Ydb.Sdk/src/Transport/GrpcTransport.cs +++ /dev/null @@ -1,123 +0,0 @@ -using Grpc.Core; -using Grpc.Net.Client; -using Microsoft.Extensions.Logging; - -namespace Ydb.Sdk.Transport; - -// TODO Experimental [for Driver with fix call options] -public abstract class GrpcTransport : IDisposable, IAsyncDisposable -{ - protected readonly DriverConfig Config; - protected readonly ILogger Logger; - - internal GrpcTransport(DriverConfig driverConfig, ILogger logger) - { - Logger = logger; - Config = driverConfig; - } - - ~GrpcTransport() - { - Dispose(false); - } - - public void Dispose() - { - Dispose(true); - GC.SuppressFinalize(this); - } - - public ValueTask DisposeAsync() - { - Dispose(true); - GC.SuppressFinalize(this); - return default; - } - - protected abstract void Dispose(bool disposing); - - internal async Task UnaryCall( - Method method, - GrpcRequestSettings settings, - TRequest request - ) where TRequest : class where TResponse : class - { - var (endpoint, channel) = GetChannel(settings.NodeId); - var callInvoker = channel.CreateCallInvoker(); - - Logger.LogTrace("Unary call, method: {MethodName}, endpoint: {Endpoint}", method.Name, endpoint); - - try - { - using var call = callInvoker.AsyncUnaryCall( - method: method, - host: null, - options: GetCallOptions(settings, false), - request: request - ); - - var response = await call.ResponseAsync; - settings.TrailersHandler(call.GetTrailers()); - - return response; - } - catch (RpcException e) - { - OnRpcError(endpoint, e); - throw new TransportException(e); - } - } - - protected abstract (string, GrpcChannel) GetChannel(long nodeId); - - protected abstract void OnRpcError(string endpoint, RpcException e); - - private CallOptions GetCallOptions(GrpcRequestSettings settings, bool streaming) - { - var meta = new Grpc.Core.Metadata - { - { Metadata.RpcDatabaseHeader, Config.Database } - }; - - var authInfo = Config.Credentials.GetAuthInfo(); - if (authInfo != null) - { - meta.Add(Metadata.RpcAuthHeader, authInfo); - } - - if (settings.TraceId.Length > 0) - { - meta.Add(Metadata.RpcTraceIdHeader, settings.TraceId); - } - - var transportTimeout = streaming - ? Config.DefaultStreamingTransportTimeout - : Config.DefaultTransportTimeout; - - if (settings.TransportTimeout != null) - { - transportTimeout = settings.TransportTimeout.Value; - } - - var options = new CallOptions( - headers: meta - ); - - if (transportTimeout != TimeSpan.Zero) - { - options = options.WithDeadline(DateTime.UtcNow + transportTimeout); - } - - return options; - } -} - -public class TransportException : Exception -{ - internal TransportException(RpcException e) : base($"Transport exception: {e.Message}", e) - { - Status = e.Status.ConvertStatus(); - } - - public Status Status { get; } -}