Skip to content

Commit 73b6a8d

Browse files
updating
1 parent 7dd5c88 commit 73b6a8d

File tree

10 files changed

+33
-29
lines changed

10 files changed

+33
-29
lines changed

src/Ydb.Sdk/CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
- Optimization: On BadSession, do not invoke the `DeleteSession()` method.
2+
- Canceling AttachStream after calling the `DeleteSession` method.
3+
- Fixed bug: fixed issue where session was not deleted (`ClientTransportTimeout`).
14
- Fixed bug: Grpc.Core.StatusCode.Cancelled was mapped to server's Canceled status.
25
- ADO.NET: PoolingSessionSource 2.0 based on Npgsql pooling algorithm.
36
- Added new ADO.NET options:

src/Ydb.Sdk/src/Ado/Internal/SqlParser.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -232,7 +232,7 @@ StringBuilder yql
232232
return startToken; // rollback parse IN LIST
233233
}
234234

235-
yql.Append(listStartToken > startToken ? sql[startToken..listStartToken] : ' ');
235+
yql.Append(listStartToken > startToken ? sql[startToken .. listStartToken] : ' ');
236236
var paramListName = sqlParamsBuilder.AddListPrimitiveParams(findNameParams);
237237
yql.Append(paramListName);
238238

@@ -273,7 +273,7 @@ private static (string Name, int NextToken) ParseNameParam(string sql, int curTo
273273
throw new YdbException($"Have empty name parameter, invalid SQL [position: {prevToken}]");
274274
}
275275

276-
return ($"${sql[prevToken..curToken]}", curToken);
276+
return ($"${sql[prevToken .. curToken]}", curToken);
277277
}
278278

279279
private static bool IsSqlIdentifierChar(this char c) => char.IsLetterOrDigit(c) || c == '_';

src/Ydb.Sdk/src/Ado/Session/PoolingSession.cs

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,3 @@
1-
// This file contains session pooling algorithms adapted from Npgsql
2-
// Original source: https://github.com/npgsql/npgsql
3-
// Copyright (c) 2002-2025, Npgsql
4-
// Licence https://github.com/npgsql/npgsql?tab=PostgreSQL-1-ov-file
5-
61
using Microsoft.Extensions.Logging;
72
using Ydb.Query;
83
using Ydb.Query.V1;
@@ -25,6 +20,7 @@ internal class PoolingSession : IPoolingSession
2520
private readonly CancellationTokenSource _attachStreamLifecycleCts = new();
2621

2722
private volatile bool _isBroken = true;
23+
private volatile bool _isBadSession = false;
2824

2925
private readonly bool _disableServerBalancer;
3026

@@ -105,6 +101,8 @@ public async Task RollbackTransaction(
105101

106102
public void OnNotSuccessStatusCode(StatusCode statusCode)
107103
{
104+
_isBadSession = _isBadSession || statusCode is StatusCode.BadSession;
105+
108106
if (statusCode is
109107
StatusCode.BadSession or
110108
StatusCode.SessionBusy or
@@ -226,6 +224,12 @@ public async Task DeleteSession()
226224
_isBroken = true;
227225
_attachStreamLifecycleCts.CancelAfter(DeleteSessionTimeout);
228226

227+
if (_isBadSession)
228+
{
229+
return;
230+
}
231+
232+
_isBadSession = true;
229233
var deleteSessionResponse = await Driver.UnaryCall(
230234
QueryService.DeleteSessionMethod,
231235
new DeleteSessionRequest { SessionId = SessionId },

src/Ydb.Sdk/src/Ado/Session/PoolingSessionSource.cs

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,3 @@
1-
// This file contains session pooling algorithms adapted from Npgsql
2-
// Original source: https://github.com/npgsql/npgsql
3-
// Copyright (c) 2002-2025, Npgsql
4-
// Licence https://github.com/npgsql/npgsql?tab=PostgreSQL-1-ov-file
5-
61
using System.Diagnostics.CodeAnalysis;
72
using System.Runtime.CompilerServices;
83
using System.Threading.Channels;

src/Ydb.Sdk/src/Ado/YdbCommand.cs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
using System.Diagnostics.CodeAnalysis;
44
using System.Text;
55
using Ydb.Sdk.Ado.Internal;
6-
using Ydb.Sdk.Services.Query;
76
using Ydb.Sdk.Value;
87

98
namespace Ydb.Sdk.Ado;

src/Ydb.Sdk/src/Services/Query/QueryClient.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,8 @@ public Task<T> Stream<T>(string query, Func<ExecuteQueryStream, Task<T>> onStrea
6161
Dictionary<string, YdbValue>? parameters = null, TxMode txMode = TxMode.NoTx,
6262
ExecuteQuerySettings? settings = null) =>
6363
_sessionPool.ExecOnSession(async session => await onStream(new ExecuteQueryStream(
64-
await session.ExecuteQuery(query, parameters, settings, txMode.TransactionControl())))
64+
await session.ExecuteQuery(query, parameters ?? new Dictionary<string, YdbValue>(),
65+
settings ?? new GrpcRequestSettings(), txMode.TransactionControl())))
6566
);
6667

6768
public Task Stream(string query, Func<ExecuteQueryStream, Task> onStream,

src/Ydb.Sdk/src/Services/Query/QueryTx.cs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,13 @@ internal QueryTx(Session session, TxMode txMode)
2828
}
2929

3030
public async ValueTask<ExecuteQueryStream> Stream(string query, Dictionary<string, YdbValue>? parameters = null,
31-
bool commit = false, ExecuteQuerySettings? settings = null) =>
32-
new(
33-
await _session.ExecuteQuery(query, parameters, settings, TxControl(commit)), txId => TxId = txId
34-
);
31+
bool commit = false, ExecuteQuerySettings? settings = null) => new(await _session.ExecuteQuery(
32+
query,
33+
parameters ?? new Dictionary<string, YdbValue>(),
34+
settings ?? new GrpcRequestSettings(),
35+
TxControl(commit)
36+
), txId => TxId = txId
37+
);
3538

3639
public async Task<IReadOnlyList<Value.ResultSet.Row>> ReadAllRows(string query,
3740
Dictionary<string, YdbValue>? parameters = null, bool commit = false, ExecuteQuerySettings? settings = null)

src/Ydb.Sdk/src/Services/Query/SessionPool.cs

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,14 @@ internal sealed class SessionPool : SessionPool<Session>, IAsyncDisposable
1818
{
1919
private static readonly CreateSessionRequest CreateSessionRequest = new();
2020

21+
private readonly IDriver _driver;
2122
private readonly bool _disposingDriver;
2223
private readonly ILogger<Session> _loggerSession;
2324

24-
internal readonly IDriver Driver;
25-
2625
internal SessionPool(IDriver driver, SessionPoolConfig sessionPoolConfig)
2726
: base(driver.LoggerFactory.CreateLogger<SessionPool>(), sessionPoolConfig)
2827
{
29-
Driver = driver;
28+
_driver = driver;
3029
_disposingDriver = sessionPoolConfig.DisposeDriver;
3130
_loggerSession = driver.LoggerFactory.CreateLogger<Session>();
3231
}
@@ -45,7 +44,7 @@ protected override async Task<Session> CreateSession(
4544
requestSettings.ClientCapabilities.Add("session-balancer");
4645
}
4746

48-
var response = await Driver.UnaryCall(
47+
var response = await _driver.UnaryCall(
4948
QueryService.CreateSessionMethod,
5049
CreateSessionRequest,
5150
requestSettings
@@ -58,13 +57,13 @@ protected override async Task<Session> CreateSession(
5857
var sessionId = response.SessionId;
5958
var nodeId = response.NodeId;
6059

61-
var session = new Session(Driver, this, sessionId, nodeId, _loggerSession);
60+
var session = new Session(_driver, this, sessionId, nodeId, _loggerSession);
6261

6362
_ = Task.Run(async () =>
6463
{
6564
try
6665
{
67-
using var stream = await Driver.ServerStreamCall(
66+
using var stream = await _driver.ServerStreamCall(
6867
QueryService.AttachSessionMethod,
6968
new AttachSessionRequest { SessionId = sessionId },
7069
new GrpcRequestSettings { NodeId = nodeId }
@@ -137,7 +136,7 @@ protected override async Task<Session> CreateSession(
137136
return session;
138137
}
139138

140-
protected override ValueTask DisposeDriver() => _disposingDriver ? Driver.DisposeAsync() : default;
139+
protected override ValueTask DisposeDriver() => _disposingDriver ? _driver.DisposeAsync() : default;
141140
}
142141

143142
internal class Session : SessionBase<Session>, ISession
@@ -156,8 +155,8 @@ ILogger<Session> logger
156155
public IDriver Driver { get; }
157156

158157
public ValueTask<IServerStream<ExecuteQueryResponsePart>> ExecuteQuery(
159-
string query,
160-
Dictionary<string, YdbValue> parameters,
158+
string query,
159+
Dictionary<string, YdbValue> parameters,
161160
GrpcRequestSettings settings,
162161
TransactionControl? txControl)
163162
{

src/Ydb.Sdk/test/Ydb.Sdk.Ado.Benchmarks/SessionSourceBenchmark.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ internal class MockSessionFactory : IPoolingSessionFactory
9393
internal class PoolingMockSession(PoolingSessionSource source) : IPoolingSession
9494
{
9595
public IDriver Driver => throw new NotImplementedException();
96-
96+
9797
public bool IsBroken => false;
9898

9999
public void Close() => source.Return(this);

src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/Session/PoolingSessionTests.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -269,7 +269,7 @@ private TaskCompletionSource<bool> SetupAttachStream()
269269
{
270270
var tcsSecondMoveAttachStream = new TaskCompletionSource<bool>();
271271

272-
_mockAttachStream.SetupSequence(attachStream => attachStream.MoveNextAsync(CancellationToken.None))
272+
_mockAttachStream.SetupSequence(attachStream => attachStream.MoveNextAsync(It.IsAny<CancellationToken>()))
273273
.ReturnsAsync(true)
274274
.Returns(new ValueTask<bool>(tcsSecondMoveAttachStream.Task));
275275
_mockAttachStream.SetupSequence(attachStream => attachStream.Current)

0 commit comments

Comments
 (0)