diff --git a/CHANGELOG.md b/CHANGELOG.md index 26ad8081..28fa216a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,16 @@ +- Supported in ADO.NET GetSchema(Async). CollectionNames: + * Tables + * TablesWithCollections + * DataSourceInformation + * MetaDataCollections + * Restrictions +- Rename field _onStatus -> _onNotSuccessStatus in YdbDataReader +- If session is not active, do not invoke DeleteNotActiveSession(session) +- AttachStream: connect stream using NodeId +- PoolManager: change pool properties on field +- Delete *Settings.DefaultInstance because it's a singleton object that's changed by tasks when NodeId is set +- DbConnection.Session.OnStatus(status) in YdbTransaction + ## v0.9.4 - Do not pessimize the node on Grpc.Core.StatusCode.Cancelled and Grpc.Core.StatusCode.DeadlineExceeded. - Dispose of WriterSession using dispose CancellationToken. diff --git a/src/Ydb.Sdk/src/Ado/PoolManager.cs b/src/Ydb.Sdk/src/Ado/PoolManager.cs index 36337772..406beb1f 100644 --- a/src/Ydb.Sdk/src/Ado/PoolManager.cs +++ b/src/Ydb.Sdk/src/Ado/PoolManager.cs @@ -6,8 +6,7 @@ namespace Ydb.Sdk.Ado; internal static class PoolManager { private static readonly SemaphoreSlim SemaphoreSlim = new(1); // async mutex - - private static ConcurrentDictionary Pools { get; } = new(); + private static readonly ConcurrentDictionary Pools = new(); internal static async Task GetSession(YdbConnectionStringBuilder connectionString) { diff --git a/src/Ydb.Sdk/src/Ado/YdbCommand.cs b/src/Ydb.Sdk/src/Ado/YdbCommand.cs index cec8d5db..2242435e 100644 --- a/src/Ydb.Sdk/src/Ado/YdbCommand.cs +++ b/src/Ydb.Sdk/src/Ado/YdbCommand.cs @@ -181,7 +181,7 @@ protected override async Task ExecuteDbDataReaderAsync(CommandBeha var execSettings = CommandTimeout > 0 ? new ExecuteQuerySettings { TransportTimeout = TimeSpan.FromSeconds(CommandTimeout) } - : ExecuteQuerySettings.DefaultInstance; + : new ExecuteQuerySettings(); var ydbDataReader = await YdbDataReader.CreateYdbDataReader(YdbConnection.Session.ExecuteQuery( preparedSql.ToString(), ydbParameters, execSettings, Transaction?.TransactionControl), diff --git a/src/Ydb.Sdk/src/Ado/YdbConnection.cs b/src/Ydb.Sdk/src/Ado/YdbConnection.cs index c14187ce..dc0e9826 100644 --- a/src/Ydb.Sdk/src/Ado/YdbConnection.cs +++ b/src/Ydb.Sdk/src/Ado/YdbConnection.cs @@ -164,6 +164,39 @@ protected override YdbCommand CreateDbCommand() return CreateDbCommand(); } + public override DataTable GetSchema() + { + return GetSchemaAsync().GetAwaiter().GetResult(); + } + + public override DataTable GetSchema(string collectionName) + { + return GetSchemaAsync(collectionName).GetAwaiter().GetResult(); + } + + public override DataTable GetSchema(string collectionName, string?[] restrictionValues) + { + return GetSchemaAsync(collectionName, restrictionValues).GetAwaiter().GetResult(); + } + + public override Task GetSchemaAsync(CancellationToken cancellationToken = default) + { + return GetSchemaAsync("MetaDataCollections", cancellationToken); + } + + public override Task GetSchemaAsync(string collectionName, CancellationToken cancellationToken = default) + { + return GetSchemaAsync(collectionName, new string[4], cancellationToken); + } + + public override Task GetSchemaAsync( + string collectionName, + string?[] restrictionValues, + CancellationToken cancellationToken = default) + { + return YdbSchema.GetSchemaAsync(this, collectionName, restrictionValues, cancellationToken); + } + internal void EnsureConnectionOpen() { if (ConnectionState == ConnectionState.Closed) diff --git a/src/Ydb.Sdk/src/Ado/YdbDataReader.cs b/src/Ydb.Sdk/src/Ado/YdbDataReader.cs index b0fea6c8..cf76ae72 100644 --- a/src/Ydb.Sdk/src/Ado/YdbDataReader.cs +++ b/src/Ydb.Sdk/src/Ado/YdbDataReader.cs @@ -11,7 +11,7 @@ public sealed class YdbDataReader : DbDataReader, IAsyncEnumerable _stream; private readonly YdbTransaction? _ydbTransaction; private readonly RepeatedField _issueMessagesInStream = new(); - private readonly Action _onStatus; + private readonly Action _onNotSuccessStatus; private int _currentRowIndex = -1; private long _resultSetIndex = -1; @@ -51,11 +51,11 @@ private interface IMetadata private YdbDataReader( IAsyncEnumerator resultSetStream, - Action onStatus, + Action onNotSuccessStatus, YdbTransaction? ydbTransaction) { _stream = resultSetStream; - _onStatus = onStatus; + _onNotSuccessStatus = onNotSuccessStatus; _ydbTransaction = ydbTransaction; } @@ -489,7 +489,7 @@ private async Task NextExecPart() var status = Status.FromProto(part.Status, _issueMessagesInStream); - _onStatus(status); + _onNotSuccessStatus(status); throw new YdbException(status); } @@ -515,7 +515,7 @@ private async Task NextExecPart() { OnFailReadStream(); - _onStatus(e.Status); + _onNotSuccessStatus(e.Status); throw new YdbException(e.Status); } diff --git a/src/Ydb.Sdk/src/Ado/YdbSchema.cs b/src/Ydb.Sdk/src/Ado/YdbSchema.cs new file mode 100644 index 00000000..19061c10 --- /dev/null +++ b/src/Ydb.Sdk/src/Ado/YdbSchema.cs @@ -0,0 +1,373 @@ +using System.Data; +using System.Data.Common; +using Ydb.Scheme; +using Ydb.Scheme.V1; +using Ydb.Sdk.Services.Table; +using Ydb.Table; + +namespace Ydb.Sdk.Ado; + +internal static class YdbSchema +{ + public static Task GetSchemaAsync( + YdbConnection ydbConnection, + string? collectionName, + string?[] restrictions, + CancellationToken cancellationToken = default) + { + ArgumentNullException.ThrowIfNull(collectionName); + if (collectionName.Length == 0) + throw new ArgumentException("Collection name cannot be empty.", nameof(collectionName)); + + return collectionName.ToUpperInvariant() switch + { + // Common Schema Collections + "METADATACOLLECTIONS" => Task.FromResult(GetMetaDataCollections()), + "DATASOURCEINFORMATION" => GetDataSourceInformation(ydbConnection), + "RESTRICTIONS" => Task.FromResult(GetRestrictions()), + + // Ydb specific Schema Collections + "TABLES" => GetTables(ydbConnection, restrictions, cancellationToken), + "TABLESWITHSTATS" => GetTablesWithStats(ydbConnection, restrictions, cancellationToken), + + _ => throw new ArgumentOutOfRangeException(nameof(collectionName), collectionName, + "Invalid collection name.") + }; + } + + private static async Task GetTables( + YdbConnection ydbConnection, + string?[] restrictions, + CancellationToken cancellationToken) + { + var table = new DataTable("Tables"); + table.Columns.Add("table_name", typeof(string)); + table.Columns.Add("table_type", typeof(string)); + + var tableName = restrictions[0]; + var tableType = restrictions[1]; + var database = ydbConnection.Database; + + if (tableName == null) // tableName isn't set + { + foreach (var tupleTable in await ListTables(ydbConnection.Session.Driver, + WithSuffix(database), database, tableType, cancellationToken)) + { + table.Rows.Add(tupleTable.TableName, tupleTable.TableType); + } + } + else + { + await AppendDescribeTable( + ydbConnection: ydbConnection, + describeTableSettings: new DescribeTableSettings { CancellationToken = cancellationToken }, + database: database, + tableName: tableName, + tableType: tableType, + (_, type) => { table.Rows.Add(tableName, type); }); + } + + return table; + } + + private static async Task GetTablesWithStats( + YdbConnection ydbConnection, + string?[] restrictions, + CancellationToken cancellationToken) + { + var table = new DataTable("TablesWithStats"); + table.Columns.Add("table_name", typeof(string)); + table.Columns.Add("table_type", typeof(string)); + table.Columns.Add("rows_estimate", typeof(ulong)); + table.Columns.Add("creation_time", typeof(DateTime)); + table.Columns.Add("modification_time", typeof(DateTime)); + + var tableName = restrictions[0]; + var tableType = restrictions[1]; + var database = ydbConnection.Database; + + if (tableName == null) // tableName isn't set + { + foreach (var tupleTable in await ListTables(ydbConnection.Session.Driver, + WithSuffix(database), database, tableType, cancellationToken)) + { + await AppendDescribeTable( + ydbConnection: ydbConnection, + describeTableSettings: new DescribeTableSettings { CancellationToken = cancellationToken } + .WithTableStats(), + database: database, + tableName: tupleTable.TableName, + tableType: tableType, + (describeTableResult, type) => + { + var row = table.Rows.Add(); + var tableStats = describeTableResult.TableStats; + + row["table_name"] = tupleTable.TableName; + row["table_type"] = type; + row["rows_estimate"] = tableStats.RowsEstimate; + row["creation_time"] = tableStats.CreationTime.ToDateTime(); + row["modification_time"] = (object?)tableStats.ModificationTime?.ToDateTime() ?? DBNull.Value; + }); + } + } + else + { + await AppendDescribeTable( + ydbConnection: ydbConnection, + describeTableSettings: new DescribeTableSettings { CancellationToken = cancellationToken } + .WithTableStats(), + database: database, + tableName: tableName, + tableType: tableType, + (describeTableResult, type) => + { + var row = table.Rows.Add(); + var tableStats = describeTableResult.TableStats; + + row["table_name"] = tableName; + row["table_type"] = type; + row["rows_estimate"] = tableStats.RowsEstimate; + row["creation_time"] = tableStats.CreationTime.ToDateTime(); + row["modification_time"] = (object?)tableStats.ModificationTime?.ToDateTime() ?? DBNull.Value; + }); + } + + return table; + } + + private static async Task AppendDescribeTable( + YdbConnection ydbConnection, + DescribeTableSettings describeTableSettings, + string database, + string tableName, + string? tableType, + Action appendInTable) + { + try + { + var describeResponse = await ydbConnection.Session + .DescribeTable(WithSuffix(database) + tableName, describeTableSettings); + + if (describeResponse.Operation.Status == StatusIds.Types.StatusCode.SchemeError) + { + // ignore scheme errors like path not found + return; + } + + var status = Status.FromProto(describeResponse.Operation.Status, describeResponse.Operation.Issues); + + if (status.IsNotSuccess) + { + ydbConnection.Session.OnStatus(status); + + throw new YdbException(status); + } + + var describeRes = describeResponse.Operation.Result.Unpack(); + + // ReSharper disable once SwitchExpressionHandlesSomeKnownEnumValuesWithExceptionInDefault + var type = describeRes.Self.Type switch + { + Entry.Types.Type.Table => tableName.IsSystem() ? "SYSTEM_TABLE" : "TABLE", + Entry.Types.Type.ColumnTable => "COLUMN_TABLE", + _ => throw new YdbException($"Unexpected entry type for Table: {describeRes.Self.Type}") + }; + + if (type.IsTableType(tableType)) + { + appendInTable(describeRes, type); + } + } + catch (Driver.TransportException e) + { + ydbConnection.Session.OnStatus(e.Status); + + throw new YdbException("Transport error on DescribeTable", e); + } + } + + private static async Task> ListTables( + Driver driver, + string databasePath, + string path, + string? tableType, + CancellationToken cancellationToken) + { + try + { + var fullPath = WithSuffix(path); + var tables = new List<(string, string)>(); + var response = await driver.UnaryCall( + SchemeService.ListDirectoryMethod, + new ListDirectoryRequest { Path = fullPath }, + new GrpcRequestSettings { CancellationToken = cancellationToken } + ); + + var operation = response.Operation; + var status = Status.FromProto(operation.Status, operation.Issues); + + if (status.IsNotSuccess) + { + throw new YdbException(status); + } + + foreach (var entry in operation.Result.Unpack().Children) + { + var tablePath = fullPath[databasePath.Length..] + entry.Name; + + switch (entry.Type) + { + case Entry.Types.Type.Table: + var type = tablePath.IsSystem() ? "SYSTEM_TABLE" : "TABLE"; + if (type.IsTableType(tableType)) + { + tables.Add((tablePath, type)); + } + + break; + case Entry.Types.Type.ColumnTable: + if ("COLUMN_TABLE".IsTableType(tableType)) + { + tables.Add((tablePath, "COLUMN_TABLE")); + } + + break; + case Entry.Types.Type.Directory: + tables.AddRange( + await ListTables(driver, databasePath, fullPath + entry.Name, tableType, cancellationToken) + ); + break; + case Entry.Types.Type.Unspecified: + case Entry.Types.Type.PersQueueGroup: + case Entry.Types.Type.Database: + case Entry.Types.Type.RtmrVolume: + case Entry.Types.Type.BlockStoreVolume: + case Entry.Types.Type.CoordinationNode: + case Entry.Types.Type.ColumnStore: + case Entry.Types.Type.Sequence: + case Entry.Types.Type.Replication: + case Entry.Types.Type.Topic: + default: + continue; + } + } + + return tables; + } + catch (Driver.TransportException e) + { + throw new YdbException("Transport error on ListDirectory", e); + } + } + + private static async Task GetDataSourceInformation(YdbConnection ydbConnection) + { + var ydbVersion = + (await new YdbCommand(ydbConnection) { CommandText = "SELECT Version();" }.ExecuteScalarAsync())! + .ToString(); + + var table = new DataTable("DataSourceInformation"); + var row = table.Rows.Add(); + + table.Columns.Add("CompositeIdentifierSeparatorPattern", typeof(string)); + table.Columns.Add("DataSourceProductName", typeof(string)); + table.Columns.Add("DataSourceProductVersion", typeof(string)); + table.Columns.Add("DataSourceProductVersionNormalized", typeof(string)); + table.Columns.Add("GroupByBehavior", typeof(GroupByBehavior)); + table.Columns.Add("IdentifierPattern", typeof(string)); + table.Columns.Add("IdentifierCase", typeof(IdentifierCase)); + table.Columns.Add("OrderByColumnsInSelect", typeof(bool)); + table.Columns.Add("ParameterMarkerFormat", typeof(string)); + table.Columns.Add("ParameterMarkerPattern", typeof(string)); + table.Columns.Add("ParameterNameMaxLength", typeof(int)); + table.Columns.Add("QuotedIdentifierPattern", typeof(string)); + table.Columns.Add("QuotedIdentifierCase", typeof(IdentifierCase)); + table.Columns.Add("ParameterNamePattern", typeof(string)); + table.Columns.Add("StatementSeparatorPattern", typeof(string)); + table.Columns.Add("StringLiteralPattern", typeof(string)); + table.Columns.Add("SupportedJoinOperators", typeof(SupportedJoinOperators)); + + row["CompositeIdentifierSeparatorPattern"] = "\\/"; + row["DataSourceProductName"] = "YDB"; + row["DataSourceProductVersion"] = row["DataSourceProductVersionNormalized"] = ydbVersion; + row["GroupByBehavior"] = GroupByBehavior.Unrelated; + row["IdentifierPattern"] = // copy-paste from MySQL and PostgreSQL + @"(^\`\p{Lo}\p{Lu}\p{Ll}_@#][\p{Lo}\p{Lu}\p{Ll}\p{Nd}@$#_]*$)|(^\`[^\`\0]|\`\`+\`$)|(^\"" + [^\""\0]|\""\""+\""$)"; + row["IdentifierCase"] = IdentifierCase.Insensitive; + row["OrderByColumnsInSelect"] = false; + row["QuotedIdentifierPattern"] = @"(([^\`]|\`\`)*)"; + ; + row["QuotedIdentifierCase"] = IdentifierCase.Sensitive; + row["StatementSeparatorPattern"] = ";"; + row["StringLiteralPattern"] = "'(([^']|'')*)'|'(([^\"]|\"\")*)'"; + row["SupportedJoinOperators"] = + SupportedJoinOperators.FullOuter | + SupportedJoinOperators.Inner | + SupportedJoinOperators.LeftOuter | + SupportedJoinOperators.RightOuter; + + row["ParameterMarkerFormat"] = "{0}"; + row["ParameterMarkerPattern"] = "(@[A-Za-z0-9_$#]*)"; + row["ParameterNameMaxLength"] = int.MaxValue; + row["ParameterNamePattern"] = + @"^[\p{Lo}\p{Lu}\p{Ll}\p{Lm}_@#][\p{Lo}\p{Lu}\p{Ll}\p{Lm}\p{Nd}\uff3f_@#\$]*(?=\s+|$)"; + + return table; + } + + private static DataTable GetMetaDataCollections() + { + var table = new DataTable("MetaDataCollections"); + table.Columns.Add("CollectionName", typeof(string)); + table.Columns.Add("NumberOfRestrictions", typeof(int)); + table.Columns.Add("NumberOfIdentifierParts", typeof(int)); + + // Common Schema Collections + table.Rows.Add("MetaDataCollections", 0, 0); + table.Rows.Add("DataSourceInformation", 0, 0); + table.Rows.Add("Restrictions", 0, 0); + table.Rows.Add("DataTypes", 0, 0); + table.Rows.Add("ReservedWords", 0, 0); + + // Ydb Specific Schema Collections + table.Rows.Add("Tables", 2, 1); + table.Rows.Add("TablesWithStats", 2, 1); + + return table; + } + + private static DataTable GetRestrictions() + { + var table = new DataTable("Restrictions"); + + table.Columns.Add("CollectionName", typeof(string)); + table.Columns.Add("RestrictionName", typeof(string)); + table.Columns.Add("RestrictionDefault", typeof(string)); + table.Columns.Add("RestrictionNumber", typeof(int)); + + table.Rows.Add("Tables", "Table", "TABLE_NAME", 1); + table.Rows.Add("Tables", "TableType", "TABLE_TYPE", 2); + table.Rows.Add("TablesWithStats", "Table", "TABLE_NAME", 1); + table.Rows.Add("TablesWithStats", "TableType", "TABLE_TYPE", 2); + + return table; + } + + private static string WithSuffix(string path) + { + return path.EndsWith('/') ? path : path + '/'; + } + + private static bool IsSystem(this string tablePath) + { + return tablePath.StartsWith(".sys/") + || tablePath.StartsWith(".sys_health/") + || tablePath.StartsWith(".sys_health_dev/"); + } + + private static bool IsTableType(this string tableType, string? expectedTableType) + { + return expectedTableType == null || expectedTableType.Equals(tableType, StringComparison.OrdinalIgnoreCase); + } +} diff --git a/src/Ydb.Sdk/src/Ado/YdbTransaction.cs b/src/Ydb.Sdk/src/Ado/YdbTransaction.cs index 7283fa75..8478e255 100644 --- a/src/Ydb.Sdk/src/Ado/YdbTransaction.cs +++ b/src/Ydb.Sdk/src/Ado/YdbTransaction.cs @@ -98,6 +98,8 @@ private async Task FinishTransaction(Func> finishMethod) { Failed = true; + DbConnection.Session.OnStatus(status); + throw new YdbException(status); } } @@ -105,6 +107,8 @@ private async Task FinishTransaction(Func> finishMethod) { Failed = true; + DbConnection.Session.OnStatus(e.Status); + throw new YdbException(e.Status); } } diff --git a/src/Ydb.Sdk/src/GrpcRequestSettings.cs b/src/Ydb.Sdk/src/GrpcRequestSettings.cs index 9931cca1..993bf41e 100644 --- a/src/Ydb.Sdk/src/GrpcRequestSettings.cs +++ b/src/Ydb.Sdk/src/GrpcRequestSettings.cs @@ -6,8 +6,6 @@ namespace Ydb.Sdk; public class GrpcRequestSettings { - internal static readonly GrpcRequestSettings DefaultInstance = new(); - public string TraceId { get; set; } = string.Empty; public TimeSpan TransportTimeout { get; set; } = TimeSpan.Zero; public ImmutableArray CustomClientHeaders { get; } = new(); diff --git a/src/Ydb.Sdk/src/Pool/SessionPool.cs b/src/Ydb.Sdk/src/Pool/SessionPool.cs index 271d99b3..48e4c054 100644 --- a/src/Ydb.Sdk/src/Pool/SessionPool.cs +++ b/src/Ydb.Sdk/src/Pool/SessionPool.cs @@ -43,8 +43,6 @@ internal async Task GetSession() if (session != null) // not active { Logger.LogDebug("Session[{Id}] isn't active, creating new session", session.SessionId); - - _ = DeleteNotActiveSession(session); } try @@ -62,8 +60,6 @@ internal async Task GetSession() protected abstract Task CreateSession(); - protected abstract Task DeleteSession(TSession session); - // TODO Retry policy and may be move to SessionPool method internal async Task ExecOnSession(Func> onSession, RetrySettings? retrySettings = null) { @@ -138,7 +134,7 @@ internal async ValueTask ReleaseSession(TSession session) { if (_disposed) { - await DeleteNotActiveSession(session); + await DeleteSession(session); await TryDriverDispose(_size - 1); return; @@ -150,7 +146,7 @@ internal async ValueTask ReleaseSession(TSession session) } else { - _ = DeleteNotActiveSession(session); + _ = DeleteSession(session); } } finally @@ -164,9 +160,9 @@ private void Release() _semaphore.Release(); } - private Task DeleteNotActiveSession(TSession session) + private Task DeleteSession(TSession session) { - return DeleteSession(session).ContinueWith(s => + return session.DeleteSession().ContinueWith(s => { Logger.LogDebug("Session[{id}] removed with status {status}", session.SessionId, s.Result); }); @@ -184,7 +180,7 @@ public async ValueTask DisposeAsync() var tasks = new List(); while (_idleSessions.TryDequeue(out var session)) // thread safe iteration { - tasks.Add(DeleteNotActiveSession(session)); + tasks.Add(DeleteSession(session)); } await Task.WhenAll(tasks); @@ -209,18 +205,17 @@ public abstract class SessionBase where T : SessionBase { private readonly SessionPool _sessionPool; private readonly ILogger _logger; + private readonly long _nodeId; public string SessionId { get; } - internal long NodeId { get; } - internal volatile bool IsActive = true; internal SessionBase(SessionPool sessionPool, string sessionId, long nodeId, ILogger logger) { _sessionPool = sessionPool; SessionId = sessionId; - NodeId = nodeId; + _nodeId = nodeId; _logger = logger; } @@ -245,4 +240,12 @@ internal ValueTask Release() { return _sessionPool.ReleaseSession((T)this); } + + internal TS MakeGrpcRequestSettings(TS settings) where TS : GrpcRequestSettings + { + settings.NodeId = _nodeId; + return settings; + } + + internal abstract Task DeleteSession(); } diff --git a/src/Ydb.Sdk/src/Services/Query/SessionPool.cs b/src/Ydb.Sdk/src/Services/Query/SessionPool.cs index 65815633..c70514d1 100644 --- a/src/Ydb.Sdk/src/Services/Query/SessionPool.cs +++ b/src/Ydb.Sdk/src/Services/Query/SessionPool.cs @@ -2,7 +2,16 @@ using Ydb.Query; using Ydb.Query.V1; using Ydb.Sdk.Pool; +using Ydb.Sdk.Services.Table; using Ydb.Sdk.Value; +using Ydb.Table; +using Ydb.Table.V1; +using CommitTransactionRequest = Ydb.Query.CommitTransactionRequest; +using CreateSessionRequest = Ydb.Query.CreateSessionRequest; +using DeleteSessionRequest = Ydb.Query.DeleteSessionRequest; +using DescribeTableResponse = Ydb.Table.DescribeTableResponse; +using RollbackTransactionRequest = Ydb.Query.RollbackTransactionRequest; +using TransactionControl = Ydb.Query.TransactionControl; namespace Ydb.Sdk.Services.Query; @@ -15,8 +24,6 @@ internal sealed class SessionPool : SessionPool, IAsyncDisposable TransportTimeout = TimeSpan.FromMinutes(2) }; - private static readonly GrpcRequestSettings AttachSessionSettings = new(); - private readonly Driver _driver; private readonly bool _disposingDriver; private readonly ILogger _loggerSession; @@ -31,21 +38,30 @@ internal SessionPool(Driver driver, int? maxSessionPool = null, bool disposingDr protected override async Task CreateSession() { - var response = await _driver.UnaryCall(QueryService.CreateSessionMethod, CreateSessionRequest, - CreateSessionSettings); + var response = await _driver.UnaryCall( + QueryService.CreateSessionMethod, + CreateSessionRequest, + CreateSessionSettings + ); Status.FromProto(response.Status, response.Issues).EnsureSuccess(); TaskCompletionSource completeTask = new(); - var session = new Session(_driver, this, response.SessionId, response.NodeId, _loggerSession); + var sessionId = response.SessionId; + var nodeId = response.NodeId; + + var session = new Session(_driver, this, sessionId, nodeId, _loggerSession); _ = Task.Run(async () => { try { - await using var stream = _driver.ServerStreamCall(QueryService.AttachSessionMethod, - new AttachSessionRequest { SessionId = session.SessionId }, AttachSessionSettings); + await using var stream = _driver.ServerStreamCall( + QueryService.AttachSessionMethod, + new AttachSessionRequest { SessionId = sessionId }, + new GrpcRequestSettings { NodeId = nodeId } + ); if (!await stream.MoveNextAsync()) { @@ -64,7 +80,7 @@ protected override async Task CreateSession() var sessionStateStatus = Status.FromProto(sessionState.Status, sessionState.Issues); Logger.LogDebug("Session[{SessionId}] was received the status from the attach stream: {Status}", - session.SessionId, sessionStateStatus); + sessionId, sessionStateStatus); session.OnStatus(sessionStateStatus); @@ -75,13 +91,13 @@ protected override async Task CreateSession() } } - Logger.LogDebug("Session[{SessionId}]: Attached stream is closed", session.SessionId); + Logger.LogDebug("Session[{SessionId}]: Attached stream is closed", sessionId); // attach stream is closed } catch (Driver.TransportException e) { - Logger.LogWarning(e, "Session[{SessionId}] is deactivated by transport error", session.SessionId); + Logger.LogWarning(e, "Session[{SessionId}] is deactivated by transport error", sessionId); } } catch (Driver.TransportException e) @@ -99,27 +115,6 @@ protected override async Task CreateSession() return session; } - protected override async Task DeleteSession(Session session) - { - try - { - var settings = new GrpcRequestSettings - { - TransportTimeout = TimeSpan.FromSeconds(5), - NodeId = session.NodeId - }; - - var deleteSessionResponse = await _driver.UnaryCall(QueryService.DeleteSessionMethod, - new DeleteSessionRequest { SessionId = session.SessionId }, settings); - - return Status.FromProto(deleteSessionResponse.Status, deleteSessionResponse.Issues); - } - catch (Driver.TransportException e) - { - return e.Status; - } - } - protected override ValueTask DisposeDriver() { return _disposingDriver ? _driver.DisposeAsync() : default; @@ -128,7 +123,7 @@ protected override ValueTask DisposeDriver() internal class Session : SessionBase { - private readonly Driver _driver; + internal Driver Driver { get; } internal Session( Driver driver, @@ -138,7 +133,7 @@ internal Session( ILogger logger ) : base(sessionPool, sessionId, nodeId, logger) { - _driver = driver; + Driver = driver; } internal ServerStream ExecuteQuery( @@ -148,8 +143,7 @@ internal ServerStream ExecuteQuery( TransactionControl? txControl) { parameters ??= new Dictionary(); - settings ??= ExecuteQuerySettings.DefaultInstance; - settings.NodeId = NodeId; + settings = MakeGrpcRequestSettings(settings ?? new ExecuteQuerySettings()); var request = new ExecuteQueryRequest { @@ -162,36 +156,66 @@ internal ServerStream ExecuteQuery( request.Parameters.Add(parameters.ToDictionary(p => p.Key, p => p.Value.GetProto())); - return _driver.ServerStreamCall(QueryService.ExecuteQueryMethod, request, settings); + return Driver.ServerStreamCall(QueryService.ExecuteQueryMethod, request, settings); } internal async Task CommitTransaction(string txId, GrpcRequestSettings? settings = null) { - settings ??= GrpcRequestSettings.DefaultInstance; - settings.NodeId = NodeId; + settings = MakeGrpcRequestSettings(settings ?? new GrpcRequestSettings()); - var response = await _driver.UnaryCall(QueryService.CommitTransactionMethod, new CommitTransactionRequest - { SessionId = SessionId, TxId = txId }, settings); + var response = await Driver.UnaryCall(QueryService.CommitTransactionMethod, + new CommitTransactionRequest { SessionId = SessionId, TxId = txId }, settings); - var status = Status.FromProto(response.Status, response.Issues); + return Status.FromProto(response.Status, response.Issues); + } - OnStatus(status); + internal async Task RollbackTransaction(string txId, GrpcRequestSettings? settings = null) + { + settings = MakeGrpcRequestSettings(settings ?? new GrpcRequestSettings()); - return status; + var response = await Driver.UnaryCall(QueryService.RollbackTransactionMethod, + new RollbackTransactionRequest { SessionId = SessionId, TxId = txId }, settings); + + return Status.FromProto(response.Status, response.Issues); } - internal async Task RollbackTransaction(string txId, GrpcRequestSettings? settings = null) + internal async Task DescribeTable(string path, DescribeTableSettings? settings = null) { - settings ??= GrpcRequestSettings.DefaultInstance; - settings.NodeId = NodeId; + settings = MakeGrpcRequestSettings(settings ?? new DescribeTableSettings()); - var response = await _driver.UnaryCall(QueryService.RollbackTransactionMethod, new RollbackTransactionRequest - { SessionId = SessionId, TxId = txId }, settings); + return await Driver.UnaryCall( + TableService.DescribeTableMethod, + new DescribeTableRequest + { + Path = path, + IncludeTableStats = settings.IncludeTableStats, + IncludePartitionStats = settings.IncludePartitionStats, + IncludeShardKeyBounds = settings.IncludeShardKeyBounds + }, + settings + ); + } - var status = Status.FromProto(response.Status, response.Issues); + internal override async Task DeleteSession() + { + try + { + IsActive = false; + + var settings = MakeGrpcRequestSettings(new GrpcRequestSettings + { TransportTimeout = TimeSpan.FromSeconds(5) }); - OnStatus(status); + var deleteSessionResponse = await Driver.UnaryCall( + QueryService.DeleteSessionMethod, + new DeleteSessionRequest { SessionId = SessionId }, + settings + ); - return status; + return Status.FromProto(deleteSessionResponse.Status, deleteSessionResponse.Issues); + } + catch (Driver.TransportException e) + { + return e.Status; + } } } diff --git a/src/Ydb.Sdk/src/Services/Query/Settings.cs b/src/Ydb.Sdk/src/Services/Query/Settings.cs index fe493c1c..70beb138 100644 --- a/src/Ydb.Sdk/src/Services/Query/Settings.cs +++ b/src/Ydb.Sdk/src/Services/Query/Settings.cs @@ -20,8 +20,6 @@ public enum Syntax public class ExecuteQuerySettings : GrpcRequestSettings { - internal new static readonly ExecuteQuerySettings DefaultInstance = new(); - public Syntax Syntax { get; set; } = Syntax.YqlV1; public bool ConcurrentResultSets { get; set; } } diff --git a/src/Ydb.Sdk/tests/Ado/YdbSchemaTests.cs b/src/Ydb.Sdk/tests/Ado/YdbSchemaTests.cs new file mode 100644 index 00000000..cf0a5155 --- /dev/null +++ b/src/Ydb.Sdk/tests/Ado/YdbSchemaTests.cs @@ -0,0 +1,124 @@ +using System.Data; +using Xunit; +using Ydb.Sdk.Ado; + +namespace Ydb.Sdk.Tests.Ado; + +public class YdbSchemaTests +{ + [Fact] + public async Task GetSchema_WhenTablesCollection_ReturnAllTables() + { + await using var ydbConnection = new YdbConnection(); + await ydbConnection.OpenAsync(); + + var table1 = $"a/b/{Utils.Net}"; + var table2 = $"a/{Utils.Net}"; + var table3 = $"{Utils.Net}"; + + var tableNames = new HashSet { table1, table2, table3 }; + + await new YdbCommand(ydbConnection) + { + CommandText = $@" +CREATE TABLE `{table1}` (a Int32, b Int32, PRIMARY KEY(a)); +CREATE TABLE `{table2}` (a Int32, b Int32, PRIMARY KEY(a)); +CREATE TABLE `{table3}` (a Int32, b Int32, PRIMARY KEY(a)); +" + }.ExecuteNonQueryAsync(); + + var table = await ydbConnection.GetSchemaAsync("Tables", new[] { null, "TABLE" }); + + foreach (DataRow row in table.Rows) + { + tableNames.Remove(row["table_name"].ToString()!); + } + + Assert.Empty(tableNames); + + var singleTable1 = await ydbConnection.GetSchemaAsync("Tables", new[] { table1, "TABLE" }); + Assert.Equal(1, singleTable1.Rows.Count); + Assert.Equal(table1, singleTable1.Rows[0]["table_name"].ToString()); + Assert.Equal("TABLE", singleTable1.Rows[0]["table_type"].ToString()); + + var singleTable2 = await ydbConnection.GetSchemaAsync("Tables", new[] { table2, null }); + Assert.Equal(1, singleTable2.Rows.Count); + Assert.Equal(table2, singleTable2.Rows[0]["table_name"].ToString()); + Assert.Equal("TABLE", singleTable2.Rows[0]["table_type"].ToString()); + + // not found case + var notFound = await ydbConnection.GetSchemaAsync("Tables", new[] { "not_found", null }); + Assert.Equal(0, notFound.Rows.Count); + + await new YdbCommand(ydbConnection) + { + CommandText = $@" +DROP TABLE `{table1}`; +DROP TABLE `{table2}`; +DROP TABLE `{table3}`;" + }.ExecuteNonQueryAsync(); + } + + [Fact] + public async Task GetSchema_WhenTablesWithStatsCollection_ReturnAllTables() + { + await using var ydbConnection = new YdbConnection(); + await ydbConnection.OpenAsync(); + + var table1 = $"a/b/{Utils.Net}_for_stats"; + var table2 = $"a/{Utils.Net}_for_stats"; + var table3 = $"{Utils.Net}_for_stats"; + + var tableNames = new HashSet { table1, table2, table3 }; + + await new YdbCommand(ydbConnection) + { + CommandText = $@" +CREATE TABLE `{table1}` (a Int32, b Int32, PRIMARY KEY(a)); +CREATE TABLE `{table2}` (a Int32, b Int32, PRIMARY KEY(a)); +CREATE TABLE `{table3}` (a Int32, b Int32, PRIMARY KEY(a)); +" + }.ExecuteNonQueryAsync(); + + var table = await ydbConnection.GetSchemaAsync("TablesWithStats", new[] { null, "TABLE" }); + + foreach (DataRow row in table.Rows) + { + tableNames.Remove(row["table_name"].ToString()!); + + Assert.Equal(0UL, row["rows_estimate"]); + Assert.NotNull(row["creation_time"]); + Assert.Equal(DBNull.Value, row["modification_time"]); + } + + Assert.Empty(tableNames); + + var singleTable1 = await ydbConnection.GetSchemaAsync("TablesWithStats", new[] { table1, "TABLE" }); + Assert.Equal(1, singleTable1.Rows.Count); + Assert.Equal(table1, singleTable1.Rows[0]["table_name"].ToString()); + Assert.Equal("TABLE", singleTable1.Rows[0]["table_type"].ToString()); + Assert.Equal(0UL, singleTable1.Rows[0]["rows_estimate"]); + Assert.NotNull(singleTable1.Rows[0]["creation_time"]); + Assert.Equal(DBNull.Value, singleTable1.Rows[0]["modification_time"]); + + var singleTable2 = await ydbConnection.GetSchemaAsync("TablesWithStats", new[] { table2, null }); + Assert.Equal(1, singleTable2.Rows.Count); + Assert.Equal(table2, singleTable2.Rows[0]["table_name"].ToString()); + Assert.Equal("TABLE", singleTable2.Rows[0]["table_type"].ToString()); + Assert.Equal(0UL, singleTable2.Rows[0]["rows_estimate"]); + Assert.NotNull(singleTable2.Rows[0]["creation_time"]); + Assert.Equal(DBNull.Value, singleTable2.Rows[0]["modification_time"]); + + // not found case + var notFound = await ydbConnection.GetSchemaAsync("Tables", new[] { "not_found", null }); + Assert.Equal(0, notFound.Rows.Count); + + await new YdbCommand(ydbConnection) + { + CommandText = $@" +DROP TABLE `{table1}`; +DROP TABLE `{table2}`; +DROP TABLE `{table3}`;" + }.ExecuteNonQueryAsync(); + } +} diff --git a/src/Ydb.Sdk/tests/Pool/SessionPoolTests.cs b/src/Ydb.Sdk/tests/Pool/SessionPoolTests.cs index 7dfcb1d2..d9ef7779 100644 --- a/src/Ydb.Sdk/tests/Pool/SessionPoolTests.cs +++ b/src/Ydb.Sdk/tests/Pool/SessionPoolTests.cs @@ -98,11 +98,6 @@ protected override Task CreateSession() return new TestSession(this); }))()); } - - protected override Task DeleteSession(TestSession session) - { - return Task.FromResult(Status.Success); - } } public class TestSession : SessionBase @@ -111,4 +106,9 @@ internal TestSession(SessionPool sessionPool) : base(sessionPool, "0", 0, Utils.GetLoggerFactory().CreateLogger()) { } + + internal override Task DeleteSession() + { + return Task.FromResult(new Status(StatusCode.Success)); + } }