Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion src/Ydb.Sdk/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,10 @@
- Added 'x-ydb-client-pid' header to any RPC calls
- Fixed bug `Reader`: unhandled exception in `TryReadRequestBytes(long bytes)`.
- Handle `YdbException` on `DeleteSession`.
- Do not invoke `DeleteSession` if the session is not active.
- `YdbException`: Added cancellation token propagation support in `CommitAsync` and `RollbackAsync`.
- Deleted legacy exceptions: Driver.TransportException, StatusUnsuccessfulException and InitializationFailureException.
- Fixed bug: Unhandled exception System.Net.Http.HttpIOException has now been converted to YdbException ([grpc-dotnet issue](https://github.com/grpc/grpc-dotnet/issues/2638)).
- Added 'x-ydb-client-pid' header to any RPC calls.
- Added DisableServerBalancer option to ADO.NET session creation; default false.

## v0.20.1
Expand Down
58 changes: 58 additions & 0 deletions src/Ydb.Sdk/src/Ado/Internal/IssueMessageUtils.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
using System.Text;
using Ydb.Issue;

namespace Ydb.Sdk.Ado.Internal;

internal static class IssueMessageUtils
{
internal static string IssuesToString(this IReadOnlyList<IssueMessage> issues) => IssuesToString(issues, 0, 4);

private static string IssuesToString(IEnumerable<IssueMessage> issueMessages, int currentIndent, int indent) =>
string.Join(Environment.NewLine, issueMessages.Select(message =>
{
var sb = new StringBuilder();
sb.Append(' ', currentIndent);
sb.Append($"[{message.IssueCode}] ");

if (message.Position != null)
{
sb.Append(message.Position.PositionToString());
}

sb.Append($"{message.Severity.SeverityToString()}: ");
sb.Append(message.Message);

if (message.Issues.Count > 0)
{
sb.AppendLine();
sb.Append(IssuesToString(message.Issues, currentIndent + indent, indent));
}

return sb.ToString();
}));

private static string SeverityToString(this uint severity) => severity switch
{
0 => "Fatal",
1 => "Error",
2 => "Warning",
3 => "Info",
_ => $"Unknown SeverityCode {severity}"
};

private static string PositionToString(this IssueMessage.Types.Position position)
{
var sb = new StringBuilder();
sb.Append('(');

if (!string.IsNullOrEmpty(position.File))
{
sb.Append(position.File);
sb.Append(':');
}

sb.Append($"{position.Row}:{position.Column}");
sb.Append(") ");
return sb.ToString();
}
}
27 changes: 27 additions & 0 deletions src/Ydb.Sdk/src/Ado/Internal/StatusCodeUtils.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
using Ydb.Issue;

namespace Ydb.Sdk.Ado.Internal;

public static class StatusCodeUtils
{
internal static StatusCode Code(this Grpc.Core.Status rpcStatus) => rpcStatus.StatusCode switch
{
Grpc.Core.StatusCode.Unavailable => StatusCode.ClientTransportUnavailable,
Grpc.Core.StatusCode.DeadlineExceeded => StatusCode.ClientTransportTimeout,
Grpc.Core.StatusCode.ResourceExhausted => StatusCode.ClientTransportResourceExhausted,
Grpc.Core.StatusCode.Unimplemented => StatusCode.ClientTransportUnimplemented,
Grpc.Core.StatusCode.Cancelled => StatusCode.Cancelled,
_ => StatusCode.ClientTransportUnknown
};

internal static StatusCode Code(this StatusIds.Types.StatusCode statusCode) =>
Enum.IsDefined(typeof(StatusCode), (int)statusCode) ? (StatusCode)statusCode : StatusCode.Unavailable;

internal static bool IsNotSuccess(this StatusIds.Types.StatusCode code) =>
code != StatusIds.Types.StatusCode.Success;

internal static string ToMessage(this StatusCode statusCode, IReadOnlyList<IssueMessage> issueMessages) =>
issueMessages.Count == 0
? $"Status: {statusCode}"
: $"Status: {statusCode}, Issues:{Environment.NewLine}{issueMessages.IssuesToString()}";
}
2 changes: 1 addition & 1 deletion src/Ydb.Sdk/src/Ado/YdbCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ protected override async Task<DbDataReader> ExecuteDbDataReaderAsync(CommandBeha
var ydbDataReader = await YdbDataReader.CreateYdbDataReader(
await YdbConnection.Session
.ExecuteQuery(preparedSql.ToString(), ydbParameters, execSettings, transaction?.TransactionControl),
YdbConnection.OnStatus, transaction, cancellationToken
YdbConnection.OnNotSuccessStatusCode, transaction, cancellationToken
);

YdbConnection.LastReader = ydbDataReader;
Expand Down
23 changes: 3 additions & 20 deletions src/Ydb.Sdk/src/Ado/YdbConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -92,24 +92,7 @@ public override async Task OpenAsync(CancellationToken cancellationToken)
{
ThrowIfConnectionOpen();

try
{
Session = await PoolManager.GetSession(ConnectionStringBuilder, cancellationToken);
}
catch (Exception e)
{
throw e switch
{
OperationCanceledException => throw new YdbException(StatusCode.Cancelled,
$"The connection pool has been exhausted, either raise 'MaxSessionPool' " +
$"(currently {ConnectionStringBuilder.MaxSessionPool}) or 'CreateSessionTimeout' " +
$"(currently {ConnectionStringBuilder.CreateSessionTimeout} seconds) in your connection string.", e
),
Driver.TransportException transportException => new YdbException(transportException),
StatusUnsuccessfulException unsuccessfulException => new YdbException(unsuccessfulException.Status),
_ => e
};
}
Session = await PoolManager.GetSession(ConnectionStringBuilder, cancellationToken);

OnStateChange(ClosedToOpenEventArgs);

Expand Down Expand Up @@ -165,9 +148,9 @@ public override string ConnectionString

private ConnectionState ConnectionState { get; set; } = ConnectionState.Closed; // Invoke AsyncOpen()

internal void OnStatus(Status status)
internal void OnNotSuccessStatusCode(StatusCode code)
{
_session.OnStatus(status);
_session.OnNotSuccessStatusCode(code);

if (!_session.IsActive)
{
Expand Down
29 changes: 12 additions & 17 deletions src/Ydb.Sdk/src/Ado/YdbDataReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using Google.Protobuf.Collections;
using Ydb.Issue;
using Ydb.Query;
using Ydb.Sdk.Ado.Internal;
using Ydb.Sdk.Value;

namespace Ydb.Sdk.Ado;
Expand All @@ -11,7 +12,7 @@ public sealed class YdbDataReader : DbDataReader, IAsyncEnumerable<YdbDataRecord
private readonly IServerStream<ExecuteQueryResponsePart> _stream;
private readonly YdbTransaction? _ydbTransaction;
private readonly RepeatedField<IssueMessage> _issueMessagesInStream = new();
private readonly Action<Status> _onNotSuccessStatus;
private readonly Action<StatusCode> _onNotSuccessStatusCode;

private int _currentRowIndex = -1;
private long _resultSetIndex = -1;
Expand Down Expand Up @@ -54,22 +55,22 @@ private enum State

private YdbDataReader(
IServerStream<ExecuteQueryResponsePart> resultSetStream,
Action<Status> onNotSuccessStatus,
Action<StatusCode> onNotSuccessStatusCode,
YdbTransaction? ydbTransaction)
{
_stream = resultSetStream;
_onNotSuccessStatus = onNotSuccessStatus;
_onNotSuccessStatusCode = onNotSuccessStatusCode;
_ydbTransaction = ydbTransaction;
}

internal static async Task<YdbDataReader> CreateYdbDataReader(
IServerStream<ExecuteQueryResponsePart> resultSetStream,
Action<Status> onStatus,
Action<StatusCode> onNotSuccessStatusCode,
YdbTransaction? ydbTransaction = null,
CancellationToken cancellationToken = default
)
{
var ydbDataReader = new YdbDataReader(resultSetStream, onStatus, ydbTransaction);
var ydbDataReader = new YdbDataReader(resultSetStream, onNotSuccessStatusCode, ydbTransaction);
await ydbDataReader.Init(cancellationToken);

return ydbDataReader;
Expand Down Expand Up @@ -522,7 +523,7 @@ public override async Task CloseAsync()
return;
}

_onNotSuccessStatus(new Status(StatusCode.SessionBusy));
_onNotSuccessStatusCode(StatusCode.SessionBusy);
_stream.Dispose();

if (_ydbTransaction != null)
Expand Down Expand Up @@ -559,20 +560,14 @@ private async ValueTask<State> NextExecPart(CancellationToken cancellationToken)

_issueMessagesInStream.AddRange(part.Issues);

if (part.Status != StatusIds.Types.StatusCode.Success)
if (part.Status.IsNotSuccess())
{
OnFailReadStream();

while (await _stream.MoveNextAsync(cancellationToken))
{
_issueMessagesInStream.AddRange(_stream.Current.Issues);
}

var status = Status.FromProto(part.Status, _issueMessagesInStream);

_onNotSuccessStatus(status);

throw new YdbException(status);
throw YdbException.FromServer(part.Status, _issueMessagesInStream);
}

_currentResultSet = part.ResultSet?.FromProto();
Expand All @@ -592,13 +587,13 @@ private async ValueTask<State> NextExecPart(CancellationToken cancellationToken)

return State.NewResultSet;
}
catch (Driver.TransportException e)
catch (YdbException e)
{
OnFailReadStream();

_onNotSuccessStatus(e.Status);
_onNotSuccessStatusCode(e.Code);

throw new YdbException(e);
throw;
}
}

Expand Down
14 changes: 10 additions & 4 deletions src/Ydb.Sdk/src/Ado/YdbException.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
using System.Data.Common;
using Grpc.Core;
using Ydb.Issue;
using Ydb.Sdk.Ado.Internal;

namespace Ydb.Sdk.Ado;

Expand All @@ -8,14 +11,17 @@ internal YdbException(string message) : base(message)
{
}

internal YdbException(Driver.TransportException transportException)
: this(transportException.Status, transportException)
internal YdbException(RpcException e) : this(e.Status.Code(), "Transport RPC call error", e)
{
}

internal YdbException(Status status, Exception? innerException = null)
: this(status.StatusCode, status.ToString(), innerException)
internal static YdbException FromServer(StatusIds.Types.StatusCode statusCode, IReadOnlyList<IssueMessage> issues)
{
var code = statusCode.Code();

var message = code.ToMessage(issues);

return new YdbException(code, message);
}

internal YdbException(StatusCode statusCode, string message, Exception? innerException = null)
Expand Down
62 changes: 32 additions & 30 deletions src/Ydb.Sdk/src/Ado/YdbSchema.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
using Ydb.Scheme.V1;
using Ydb.Sdk.Ado.Schema;
using Ydb.Sdk.Services.Table;
using Ydb.Table;

namespace Ydb.Sdk.Ado;

Expand Down Expand Up @@ -63,24 +62,13 @@ public static async Task<YdbTable> DescribeTable(
var describeResponse = await ydbConnection.Session
.DescribeTable(WithSuffix(ydbConnection.Database) + tableName, describeTableSettings);

var status = Status.FromProto(describeResponse.Operation.Status, describeResponse.Operation.Issues);

if (status.IsNotSuccess)
{
ydbConnection.OnStatus(status);

throw new YdbException(status);
}

var describeRes = describeResponse.Operation.Result.Unpack<DescribeTableResult>();

return new YdbTable(tableName, describeRes);
return new YdbTable(tableName, describeResponse);
}
catch (Driver.TransportException e)
catch (YdbException e)
{
ydbConnection.OnStatus(e.Status);
ydbConnection.OnNotSuccessStatusCode(e.Code);

throw new YdbException(e);
throw;
}
}

Expand Down Expand Up @@ -253,19 +241,31 @@ private static async Task AppendDescribeTable(
string? tableType,
Action<YdbTable, string> appendInTable)
{
var ydbTable = await DescribeTable(ydbConnection, tableName, describeTableSettings);
var type = ydbTable.IsSystem
? "SYSTEM_TABLE"
: ydbTable.Type switch
try
{
var describeResponse = await ydbConnection.Session
.DescribeTable(WithSuffix(ydbConnection.Database) + tableName, describeTableSettings);
var ydbTable = new YdbTable(tableName, describeResponse);

var type = ydbTable.IsSystem
? "SYSTEM_TABLE"
: ydbTable.Type switch
{
YdbTable.TableType.Table => "TABLE",
YdbTable.TableType.ColumnTable => "COLUMN_TABLE",
YdbTable.TableType.ExternalTable => "EXTERNAL_TABLE",
_ => throw new ArgumentOutOfRangeException(nameof(tableType))
};
if (type.IsPattern(tableType))
{
YdbTable.TableType.Table => "TABLE",
YdbTable.TableType.ColumnTable => "COLUMN_TABLE",
YdbTable.TableType.ExternalTable => "EXTERNAL_TABLE",
_ => throw new ArgumentOutOfRangeException(nameof(tableType))
};
if (type.IsPattern(tableType))
appendInTable(ydbTable, type);
}
}
catch (YdbException e)
{
appendInTable(ydbTable, type);
ydbConnection.OnNotSuccessStatusCode(e.Code);

throw;
}
}

Expand Down Expand Up @@ -417,7 +417,7 @@ CancellationToken cancellationToken

if (status.IsNotSuccess)
{
throw new YdbException(status);
throw YdbException.FromServer(operation.Status, operation.Issues);
}

foreach (var entry in operation.Result.Unpack<ListDirectoryResult>().Children)
Expand Down Expand Up @@ -461,9 +461,11 @@ await SchemaObjects(

return ydbSchemaObjects;
}
catch (Driver.TransportException e)
catch (YdbException e)
{
throw new YdbException(e);
ydbConnection.OnNotSuccessStatusCode(e.Code);

throw;
}
}

Expand Down
Loading
Loading