diff --git a/src/Ydb.Sdk/CHANGELOG.md b/src/Ydb.Sdk/CHANGELOG.md index d61d16d7..a4d7e72a 100644 --- a/src/Ydb.Sdk/CHANGELOG.md +++ b/src/Ydb.Sdk/CHANGELOG.md @@ -1,4 +1,10 @@ -- Added 'x-ydb-client-pid' header to any RPC calls +- Fixed bug `Reader`: unhandled exception in `TryReadRequestBytes(long bytes)`. +- Handle `YdbException` on `DeleteSession`. +- Do not invoke `DeleteSession` if the session is not active. +- `YdbException`: Added cancellation token propagation support in `CommitAsync` and `RollbackAsync`. +- Deleted legacy exceptions: Driver.TransportException, StatusUnsuccessfulException and InitializationFailureException. +- Fixed bug: Unhandled exception System.Net.Http.HttpIOException has now been converted to YdbException ([grpc-dotnet issue](https://github.com/grpc/grpc-dotnet/issues/2638)). +- Added 'x-ydb-client-pid' header to any RPC calls. - Added DisableServerBalancer option to ADO.NET session creation; default false. ## v0.20.1 diff --git a/src/Ydb.Sdk/src/Ado/Internal/IssueMessageUtils.cs b/src/Ydb.Sdk/src/Ado/Internal/IssueMessageUtils.cs new file mode 100644 index 00000000..281b815f --- /dev/null +++ b/src/Ydb.Sdk/src/Ado/Internal/IssueMessageUtils.cs @@ -0,0 +1,58 @@ +using System.Text; +using Ydb.Issue; + +namespace Ydb.Sdk.Ado.Internal; + +internal static class IssueMessageUtils +{ + internal static string IssuesToString(this IReadOnlyList issues) => IssuesToString(issues, 0, 4); + + private static string IssuesToString(IEnumerable issueMessages, int currentIndent, int indent) => + string.Join(Environment.NewLine, issueMessages.Select(message => + { + var sb = new StringBuilder(); + sb.Append(' ', currentIndent); + sb.Append($"[{message.IssueCode}] "); + + if (message.Position != null) + { + sb.Append(message.Position.PositionToString()); + } + + sb.Append($"{message.Severity.SeverityToString()}: "); + sb.Append(message.Message); + + if (message.Issues.Count > 0) + { + sb.AppendLine(); + sb.Append(IssuesToString(message.Issues, currentIndent + indent, indent)); + } + + return sb.ToString(); + })); + + private static string SeverityToString(this uint severity) => severity switch + { + 0 => "Fatal", + 1 => "Error", + 2 => "Warning", + 3 => "Info", + _ => $"Unknown SeverityCode {severity}" + }; + + private static string PositionToString(this IssueMessage.Types.Position position) + { + var sb = new StringBuilder(); + sb.Append('('); + + if (!string.IsNullOrEmpty(position.File)) + { + sb.Append(position.File); + sb.Append(':'); + } + + sb.Append($"{position.Row}:{position.Column}"); + sb.Append(") "); + return sb.ToString(); + } +} diff --git a/src/Ydb.Sdk/src/Ado/Internal/StatusCodeUtils.cs b/src/Ydb.Sdk/src/Ado/Internal/StatusCodeUtils.cs new file mode 100644 index 00000000..8bfb9792 --- /dev/null +++ b/src/Ydb.Sdk/src/Ado/Internal/StatusCodeUtils.cs @@ -0,0 +1,27 @@ +using Ydb.Issue; + +namespace Ydb.Sdk.Ado.Internal; + +public static class StatusCodeUtils +{ + internal static StatusCode Code(this Grpc.Core.Status rpcStatus) => rpcStatus.StatusCode switch + { + Grpc.Core.StatusCode.Unavailable => StatusCode.ClientTransportUnavailable, + Grpc.Core.StatusCode.DeadlineExceeded => StatusCode.ClientTransportTimeout, + Grpc.Core.StatusCode.ResourceExhausted => StatusCode.ClientTransportResourceExhausted, + Grpc.Core.StatusCode.Unimplemented => StatusCode.ClientTransportUnimplemented, + Grpc.Core.StatusCode.Cancelled => StatusCode.Cancelled, + _ => StatusCode.ClientTransportUnknown + }; + + internal static StatusCode Code(this StatusIds.Types.StatusCode statusCode) => + Enum.IsDefined(typeof(StatusCode), (int)statusCode) ? (StatusCode)statusCode : StatusCode.Unavailable; + + internal static bool IsNotSuccess(this StatusIds.Types.StatusCode code) => + code != StatusIds.Types.StatusCode.Success; + + internal static string ToMessage(this StatusCode statusCode, IReadOnlyList issueMessages) => + issueMessages.Count == 0 + ? $"Status: {statusCode}" + : $"Status: {statusCode}, Issues:{Environment.NewLine}{issueMessages.IssuesToString()}"; +} diff --git a/src/Ydb.Sdk/src/Ado/YdbCommand.cs b/src/Ydb.Sdk/src/Ado/YdbCommand.cs index 864a8147..844343b5 100644 --- a/src/Ydb.Sdk/src/Ado/YdbCommand.cs +++ b/src/Ydb.Sdk/src/Ado/YdbCommand.cs @@ -219,7 +219,7 @@ protected override async Task ExecuteDbDataReaderAsync(CommandBeha var ydbDataReader = await YdbDataReader.CreateYdbDataReader( await YdbConnection.Session .ExecuteQuery(preparedSql.ToString(), ydbParameters, execSettings, transaction?.TransactionControl), - YdbConnection.OnStatus, transaction, cancellationToken + YdbConnection.OnNotSuccessStatusCode, transaction, cancellationToken ); YdbConnection.LastReader = ydbDataReader; diff --git a/src/Ydb.Sdk/src/Ado/YdbConnection.cs b/src/Ydb.Sdk/src/Ado/YdbConnection.cs index afadcf1b..63ebbbe0 100644 --- a/src/Ydb.Sdk/src/Ado/YdbConnection.cs +++ b/src/Ydb.Sdk/src/Ado/YdbConnection.cs @@ -92,24 +92,7 @@ public override async Task OpenAsync(CancellationToken cancellationToken) { ThrowIfConnectionOpen(); - try - { - Session = await PoolManager.GetSession(ConnectionStringBuilder, cancellationToken); - } - catch (Exception e) - { - throw e switch - { - OperationCanceledException => throw new YdbException(StatusCode.Cancelled, - $"The connection pool has been exhausted, either raise 'MaxSessionPool' " + - $"(currently {ConnectionStringBuilder.MaxSessionPool}) or 'CreateSessionTimeout' " + - $"(currently {ConnectionStringBuilder.CreateSessionTimeout} seconds) in your connection string.", e - ), - Driver.TransportException transportException => new YdbException(transportException), - StatusUnsuccessfulException unsuccessfulException => new YdbException(unsuccessfulException.Status), - _ => e - }; - } + Session = await PoolManager.GetSession(ConnectionStringBuilder, cancellationToken); OnStateChange(ClosedToOpenEventArgs); @@ -165,9 +148,9 @@ public override string ConnectionString private ConnectionState ConnectionState { get; set; } = ConnectionState.Closed; // Invoke AsyncOpen() - internal void OnStatus(Status status) + internal void OnNotSuccessStatusCode(StatusCode code) { - _session.OnStatus(status); + _session.OnNotSuccessStatusCode(code); if (!_session.IsActive) { diff --git a/src/Ydb.Sdk/src/Ado/YdbDataReader.cs b/src/Ydb.Sdk/src/Ado/YdbDataReader.cs index ac30b459..5bcd0e8f 100644 --- a/src/Ydb.Sdk/src/Ado/YdbDataReader.cs +++ b/src/Ydb.Sdk/src/Ado/YdbDataReader.cs @@ -2,6 +2,7 @@ using Google.Protobuf.Collections; using Ydb.Issue; using Ydb.Query; +using Ydb.Sdk.Ado.Internal; using Ydb.Sdk.Value; namespace Ydb.Sdk.Ado; @@ -11,7 +12,7 @@ public sealed class YdbDataReader : DbDataReader, IAsyncEnumerable _stream; private readonly YdbTransaction? _ydbTransaction; private readonly RepeatedField _issueMessagesInStream = new(); - private readonly Action _onNotSuccessStatus; + private readonly Action _onNotSuccessStatusCode; private int _currentRowIndex = -1; private long _resultSetIndex = -1; @@ -54,22 +55,22 @@ private enum State private YdbDataReader( IServerStream resultSetStream, - Action onNotSuccessStatus, + Action onNotSuccessStatusCode, YdbTransaction? ydbTransaction) { _stream = resultSetStream; - _onNotSuccessStatus = onNotSuccessStatus; + _onNotSuccessStatusCode = onNotSuccessStatusCode; _ydbTransaction = ydbTransaction; } internal static async Task CreateYdbDataReader( IServerStream resultSetStream, - Action onStatus, + Action onNotSuccessStatusCode, YdbTransaction? ydbTransaction = null, CancellationToken cancellationToken = default ) { - var ydbDataReader = new YdbDataReader(resultSetStream, onStatus, ydbTransaction); + var ydbDataReader = new YdbDataReader(resultSetStream, onNotSuccessStatusCode, ydbTransaction); await ydbDataReader.Init(cancellationToken); return ydbDataReader; @@ -522,7 +523,7 @@ public override async Task CloseAsync() return; } - _onNotSuccessStatus(new Status(StatusCode.SessionBusy)); + _onNotSuccessStatusCode(StatusCode.SessionBusy); _stream.Dispose(); if (_ydbTransaction != null) @@ -559,20 +560,14 @@ private async ValueTask NextExecPart(CancellationToken cancellationToken) _issueMessagesInStream.AddRange(part.Issues); - if (part.Status != StatusIds.Types.StatusCode.Success) + if (part.Status.IsNotSuccess()) { - OnFailReadStream(); - while (await _stream.MoveNextAsync(cancellationToken)) { _issueMessagesInStream.AddRange(_stream.Current.Issues); } - var status = Status.FromProto(part.Status, _issueMessagesInStream); - - _onNotSuccessStatus(status); - - throw new YdbException(status); + throw YdbException.FromServer(part.Status, _issueMessagesInStream); } _currentResultSet = part.ResultSet?.FromProto(); @@ -592,13 +587,13 @@ private async ValueTask NextExecPart(CancellationToken cancellationToken) return State.NewResultSet; } - catch (Driver.TransportException e) + catch (YdbException e) { OnFailReadStream(); - _onNotSuccessStatus(e.Status); + _onNotSuccessStatusCode(e.Code); - throw new YdbException(e); + throw; } } diff --git a/src/Ydb.Sdk/src/Ado/YdbException.cs b/src/Ydb.Sdk/src/Ado/YdbException.cs index dbfc360f..a419ad34 100644 --- a/src/Ydb.Sdk/src/Ado/YdbException.cs +++ b/src/Ydb.Sdk/src/Ado/YdbException.cs @@ -1,4 +1,7 @@ using System.Data.Common; +using Grpc.Core; +using Ydb.Issue; +using Ydb.Sdk.Ado.Internal; namespace Ydb.Sdk.Ado; @@ -8,14 +11,17 @@ internal YdbException(string message) : base(message) { } - internal YdbException(Driver.TransportException transportException) - : this(transportException.Status, transportException) + internal YdbException(RpcException e) : this(e.Status.Code(), "Transport RPC call error", e) { } - internal YdbException(Status status, Exception? innerException = null) - : this(status.StatusCode, status.ToString(), innerException) + internal static YdbException FromServer(StatusIds.Types.StatusCode statusCode, IReadOnlyList issues) { + var code = statusCode.Code(); + + var message = code.ToMessage(issues); + + return new YdbException(code, message); } internal YdbException(StatusCode statusCode, string message, Exception? innerException = null) diff --git a/src/Ydb.Sdk/src/Ado/YdbSchema.cs b/src/Ydb.Sdk/src/Ado/YdbSchema.cs index a54bb40e..4fb1d901 100644 --- a/src/Ydb.Sdk/src/Ado/YdbSchema.cs +++ b/src/Ydb.Sdk/src/Ado/YdbSchema.cs @@ -5,7 +5,6 @@ using Ydb.Scheme.V1; using Ydb.Sdk.Ado.Schema; using Ydb.Sdk.Services.Table; -using Ydb.Table; namespace Ydb.Sdk.Ado; @@ -63,24 +62,13 @@ public static async Task DescribeTable( var describeResponse = await ydbConnection.Session .DescribeTable(WithSuffix(ydbConnection.Database) + tableName, describeTableSettings); - var status = Status.FromProto(describeResponse.Operation.Status, describeResponse.Operation.Issues); - - if (status.IsNotSuccess) - { - ydbConnection.OnStatus(status); - - throw new YdbException(status); - } - - var describeRes = describeResponse.Operation.Result.Unpack(); - - return new YdbTable(tableName, describeRes); + return new YdbTable(tableName, describeResponse); } - catch (Driver.TransportException e) + catch (YdbException e) { - ydbConnection.OnStatus(e.Status); + ydbConnection.OnNotSuccessStatusCode(e.Code); - throw new YdbException(e); + throw; } } @@ -253,19 +241,31 @@ private static async Task AppendDescribeTable( string? tableType, Action appendInTable) { - var ydbTable = await DescribeTable(ydbConnection, tableName, describeTableSettings); - var type = ydbTable.IsSystem - ? "SYSTEM_TABLE" - : ydbTable.Type switch + try + { + var describeResponse = await ydbConnection.Session + .DescribeTable(WithSuffix(ydbConnection.Database) + tableName, describeTableSettings); + var ydbTable = new YdbTable(tableName, describeResponse); + + var type = ydbTable.IsSystem + ? "SYSTEM_TABLE" + : ydbTable.Type switch + { + YdbTable.TableType.Table => "TABLE", + YdbTable.TableType.ColumnTable => "COLUMN_TABLE", + YdbTable.TableType.ExternalTable => "EXTERNAL_TABLE", + _ => throw new ArgumentOutOfRangeException(nameof(tableType)) + }; + if (type.IsPattern(tableType)) { - YdbTable.TableType.Table => "TABLE", - YdbTable.TableType.ColumnTable => "COLUMN_TABLE", - YdbTable.TableType.ExternalTable => "EXTERNAL_TABLE", - _ => throw new ArgumentOutOfRangeException(nameof(tableType)) - }; - if (type.IsPattern(tableType)) + appendInTable(ydbTable, type); + } + } + catch (YdbException e) { - appendInTable(ydbTable, type); + ydbConnection.OnNotSuccessStatusCode(e.Code); + + throw; } } @@ -417,7 +417,7 @@ CancellationToken cancellationToken if (status.IsNotSuccess) { - throw new YdbException(status); + throw YdbException.FromServer(operation.Status, operation.Issues); } foreach (var entry in operation.Result.Unpack().Children) @@ -461,9 +461,11 @@ await SchemaObjects( return ydbSchemaObjects; } - catch (Driver.TransportException e) + catch (YdbException e) { - throw new YdbException(e); + ydbConnection.OnNotSuccessStatusCode(e.Code); + + throw; } } diff --git a/src/Ydb.Sdk/src/Ado/YdbTransaction.cs b/src/Ydb.Sdk/src/Ado/YdbTransaction.cs index d7162c55..4355c3d3 100644 --- a/src/Ydb.Sdk/src/Ado/YdbTransaction.cs +++ b/src/Ydb.Sdk/src/Ado/YdbTransaction.cs @@ -40,13 +40,11 @@ internal YdbTransaction(YdbConnection ydbConnection, TxMode txMode) public override void Commit() => CommitAsync().GetAwaiter().GetResult(); - // TODO propagate cancellation token public override async Task CommitAsync(CancellationToken cancellationToken = new()) => - await FinishTransaction(txId => DbConnection!.Session.CommitTransaction(txId)); + await FinishTransaction(txId => DbConnection!.Session.CommitTransaction(txId, cancellationToken)); public override void Rollback() => RollbackAsync().GetAwaiter().GetResult(); - // TODO propagate cancellation token public override async Task RollbackAsync(CancellationToken cancellationToken = new()) { if (Failed) @@ -56,7 +54,7 @@ internal YdbTransaction(YdbConnection ydbConnection, TxMode txMode) return; } - await FinishTransaction(txId => DbConnection!.Session.RollbackTransaction(txId)); + await FinishTransaction(txId => DbConnection!.Session.RollbackTransaction(txId, cancellationToken)); } protected override YdbConnection? DbConnection @@ -72,7 +70,7 @@ protected override YdbConnection? DbConnection ? IsolationLevel.Serializable : IsolationLevel.Unspecified; - private async Task FinishTransaction(Func> finishMethod) + private async Task FinishTransaction(Func finishMethod) { if (DbConnection?.State == ConnectionState.Closed || Completed) { @@ -93,24 +91,15 @@ private async Task FinishTransaction(Func> finishMethod) return; // transaction isn't started } - var status = await finishMethod(TxId); // Commit or Rollback - - if (status.IsNotSuccess) - { - Failed = true; - - DbConnection.OnStatus(status); - - throw new YdbException(status); - } + await finishMethod(TxId); // Commit or Rollback } - catch (Driver.TransportException e) + catch (YdbException e) { Failed = true; - DbConnection.OnStatus(e.Status); + DbConnection.OnNotSuccessStatusCode(e.Code); - throw new YdbException(e); + throw; } finally { diff --git a/src/Ydb.Sdk/src/Client/Response.cs b/src/Ydb.Sdk/src/Client/Response.cs index 9f0a297a..22051085 100644 --- a/src/Ydb.Sdk/src/Client/Response.cs +++ b/src/Ydb.Sdk/src/Client/Response.cs @@ -1,4 +1,5 @@ using System.Diagnostics; +using Ydb.Sdk.Ado; namespace Ydb.Sdk.Client; @@ -99,9 +100,9 @@ public async Task Next() return result; } - catch (Driver.TransportException e) + catch (YdbException e) { - _response = MakeResponse(e.Status); + _response = MakeResponse(new Status(e.Code, e.Message)); _transportError = true; return true; } diff --git a/src/Ydb.Sdk/src/Driver.cs b/src/Ydb.Sdk/src/Driver.cs index 4af6609b..de842a76 100644 --- a/src/Ydb.Sdk/src/Driver.cs +++ b/src/Ydb.Sdk/src/Driver.cs @@ -4,6 +4,8 @@ using Microsoft.Extensions.Logging.Abstractions; using Ydb.Discovery; using Ydb.Discovery.V1; +using Ydb.Sdk.Ado; +using Ydb.Sdk.Ado.Internal; using Ydb.Sdk.Pool; namespace Ydb.Sdk; @@ -39,19 +41,15 @@ public async Task Initialize() { try { - var status = await DiscoverEndpoints(); - if (status.IsSuccess) - { - _ = Task.Run(PeriodicDiscovery); + await DiscoverEndpoints(); - return; - } + _ = Task.Run(PeriodicDiscovery); - Logger.LogCritical("Error during initial endpoint discovery: {status}", status); + return; } - catch (RpcException e) + catch (YdbException e) { - Logger.LogCritical("RPC error during initial endpoint discovery: {e.Status}", e.Status); + Logger.LogError(e, "Error during initial endpoint discovery: {e.Status}", e.Code); if (i == AttemptDiscovery - 1) { @@ -62,7 +60,7 @@ public async Task Initialize() await Task.Delay(TimeSpan.FromMilliseconds(i * 200)); // await 0 ms, 200 ms, 400ms, ... 1.8 sec } - throw new InitializationFailureException("Error during initial endpoint discovery"); + throw new YdbException("Error initial endpoint discovery"); } protected override string GetEndpoint(long nodeId) => _endpointPool.GetEndpoint(nodeId); @@ -90,7 +88,7 @@ Grpc.Core.StatusCode.DeadlineExceeded or _ = Task.Run(DiscoverEndpoints); } - private async Task DiscoverEndpoints() + private async Task DiscoverEndpoints() { using var channel = GrpcChannelFactory.CreateChannel(Config.Endpoint); @@ -111,30 +109,13 @@ private async Task DiscoverEndpoints() options: await GetCallOptions(requestSettings) ); - if (!response.Operation.Ready) + var operation = response.Operation; + if (operation.Status.IsNotSuccess()) { - const string error = "Unexpected non-ready endpoint discovery operation."; - Logger.LogError($"Endpoint discovery internal error: {error}"); - - return new Status(StatusCode.ClientInternalError, error); + throw YdbException.FromServer(operation.Status, operation.Issues); } - var status = Status.FromProto(response.Operation.Status, response.Operation.Issues); - if (status.IsNotSuccess) - { - Logger.LogWarning("Unsuccessful endpoint discovery: {Status}", status); - return status; - } - - if (response.Operation.Result is null) - { - const string error = "Unexpected empty endpoint discovery result."; - Logger.LogError($"Endpoint discovery internal error: {error}"); - - return new Status(StatusCode.ClientInternalError, error); - } - - var resultProto = response.Operation.Result.Unpack(); + var resultProto = operation.Result.Unpack(); Logger.LogDebug( "Successfully discovered endpoints: {EndpointsCount}, self location: {SelfLocation}, sdk info: {SdkInfo}", @@ -151,8 +132,6 @@ await ChannelPool.RemoveChannels( .ToImmutableArray() ) ); - - return new Status(StatusCode.Success); } private async Task PeriodicDiscovery() @@ -163,11 +142,11 @@ private async Task PeriodicDiscovery() { await Task.Delay(Config.EndpointDiscoveryInterval); - _ = await DiscoverEndpoints(); + await DiscoverEndpoints(); } - catch (RpcException e) + catch (YdbException e) { - Logger.LogWarning("RPC error during endpoint discovery: {Status}", e.Status); + Logger.LogWarning(e, "Error during endpoint discovery"); } catch (Exception e) { @@ -175,21 +154,4 @@ private async Task PeriodicDiscovery() } } } - - public class InitializationFailureException : Exception - { - internal InitializationFailureException(string message) : base(message) - { - } - } - - public class TransportException : IOException - { - internal TransportException(RpcException e) : base("Transport RPC call error", e) - { - Status = e.Status.ConvertStatus(); - } - - public Status Status { get; } - } } diff --git a/src/Ydb.Sdk/src/IDriver.cs b/src/Ydb.Sdk/src/IDriver.cs index 7259a3c2..7461b704 100644 --- a/src/Ydb.Sdk/src/IDriver.cs +++ b/src/Ydb.Sdk/src/IDriver.cs @@ -1,6 +1,7 @@ using Grpc.Core; using Grpc.Net.Client; using Microsoft.Extensions.Logging; +using Ydb.Sdk.Ado; using Ydb.Sdk.Auth; using Ydb.Sdk.Pool; using Ydb.Sdk.Services.Auth; @@ -116,7 +117,12 @@ public async Task UnaryCall( catch (RpcException e) { OnRpcError(endpoint, e); - throw new Driver.TransportException(e); + + throw new YdbException(e); + } + catch (Exception e) + { + throw new YdbException(StatusCode.ClientTransportUnknown, "Unexpected transport exception", e); } } @@ -233,7 +239,11 @@ public async ValueTask MoveNextAsync(CancellationToken cancellationToken = { _rpcErrorAction(e); - throw new Driver.TransportException(e); + throw new YdbException(e); + } + catch (Exception e) + { + throw new YdbException(StatusCode.ClientTransportUnknown, "Unexpected transport exception", e); } } @@ -268,7 +278,11 @@ public async Task Write(TRequest request) { _rpcErrorAction(e); - throw new Driver.TransportException(e); + throw new YdbException(e); + } + catch (Exception e) + { + throw new YdbException(StatusCode.ClientTransportUnknown, "Unexpected transport exception", e); } } @@ -282,7 +296,11 @@ public async ValueTask MoveNextAsync() { _rpcErrorAction(e); - throw new Driver.TransportException(e); + throw new YdbException(e); + } + catch (Exception e) + { + throw new YdbException(StatusCode.ClientTransportUnknown, "Unexpected transport exception", e); } } diff --git a/src/Ydb.Sdk/src/Pool/SessionPool.cs b/src/Ydb.Sdk/src/Pool/SessionPool.cs index d7466dc8..d38592e1 100644 --- a/src/Ydb.Sdk/src/Pool/SessionPool.cs +++ b/src/Ydb.Sdk/src/Pool/SessionPool.cs @@ -9,7 +9,7 @@ internal abstract class SessionPool where TSession : SessionBase _idleSessions = new(); - private readonly int _createSessionTimeoutMs; + private readonly int _createSessionTimeout; private readonly int _size; protected readonly SessionPoolConfig Config; @@ -23,50 +23,61 @@ protected SessionPool(ILogger> logger, SessionPoolConfig c Logger = logger; Config = config; _size = config.MaxSessionPool; - _createSessionTimeoutMs = config.CreateSessionTimeout * 1000; + _createSessionTimeout = config.CreateSessionTimeout; _semaphore = new SemaphoreSlim(_size); } internal async Task GetSession(CancellationToken cancellationToken = default) { - using var ctsGetSession = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); - if (_createSessionTimeoutMs > 0) + try { - ctsGetSession.CancelAfter(_createSessionTimeoutMs); - } + using var ctsGetSession = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); + if (_createSessionTimeout > 0) + { + ctsGetSession.CancelAfter(TimeSpan.FromSeconds(_createSessionTimeout)); + } - var finalCancellationToken = ctsGetSession.Token; + var finalCancellationToken = ctsGetSession.Token; - Interlocked.Increment(ref _waitingCount); + Interlocked.Increment(ref _waitingCount); - if (_disposed) - { - throw new YdbException("Session pool is closed"); - } + if (_disposed) + { + throw new YdbException("Session pool is closed"); + } - await _semaphore.WaitAsync(finalCancellationToken); - Interlocked.Decrement(ref _waitingCount); + await _semaphore.WaitAsync(finalCancellationToken); + Interlocked.Decrement(ref _waitingCount); - if (_idleSessions.TryDequeue(out var session) && session.IsActive) - { - return session; - } + if (_idleSessions.TryDequeue(out var session) && session.IsActive) + { + return session; + } - if (session != null) // not active - { - Logger.LogDebug("Session[{Id}] isn't active, creating new session", session.SessionId); - } + if (session != null) // not active + { + Logger.LogDebug("Session[{Id}] isn't active, creating new session", session.SessionId); + } - try - { - return await CreateSession(finalCancellationToken); + try + { + return await CreateSession(finalCancellationToken); + } + catch (Exception e) + { + Release(); + + Logger.LogError(e, "Failed to create a session"); + throw; + } } - catch (Exception e) + catch (OperationCanceledException e) { - Release(); - - Logger.LogError(e, "Failed to create a session"); - throw; + throw new YdbException(StatusCode.Cancelled, + $"The connection pool has been exhausted, either raise 'MaxSessionPool' " + + $"(currently {_size}) or 'CreateSessionTimeout' " + + $"(currently {_createSessionTimeout} seconds) in your connection string.", e + ); } } @@ -86,47 +97,29 @@ internal async Task ExecOnSession(Func> onSession, Retry return await onSession(session); } - catch (Exception e) + catch (YdbException e) { - var statusErr = e switch - { - Driver.TransportException transportException => transportException.Status, - StatusUnsuccessfulException unsuccessfulException => unsuccessfulException.Status, - _ => null - }; - if (attempt == retrySettings.MaxAttempts - 1) { - if (statusErr != null) - { - session?.OnStatus(statusErr); - } + session?.OnNotSuccessStatusCode(e.Code); throw; } - if (statusErr != null) - { - session?.OnStatus(statusErr); - var retryRule = retrySettings.GetRetryRule(statusErr.StatusCode); + session?.OnNotSuccessStatusCode(e.Code); + var retryRule = retrySettings.GetRetryRule(e.Code); - if (retryRule.Policy == RetryPolicy.None || - (retryRule.Policy == RetryPolicy.IdempotentOnly && !retrySettings.IsIdempotent)) - { - throw; - } - - Logger.LogTrace( - "Retry: attempt {attempt}, Session ${session.SessionId}, idempotent error {status} retrying", - attempt, session?.SessionId, statusErr); - - - await Task.Delay(retryRule.BackoffSettings.CalcBackoff(attempt)); - } - else + if (retryRule.Policy == RetryPolicy.None || + (retryRule.Policy == RetryPolicy.IdempotentOnly && !retrySettings.IsIdempotent)) { throw; } + + Logger.LogTrace(e, "Retry: attempt {attempt}, Session ${session.SessionId}, idempotent error retrying", + attempt, session?.SessionId); + + + await Task.Delay(retryRule.BackoffSettings.CalcBackoff(attempt)); } finally { @@ -156,10 +149,6 @@ internal async ValueTask ReleaseSession(TSession session) { _idleSessions.Enqueue(session); } - else - { - _ = DeleteSession(session); - } } finally { @@ -169,11 +158,20 @@ internal async ValueTask ReleaseSession(TSession session) private void Release() => _semaphore.Release(); - private Task DeleteSession(TSession session) => - session.DeleteSession().ContinueWith(s => + private async Task DeleteSession(TSession session) + { + try + { + if (session.IsActive) + { + await session.DeleteSession(); + } + } + catch (YdbException e) { - Logger.LogDebug("Session[{id}] removed with status {status}", session.SessionId, s.Result); - }); + Logger.LogError(e, "Failed to delete session"); + } + } public async ValueTask DisposeAsync() { @@ -223,10 +221,10 @@ internal SessionBase(SessionPool sessionPool, string sessionId, long nodeId, _logger = logger; } - internal void OnStatus(Status status) + internal void OnNotSuccessStatusCode(StatusCode code) { // ReSharper disable once InvertIf - if (status.StatusCode is + if (code is StatusCode.Cancelled or StatusCode.BadSession or StatusCode.SessionBusy or @@ -235,7 +233,7 @@ StatusCode.ClientTransportTimeout or StatusCode.Unavailable or StatusCode.ClientTransportUnavailable) { - _logger.LogWarning("Session[{SessionId}] is deactivated. Reason: {Status}", SessionId, status); + _logger.LogWarning("Session[{SessionId}] is deactivated. Reason StatusCode: {Code}", SessionId, code); IsActive = false; } @@ -249,7 +247,7 @@ internal TS MakeGrpcRequestSettings(TS settings) where TS : GrpcRequestSetti return settings; } - internal abstract Task DeleteSession(); + internal abstract Task DeleteSession(); } internal record SessionPoolConfig( diff --git a/src/Ydb.Sdk/src/Services/Auth/StaticCredentialsAuthClient.cs b/src/Ydb.Sdk/src/Services/Auth/StaticCredentialsAuthClient.cs index afc5e632..e93fa298 100644 --- a/src/Ydb.Sdk/src/Services/Auth/StaticCredentialsAuthClient.cs +++ b/src/Ydb.Sdk/src/Services/Auth/StaticCredentialsAuthClient.cs @@ -3,10 +3,10 @@ using Microsoft.Extensions.Logging; using Ydb.Auth; using Ydb.Auth.V1; +using Ydb.Sdk.Ado; +using Ydb.Sdk.Ado.Internal; using Ydb.Sdk.Auth; -using Ydb.Sdk.Client; using Ydb.Sdk.Pool; -using Ydb.Sdk.Services.Operations; namespace Ydb.Sdk.Services.Auth; @@ -34,72 +34,48 @@ public async Task FetchToken() uint attempt = 0; while (true) { - var loginResponse = await Login(); - var status = loginResponse.Status; - - if (status.IsSuccess) + try { - return new TokenResponse(loginResponse.Result, new JwtSecurityToken(loginResponse.Result).ValidTo); - } + var token = await Login(); - _logger.LogError("Login request get wrong status {Status}", status); + return new TokenResponse(token, new JwtSecurityToken(token).ValidTo); + } + catch (YdbException e) + { + _logger.LogError(e, "Login request get wrong status"); - var retryRule = _retrySettings.GetRetryRule(status.StatusCode); + var retryRule = _retrySettings.GetRetryRule(e.Code); - if (retryRule.Policy == RetryPolicy.None) - { - throw new StatusUnsuccessfulException(status); - } + if (retryRule.Policy == RetryPolicy.None || ++attempt >= _retrySettings.MaxAttempts) + { + throw; + } - if (++attempt >= _retrySettings.MaxAttempts) - { - throw new StatusUnsuccessfulException(status); + await Task.Delay(retryRule.BackoffSettings.CalcBackoff(attempt)); } - - await Task.Delay(retryRule.BackoffSettings.CalcBackoff(attempt)); } } - private async Task Login() + private async Task Login() { - var request = new LoginRequest - { - User = _config.User - }; + var request = new LoginRequest { User = _config.User }; if (_config.Password is not null) { request.Password = _config.Password; } - try - { - using var channel = _grpcChannelFactory.CreateChannel(_config.Endpoint); - - var response = await new AuthService.AuthServiceClient(channel) - .LoginAsync(request, new CallOptions(_config.GetCallMetadata)); - - var status = response.Operation.TryUnpack(out LoginResult? resultProto); - - string? result = null; + using var channel = _grpcChannelFactory.CreateChannel(_config.Endpoint); - if (status.IsSuccess && resultProto is not null) - { - result = resultProto.Token; - } + var response = await new AuthService.AuthServiceClient(channel) + .LoginAsync(request, new CallOptions(_config.GetCallMetadata)); - return new LoginResponse(status, result); - } - catch (Driver.TransportException e) + var operation = response.Operation; + if (operation.Status.IsNotSuccess()) { - return new LoginResponse(e.Status); + throw YdbException.FromServer(operation.Status, operation.Issues); } - } - private class LoginResponse : ResponseWithResultBase - { - internal LoginResponse(Status status, string? token = null) : base(status, token) - { - } + return operation.Result.Unpack().Token; } } diff --git a/src/Ydb.Sdk/src/Services/Operations/GetOperation.cs b/src/Ydb.Sdk/src/Services/Operations/GetOperation.cs index 9270da10..1fdb1887 100644 --- a/src/Ydb.Sdk/src/Services/Operations/GetOperation.cs +++ b/src/Ydb.Sdk/src/Services/Operations/GetOperation.cs @@ -15,18 +15,11 @@ public async Task GetOperation(string id, GrpcRequestSettings? Id = id }; - try - { - var response = await _driver.UnaryCall( - method: OperationService.GetOperationMethod, - request: request, - settings: settings); + var response = await _driver.UnaryCall( + method: OperationService.GetOperationMethod, + request: request, + settings: settings); - return ClientOperation.FromProto(response.Operation); - } - catch (Driver.TransportException e) - { - return new ClientOperation(e.Status); - } + return ClientOperation.FromProto(response.Operation); } } diff --git a/src/Ydb.Sdk/src/Services/Operations/Poll.cs b/src/Ydb.Sdk/src/Services/Operations/Poll.cs index a6b64fe3..1af82988 100644 --- a/src/Ydb.Sdk/src/Services/Operations/Poll.cs +++ b/src/Ydb.Sdk/src/Services/Operations/Poll.cs @@ -6,7 +6,7 @@ public partial class OperationsClient { public async Task PollReady( string id, - TimeSpan? delay = default, + TimeSpan? delay = null, CancellationToken cancellationToken = default) { delay ??= TimeSpan.FromSeconds(10); diff --git a/src/Ydb.Sdk/src/Services/Query/QueryClient.cs b/src/Ydb.Sdk/src/Services/Query/QueryClient.cs index d7999624..443d3b02 100644 --- a/src/Ydb.Sdk/src/Services/Query/QueryClient.cs +++ b/src/Ydb.Sdk/src/Services/Query/QueryClient.cs @@ -1,4 +1,5 @@ using System.Collections.Immutable; +using Ydb.Sdk.Ado; using Ydb.Sdk.Pool; using Ydb.Sdk.Value; @@ -130,11 +131,7 @@ public Task DoTx(Func> queryTx, TxMode txMode = TxMode.Se return result; } - catch (StatusUnsuccessfulException) - { - throw; - } - catch (Driver.TransportException) + catch (YdbException) { throw; } diff --git a/src/Ydb.Sdk/src/Services/Query/QueryTx.cs b/src/Ydb.Sdk/src/Services/Query/QueryTx.cs index ddb9b320..c9aadf3e 100644 --- a/src/Ydb.Sdk/src/Services/Query/QueryTx.cs +++ b/src/Ydb.Sdk/src/Services/Query/QueryTx.cs @@ -76,8 +76,7 @@ public async Task Rollback() Commited = true; - var status = await _session.RollbackTransaction(TxId!); - status.EnsureSuccess(); + await _session.RollbackTransaction(TxId!); } internal async Task Commit() diff --git a/src/Ydb.Sdk/src/Services/Query/SessionPool.cs b/src/Ydb.Sdk/src/Services/Query/SessionPool.cs index 07914f4b..15ff1060 100644 --- a/src/Ydb.Sdk/src/Services/Query/SessionPool.cs +++ b/src/Ydb.Sdk/src/Services/Query/SessionPool.cs @@ -1,6 +1,8 @@ using Microsoft.Extensions.Logging; using Ydb.Query; using Ydb.Query.V1; +using Ydb.Sdk.Ado; +using Ydb.Sdk.Ado.Internal; using Ydb.Sdk.Pool; using Ydb.Sdk.Services.Table; using Ydb.Sdk.Value; @@ -9,7 +11,6 @@ using CommitTransactionRequest = Ydb.Query.CommitTransactionRequest; using CreateSessionRequest = Ydb.Query.CreateSessionRequest; using DeleteSessionRequest = Ydb.Query.DeleteSessionRequest; -using DescribeTableResponse = Ydb.Table.DescribeTableResponse; using RollbackTransactionRequest = Ydb.Query.RollbackTransactionRequest; using TransactionControl = Ydb.Query.TransactionControl; @@ -86,12 +87,13 @@ protected override async Task CreateSession( while (await stream.MoveNextAsync()) { var sessionState = stream.Current; - var sessionStateStatus = Status.FromProto(sessionState.Status, sessionState.Issues); - Logger.LogDebug("Session[{SessionId}] was received the status from the attach stream: {Status}", - sessionId, sessionStateStatus); + var statusCode = sessionState.Status.Code(); - session.OnStatus(sessionStateStatus); + Logger.LogDebug("Session[{SessionId}] was received the status from the attach stream: {Code}", + sessionId, statusCode); + + session.OnNotSuccessStatusCode(statusCode); // ReSharper disable once InvertIf if (!session.IsActive) @@ -104,9 +106,9 @@ protected override async Task CreateSession( // attach stream is closed } - catch (Driver.TransportException e) + catch (YdbException e) { - if (e.Status.StatusCode == StatusCode.Cancelled) + if (e.Code == StatusCode.Cancelled) { Logger.LogDebug("AttachStream is cancelled (possible grpcChannel is closing)"); @@ -172,31 +174,37 @@ internal ValueTask> ExecuteQuery( return Driver.ServerStreamCall(QueryService.ExecuteQueryMethod, request, settings); } - internal async Task CommitTransaction(string txId, GrpcRequestSettings? settings = null) + internal async Task CommitTransaction(string txId, CancellationToken cancellationToken = default) { - settings = MakeGrpcRequestSettings(settings ?? new GrpcRequestSettings()); + var settings = MakeGrpcRequestSettings(new GrpcRequestSettings { CancellationToken = cancellationToken }); var response = await Driver.UnaryCall(QueryService.CommitTransactionMethod, new CommitTransactionRequest { SessionId = SessionId, TxId = txId }, settings); - return Status.FromProto(response.Status, response.Issues); + if (response.Status.IsNotSuccess()) + { + throw YdbException.FromServer(response.Status, response.Issues); + } } - internal async Task RollbackTransaction(string txId, GrpcRequestSettings? settings = null) + internal async Task RollbackTransaction(string txId, CancellationToken cancellationToken = default) { - settings = MakeGrpcRequestSettings(settings ?? new GrpcRequestSettings()); + var settings = MakeGrpcRequestSettings(new GrpcRequestSettings { CancellationToken = cancellationToken }); var response = await Driver.UnaryCall(QueryService.RollbackTransactionMethod, new RollbackTransactionRequest { SessionId = SessionId, TxId = txId }, settings); - return Status.FromProto(response.Status, response.Issues); + if (response.Status.IsNotSuccess()) + { + throw YdbException.FromServer(response.Status, response.Issues); + } } - internal async Task DescribeTable(string path, DescribeTableSettings? settings = null) + internal async Task DescribeTable(string path, DescribeTableSettings? settings = null) { settings = MakeGrpcRequestSettings(settings ?? new DescribeTableSettings()); - return await Driver.UnaryCall( + var response = await Driver.UnaryCall( TableService.DescribeTableMethod, new DescribeTableRequest { @@ -207,28 +215,31 @@ internal async Task DescribeTable(string path, DescribeTa }, settings ); + + if (response.Operation.Status.IsNotSuccess()) + { + throw YdbException.FromServer(response.Operation.Status, response.Operation.Issues); + } + + return response.Operation.Result.Unpack(); } - internal override async Task DeleteSession() + internal override async Task DeleteSession() { - try - { - IsActive = false; + IsActive = false; - var settings = MakeGrpcRequestSettings(new GrpcRequestSettings - { TransportTimeout = TimeSpan.FromSeconds(5) }); + var settings = MakeGrpcRequestSettings(new GrpcRequestSettings + { TransportTimeout = TimeSpan.FromSeconds(5) }); - var deleteSessionResponse = await Driver.UnaryCall( - QueryService.DeleteSessionMethod, - new DeleteSessionRequest { SessionId = SessionId }, - settings - ); + var deleteSessionResponse = await Driver.UnaryCall( + QueryService.DeleteSessionMethod, + new DeleteSessionRequest { SessionId = SessionId }, + settings + ); - return Status.FromProto(deleteSessionResponse.Status, deleteSessionResponse.Issues); - } - catch (Driver.TransportException e) + if (deleteSessionResponse.Status.IsNotSuccess()) { - return e.Status; + throw YdbException.FromServer(deleteSessionResponse.Status, deleteSessionResponse.Issues); } } } diff --git a/src/Ydb.Sdk/src/Services/Scheme/SchemeClient.cs b/src/Ydb.Sdk/src/Services/Scheme/SchemeClient.cs index 1c2c4f71..01122b27 100644 --- a/src/Ydb.Sdk/src/Services/Scheme/SchemeClient.cs +++ b/src/Ydb.Sdk/src/Services/Scheme/SchemeClient.cs @@ -133,27 +133,20 @@ public async Task ListDirectory(string path, ListDirector Path = path }; - try - { - var response = await _driver.UnaryCall( - method: SchemeService.ListDirectoryMethod, - request: request, - settings: settings - ); - - var status = response.Operation.TryUnpack(out ListDirectoryResult? resultProto); + var response = await _driver.UnaryCall( + method: SchemeService.ListDirectoryMethod, + request: request, + settings: settings + ); - ListDirectoryResponse.ResultData? result = null; - if (status.IsSuccess && resultProto != null) - { - result = ListDirectoryResponse.ResultData.FromProto(resultProto); - } + var status = response.Operation.TryUnpack(out ListDirectoryResult? resultProto); - return new ListDirectoryResponse(status, result); - } - catch (Driver.TransportException e) + ListDirectoryResponse.ResultData? result = null; + if (status.IsSuccess && resultProto != null) { - return new ListDirectoryResponse(e.Status); + result = ListDirectoryResponse.ResultData.FromProto(resultProto); } + + return new ListDirectoryResponse(status, result); } } diff --git a/src/Ydb.Sdk/src/Services/Table/AlterTable.cs b/src/Ydb.Sdk/src/Services/Table/AlterTable.cs index c5d28e54..47f8bf8e 100644 --- a/src/Ydb.Sdk/src/Services/Table/AlterTable.cs +++ b/src/Ydb.Sdk/src/Services/Table/AlterTable.cs @@ -52,10 +52,9 @@ protected override AlterTableMetadata UnpackMetadata(ClientOperation operation) public async Task Poll() => new(_operationsClient, await _operationsClient.GetOperation(Id)); - public async Task PollReady(TimeSpan? delay = default, + public async Task PollReady(TimeSpan? delay = null, CancellationToken cancellationToken = default) => - new(_operationsClient, - await _operationsClient.PollReady(Id, delay, cancellationToken)); + new(_operationsClient, await _operationsClient.PollReady(Id, delay, cancellationToken)); } public class AddIndexSettings : OperationSettings @@ -85,22 +84,15 @@ public async Task AddIndex(string tablePath, AddIndexSettin } }; - try - { - var response = await _driver.UnaryCall( - method: TableService.AlterTableMethod, - request: request, - settings: settings - ); - - return new AlterTableOperation( - new OperationsClient(_driver), - ClientOperation.FromProto(response.Operation) - ); - } - catch (Driver.TransportException e) - { - return new AlterTableOperation(new OperationsClient(_driver), e.Status); - } + var response = await _driver.UnaryCall( + method: TableService.AlterTableMethod, + request: request, + settings: settings + ); + + return new AlterTableOperation( + new OperationsClient(_driver), + ClientOperation.FromProto(response.Operation) + ); } } diff --git a/src/Ydb.Sdk/src/Services/Table/CopyTable.cs b/src/Ydb.Sdk/src/Services/Table/CopyTable.cs index b06d1583..7a078a32 100644 --- a/src/Ydb.Sdk/src/Services/Table/CopyTable.cs +++ b/src/Ydb.Sdk/src/Services/Table/CopyTable.cs @@ -62,21 +62,14 @@ public async Task CopyTable(string sourcePath, string destina DestinationPath = MakeTablePath(destinationPath) }; - try - { - var response = await _driver.UnaryCall( - method: TableService.CopyTableMethod, - request: request, - settings: settings - ); + var response = await _driver.UnaryCall( + method: TableService.CopyTableMethod, + request: request, + settings: settings + ); - var status = response.Operation.Unpack(); - return new CopyTableResponse(status); - } - catch (Driver.TransportException e) - { - return new CopyTableResponse(e.Status); - } + var status = response.Operation.Unpack(); + return new CopyTableResponse(status); } public async Task CopyTables(List tableItems, @@ -89,19 +82,12 @@ public async Task CopyTables(List tableItems, }; request.Tables.AddRange(tableItems.Select(item => item.GetProto(this))); - try - { - var response = await _driver.UnaryCall( - method: TableService.CopyTablesMethod, - request: request, - settings: settings); + var response = await _driver.UnaryCall( + method: TableService.CopyTablesMethod, + request: request, + settings: settings); - var status = response.Operation.Unpack(); - return new CopyTablesResponse(status); - } - catch (Driver.TransportException e) - { - return new CopyTablesResponse(e.Status); - } + var status = response.Operation.Unpack(); + return new CopyTablesResponse(status); } } diff --git a/src/Ydb.Sdk/src/Services/Table/CreateSession.cs b/src/Ydb.Sdk/src/Services/Table/CreateSession.cs index 145f1ac8..052ea728 100644 --- a/src/Ydb.Sdk/src/Services/Table/CreateSession.cs +++ b/src/Ydb.Sdk/src/Services/Table/CreateSession.cs @@ -53,27 +53,20 @@ public async Task CreateSession(CreateSessionSettings? se OperationParams = settings.MakeOperationParams() }; - try - { - var response = await _driver.UnaryCall( - method: TableService.CreateSessionMethod, - request: request, - settings: settings - ); - - var status = response.Operation.TryUnpack(out CreateSessionResult? resultProto); + var response = await _driver.UnaryCall( + method: TableService.CreateSessionMethod, + request: request, + settings: settings + ); - CreateSessionResponse.ResultData? result = null; - if (status.IsSuccess && resultProto != null) - { - result = CreateSessionResponse.ResultData.FromProto(resultProto, _driver); - } + var status = response.Operation.TryUnpack(out CreateSessionResult? resultProto); - return new CreateSessionResponse(status, result); - } - catch (Driver.TransportException e) + CreateSessionResponse.ResultData? result = null; + if (status.IsSuccess && resultProto != null) { - return new CreateSessionResponse(e.Status); + result = CreateSessionResponse.ResultData.FromProto(resultProto, _driver); } + + return new CreateSessionResponse(status, result); } } diff --git a/src/Ydb.Sdk/src/Services/Table/DeleteSession.cs b/src/Ydb.Sdk/src/Services/Table/DeleteSession.cs index 867ec732..9cd0f38f 100644 --- a/src/Ydb.Sdk/src/Services/Table/DeleteSession.cs +++ b/src/Ydb.Sdk/src/Services/Table/DeleteSession.cs @@ -28,20 +28,13 @@ public async Task DeleteSession(string sessionId, DeleteS SessionId = sessionId }; - try - { - var response = await _driver.UnaryCall( - method: TableService.DeleteSessionMethod, - request: request, - settings: settings); + var response = await _driver.UnaryCall( + method: TableService.DeleteSessionMethod, + request: request, + settings: settings); - var status = response.Operation.Unpack(); + var status = response.Operation.Unpack(); - return new DeleteSessionResponse(status); - } - catch (Driver.TransportException e) - { - return new DeleteSessionResponse(e.Status); - } + return new DeleteSessionResponse(status); } } diff --git a/src/Ydb.Sdk/src/Services/Table/DescribeTable.cs b/src/Ydb.Sdk/src/Services/Table/DescribeTable.cs index da8d5211..7c45f87a 100644 --- a/src/Ydb.Sdk/src/Services/Table/DescribeTable.cs +++ b/src/Ydb.Sdk/src/Services/Table/DescribeTable.cs @@ -530,27 +530,20 @@ public async Task DescribeTable(string tablePath, Describ IncludePartitionStats = settings.IncludePartitionStats }; - try - { - var response = await _driver.UnaryCall( - method: TableService.DescribeTableMethod, - request: request, - settings: settings - ); - - var status = response.Operation.TryUnpack(out DescribeTableResult? resultProto); - DescribeTableResponse.ResultData? result = null; + var response = await _driver.UnaryCall( + method: TableService.DescribeTableMethod, + request: request, + settings: settings + ); - if (status.IsSuccess && resultProto is not null) - { - result = DescribeTableResponse.ResultData.FromProto(resultProto); - } + var status = response.Operation.TryUnpack(out DescribeTableResult? resultProto); + DescribeTableResponse.ResultData? result = null; - return new DescribeTableResponse(status, result); - } - catch (Driver.TransportException e) + if (status.IsSuccess && resultProto is not null) { - return new DescribeTableResponse(e.Status); + result = DescribeTableResponse.ResultData.FromProto(resultProto); } + + return new DescribeTableResponse(status, result); } } diff --git a/src/Ydb.Sdk/src/Services/Table/ExecuteDataQuery.cs b/src/Ydb.Sdk/src/Services/Table/ExecuteDataQuery.cs index b775b4dd..cd7b3ad2 100644 --- a/src/Ydb.Sdk/src/Services/Table/ExecuteDataQuery.cs +++ b/src/Ydb.Sdk/src/Services/Table/ExecuteDataQuery.cs @@ -79,44 +79,37 @@ public async Task ExecuteDataQuery( request.Parameters.Add(parameters.ToDictionary(p => p.Key, p => p.Value.GetProto())); - try + var response = await UnaryCall( + method: TableService.ExecuteDataQueryMethod, + request: request, + settings: settings + ); + + var status = response.Operation.TryUnpack(out ExecuteQueryResult? resultProto); + OnResponseStatus(status); + + var txState = TransactionState.Unknown; + Transaction? tx = null; + if (resultProto?.TxMeta != null) { - var response = await UnaryCall( - method: TableService.ExecuteDataQueryMethod, - request: request, - settings: settings - ); - - var status = response.Operation.TryUnpack(out ExecuteQueryResult? resultProto); - OnResponseStatus(status); - - var txState = TransactionState.Unknown; - Transaction? tx = null; - if (resultProto?.TxMeta != null) - { - txState = resultProto.TxMeta.Id.Length > 0 - ? TransactionState.Active - : TransactionState.Void; + txState = resultProto.TxMeta.Id.Length > 0 + ? TransactionState.Active + : TransactionState.Void; - tx = Transaction.FromProto(resultProto.TxMeta, Logger); - } + tx = Transaction.FromProto(resultProto.TxMeta, Logger); + } - ExecuteDataQueryResponse.ResultData? result = null; - if (status.IsSuccess && resultProto != null) + ExecuteDataQueryResponse.ResultData? result = null; + if (status.IsSuccess && resultProto != null) + { + result = ExecuteDataQueryResponse.ResultData.FromProto(resultProto); + if (!settings.AllowTruncated && result.ResultSets.Any(set => set.Truncated)) { - result = ExecuteDataQueryResponse.ResultData.FromProto(resultProto); - if (!settings.AllowTruncated && result.ResultSets.Any(set => set.Truncated)) - { - throw new TruncateException(); - } + throw new TruncateException(); } - - return new ExecuteDataQueryResponse(status, txState, tx, result); - } - catch (Driver.TransportException e) - { - return new ExecuteDataQueryResponse(e.Status, TransactionState.Unknown); } + + return new ExecuteDataQueryResponse(status, txState, tx, result); } public async Task ExecuteDataQuery( diff --git a/src/Ydb.Sdk/src/Services/Table/ExecuteSchemeQuery.cs b/src/Ydb.Sdk/src/Services/Table/ExecuteSchemeQuery.cs index c4711d5a..a7277051 100644 --- a/src/Ydb.Sdk/src/Services/Table/ExecuteSchemeQuery.cs +++ b/src/Ydb.Sdk/src/Services/Table/ExecuteSchemeQuery.cs @@ -33,22 +33,15 @@ public async Task ExecuteSchemeQuery( YqlText = query }; - try - { - var response = await UnaryCall( - method: TableService.ExecuteSchemeQueryMethod, - request: request, - settings: settings - ); - - var status = response.Operation.Unpack(); - OnResponseStatus(status); - - return new ExecuteSchemeQueryResponse(status); - } - catch (Driver.TransportException e) - { - return new ExecuteSchemeQueryResponse(e.Status); - } + var response = await UnaryCall( + method: TableService.ExecuteSchemeQueryMethod, + request: request, + settings: settings + ); + + var status = response.Operation.Unpack(); + OnResponseStatus(status); + + return new ExecuteSchemeQueryResponse(status); } } diff --git a/src/Ydb.Sdk/src/Services/Table/KeepAlive.cs b/src/Ydb.Sdk/src/Services/Table/KeepAlive.cs index a760feaa..ec3d9bc2 100644 --- a/src/Ydb.Sdk/src/Services/Table/KeepAlive.cs +++ b/src/Ydb.Sdk/src/Services/Table/KeepAlive.cs @@ -61,27 +61,20 @@ public async Task KeepAlive(string sessionId, KeepAliveSettin SessionId = sessionId }; - try - { - var response = await _driver.UnaryCall( - method: TableService.KeepAliveMethod, - request: request, - settings: settings - ); - - var status = response.Operation.TryUnpack(out KeepAliveResult? resultProto); + var response = await _driver.UnaryCall( + method: TableService.KeepAliveMethod, + request: request, + settings: settings + ); - KeepAliveResponse.ResultData? result = null; - if (status.IsSuccess && resultProto != null) - { - result = KeepAliveResponse.ResultData.FromProto(resultProto); - } + var status = response.Operation.TryUnpack(out KeepAliveResult? resultProto); - return new KeepAliveResponse(status, result); - } - catch (Driver.TransportException e) + KeepAliveResponse.ResultData? result = null; + if (status.IsSuccess && resultProto != null) { - return new KeepAliveResponse(e.Status); + result = KeepAliveResponse.ResultData.FromProto(resultProto); } + + return new KeepAliveResponse(status, result); } } diff --git a/src/Ydb.Sdk/src/Services/Table/RenameTables.cs b/src/Ydb.Sdk/src/Services/Table/RenameTables.cs index 5a074666..5bf426b7 100644 --- a/src/Ydb.Sdk/src/Services/Table/RenameTables.cs +++ b/src/Ydb.Sdk/src/Services/Table/RenameTables.cs @@ -50,20 +50,13 @@ public async Task RenameTables(IEnumerable item.GetProto(this))); - try - { - var response = await _driver.UnaryCall( - method: TableService.RenameTablesMethod, - request: request, - settings: settings - ); - - var status = response.Operation.Unpack(); - return new RenameTablesResponse(status); - } - catch (Driver.TransportException e) - { - return new RenameTablesResponse(e.Status); - } + var response = await _driver.UnaryCall( + method: TableService.RenameTablesMethod, + request: request, + settings: settings + ); + + var status = response.Operation.Unpack(); + return new RenameTablesResponse(status); } } diff --git a/src/Ydb.Sdk/src/Services/Table/RollbackTransaction.cs b/src/Ydb.Sdk/src/Services/Table/RollbackTransaction.cs index 32aaeb8b..d99ca1a8 100644 --- a/src/Ydb.Sdk/src/Services/Table/RollbackTransaction.cs +++ b/src/Ydb.Sdk/src/Services/Table/RollbackTransaction.cs @@ -32,18 +32,11 @@ public async Task RollbackTransaction( OperationParams = settings.MakeOperationParams() }; - try - { - var response = await UnaryCall(TableService.RollbackTransactionMethod, request, settings); + var response = await UnaryCall(TableService.RollbackTransactionMethod, request, settings); - var status = response.Operation.Unpack(); - OnResponseStatus(status); + var status = response.Operation.Unpack(); + OnResponseStatus(status); - return new RollbackTransactionResponse(status); - } - catch (Driver.TransportException e) - { - return new RollbackTransactionResponse(e.Status); - } + return new RollbackTransactionResponse(status); } } diff --git a/src/Ydb.Sdk/src/Services/Topic/Exceptions.cs b/src/Ydb.Sdk/src/Services/Topic/Exceptions.cs index 4a99c99e..6e18e5ea 100644 --- a/src/Ydb.Sdk/src/Services/Topic/Exceptions.cs +++ b/src/Ydb.Sdk/src/Services/Topic/Exceptions.cs @@ -6,10 +6,6 @@ public WriterException(string message) : base(message) { } - public WriterException(string message, Status status) : base(message + ": " + status) - { - } - public WriterException(string message, Exception inner) : base(message, inner) { } @@ -21,10 +17,6 @@ public ReaderException(string message) : base(message) { } - public ReaderException(string message, Status status) : base(message + ": " + status) - { - } - public ReaderException(string message, Exception inner) : base(message, inner) { } diff --git a/src/Ydb.Sdk/src/Services/Topic/Reader/InternalBatchMessages.cs b/src/Ydb.Sdk/src/Services/Topic/Reader/InternalBatchMessages.cs index 31e50a3f..36a2f3f2 100644 --- a/src/Ydb.Sdk/src/Services/Topic/Reader/InternalBatchMessages.cs +++ b/src/Ydb.Sdk/src/Services/Topic/Reader/InternalBatchMessages.cs @@ -38,14 +38,14 @@ internal bool TryDequeueMessage([MaybeNullWhen(false)] out Message messa { if (!IsActive) { - message = default; + message = null; return false; } var index = _startMessageDataIndex++; var messageData = _batch.MessageData[index]; - _readerSession.TryReadRequestBytes(Utils - .CalculateApproximatelyBytesSize(_approximatelyBatchSize, OriginalMessageCount, index)); + _ = _readerSession.TryReadRequestBytes( + Utils.CalculateApproximatelyBytesSize(_approximatelyBatchSize, OriginalMessageCount, index)); TValue value; try @@ -82,7 +82,7 @@ internal bool TryPublicBatch([MaybeNullWhen(false)] out BatchMessages ba { if (!IsActive) { - batchMessages = default; + batchMessages = null; return false; } diff --git a/src/Ydb.Sdk/src/Services/Topic/Reader/Reader.cs b/src/Ydb.Sdk/src/Services/Topic/Reader/Reader.cs index 9df59891..eea1a7a1 100644 --- a/src/Ydb.Sdk/src/Services/Topic/Reader/Reader.cs +++ b/src/Ydb.Sdk/src/Services/Topic/Reader/Reader.cs @@ -2,6 +2,7 @@ using System.Threading.Channels; using Google.Protobuf.WellKnownTypes; using Microsoft.Extensions.Logging; +using Ydb.Sdk.Ado.Internal; using Ydb.Topic; using Ydb.Topic.V1; using static Ydb.Topic.StreamReadMessage.Types.FromServer; @@ -157,21 +158,23 @@ private async Task Initialize() var receivedInitMessage = stream.Current; - var status = Status.FromProto(receivedInitMessage.Status, receivedInitMessage.Issues); - - if (status.IsNotSuccess) + if (receivedInitMessage.Status.IsNotSuccess()) { - if (RetrySettings.DefaultInstance.GetRetryRule(status.StatusCode).Policy != RetryPolicy.None) + var statusCode = receivedInitMessage.Status.Code(); + var statusMessage = statusCode.ToMessage(receivedInitMessage.Issues); + + if (RetrySettings.DefaultInstance.GetRetryRule(statusCode).Policy != RetryPolicy.None) { - _logger.LogError("Reader initialization failed to start. Reason: {Status}", status); + _logger.LogError("Reader initialization failed to start. {StatusMessage}", statusMessage); _ = Task.Run(Initialize, _disposeCts.Token); } else { - _logger.LogCritical("Reader initialization failed to start. Reason: {Status}", status); + _logger.LogCritical("Reader initialization failed to start. {StatusMessage}", statusMessage); - _receivedMessagesChannel.Writer.Complete(new ReaderException("Initialization failed", status)); + _receivedMessagesChannel.Writer.Complete( + new ReaderException($"Initialization failed! {statusMessage}")); } return; @@ -300,13 +303,11 @@ private async Task RunProcessingStreamResponse() { var messageFromServer = Stream.Current; - var status = Status.FromProto(messageFromServer.Status, messageFromServer.Issues); - - if (status.IsNotSuccess) + if (messageFromServer.Status.IsNotSuccess()) { Logger.LogError( "ReaderSession[{SessionId}] received unsuccessful status while processing readAck: {Status}", - SessionId, status); + SessionId, messageFromServer.Status.Code().ToMessage(messageFromServer.Issues)); return; } @@ -340,8 +341,7 @@ private async Task RunProcessingStreamResponse() } catch (Exception e) { - Logger.LogError(e, "ReaderSession[{SessionId}] have error on processing server messages", - SessionId); + Logger.LogError(e, "ReaderSession[{SessionId}] have error on processing server messages", SessionId); } finally { @@ -370,7 +370,9 @@ private async Task RunProcessingStreamRequest() } } - internal async void TryReadRequestBytes(long bytes) + // Avoid using 'async' for method with the 'void' return type or catch all exceptions in it: + // any exceptions unhandled by the method might lead to the process crash + internal async Task TryReadRequestBytes(long bytes) { var readRequestBytes = Interlocked.Add(ref _readRequestBytes, bytes); @@ -582,8 +584,6 @@ public override async ValueTask DisposeAsync() Logger.LogInformation("ReaderSession[{SessionId}]: RequestStream is closed", SessionId); await _runProcessingStreamResponse; // waiting all ack's commits - - _lifecycleReaderSessionCts.Cancel(); } catch (Exception e) { @@ -591,6 +591,7 @@ public override async ValueTask DisposeAsync() } finally { + _lifecycleReaderSessionCts.Cancel(); Stream.Dispose(); } } diff --git a/src/Ydb.Sdk/src/Services/Topic/TopicSession.cs b/src/Ydb.Sdk/src/Services/Topic/TopicSession.cs index 3e3565ad..3739c47c 100644 --- a/src/Ydb.Sdk/src/Services/Topic/TopicSession.cs +++ b/src/Ydb.Sdk/src/Services/Topic/TopicSession.cs @@ -29,7 +29,7 @@ protected TopicSession( public bool IsActive => Volatile.Read(ref _isActive) == 1; - protected async void ReconnectSession() + protected void ReconnectSession() { if (Interlocked.CompareExchange(ref _isActive, 0, 1) == 0) { @@ -40,7 +40,7 @@ protected async void ReconnectSession() Logger.LogDebug("TopicSession[{SessionId}] has been deactivated, starting to reconnect", SessionId); - await _initialize(); + _ = Task.Run(() => _initialize()); } protected async Task SendMessage(TFromClient fromClient) diff --git a/src/Ydb.Sdk/src/Services/Topic/Writer/Writer.cs b/src/Ydb.Sdk/src/Services/Topic/Writer/Writer.cs index a05a4ca0..44af6fab 100644 --- a/src/Ydb.Sdk/src/Services/Topic/Writer/Writer.cs +++ b/src/Ydb.Sdk/src/Services/Topic/Writer/Writer.cs @@ -2,6 +2,7 @@ using Google.Protobuf; using Google.Protobuf.WellKnownTypes; using Microsoft.Extensions.Logging; +using Ydb.Sdk.Ado.Internal; using Ydb.Topic; using Ydb.Topic.V1; @@ -146,37 +147,44 @@ private async Task WaitBufferAvailable(CancellationToken cancellationToken) private async void StartWriteWorker() { - await Initialize(); - try { - while (!_disposeCts.Token.IsCancellationRequested) - { - await _tcsWakeUp.Task.WaitAsync(_disposeCts.Token); - _tcsWakeUp = new TaskCompletionSource(); + await Initialize(); - if (_toSendBuffer.IsEmpty) + try + { + while (!_disposeCts.Token.IsCancellationRequested) { - continue; - } + await _tcsWakeUp.Task.WaitAsync(_disposeCts.Token); + _tcsWakeUp = new TaskCompletionSource(); - await _sendInFlightMessagesSemaphoreSlim.WaitAsync(_disposeCts.Token); - try - { - if (_session.IsActive) + if (_toSendBuffer.IsEmpty) { - await _session.Write(_toSendBuffer); + continue; + } + + await _sendInFlightMessagesSemaphoreSlim.WaitAsync(_disposeCts.Token); + try + { + if (_session.IsActive) + { + await _session.Write(_toSendBuffer); + } + } + finally + { + _sendInFlightMessagesSemaphoreSlim.Release(); } } - finally - { - _sendInFlightMessagesSemaphoreSlim.Release(); - } + } + catch (OperationCanceledException) + { + _logger.LogInformation("WriteWorker[{WriterConfig}] is disposed", _config); } } - catch (OperationCanceledException) + catch (Exception e) { - _logger.LogInformation("WriteWorker[{WriterConfig}] is disposed", _config); + _logger.LogCritical(e, "WriteWorker[{WriterConfig}] has unhandled exception! Bug report!", _config); } } @@ -226,21 +234,22 @@ private async Task Initialize() var receivedInitMessage = stream.Current; - var status = Status.FromProto(receivedInitMessage.Status, receivedInitMessage.Issues); - - if (status.IsNotSuccess) + if (receivedInitMessage.Status.IsNotSuccess()) { - if (RetrySettings.DefaultInstance.GetRetryRule(status.StatusCode).Policy != RetryPolicy.None) + var statusCode = receivedInitMessage.Status.Code(); + var statusMessage = statusCode.ToMessage(receivedInitMessage.Issues); + + if (RetrySettings.DefaultInstance.GetRetryRule(statusCode).Policy != RetryPolicy.None) { - _logger.LogError("Writer initialization failed to start. Reason: {Status}", status); + _logger.LogError("Writer initialization failed to start. {StatusMessage}", statusMessage); _ = Task.Run(Initialize); } else { - _logger.LogCritical("Writer initialization failed to start. Reason: {Status}", status); + _logger.LogCritical("Writer initialization failed to start. {StatusMessage}", statusMessage); - _session = new NotStartedWriterSession("Initialization failed", status); + _session = new NotStartedWriterSession($"Initialization failed! {statusMessage}"); } return; @@ -390,11 +399,6 @@ public NotStartedWriterSession(string reasonExceptionMessage) _reasonException = new WriterException(reasonExceptionMessage); } - public NotStartedWriterSession(string reasonExceptionMessage, Status status) - { - _reasonException = new WriterException(reasonExceptionMessage, status); - } - public Task Write(ConcurrentQueue toSendBuffer) { while (toSendBuffer.TryDequeue(out var messageSending)) @@ -481,7 +485,7 @@ public async Task Write(ConcurrentQueue toSendBuffer) var messageData = sendData.MessageData; - if (messageData.SeqNo == default) + if (messageData.SeqNo == 0) { messageData.SeqNo = ++currentSeqNum; } @@ -511,13 +515,12 @@ private async Task RunProcessingWriteAck() while (await Stream.MoveNextAsync()) { var messageFromServer = Stream.Current; - var status = Status.FromProto(messageFromServer.Status, messageFromServer.Issues); - if (status.IsNotSuccess) + if (messageFromServer.Status.IsNotSuccess()) { Logger.LogError( "WriterSession[{SessionId}] received unsuccessful status while processing writeAck: {Status}", - SessionId, status); + SessionId, messageFromServer.Status.Code().ToMessage(messageFromServer.Issues)); return; } @@ -585,11 +588,20 @@ protected override MessageFromClient GetSendUpdateTokenRequest(string token) => public override async ValueTask DisposeAsync() { - Logger.LogDebug("WriterSession[{SessionId}]: start dispose process", SessionId); - - await Stream.RequestStreamComplete(); - await _processingResponseStream; + try + { + Logger.LogDebug("WriterSession[{SessionId}]: start dispose process", SessionId); - Stream.Dispose(); + await Stream.RequestStreamComplete(); + await _processingResponseStream; + } + catch (Exception e) + { + Logger.LogError(e, "WriterSession[{SessionId}]: error on disposing", SessionId); + } + finally + { + Stream.Dispose(); + } } } diff --git a/src/Ydb.Sdk/src/Status.cs b/src/Ydb.Sdk/src/Status.cs index 4d9ba932..e5d64c79 100644 --- a/src/Ydb.Sdk/src/Status.cs +++ b/src/Ydb.Sdk/src/Status.cs @@ -1,16 +1,17 @@ using System.Text; using Google.Protobuf.Collections; using Ydb.Issue; +using Ydb.Sdk.Ado; namespace Ydb.Sdk; internal static class StatusRanges { - public const uint ClientFirst = 500000; - public const uint ClientTransportFirst = 600000; + public const int ClientFirst = 500000; + public const int ClientTransportFirst = 600000; } -public enum StatusCode : uint +public enum StatusCode { Unspecified = 0, Success = 400000, @@ -148,7 +149,7 @@ private static string IssuesToString(IReadOnlyList issues, int currentInd foreach (var issue in issues) { sb.Append(issue.ToString(currentIndent, indent)); - sb.Append(Environment.NewLine); + sb.AppendLine(); } return sb.ToString(); @@ -183,7 +184,7 @@ public void EnsureSuccess() { if (!IsSuccess) { - throw new StatusUnsuccessfulException(this); + throw new YdbException(StatusCode, Issue.IssuesToString(Issues)); } } @@ -198,7 +199,7 @@ public override string ToString() } sb.Append(", Issues:"); - sb.Append(Environment.NewLine); + sb.AppendLine(); sb.Append(Issue.IssuesToString(Issues)); return sb.ToString(); @@ -206,10 +207,9 @@ public override string ToString() private static StatusCode ConvertStatusCode(StatusIds.Types.StatusCode statusCode) { - var value = (uint)statusCode; - if (Enum.IsDefined(typeof(StatusCode), value)) + if (Enum.IsDefined(typeof(StatusCode), (int)statusCode)) { - return (StatusCode)value; + return (StatusCode)statusCode; } return StatusCode.Unspecified; @@ -218,30 +218,3 @@ private static StatusCode ConvertStatusCode(StatusIds.Types.StatusCode statusCod public static Status FromProto(StatusIds.Types.StatusCode statusCode, RepeatedField issues) => new(ConvertStatusCode(statusCode), issues.Select(i => new Issue(i)).ToList()); } - -public class StatusUnsuccessfulException : Exception -{ - public StatusUnsuccessfulException(Status status) : base(status.ToString()) - { - Status = status; - } - - public Status Status { get; } -} - -internal static class StatusExtensions -{ - internal static Status ConvertStatus(this Grpc.Core.Status rpcStatus) => - new( - rpcStatus.StatusCode switch - { - Grpc.Core.StatusCode.Unavailable => StatusCode.ClientTransportUnavailable, - Grpc.Core.StatusCode.DeadlineExceeded => StatusCode.ClientTransportTimeout, - Grpc.Core.StatusCode.ResourceExhausted => StatusCode.ClientTransportResourceExhausted, - Grpc.Core.StatusCode.Unimplemented => StatusCode.ClientTransportUnimplemented, - Grpc.Core.StatusCode.Cancelled => StatusCode.Cancelled, - _ => StatusCode.ClientTransportUnknown - }, - new List { new(rpcStatus.Detail) } - ); -} diff --git a/src/Ydb.Sdk/src/Value/YdbValue.cs b/src/Ydb.Sdk/src/Value/YdbValue.cs index 1b0a923b..287017fa 100644 --- a/src/Ydb.Sdk/src/Value/YdbValue.cs +++ b/src/Ydb.Sdk/src/Value/YdbValue.cs @@ -1,6 +1,6 @@ namespace Ydb.Sdk.Value; -public enum YdbTypeId : uint +public enum YdbTypeId { Unknown = 0, @@ -45,7 +45,7 @@ public enum YdbTypeId : uint internal static class YdbTypeIdRanges { - public const uint ComplexTypesFirst = 0xffff; + public const int ComplexTypesFirst = 0xffff; } public sealed partial class YdbValue @@ -85,7 +85,7 @@ private static string ToYql(Type type) => internal static YdbTypeId GetYdbTypeId(Type protoType) => protoType.TypeCase switch { - Type.TypeOneofCase.TypeId => Enum.IsDefined(typeof(YdbTypeId), (uint)protoType.TypeId) + Type.TypeOneofCase.TypeId => Enum.IsDefined(typeof(YdbTypeId), (int)protoType.TypeId) ? (YdbTypeId)protoType.TypeId : YdbTypeId.Unknown, Type.TypeOneofCase.DecimalType => YdbTypeId.DecimalType, diff --git a/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/Auth/CachedCredentialsProviderTests.cs b/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/Auth/CachedCredentialsProviderTests.cs index 26e02761..a0802ed4 100644 --- a/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/Auth/CachedCredentialsProviderTests.cs +++ b/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/Auth/CachedCredentialsProviderTests.cs @@ -11,21 +11,18 @@ public class CachedCredentialsProviderTests private readonly Mock _mockAuthClient = new(); private readonly Mock _mockClock = new(); - [Fact] public async Task SyncState_To_ErrorState_To_SyncState_To_ActiveState() { var now = DateTime.UtcNow; _mockAuthClient.SetupSequence(authClient => authClient.FetchToken()) - .ThrowsAsync(new StatusUnsuccessfulException( - new Status(StatusCode.Unavailable, new List { new(":(") })) - ) + .ThrowsAsync(new YdbException(StatusCode.Unavailable, "Mock Unavailable")) .ReturnsAsync(new TokenResponse(Token, now.Add(TimeSpan.FromSeconds(2)))); _mockClock.Setup(clock => clock.UtcNow).Returns(now); var credentialsProvider = new CachedCredentialsProvider(_mockAuthClient.Object, _mockClock.Object); - await Assert.ThrowsAsync(() => credentialsProvider.GetAuthInfoAsync().AsTask()); + await Assert.ThrowsAsync(() => credentialsProvider.GetAuthInfoAsync().AsTask()); Assert.Equal(Token, await credentialsProvider.GetAuthInfoAsync()); Assert.Equal(Token, await credentialsProvider.GetAuthInfoAsync()); Assert.Equal(Token, await credentialsProvider.GetAuthInfoAsync()); @@ -136,8 +133,8 @@ public async Task Assert.Equal(Token, await credentialsProvider.GetAuthInfoAsync()); Assert.Equal(Token, await credentialsProvider.GetAuthInfoAsync()); var taskOnError = credentialsProvider.GetAuthInfoAsync(); - tcsTokenResponse.SetException(new StatusUnsuccessfulException(new Status(StatusCode.Unavailable))); - await Assert.ThrowsAsync(async () => await taskOnError); + tcsTokenResponse.SetException(new YdbException(StatusCode.Unavailable, "Mock Unavailable")); + await Assert.ThrowsAsync(async () => await taskOnError); Assert.Equal(Token + Token, await credentialsProvider.GetAuthInfoAsync()); Assert.Equal(Token + Token, await credentialsProvider.GetAuthInfoAsync()); _mockAuthClient.Verify(authClient => authClient.FetchToken(), Times.Exactly(3)); @@ -192,7 +189,7 @@ public async Task Assert.Equal(Token, await credentialsProvider.GetAuthInfoAsync()); Assert.Equal(Token, await credentialsProvider.GetAuthInfoAsync()); - tcsTokenResponse.SetException(new StatusUnsuccessfulException(new Status(StatusCode.Unavailable))); + tcsTokenResponse.SetException(new YdbException(StatusCode.Unavailable, "Mock Unavailable")); var taskOnBackground = credentialsProvider.GetAuthInfoAsync(); Assert.Equal(Token + Token, await taskOnBackground); Assert.Equal(Token + Token, await credentialsProvider.GetAuthInfoAsync()); @@ -248,7 +245,7 @@ public async Task Assert.Equal(Token, await credentialsProvider.GetAuthInfoAsync()); Assert.Equal(Token, await credentialsProvider.GetAuthInfoAsync()); - tcsTokenResponse.SetException(new StatusUnsuccessfulException(new Status(StatusCode.Unavailable))); + tcsTokenResponse.SetException(new YdbException(StatusCode.Unavailable, "Mock Unavailable")); var taskOnBackground = credentialsProvider.GetAuthInfoAsync(); Assert.Equal(Token, await taskOnBackground); Assert.Equal(Token + Token, await credentialsProvider.GetAuthInfoAsync()); diff --git a/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/Internal/StatusCodeTestUtils.cs b/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/Internal/StatusCodeTestUtils.cs new file mode 100644 index 00000000..cdc27c79 --- /dev/null +++ b/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/Internal/StatusCodeTestUtils.cs @@ -0,0 +1,100 @@ +using Xunit; +using Ydb.Issue; +using Ydb.Sdk.Ado.Internal; + +namespace Ydb.Sdk.Ado.Tests.Internal; + +public class StatusCodeTestUtils +{ + [Theory] + [InlineData(Grpc.Core.StatusCode.Unavailable, StatusCode.ClientTransportUnavailable)] + [InlineData(Grpc.Core.StatusCode.DeadlineExceeded, StatusCode.ClientTransportTimeout)] + [InlineData(Grpc.Core.StatusCode.ResourceExhausted, StatusCode.ClientTransportResourceExhausted)] + [InlineData(Grpc.Core.StatusCode.Unimplemented, StatusCode.ClientTransportUnimplemented)] + [InlineData(Grpc.Core.StatusCode.Cancelled, StatusCode.Cancelled)] + public void Code_GrpcCoreStatusCodeConvertToStatusCode_Assert( + Grpc.Core.StatusCode statusCode, + StatusCode expectedStatusCode + ) => Assert.Equal(expectedStatusCode, new Grpc.Core.Status(statusCode, "Mock status").Code()); + + + [Fact] + public void ServerMessage_WhenServerSendsEmptyIssues_Assert() => + Assert.Equal("Status: Aborted", StatusIds.Types.StatusCode.Aborted.Code().ToMessage([])); + + [Fact] + public void ServerMessage_WhenServerSendsListIssues_Assert() => Assert.Equal( + """ + Status: BadSession, Issues: + [0] Fatal: Session is bad :( + [1] Error: Session is very bad :( + [2] (good.txt:2:2) Warning: Session is very bad :(( + [1000] (1:1) Info: Session is very bad :) + [2000] Unknown SeverityCode 10: Unknown severity test :) + """, + StatusIds.Types.StatusCode.BadSession.Code().ToMessage(new List + { + new() { IssueCode = 0, Severity = 0, Message = "Session is bad :(" }, + new() { IssueCode = 1, Severity = 1, Message = "Session is very bad :(" }, + new() + { + IssueCode = 2, + Position = new IssueMessage.Types.Position { File = "good.txt", Column = 2, Row = 2 }, + Severity = 2, + Message = "Session is very bad :((" + }, + new() + { + IssueCode = 1000, + Position = new IssueMessage.Types.Position { Column = 1, Row = 1 }, + Severity = 3, + Message = "Session is very bad :)" + }, + new() { IssueCode = 2000, Severity = 10, Message = "Unknown severity test :)" } + })); + + [Fact] + public void ServerMessage_WhenServerSendsRecursiveListIssues_Assert() + { + var listIssues = new List(); + var recursiveIssue = new IssueMessage + { + IssueCode = 0, Severity = 0, Message = "Overloaded is bad :(" + }; + var recursiveIssueInRecIssue = new IssueMessage + { + IssueCode = 1, Severity = 1, Message = "Overloaded is very bad :(" + }; + recursiveIssueInRecIssue.Issues.Add(new IssueMessage + { + IssueCode = 2, + Position = new IssueMessage.Types.Position { File = "good.txt", Column = 2, Row = 2 }, + Severity = 2, + Message = "Overloaded is very bad :((" + }); + recursiveIssue.Issues.Add(recursiveIssueInRecIssue); + recursiveIssue.Issues.Add(new IssueMessage + { + IssueCode = 1000, + Position = new IssueMessage.Types.Position { Column = 1, Row = 1 }, + Severity = 3, + Message = "Overloaded is very bad :)" + }); + recursiveIssue.Issues.Add(new IssueMessage + { IssueCode = 2000, Severity = 10, Message = "Unknown severity test :)" }); + listIssues.Add(recursiveIssue); + + + Assert.Equal( + """ + Status: Overloaded, Issues: + [0] Fatal: Overloaded is bad :( + [1] Error: Overloaded is very bad :( + [2] (good.txt:2:2) Warning: Overloaded is very bad :(( + [1000] (1:1) Info: Overloaded is very bad :) + [2000] Unknown SeverityCode 10: Unknown severity test :) + """, + StatusIds.Types.StatusCode.Overloaded.Code().ToMessage(listIssues) + ); + } +} diff --git a/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/Pool/SessionPoolTests.cs b/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/Pool/SessionPoolTests.cs index 974e541b..01984d46 100644 --- a/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/Pool/SessionPoolTests.cs +++ b/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/Pool/SessionPoolTests.cs @@ -24,9 +24,9 @@ public async Task GetSession_WhenCreateSessionReturnUnavailable_ExpectedStatusAn _testSessionPool.CreatedStatus = Status .FromProto(StatusIds.Types.StatusCode.Unavailable, new RepeatedField()); - var e = await Assert.ThrowsAsync(async () => await _testSessionPool.GetSession()); + var e = await Assert.ThrowsAsync(async () => await _testSessionPool.GetSession()); - Assert.Equal(StatusCode.Unavailable, e.Status.StatusCode); + Assert.Equal(StatusCode.Unavailable, e.Code); _testSessionPool.CreatedStatus = Status.Success; @@ -34,18 +34,15 @@ public async Task GetSession_WhenCreateSessionReturnUnavailable_ExpectedStatusAn } [Theory] - [InlineData(StatusIds.Types.StatusCode.Unavailable)] - [InlineData(StatusIds.Types.StatusCode.BadSession)] - [InlineData(StatusIds.Types.StatusCode.SessionBusy)] - [InlineData(StatusIds.Types.StatusCode.InternalError)] + [InlineData(StatusCode.Unavailable)] + [InlineData(StatusCode.BadSession)] + [InlineData(StatusCode.SessionBusy)] + [InlineData(StatusCode.InternalError)] public void GetSession_WhenCreatedSessionIsInvalidated_ExpectedRecreatedSession( - StatusIds.Types.StatusCode statusCode) + StatusCode statusCode) { StressTestSessionPoolAndCheckCreatedSessions(100, TestSessionPoolSize); - - StressTestSessionPoolAndCheckCreatedSessions(70, 70, - session => session.OnStatus(Status.FromProto(statusCode, new RepeatedField()))); - + StressTestSessionPoolAndCheckCreatedSessions(70, 70, session => session.OnNotSuccessStatusCode(statusCode)); StressTestSessionPoolAndCheckCreatedSessions(100, 120); } diff --git a/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/YdbDataReaderTests.cs b/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/YdbDataReaderTests.cs index e21c4dd6..71b344b5 100644 --- a/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/YdbDataReaderTests.cs +++ b/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/YdbDataReaderTests.cs @@ -10,8 +10,8 @@ public class YdbDataReaderTests [Fact] public async Task BasedIteration_WhenNotCallMethodRead_ThrowException() { - var statuses = new List(); - var reader = await YdbDataReader.CreateYdbDataReader(EnumeratorSuccess(), statuses.Add); + var codes = new List(); + var reader = await YdbDataReader.CreateYdbDataReader(EnumeratorSuccess(), codes.Add); // Read first metadata Assert.True(reader.HasRows); @@ -34,7 +34,7 @@ public async Task BasedIteration_WhenNotCallMethodRead_ThrowException() Assert.Equal("No row is available", Assert.Throws(() => reader.GetValue(0)).Message); - Assert.Empty(statuses); + Assert.Empty(codes); await reader.CloseAsync(); Assert.True(reader.IsClosed); @@ -47,18 +47,18 @@ public async Task BasedIteration_WhenNotCallMethodRead_ThrowException() [Fact] public async Task CreateYdbDataReader_WhenAbortedStatus_ThrowException() { - var statuses = new List(); + var codes = new List(); Assert.Equal("Status: Aborted", (await Assert.ThrowsAsync(() => - YdbDataReader.CreateYdbDataReader(SingleEnumeratorFailed, statuses.Add))).Message); - Assert.Single(statuses); - Assert.Equal(StatusCode.Aborted, statuses[0].StatusCode); + YdbDataReader.CreateYdbDataReader(SingleEnumeratorFailed, codes.Add))).Message); + Assert.Single(codes); + Assert.Equal(StatusCode.Aborted, codes[0]); } [Fact] public async Task NextResult_WhenNextResultSkipResultSet_ReturnNextResultSet() { - var statuses = new List(); - var reader = await YdbDataReader.CreateYdbDataReader(EnumeratorSuccess(2), statuses.Add); + var codes = new List(); + var reader = await YdbDataReader.CreateYdbDataReader(EnumeratorSuccess(2), codes.Add); Assert.True(reader.NextResult()); Assert.True(reader.Read()); @@ -66,14 +66,14 @@ public async Task NextResult_WhenNextResultSkipResultSet_ReturnNextResultSet() Assert.False(reader.Read()); Assert.False(reader.NextResult()); - Assert.Empty(statuses); + Assert.Empty(codes); } [Fact] public async Task NextResult_WhenFirstRead_ReturnResultSet() { - var statuses = new List(); - var reader = await YdbDataReader.CreateYdbDataReader(EnumeratorSuccess(2), statuses.Add); + var codes = new List(); + var reader = await YdbDataReader.CreateYdbDataReader(EnumeratorSuccess(2), codes.Add); Assert.True(reader.Read()); Assert.True((bool)reader.GetValue(0)); @@ -85,14 +85,14 @@ public async Task NextResult_WhenFirstRead_ReturnResultSet() Assert.False(reader.NextResult()); Assert.False(reader.Read()); - Assert.Empty(statuses); + Assert.Empty(codes); } [Fact] public async Task NextResult_WhenLongResultSet_ReturnResultSet() { - var statuses = new List(); - var reader = await YdbDataReader.CreateYdbDataReader(EnumeratorSuccess(2, true), statuses.Add); + var codes = new List(); + var reader = await YdbDataReader.CreateYdbDataReader(EnumeratorSuccess(2, true), codes.Add); Assert.True(reader.Read()); Assert.True((bool)reader.GetValue(0)); @@ -106,13 +106,13 @@ public async Task NextResult_WhenLongResultSet_ReturnResultSet() Assert.False(reader.NextResult()); Assert.False(reader.Read()); - Assert.Empty(statuses); + Assert.Empty(codes); } [Fact] public async Task Read_WhenReadAsyncThrowException_AggregateIssuesBeforeErrorAndAfter() { - var statuses = new List(); + var codes = new List(); var result = ResultSet.Parser.ParseJson( "{ \"columns\": [ { \"name\": \"column0\", " + "\"type\": { \"typeId\": \"BOOL\" } } ], " + @@ -131,17 +131,18 @@ public async Task Read_WhenReadAsyncThrowException_AggregateIssuesBeforeErrorAnd nextFailPart.Issues.Add(new IssueMessage { Message = "Some message 3" }); var reader = await YdbDataReader.CreateYdbDataReader(new MockAsyncEnumerator( - new List { successPart, failPart, nextFailPart }), statuses.Add); + new List { successPart, failPart, nextFailPart }), codes.Add); Assert.True(reader.Read()); - Assert.Equal(@"Status: Aborted, Issues: -[0] Fatal: Some message 1 -[0] Fatal: Some message 2 -[0] Fatal: Some message 2 -[0] Fatal: Some message 3 -", Assert.Throws(() => reader.Read()).Message); - Assert.Single(statuses); - Assert.Equal(StatusCode.Aborted, statuses[0].StatusCode); + Assert.Equal(""" + Status: Aborted, Issues: + [0] Fatal: Some message 1 + [0] Fatal: Some message 2 + [0] Fatal: Some message 2 + [0] Fatal: Some message 3 + """, Assert.Throws(() => reader.Read()).Message); + Assert.Single(codes); + Assert.Equal(StatusCode.Aborted, codes[0]); } private static MockAsyncEnumerator EnumeratorSuccess(int size = 1, diff --git a/src/Ydb.Sdk/test/Ydb.Sdk.Topic.Tests/ReaderUnitTests.cs b/src/Ydb.Sdk/test/Ydb.Sdk.Topic.Tests/ReaderUnitTests.cs index 6c7ec2e9..4052e84c 100644 --- a/src/Ydb.Sdk/test/Ydb.Sdk.Topic.Tests/ReaderUnitTests.cs +++ b/src/Ydb.Sdk/test/Ydb.Sdk.Topic.Tests/ReaderUnitTests.cs @@ -5,6 +5,7 @@ using Moq; using Xunit; using Ydb.Issue; +using Ydb.Sdk.Ado; using Ydb.Sdk.Services.Topic; using Ydb.Sdk.Services.Topic.Reader; using Ydb.Topic; @@ -76,7 +77,7 @@ public async Task Initialize_WhenFailWriteMessage_ShouldRetryInitializeAndReadTh var tcsCommitMessage = new TaskCompletionSource(); _mockStream.SetupSequence(stream => stream.Write(It.IsAny())) - .ThrowsAsync(new Driver.TransportException(new RpcException(Grpc.Core.Status.DefaultCancelled))) + .ThrowsAsync(new YdbException(new RpcException(Grpc.Core.Status.DefaultCancelled))) .Returns(Task.CompletedTask) .Returns(Task.CompletedTask) .Returns(() => @@ -186,7 +187,7 @@ public async Task Initialize_WhenFailMoveNextAsync_ShouldRetryInitializeAndReadT }); _mockStream.SetupSequence(stream => stream.MoveNextAsync()) - .ThrowsAsync(new Driver.TransportException(new RpcException(Grpc.Core.Status.DefaultCancelled))) + .ThrowsAsync(new YdbException(new RpcException(Grpc.Core.Status.DefaultCancelled))) .ReturnsAsync(true) .ReturnsAsync(true) .Returns(new ValueTask(tcsMoveNext.Task)) @@ -374,9 +375,9 @@ public async Task Initialize_WhenInitResponseStatusIsNotRetryable_ShouldThrowRea SubscribeSettings = { new SubscribeSettings("/topic") } }.Build(); - Assert.Equal("Initialization failed: Status: SchemeError, Issues:\n[0] Fatal: Topic not found\n", + Assert.Equal("Initialization failed! Status: SchemeError, Issues:\n[0] Fatal: Topic not found", (await Assert.ThrowsAsync(async () => await reader.ReadAsync())).Message); - Assert.Equal("Initialization failed: Status: SchemeError, Issues:\n[0] Fatal: Topic not found\n", + Assert.Equal("Initialization failed! Status: SchemeError, Issues:\n[0] Fatal: Topic not found", (await Assert.ThrowsAsync(async () => await reader.ReadBatchAsync())).Message); _mockStream.Verify(stream => stream.Write(It.Is(msg => @@ -441,7 +442,7 @@ public async Task _mockStream.SetupSequence(stream => stream.MoveNextAsync()) .ReturnsAsync(true) - .ThrowsAsync(new Driver.TransportException(new RpcException(Grpc.Core.Status.DefaultCancelled))) + .ThrowsAsync(new YdbException(new RpcException(Grpc.Core.Status.DefaultCancelled))) .ReturnsAsync(true) .ReturnsAsync(true) .Returns(new ValueTask(tcsMoveNext.Task)) @@ -550,7 +551,7 @@ public async Task { tcsMoveNextFirst.SetResult(false); - return new Driver.TransportException(new RpcException(Grpc.Core.Status.DefaultCancelled)); + return new YdbException(new RpcException(Grpc.Core.Status.DefaultCancelled)); }) .Returns(Task.CompletedTask) .Returns(Task.CompletedTask) @@ -715,7 +716,7 @@ public async Task }) .Throws(() => { - var error = new Driver.TransportException(new RpcException(Grpc.Core.Status.DefaultCancelled)); + var error = new YdbException(new RpcException(Grpc.Core.Status.DefaultCancelled)); tcsMoveNextSecond.TrySetException(error); return error; @@ -850,7 +851,7 @@ public async Task .Returns(Task.CompletedTask) .Throws(() => { - var error = new Driver.TransportException(new RpcException(Grpc.Core.Status.DefaultCancelled)); + var error = new YdbException(new RpcException(Grpc.Core.Status.DefaultCancelled)); tcsMoveNextSecond.TrySetException(error); return error; @@ -1027,7 +1028,7 @@ public async Task .Returns(() => { tcsMoveNextSecond.TrySetException( - new Driver.TransportException(new RpcException(Grpc.Core.Status.DefaultCancelled))); + new YdbException(new RpcException(Grpc.Core.Status.DefaultCancelled))); return Task.CompletedTask; }) @@ -1162,7 +1163,7 @@ public async Task .Returns(() => { tcsMoveNextSecond.TrySetException( - new Driver.TransportException(new RpcException(Grpc.Core.Status.DefaultCancelled))); + new YdbException(new RpcException(Grpc.Core.Status.DefaultCancelled))); return Task.CompletedTask; }) @@ -1369,7 +1370,7 @@ public async Task ReadAsync_WhenFailDeserializer_ThrowReaderExceptionAndInvokeRe var tcsMoveNext = new TaskCompletionSource(); _mockStream.SetupSequence(stream => stream.Write(It.IsAny())) - .ThrowsAsync(new Driver.TransportException(new RpcException(Grpc.Core.Status.DefaultCancelled))) + .ThrowsAsync(new YdbException(new RpcException(Grpc.Core.Status.DefaultCancelled))) .Returns(Task.CompletedTask) .Returns(Task.CompletedTask) .Returns(() => diff --git a/src/Ydb.Sdk/test/Ydb.Sdk.Topic.Tests/WriterIntegrationTests.cs b/src/Ydb.Sdk/test/Ydb.Sdk.Topic.Tests/WriterIntegrationTests.cs index 84987be8..1feeb78e 100644 --- a/src/Ydb.Sdk/test/Ydb.Sdk.Topic.Tests/WriterIntegrationTests.cs +++ b/src/Ydb.Sdk/test/Ydb.Sdk.Topic.Tests/WriterIntegrationTests.cs @@ -39,7 +39,7 @@ public async Task WriteAsync_WhenTopicNotFound_ReturnNotFoundException() { ProducerId = "producerId" }.Build(); Assert.Contains( - $"Initialization failed: Status: SchemeError, Issues:\n[500017] Error: no path 'local/{_topicName + "_not_found"}'", + $"Initialization failed! Status: SchemeError, Issues:\n[500017] Error: no path 'local/{_topicName + "_not_found"}'", (await Assert.ThrowsAsync(() => writer.WriteAsync("hello world"))).Message ); } diff --git a/src/Ydb.Sdk/test/Ydb.Sdk.Topic.Tests/WriterUnitTests.cs b/src/Ydb.Sdk/test/Ydb.Sdk.Topic.Tests/WriterUnitTests.cs index 85a72c3e..eb5e7fab 100644 --- a/src/Ydb.Sdk/test/Ydb.Sdk.Topic.Tests/WriterUnitTests.cs +++ b/src/Ydb.Sdk/test/Ydb.Sdk.Topic.Tests/WriterUnitTests.cs @@ -3,6 +3,7 @@ using Moq.Language; using Xunit; using Ydb.Issue; +using Ydb.Sdk.Ado; using Ydb.Sdk.Services.Topic; using Ydb.Sdk.Services.Topic.Writer; using Ydb.Topic; @@ -112,7 +113,7 @@ public async Task Initialize_WhenStreamClosedByServer_ShouldRetryInitializeAndRe Mock:1> (stream): - IBidirectionalStream.Write({ "initRequest": { "path": "/topic-3", "producerId": "producerId" } }) <- Driver.TransportException + IBidirectionalStream.Write({ "initRequest": { "path": "/topic-3", "producerId": "producerId" } }) <- YdbException IBidirectionalStream.Write({ "initRequest": { "path": "/topic-3", "producerId": "producerId" } }) IBidirectionalStream.MoveNextAsync() <- return true IBidirectionalStream.Current @@ -126,7 +127,7 @@ public async Task Initialize_WhenFailWriteMessage_ShouldRetryInitializeAndReturn { var taskNextComplete = new TaskCompletionSource(); _mockStream.SetupSequence(stream => stream.Write(It.IsAny())) - .ThrowsAsync(new Driver.TransportException(new RpcException(Grpc.Core.Status.DefaultCancelled))) + .ThrowsAsync(new YdbException(new RpcException(Grpc.Core.Status.DefaultCancelled))) .Returns(Task.CompletedTask) .Returns(() => { @@ -180,7 +181,7 @@ public async Task Initialize_WhenFailMoveNextAsync_ShouldRetryInitializeAndRetur return Task.CompletedTask; }); _mockStream.SetupSequence(stream => stream.MoveNextAsync()) - .ThrowsAsync(new Driver.TransportException( + .ThrowsAsync(new YdbException( new RpcException(new Grpc.Core.Status(Grpc.Core.StatusCode.DeadlineExceeded, "Some message")))) .ReturnsAsync(true) .Returns(() => new ValueTask(taskNextComplete.Task)) @@ -294,9 +295,9 @@ public async Task await using var writer = new WriterBuilder(_mockIDriver.Object, "/topic-6") { ProducerId = "producerId" }.Build(); - Assert.Equal("Initialization failed: Status: SchemeError, Issues:\n[0] Fatal: Topic not found\n", + Assert.Equal("Initialization failed! Status: SchemeError, Issues:\n[0] Fatal: Topic not found", (await Assert.ThrowsAsync(() => writer.WriteAsync(123L))).Message); - Assert.Equal("Initialization failed: Status: SchemeError, Issues:\n[0] Fatal: Topic not found\n", + Assert.Equal("Initialization failed! Status: SchemeError, Issues:\n[0] Fatal: Topic not found", (await Assert.ThrowsAsync(() => writer.WriteAsync(1L))).Message); // check not attempt repeated!!! @@ -362,7 +363,7 @@ public async Task WriteAsync_WhenTransportExceptionOnWriteInWriterSession_Should .Throws(() => { moveTcs.SetResult(false); - return new Driver.TransportException(new RpcException(Grpc.Core.Status.DefaultCancelled)); + return new YdbException(new RpcException(Grpc.Core.Status.DefaultCancelled)); }) .Returns(Task.CompletedTask) .Returns(() => @@ -447,7 +448,7 @@ public async Task WriteAsync_WhenTransportExceptionOnProcessingWriteAck_ShouldRe }); _mockStream.SetupSequence(stream => stream.MoveNextAsync()) .ReturnsAsync(true) - .ThrowsAsync(new Driver.TransportException(new RpcException(Grpc.Core.Status.DefaultCancelled))) + .ThrowsAsync(new YdbException(new RpcException(Grpc.Core.Status.DefaultCancelled))) .ReturnsAsync(true) .Returns(() => new ValueTask(moveTcs.Task)) // retry init writer session .Returns(_lastMoveNext); @@ -930,7 +931,7 @@ public async Task DisposeAsync_WhenInFlightMessages_WaitingInFlightMessages() Assert.False(writeTask1.IsCompleted); Assert.False(disposedTask.IsCompleted); - writeTcs1.TrySetException(new Driver.TransportException( + writeTcs1.TrySetException(new YdbException( new RpcException(new Grpc.Core.Status(Grpc.Core.StatusCode.DeadlineExceeded, "Some message")))); Assert.Equal("Writer[TopicPath: /topic-16, ProducerId: producerId, Codec: Raw] is disposed", (await Assert.ThrowsAsync(() => writer.WriteAsync(12))).Message);