Skip to content

Commit 7dd5c88

Browse files
dev: Prepare ISession for switching implementation.
1 parent c9ed850 commit 7dd5c88

File tree

14 files changed

+109
-128
lines changed

14 files changed

+109
-128
lines changed

src/Ydb.Sdk/src/Ado/PoolManager.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
using System.Collections.Concurrent;
2+
using Ydb.Sdk.Ado.Session;
23
using Ydb.Sdk.Pool;
34
using Ydb.Sdk.Services.Query;
45

@@ -9,7 +10,7 @@ internal static class PoolManager
910
private static readonly SemaphoreSlim SemaphoreSlim = new(1); // async mutex
1011
private static readonly ConcurrentDictionary<string, SessionPool> Pools = new();
1112

12-
internal static async Task<Services.Query.Session> GetSession(
13+
internal static async Task<ISession> GetSession(
1314
YdbConnectionStringBuilder connectionString,
1415
CancellationToken cancellationToken
1516
)

src/Ydb.Sdk/src/Ado/Session/ISession.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ namespace Ydb.Sdk.Ado.Session;
66

77
internal interface ISession
88
{
9+
IDriver Driver { get; }
10+
911
bool IsBroken { get; }
1012

1113
ValueTask<IServerStream<ExecuteQueryResponsePart>> ExecuteQuery(

src/Ydb.Sdk/src/Ado/Session/ImplicitSession.cs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,12 @@ namespace Ydb.Sdk.Ado.Session;
66

77
internal class ImplicitSession : ISession
88
{
9-
private readonly IDriver _driver;
10-
119
public ImplicitSession(IDriver driver)
1210
{
13-
_driver = driver;
11+
Driver = driver;
1412
}
1513

14+
public IDriver Driver { get; }
1615
public bool IsBroken => false;
1716

1817
public ValueTask<IServerStream<ExecuteQueryResponsePart>> ExecuteQuery(
@@ -36,7 +35,7 @@ public ValueTask<IServerStream<ExecuteQueryResponsePart>> ExecuteQuery(
3635
};
3736
request.Parameters.Add(parameters.ToDictionary(p => p.Key, p => p.Value.GetProto()));
3837

39-
return _driver.ServerStreamCall(QueryService.ExecuteQueryMethod, request, settings);
38+
return Driver.ServerStreamCall(QueryService.ExecuteQueryMethod, request, settings);
4039
}
4140

4241
public Task CommitTransaction(string txId, CancellationToken cancellationToken = default) =>

src/Ydb.Sdk/src/Ado/Session/PoolingSession.cs

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,9 @@ internal class PoolingSession : IPoolingSession
2020
private static readonly TimeSpan DeleteSessionTimeout = TimeSpan.FromSeconds(5);
2121
private static readonly CreateSessionRequest CreateSessionRequest = new();
2222

23-
private readonly IDriver _driver;
2423
private readonly PoolingSessionSource _poolingSessionSource;
2524
private readonly ILogger<PoolingSession> _logger;
25+
private readonly CancellationTokenSource _attachStreamLifecycleCts = new();
2626

2727
private volatile bool _isBroken = true;
2828

@@ -31,6 +31,7 @@ internal class PoolingSession : IPoolingSession
3131
private string SessionId { get; set; } = string.Empty;
3232
private long NodeId { get; set; }
3333

34+
public IDriver Driver { get; }
3435
public bool IsBroken => _isBroken;
3536

3637
internal PoolingSession(
@@ -40,10 +41,10 @@ internal PoolingSession(
4041
ILogger<PoolingSession> logger
4142
)
4243
{
43-
_driver = driver;
4444
_poolingSessionSource = poolingSessionSource;
4545
_disableServerBalancer = disableServerBalancer;
4646
_logger = logger;
47+
Driver = driver;
4748
}
4849

4950
public ValueTask<IServerStream<ExecuteQueryResponsePart>> ExecuteQuery(
@@ -65,15 +66,15 @@ public ValueTask<IServerStream<ExecuteQueryResponsePart>> ExecuteQuery(
6566
};
6667
request.Parameters.Add(parameters.ToDictionary(p => p.Key, p => p.Value.GetProto()));
6768

68-
return _driver.ServerStreamCall(QueryService.ExecuteQueryMethod, request, settings);
69+
return Driver.ServerStreamCall(QueryService.ExecuteQueryMethod, request, settings);
6970
}
7071

7172
public async Task CommitTransaction(
7273
string txId,
7374
CancellationToken cancellationToken = default
7475
)
7576
{
76-
var response = await _driver.UnaryCall(
77+
var response = await Driver.UnaryCall(
7778
QueryService.CommitTransactionMethod,
7879
new CommitTransactionRequest { SessionId = SessionId, TxId = txId },
7980
new GrpcRequestSettings { CancellationToken = cancellationToken, NodeId = NodeId }
@@ -90,7 +91,7 @@ public async Task RollbackTransaction(
9091
CancellationToken cancellationToken = default
9192
)
9293
{
93-
var response = await _driver.UnaryCall(
94+
var response = await Driver.UnaryCall(
9495
QueryService.RollbackTransactionMethod,
9596
new RollbackTransactionRequest { SessionId = SessionId, TxId = txId },
9697
new GrpcRequestSettings { CancellationToken = cancellationToken, NodeId = NodeId }
@@ -126,7 +127,7 @@ public async Task Open(CancellationToken cancellationToken)
126127
requestSettings.ClientCapabilities.Add(SessionBalancer);
127128
}
128129

129-
var response = await _driver.UnaryCall(QueryService.CreateSessionMethod, CreateSessionRequest, requestSettings);
130+
var response = await Driver.UnaryCall(QueryService.CreateSessionMethod, CreateSessionRequest, requestSettings);
130131

131132
if (response.Status.IsNotSuccess())
132133
{
@@ -143,7 +144,7 @@ public async Task Open(CancellationToken cancellationToken)
143144
{
144145
try
145146
{
146-
using var stream = await _driver.ServerStreamCall(
147+
using var stream = await Driver.ServerStreamCall(
147148
QueryService.AttachSessionMethod,
148149
new AttachSessionRequest { SessionId = SessionId },
149150
new GrpcRequestSettings { NodeId = NodeId }
@@ -166,10 +167,12 @@ public async Task Open(CancellationToken cancellationToken)
166167

167168
completeTask.SetResult();
168169

170+
var lifecycleAttachToken = _attachStreamLifecycleCts.Token;
171+
169172
try
170173
{
171174
// ReSharper disable once MethodSupportsCancellation
172-
while (await stream.MoveNextAsync())
175+
while (await stream.MoveNextAsync(lifecycleAttachToken))
173176
{
174177
var sessionState = stream.Current;
175178

@@ -220,14 +223,10 @@ public async Task DeleteSession()
220223
{
221224
try
222225
{
223-
if (_isBroken)
224-
{
225-
return;
226-
}
227-
228226
_isBroken = true;
227+
_attachStreamLifecycleCts.CancelAfter(DeleteSessionTimeout);
229228

230-
var deleteSessionResponse = await _driver.UnaryCall(
229+
var deleteSessionResponse = await Driver.UnaryCall(
231230
QueryService.DeleteSessionMethod,
232231
new DeleteSessionRequest { SessionId = SessionId },
233232
new GrpcRequestSettings { TransportTimeout = DeleteSessionTimeout, NodeId = NodeId }

src/Ydb.Sdk/src/Ado/YdbAdoDefaultSettings.cs

Lines changed: 0 additions & 12 deletions
This file was deleted.

src/Ydb.Sdk/src/Ado/YdbCommand.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -206,8 +206,8 @@ protected override async Task<DbDataReader> ExecuteDbDataReaderAsync(CommandBeha
206206
preparedSql.Append(sql);
207207

208208
var execSettings = CommandTimeout > 0
209-
? new ExecuteQuerySettings { TransportTimeout = TimeSpan.FromSeconds(CommandTimeout) }
210-
: new ExecuteQuerySettings();
209+
? new GrpcRequestSettings { TransportTimeout = TimeSpan.FromSeconds(CommandTimeout) }
210+
: new GrpcRequestSettings();
211211

212212
var transaction = YdbConnection.CurrentTransaction;
213213

src/Ydb.Sdk/src/Ado/YdbConnection.cs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
using System.Data;
22
using System.Data.Common;
33
using System.Diagnostics.CodeAnalysis;
4+
using Ydb.Sdk.Ado.Session;
45
using Ydb.Sdk.Services.Query;
56
using static System.Data.IsolationLevel;
67

@@ -24,7 +25,7 @@ private YdbConnectionStringBuilder ConnectionStringBuilder
2425
[param: AllowNull] init => _connectionStringBuilder = value;
2526
}
2627

27-
internal Services.Query.Session Session
28+
internal ISession Session
2829
{
2930
get
3031
{
@@ -35,7 +36,7 @@ internal Services.Query.Session Session
3536
private set => _session = value;
3637
}
3738

38-
private Services.Query.Session _session = null!;
39+
private ISession _session = null!;
3940

4041
public YdbConnection()
4142
{
@@ -124,7 +125,7 @@ public override async Task CloseAsync()
124125
}
125126
finally
126127
{
127-
await _session.Release();
128+
_session.Close();
128129
}
129130
}
130131

@@ -152,7 +153,7 @@ internal void OnNotSuccessStatusCode(StatusCode code)
152153
{
153154
_session.OnNotSuccessStatusCode(code);
154155

155-
if (!_session.IsActive)
156+
if (_session.IsBroken)
156157
{
157158
ConnectionState = ConnectionState.Broken;
158159
}

src/Ydb.Sdk/src/Ado/YdbConnectionStringBuilder.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,15 +25,15 @@ public YdbConnectionStringBuilder(string connectionString)
2525
// Init default connection string
2626
private void InitDefaultValues()
2727
{
28-
_host = YdbAdoDefaultSettings.Host;
29-
_port = YdbAdoDefaultSettings.Port;
30-
_database = YdbAdoDefaultSettings.Database;
28+
_host = "localhost";
29+
_port = 2136;
30+
_database = "/local";
3131
_minSessionPool = 0;
3232
_maxSessionPool = SessionPoolDefaultSettings.MaxSessionPool;
3333
_createSessionTimeout = SessionPoolDefaultSettings.CreateSessionTimeoutSeconds;
3434
_sessionIdleTimeout = 300;
3535
_sessionPruningInterval = 10;
36-
_useTls = YdbAdoDefaultSettings.UseTls;
36+
_useTls = false;
3737
_connectTimeout = GrpcDefaultSettings.ConnectTimeoutSeconds;
3838
_keepAlivePingDelay = GrpcDefaultSettings.KeepAlivePingSeconds;
3939
_keepAlivePingTimeout = GrpcDefaultSettings.KeepAlivePingTimeoutSeconds;

src/Ydb.Sdk/src/Ado/YdbSchema.cs

Lines changed: 35 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,11 @@
33
using System.Globalization;
44
using Ydb.Scheme;
55
using Ydb.Scheme.V1;
6+
using Ydb.Sdk.Ado.Internal;
67
using Ydb.Sdk.Ado.Schema;
78
using Ydb.Sdk.Services.Table;
9+
using Ydb.Table;
10+
using Ydb.Table.V1;
811

912
namespace Ydb.Sdk.Ado;
1013

@@ -59,10 +62,28 @@ public static async Task<YdbTable> DescribeTable(
5962
{
6063
try
6164
{
62-
var describeResponse = await ydbConnection.Session
63-
.DescribeTable(WithSuffix(ydbConnection.Database) + tableName, describeTableSettings);
65+
describeTableSettings ??= new DescribeTableSettings();
6466

65-
return new YdbTable(tableName, describeResponse);
67+
var describeResponse = await ydbConnection.Session.Driver.UnaryCall(
68+
TableService.DescribeTableMethod,
69+
new DescribeTableRequest
70+
{
71+
Path = WithSuffix(ydbConnection.Database) + tableName,
72+
IncludeTableStats = describeTableSettings.IncludeTableStats,
73+
IncludePartitionStats = describeTableSettings.IncludePartitionStats,
74+
IncludeShardKeyBounds = describeTableSettings.IncludeShardKeyBounds
75+
},
76+
describeTableSettings
77+
);
78+
79+
if (describeResponse.Operation.Status.IsNotSuccess())
80+
{
81+
throw YdbException.FromServer(describeResponse.Operation.Status, describeResponse.Operation.Issues);
82+
}
83+
84+
var describeResult = describeResponse.Operation.Result.Unpack<DescribeTableResult>();
85+
86+
return new YdbTable(tableName, describeResult);
6687
}
6788
catch (YdbException e)
6889
{
@@ -241,31 +262,20 @@ private static async Task AppendDescribeTable(
241262
string? tableType,
242263
Action<YdbTable, string> appendInTable)
243264
{
244-
try
245-
{
246-
var describeResponse = await ydbConnection.Session
247-
.DescribeTable(WithSuffix(ydbConnection.Database) + tableName, describeTableSettings);
248-
var ydbTable = new YdbTable(tableName, describeResponse);
265+
var ydbTable = await DescribeTable(ydbConnection, tableName, describeTableSettings);
249266

250-
var type = ydbTable.IsSystem
251-
? "SYSTEM_TABLE"
252-
: ydbTable.Type switch
253-
{
254-
YdbTable.TableType.Table => "TABLE",
255-
YdbTable.TableType.ColumnTable => "COLUMN_TABLE",
256-
YdbTable.TableType.ExternalTable => "EXTERNAL_TABLE",
257-
_ => throw new ArgumentOutOfRangeException(nameof(tableType))
258-
};
259-
if (type.IsPattern(tableType))
267+
var type = ydbTable.IsSystem
268+
? "SYSTEM_TABLE"
269+
: ydbTable.Type switch
260270
{
261-
appendInTable(ydbTable, type);
262-
}
263-
}
264-
catch (YdbException e)
271+
YdbTable.TableType.Table => "TABLE",
272+
YdbTable.TableType.ColumnTable => "COLUMN_TABLE",
273+
YdbTable.TableType.ExternalTable => "EXTERNAL_TABLE",
274+
_ => throw new ArgumentOutOfRangeException(nameof(tableType))
275+
};
276+
if (type.IsPattern(tableType))
265277
{
266-
ydbConnection.OnNotSuccessStatusCode(e.Code);
267-
268-
throw;
278+
appendInTable(ydbTable, type);
269279
}
270280
}
271281

src/Ydb.Sdk/src/Pool/SessionPool.cs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,8 @@ internal async Task<TSession> GetSession(CancellationToken cancellationToken = d
5757
if (session != null) // not active
5858
{
5959
Logger.LogDebug("Session[{Id}] isn't active, creating new session", session.SessionId);
60+
61+
_ = DeleteSession(session);
6062
}
6163

6264
try
@@ -149,6 +151,10 @@ internal async ValueTask ReleaseSession(TSession session)
149151
{
150152
_idleSessions.Enqueue(session);
151153
}
154+
else
155+
{
156+
_ = DeleteSession(session);
157+
}
152158
}
153159
finally
154160
{
@@ -162,10 +168,7 @@ private async Task DeleteSession(TSession session)
162168
{
163169
try
164170
{
165-
if (session.IsActive)
166-
{
167-
await session.DeleteSession();
168-
}
171+
await session.DeleteSession();
169172
}
170173
catch (YdbException e)
171174
{

0 commit comments

Comments
 (0)