Skip to content

Commit 81b1a86

Browse files
dev: Ydb.Sdk.Ado.Benchmarks & prepared support for new pooling sessions (#475)
1 parent 44be7d0 commit 81b1a86

File tree

23 files changed

+564
-50
lines changed

23 files changed

+564
-50
lines changed

src/EFCore.Ydb/test/EntityFrameworkCore.Ydb.FunctionalTests/Migrations/YdbMigrationsTest.cs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -392,11 +392,6 @@ await Test(_ => { }, builder =>
392392
}, model => Assert.Collection(
393393
Assert.Single(model.Tables, t => t.Name == "Contacts").Columns,
394394
// ReSharper disable once ParameterOnlyUsedForPreconditionCheck.Local
395-
c =>
396-
{
397-
Assert.Equal("MyComplex_MyNestedComplex_Foo", c.Name);
398-
Assert.True(c.IsNullable);
399-
},
400395
c => Assert.Equal("Id", c.Name),
401396
c => Assert.Equal("Discriminator", c.Name),
402397
c => Assert.Equal("Name", c.Name),
@@ -412,6 +407,11 @@ await Test(_ => { }, builder =>
412407
{
413408
Assert.Equal("MyComplex_MyNestedComplex_Bar", c.Name);
414409
Assert.True(c.IsNullable);
410+
},
411+
c =>
412+
{
413+
Assert.Equal("MyComplex_MyNestedComplex_Foo", c.Name);
414+
Assert.True(c.IsNullable);
415415
}));
416416

417417
AssertSql(

src/Ydb.Sdk/src/Ado/PoolManager.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ internal static class PoolManager
99
private static readonly SemaphoreSlim SemaphoreSlim = new(1); // async mutex
1010
private static readonly ConcurrentDictionary<string, SessionPool> Pools = new();
1111

12-
internal static async Task<Session> GetSession(
12+
internal static async Task<Services.Query.Session> GetSession(
1313
YdbConnectionStringBuilder connectionString,
1414
CancellationToken cancellationToken
1515
)
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
using Ydb.Query;
2+
using Ydb.Sdk.Value;
3+
using TransactionControl = Ydb.Query.TransactionControl;
4+
5+
namespace Ydb.Sdk.Ado.Session;
6+
7+
internal interface ISession
8+
{
9+
ValueTask<IServerStream<ExecuteQueryResponsePart>> ExecuteQuery(
10+
string query,
11+
Dictionary<string, YdbValue> parameters,
12+
GrpcRequestSettings settings,
13+
TransactionControl? txControl
14+
);
15+
16+
Task CommitTransaction(string txId, CancellationToken cancellationToken = default);
17+
18+
Task RollbackTransaction(string txId, CancellationToken cancellationToken = default);
19+
20+
void OnNotSuccessStatusCode(StatusCode code);
21+
22+
void Close();
23+
}
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
namespace Ydb.Sdk.Ado.Session;
2+
3+
internal interface ISessionSource<TSession> where TSession : ISession
4+
{
5+
ValueTask<TSession> OpenSession();
6+
7+
void Return(TSession session);
8+
}
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
using Ydb.Query;
2+
using Ydb.Query.V1;
3+
using Ydb.Sdk.Value;
4+
5+
namespace Ydb.Sdk.Ado.Session;
6+
7+
internal class ImplicitSession : ISession
8+
{
9+
private readonly IDriver _driver;
10+
11+
public ImplicitSession(IDriver driver)
12+
{
13+
_driver = driver;
14+
}
15+
16+
public ValueTask<IServerStream<ExecuteQueryResponsePart>> ExecuteQuery(
17+
string query,
18+
Dictionary<string, YdbValue> parameters,
19+
GrpcRequestSettings settings,
20+
TransactionControl? txControl
21+
)
22+
{
23+
if (txControl is not null && !txControl.CommitTx)
24+
{
25+
throw NotSupportedTransaction;
26+
}
27+
28+
var request = new ExecuteQueryRequest
29+
{
30+
ExecMode = ExecMode.Execute,
31+
QueryContent = new QueryContent { Text = query, Syntax = Syntax.YqlV1 },
32+
StatsMode = StatsMode.None,
33+
TxControl = txControl
34+
};
35+
request.Parameters.Add(parameters.ToDictionary(p => p.Key, p => p.Value.GetProto()));
36+
37+
return _driver.ServerStreamCall(QueryService.ExecuteQueryMethod, request, settings);
38+
}
39+
40+
public Task CommitTransaction(string txId, CancellationToken cancellationToken = default) =>
41+
throw NotSupportedTransaction;
42+
43+
public Task RollbackTransaction(string txId, CancellationToken cancellationToken = default) =>
44+
throw NotSupportedTransaction;
45+
46+
public void OnNotSuccessStatusCode(StatusCode code)
47+
{
48+
}
49+
50+
public void Close()
51+
{
52+
}
53+
54+
private static YdbException NotSupportedTransaction =>
55+
new(StatusCode.BadRequest, "Transactions are not supported in implicit sessions");
56+
}
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
namespace Ydb.Sdk.Ado.Session;
2+
3+
internal class PoolingSessionSource : ISessionSource<IPoolingSession>
4+
{
5+
public ValueTask<IPoolingSession> OpenSession() => throw new NotImplementedException();
6+
7+
public void Return(IPoolingSession session) => throw new NotImplementedException();
8+
}
9+
10+
internal interface IPoolingSession : ISession
11+
{
12+
bool IsActive { get; }
13+
14+
Task Open(CancellationToken cancellationToken);
15+
16+
Task DeleteSession();
17+
}
Lines changed: 240 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,240 @@
1+
using Microsoft.Extensions.Logging;
2+
using Ydb.Query;
3+
using Ydb.Query.V1;
4+
using Ydb.Sdk.Ado.Internal;
5+
using Ydb.Sdk.Value;
6+
using CommitTransactionRequest = Ydb.Query.CommitTransactionRequest;
7+
using TransactionControl = Ydb.Query.TransactionControl;
8+
9+
namespace Ydb.Sdk.Ado.Session;
10+
11+
internal class Session : IPoolingSession
12+
{
13+
private const string SessionBalancer = "session-balancer";
14+
15+
private static readonly TimeSpan DeleteSessionTimeout = TimeSpan.FromSeconds(5);
16+
private static readonly CreateSessionRequest CreateSessionRequest = new();
17+
18+
private readonly IDriver _driver;
19+
private readonly PoolingSessionSource _poolingSessionSource;
20+
private readonly YdbConnectionStringBuilder _settings;
21+
private readonly ILogger<Session> _logger;
22+
23+
private volatile bool _isActive;
24+
25+
private string SessionId { get; set; } = string.Empty;
26+
private long NodeId { get; set; }
27+
28+
public bool IsActive => _isActive;
29+
30+
internal Session(
31+
IDriver driver,
32+
PoolingSessionSource poolingSessionSource,
33+
YdbConnectionStringBuilder settings,
34+
ILogger<Session> logger
35+
)
36+
{
37+
_driver = driver;
38+
_poolingSessionSource = poolingSessionSource;
39+
_settings = settings;
40+
_logger = logger;
41+
}
42+
43+
public ValueTask<IServerStream<ExecuteQueryResponsePart>> ExecuteQuery(
44+
string query,
45+
Dictionary<string, YdbValue> parameters,
46+
GrpcRequestSettings settings,
47+
TransactionControl? txControl
48+
)
49+
{
50+
settings.NodeId = NodeId;
51+
52+
var request = new ExecuteQueryRequest
53+
{
54+
SessionId = SessionId,
55+
ExecMode = ExecMode.Execute,
56+
QueryContent = new QueryContent { Text = query, Syntax = Syntax.YqlV1 },
57+
StatsMode = StatsMode.None,
58+
TxControl = txControl
59+
};
60+
request.Parameters.Add(parameters.ToDictionary(p => p.Key, p => p.Value.GetProto()));
61+
62+
return _driver.ServerStreamCall(QueryService.ExecuteQueryMethod, request, settings);
63+
}
64+
65+
public async Task CommitTransaction(
66+
string txId,
67+
CancellationToken cancellationToken = default
68+
)
69+
{
70+
var response = await _driver.UnaryCall(
71+
QueryService.CommitTransactionMethod,
72+
new CommitTransactionRequest { SessionId = SessionId, TxId = txId },
73+
new GrpcRequestSettings { CancellationToken = cancellationToken, NodeId = NodeId }
74+
);
75+
76+
if (response.Status.IsNotSuccess())
77+
{
78+
throw YdbException.FromServer(response.Status, response.Issues);
79+
}
80+
}
81+
82+
public async Task RollbackTransaction(
83+
string txId,
84+
CancellationToken cancellationToken = default
85+
)
86+
{
87+
var response = await _driver.UnaryCall(
88+
QueryService.RollbackTransactionMethod,
89+
new RollbackTransactionRequest { SessionId = SessionId, TxId = txId },
90+
new GrpcRequestSettings { CancellationToken = cancellationToken, NodeId = NodeId }
91+
);
92+
93+
if (response.Status.IsNotSuccess())
94+
{
95+
throw YdbException.FromServer(response.Status, response.Issues);
96+
}
97+
}
98+
99+
public void OnNotSuccessStatusCode(StatusCode code)
100+
{
101+
if (code is
102+
StatusCode.Cancelled or
103+
StatusCode.BadSession or
104+
StatusCode.SessionBusy or
105+
StatusCode.InternalError or
106+
StatusCode.ClientTransportTimeout or
107+
StatusCode.Unavailable or
108+
StatusCode.ClientTransportUnavailable)
109+
{
110+
_logger.LogWarning("Session[{SessionId}] is deactivated. Reason StatusCode: {Code}", SessionId, code);
111+
112+
_isActive = false;
113+
}
114+
}
115+
116+
public async Task Open(CancellationToken cancellationToken)
117+
{
118+
var requestSettings = new GrpcRequestSettings { CancellationToken = cancellationToken };
119+
120+
if (!_settings.DisableServerBalancer)
121+
{
122+
requestSettings.ClientCapabilities.Add(SessionBalancer);
123+
}
124+
125+
var response = await _driver.UnaryCall(QueryService.CreateSessionMethod, CreateSessionRequest, requestSettings);
126+
127+
if (response.Status.IsNotSuccess())
128+
{
129+
throw YdbException.FromServer(response.Status, response.Issues);
130+
}
131+
132+
TaskCompletionSource completeTask = new();
133+
134+
SessionId = response.SessionId;
135+
NodeId = response.NodeId;
136+
137+
_ = Task.Run(async () =>
138+
{
139+
try
140+
{
141+
using var stream = await _driver.ServerStreamCall(
142+
QueryService.AttachSessionMethod,
143+
new AttachSessionRequest { SessionId = SessionId },
144+
new GrpcRequestSettings { NodeId = NodeId }
145+
);
146+
147+
if (!await stream.MoveNextAsync(cancellationToken))
148+
{
149+
// Session wasn't started!
150+
completeTask.SetException(new YdbException(StatusCode.Cancelled, "Attach stream is not started!"));
151+
152+
return;
153+
}
154+
155+
var initSessionState = stream.Current;
156+
157+
if (initSessionState.Status.IsNotSuccess())
158+
{
159+
throw YdbException.FromServer(initSessionState.Status, initSessionState.Issues);
160+
}
161+
162+
completeTask.SetResult();
163+
164+
try
165+
{
166+
// ReSharper disable once MethodSupportsCancellation
167+
while (await stream.MoveNextAsync())
168+
{
169+
var sessionState = stream.Current;
170+
171+
var statusCode = sessionState.Status.Code();
172+
173+
_logger.LogDebug(
174+
"Session[{SessionId}] was received the status from the attach stream: {StatusMessage}",
175+
SessionId, statusCode.ToMessage(sessionState.Issues));
176+
177+
OnNotSuccessStatusCode(statusCode);
178+
179+
if (!IsActive)
180+
{
181+
return;
182+
}
183+
}
184+
185+
_logger.LogDebug("Session[{SessionId}]: Attached stream is closed", SessionId);
186+
187+
// attach stream is closed
188+
}
189+
catch (YdbException e)
190+
{
191+
if (e.Code == StatusCode.Cancelled)
192+
{
193+
_logger.LogDebug("AttachStream is cancelled (possible grpcChannel is closing)");
194+
195+
return;
196+
}
197+
198+
_logger.LogWarning(e, "Session[{SessionId}] is deactivated by transport error", SessionId);
199+
}
200+
}
201+
catch (Exception e)
202+
{
203+
completeTask.SetException(e);
204+
}
205+
finally
206+
{
207+
_isActive = false;
208+
}
209+
}, cancellationToken);
210+
211+
await completeTask.Task;
212+
}
213+
214+
public async Task DeleteSession()
215+
{
216+
try
217+
{
218+
_isActive = false;
219+
220+
var deleteSessionResponse = await _driver.UnaryCall(
221+
QueryService.DeleteSessionMethod,
222+
new DeleteSessionRequest { SessionId = SessionId },
223+
new GrpcRequestSettings { TransportTimeout = DeleteSessionTimeout, NodeId = NodeId }
224+
);
225+
226+
if (deleteSessionResponse.Status.IsNotSuccess())
227+
{
228+
_logger.LogWarning("Failed to delete session[{SessionId}], {StatusMessage}", SessionId,
229+
deleteSessionResponse.Status.Code().ToMessage(deleteSessionResponse.Issues));
230+
}
231+
}
232+
catch (Exception e)
233+
{
234+
_logger.LogWarning(e, "Error occurred while deleting session[{SessionId}] (NodeId = {NodeId})",
235+
SessionId, NodeId);
236+
}
237+
}
238+
239+
public void Close() => _poolingSessionSource.Return(this);
240+
}

src/Ydb.Sdk/src/Ado/YdbConnection.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ private YdbConnectionStringBuilder ConnectionStringBuilder
2424
[param: AllowNull] init => _connectionStringBuilder = value;
2525
}
2626

27-
internal Session Session
27+
internal Services.Query.Session Session
2828
{
2929
get
3030
{
@@ -35,7 +35,7 @@ internal Session Session
3535
private set => _session = value;
3636
}
3737

38-
private Session _session = null!;
38+
private Services.Query.Session _session = null!;
3939

4040
public YdbConnection()
4141
{

0 commit comments

Comments
 (0)