Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion src/Ydb.Sdk/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,10 @@
- Added 'x-ydb-client-pid' header to any RPC calls
- Fixed bug `Reader`: unhandled exception in `TryReadRequestBytes(long bytes)`.
- Handle `YdbException` on `DeleteSession`.
- Do not invoke `DeleteSession` if the session is not active.
- `YdbException`: Added cancellation token propagation support in `CommitAsync` and `RollbackAsync`.
- Deleted legacy exceptions: Driver.TransportException, StatusUnsuccessfulException and InitializationFailureException.
- Fixed bug: Unhandled exception System.Net.Http.HttpIOException has now been converted to YdbException ([grpc-dotnet issue](https://github.com/grpc/grpc-dotnet/issues/2638)).
- Added 'x-ydb-client-pid' header to any RPC calls.
- Added DisableServerBalancer option to ADO.NET session creation; default false.

## v0.20.1
Expand Down
58 changes: 58 additions & 0 deletions src/Ydb.Sdk/src/Ado/Internal/IssueMessageUtils.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
using System.Text;
using Ydb.Issue;

namespace Ydb.Sdk.Ado.Internal;

internal static class IssueMessageUtils
{
internal static string IssuesToString(this IReadOnlyList<IssueMessage> issues) => IssuesToString(issues, 0, 4);

private static string IssuesToString(IEnumerable<IssueMessage> issueMessages, int currentIndent, int indent) =>
string.Join(Environment.NewLine, issueMessages.Select(message =>
{
var sb = new StringBuilder();
sb.Append(' ', currentIndent);
sb.Append($"[{message.IssueCode}] ");

if (message.Position != null)
{
sb.Append(message.Position.PositionToString());
}

sb.Append($"{message.Severity.SeverityToString()}: ");
sb.Append(message.Message);

if (message.Issues.Count > 0)
{
sb.AppendLine();
sb.Append(IssuesToString(message.Issues, currentIndent + indent, indent));
}

return sb.ToString();
}));

private static string SeverityToString(this uint severity) => severity switch
{
0 => "Fatal",
1 => "Error",
2 => "Warning",
3 => "Info",
_ => $"Unknown SeverityCode {severity}"
};

private static string PositionToString(this IssueMessage.Types.Position position)
{
var sb = new StringBuilder();
sb.Append('(');

if (!string.IsNullOrEmpty(position.File))
{
sb.Append(position.File);
sb.Append(':');
}

sb.Append($"{position.Row}:{position.Column}");
sb.Append(") ");
return sb.ToString();
}
}
27 changes: 27 additions & 0 deletions src/Ydb.Sdk/src/Ado/Internal/StatusCodeUtils.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
using Ydb.Issue;

namespace Ydb.Sdk.Ado.Internal;

public static class StatusCodeUtils
{
internal static StatusCode Code(this Grpc.Core.Status rpcStatus) => rpcStatus.StatusCode switch
{
Grpc.Core.StatusCode.Unavailable => StatusCode.ClientTransportUnavailable,
Grpc.Core.StatusCode.DeadlineExceeded => StatusCode.ClientTransportTimeout,
Grpc.Core.StatusCode.ResourceExhausted => StatusCode.ClientTransportResourceExhausted,
Grpc.Core.StatusCode.Unimplemented => StatusCode.ClientTransportUnimplemented,
Grpc.Core.StatusCode.Cancelled => StatusCode.Cancelled,
_ => StatusCode.ClientTransportUnknown
};

internal static StatusCode Code(this StatusIds.Types.StatusCode statusCode) =>
Enum.IsDefined(typeof(StatusCode), (int)statusCode) ? (StatusCode)statusCode : StatusCode.Unavailable;

internal static bool IsNotSuccess(this StatusIds.Types.StatusCode code) =>
code != StatusIds.Types.StatusCode.Success;

internal static string ToMessage(this StatusCode statusCode, IReadOnlyList<IssueMessage> issueMessages) =>
issueMessages.Count == 0
? $"Status: {statusCode}"
: $"Status: {statusCode}, Issues:{Environment.NewLine}{issueMessages.IssuesToString()}";
}
2 changes: 1 addition & 1 deletion src/Ydb.Sdk/src/Ado/YdbCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ protected override async Task<DbDataReader> ExecuteDbDataReaderAsync(CommandBeha
var ydbDataReader = await YdbDataReader.CreateYdbDataReader(
await YdbConnection.Session
.ExecuteQuery(preparedSql.ToString(), ydbParameters, execSettings, transaction?.TransactionControl),
YdbConnection.OnStatus, transaction, cancellationToken
YdbConnection.OnNotSuccessStatusCode, transaction, cancellationToken
);

YdbConnection.LastReader = ydbDataReader;
Expand Down
23 changes: 3 additions & 20 deletions src/Ydb.Sdk/src/Ado/YdbConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -92,24 +92,7 @@ public override async Task OpenAsync(CancellationToken cancellationToken)
{
ThrowIfConnectionOpen();

try
{
Session = await PoolManager.GetSession(ConnectionStringBuilder, cancellationToken);
}
catch (Exception e)
{
throw e switch
{
OperationCanceledException => throw new YdbException(StatusCode.Cancelled,
$"The connection pool has been exhausted, either raise 'MaxSessionPool' " +
$"(currently {ConnectionStringBuilder.MaxSessionPool}) or 'CreateSessionTimeout' " +
$"(currently {ConnectionStringBuilder.CreateSessionTimeout} seconds) in your connection string.", e
),
Driver.TransportException transportException => new YdbException(transportException),
StatusUnsuccessfulException unsuccessfulException => new YdbException(unsuccessfulException.Status),
_ => e
};
}
Session = await PoolManager.GetSession(ConnectionStringBuilder, cancellationToken);

OnStateChange(ClosedToOpenEventArgs);

Expand Down Expand Up @@ -165,9 +148,9 @@ public override string ConnectionString

private ConnectionState ConnectionState { get; set; } = ConnectionState.Closed; // Invoke AsyncOpen()

internal void OnStatus(Status status)
internal void OnNotSuccessStatusCode(StatusCode code)
{
_session.OnStatus(status);
_session.OnNotSuccessStatusCode(code);

if (!_session.IsActive)
{
Expand Down
29 changes: 12 additions & 17 deletions src/Ydb.Sdk/src/Ado/YdbDataReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using Google.Protobuf.Collections;
using Ydb.Issue;
using Ydb.Query;
using Ydb.Sdk.Ado.Internal;
using Ydb.Sdk.Value;

namespace Ydb.Sdk.Ado;
Expand All @@ -11,7 +12,7 @@ public sealed class YdbDataReader : DbDataReader, IAsyncEnumerable<YdbDataRecord
private readonly IServerStream<ExecuteQueryResponsePart> _stream;
private readonly YdbTransaction? _ydbTransaction;
private readonly RepeatedField<IssueMessage> _issueMessagesInStream = new();
private readonly Action<Status> _onNotSuccessStatus;
private readonly Action<StatusCode> _onNotSuccessStatusCode;

private int _currentRowIndex = -1;
private long _resultSetIndex = -1;
Expand Down Expand Up @@ -54,22 +55,22 @@ private enum State

private YdbDataReader(
IServerStream<ExecuteQueryResponsePart> resultSetStream,
Action<Status> onNotSuccessStatus,
Action<StatusCode> onNotSuccessStatusCode,
YdbTransaction? ydbTransaction)
{
_stream = resultSetStream;
_onNotSuccessStatus = onNotSuccessStatus;
_onNotSuccessStatusCode = onNotSuccessStatusCode;
_ydbTransaction = ydbTransaction;
}

internal static async Task<YdbDataReader> CreateYdbDataReader(
IServerStream<ExecuteQueryResponsePart> resultSetStream,
Action<Status> onStatus,
Action<StatusCode> onNotSuccessStatusCode,
YdbTransaction? ydbTransaction = null,
CancellationToken cancellationToken = default
)
{
var ydbDataReader = new YdbDataReader(resultSetStream, onStatus, ydbTransaction);
var ydbDataReader = new YdbDataReader(resultSetStream, onNotSuccessStatusCode, ydbTransaction);
await ydbDataReader.Init(cancellationToken);

return ydbDataReader;
Expand Down Expand Up @@ -522,7 +523,7 @@ public override async Task CloseAsync()
return;
}

_onNotSuccessStatus(new Status(StatusCode.SessionBusy));
_onNotSuccessStatusCode(StatusCode.SessionBusy);
_stream.Dispose();

if (_ydbTransaction != null)
Expand Down Expand Up @@ -559,20 +560,14 @@ private async ValueTask<State> NextExecPart(CancellationToken cancellationToken)

_issueMessagesInStream.AddRange(part.Issues);

if (part.Status != StatusIds.Types.StatusCode.Success)
if (part.Status.IsNotSuccess())
{
OnFailReadStream();

while (await _stream.MoveNextAsync(cancellationToken))
{
_issueMessagesInStream.AddRange(_stream.Current.Issues);
}

var status = Status.FromProto(part.Status, _issueMessagesInStream);

_onNotSuccessStatus(status);

throw new YdbException(status);
throw YdbException.FromServer(part.Status, _issueMessagesInStream);
}

_currentResultSet = part.ResultSet?.FromProto();
Expand All @@ -592,13 +587,13 @@ private async ValueTask<State> NextExecPart(CancellationToken cancellationToken)

return State.NewResultSet;
}
catch (Driver.TransportException e)
catch (YdbException e)
{
OnFailReadStream();

_onNotSuccessStatus(e.Status);
_onNotSuccessStatusCode(e.Code);

throw new YdbException(e);
throw;
}
}

Expand Down
14 changes: 10 additions & 4 deletions src/Ydb.Sdk/src/Ado/YdbException.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
using System.Data.Common;
using Grpc.Core;
using Ydb.Issue;
using Ydb.Sdk.Ado.Internal;

namespace Ydb.Sdk.Ado;

Expand All @@ -8,14 +11,17 @@ internal YdbException(string message) : base(message)
{
}

internal YdbException(Driver.TransportException transportException)
: this(transportException.Status, transportException)
internal YdbException(RpcException e) : this(e.Status.Code(), "Transport RPC call error", e)
{
}

internal YdbException(Status status, Exception? innerException = null)
: this(status.StatusCode, status.ToString(), innerException)
internal static YdbException FromServer(StatusIds.Types.StatusCode statusCode, IReadOnlyList<IssueMessage> issues)
{
var code = statusCode.Code();

var message = code.ToMessage(issues);

return new YdbException(code, message);
}

internal YdbException(StatusCode statusCode, string message, Exception? innerException = null)
Expand Down
62 changes: 32 additions & 30 deletions src/Ydb.Sdk/src/Ado/YdbSchema.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
using Ydb.Scheme.V1;
using Ydb.Sdk.Ado.Schema;
using Ydb.Sdk.Services.Table;
using Ydb.Table;

namespace Ydb.Sdk.Ado;

Expand Down Expand Up @@ -63,24 +62,13 @@ public static async Task<YdbTable> DescribeTable(
var describeResponse = await ydbConnection.Session
.DescribeTable(WithSuffix(ydbConnection.Database) + tableName, describeTableSettings);

var status = Status.FromProto(describeResponse.Operation.Status, describeResponse.Operation.Issues);

if (status.IsNotSuccess)
{
ydbConnection.OnStatus(status);

throw new YdbException(status);
}

var describeRes = describeResponse.Operation.Result.Unpack<DescribeTableResult>();

return new YdbTable(tableName, describeRes);
return new YdbTable(tableName, describeResponse);
}
catch (Driver.TransportException e)
catch (YdbException e)
{
ydbConnection.OnStatus(e.Status);
ydbConnection.OnNotSuccessStatusCode(e.Code);

throw new YdbException(e);
throw;
}
}

Expand Down Expand Up @@ -253,19 +241,31 @@ private static async Task AppendDescribeTable(
string? tableType,
Action<YdbTable, string> appendInTable)
{
var ydbTable = await DescribeTable(ydbConnection, tableName, describeTableSettings);
var type = ydbTable.IsSystem
? "SYSTEM_TABLE"
: ydbTable.Type switch
try
{
var describeResponse = await ydbConnection.Session
.DescribeTable(WithSuffix(ydbConnection.Database) + tableName, describeTableSettings);
var ydbTable = new YdbTable(tableName, describeResponse);

var type = ydbTable.IsSystem
? "SYSTEM_TABLE"
: ydbTable.Type switch
{
YdbTable.TableType.Table => "TABLE",
YdbTable.TableType.ColumnTable => "COLUMN_TABLE",
YdbTable.TableType.ExternalTable => "EXTERNAL_TABLE",
_ => throw new ArgumentOutOfRangeException(nameof(tableType))
};
if (type.IsPattern(tableType))
{
YdbTable.TableType.Table => "TABLE",
YdbTable.TableType.ColumnTable => "COLUMN_TABLE",
YdbTable.TableType.ExternalTable => "EXTERNAL_TABLE",
_ => throw new ArgumentOutOfRangeException(nameof(tableType))
};
if (type.IsPattern(tableType))
appendInTable(ydbTable, type);
}
}
catch (YdbException e)
{
appendInTable(ydbTable, type);
ydbConnection.OnNotSuccessStatusCode(e.Code);

throw;
}
}

Expand Down Expand Up @@ -417,7 +417,7 @@ CancellationToken cancellationToken

if (status.IsNotSuccess)
{
throw new YdbException(status);
throw YdbException.FromServer(operation.Status, operation.Issues);
}

foreach (var entry in operation.Result.Unpack<ListDirectoryResult>().Children)
Expand Down Expand Up @@ -461,9 +461,11 @@ await SchemaObjects(

return ydbSchemaObjects;
}
catch (Driver.TransportException e)
catch (YdbException e)
{
throw new YdbException(e);
ydbConnection.OnNotSuccessStatusCode(e.Code);

throw;
}
}

Expand Down
Loading
Loading