Skip to content

Commit 246edcb

Browse files
feat: PoolingSessionBase
1 parent 3aa680f commit 246edcb

File tree

6 files changed

+116
-74
lines changed

6 files changed

+116
-74
lines changed

src/Ydb.Sdk/src/Ado/Session/PoolingSession.cs

Lines changed: 12 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -13,53 +13,40 @@
1313

1414
namespace Ydb.Sdk.Ado.Session;
1515

16-
internal class PoolingSession : ISession
16+
internal class PoolingSession : PoolingSessionBase
1717
{
1818
private const string SessionBalancer = "session-balancer";
1919

2020
private static readonly TimeSpan DeleteSessionTimeout = TimeSpan.FromSeconds(5);
2121
private static readonly CreateSessionRequest CreateSessionRequest = new();
2222

23-
private readonly PoolingSessionSource _poolingSessionSource;
2423
private readonly ILogger<PoolingSession> _logger;
24+
private readonly bool _disableServerBalancer;
25+
2526
private readonly CancellationTokenSource _attachStreamLifecycleCts = new();
2627

2728
private volatile bool _isBroken = true;
2829
private volatile bool _isBadSession;
2930

30-
private readonly bool _disableServerBalancer;
31-
3231
private string SessionId { get; set; } = string.Empty;
3332
private long NodeId { get; set; }
3433

35-
private int _state = (int)PoolingSessionState.In;
36-
37-
public IDriver Driver { get; }
38-
public bool IsBroken => _isBroken;
34+
public override IDriver Driver { get; }
35+
public override bool IsBroken => _isBroken;
3936

4037
internal PoolingSession(
4138
IDriver driver,
4239
PoolingSessionSource poolingSessionSource,
4340
bool disableServerBalancer,
4441
ILogger<PoolingSession> logger
45-
)
42+
) : base(poolingSessionSource)
4643
{
47-
_poolingSessionSource = poolingSessionSource;
4844
_disableServerBalancer = disableServerBalancer;
4945
_logger = logger;
5046
Driver = driver;
5147
}
5248

53-
internal bool CompareAndSet(PoolingSessionState expected, PoolingSessionState actual) =>
54-
Interlocked.CompareExchange(ref _state, (int)expected, (int)actual) == (int)expected;
55-
56-
internal void Set(PoolingSessionState state) => Interlocked.Exchange(ref _state, (int)state);
57-
58-
internal PoolingSessionState State => (PoolingSessionState)Volatile.Read(ref _state);
59-
60-
internal DateTime IdleStartTime { get; set; }
61-
62-
public ValueTask<IServerStream<ExecuteQueryResponsePart>> ExecuteQuery(
49+
public override ValueTask<IServerStream<ExecuteQueryResponsePart>> ExecuteQuery(
6350
string query,
6451
Dictionary<string, YdbValue> parameters,
6552
GrpcRequestSettings settings,
@@ -81,10 +68,7 @@ public ValueTask<IServerStream<ExecuteQueryResponsePart>> ExecuteQuery(
8168
return Driver.ServerStreamCall(QueryService.ExecuteQueryMethod, request, settings);
8269
}
8370

84-
public async Task CommitTransaction(
85-
string txId,
86-
CancellationToken cancellationToken = default
87-
)
71+
public override async Task CommitTransaction(string txId, CancellationToken cancellationToken = default)
8872
{
8973
var response = await Driver.UnaryCall(
9074
QueryService.CommitTransactionMethod,
@@ -98,10 +82,7 @@ public async Task CommitTransaction(
9882
}
9983
}
10084

101-
public async Task RollbackTransaction(
102-
string txId,
103-
CancellationToken cancellationToken = default
104-
)
85+
public override async Task RollbackTransaction(string txId, CancellationToken cancellationToken = default)
10586
{
10687
var response = await Driver.UnaryCall(
10788
QueryService.RollbackTransactionMethod,
@@ -115,7 +96,7 @@ public async Task RollbackTransaction(
11596
}
11697
}
11798

118-
public void OnNotSuccessStatusCode(StatusCode statusCode)
99+
public override void OnNotSuccessStatusCode(StatusCode statusCode)
119100
{
120101
_isBadSession = _isBadSession || statusCode is StatusCode.BadSession;
121102

@@ -132,7 +113,7 @@ StatusCode.ClientTransportTimeout or
132113
}
133114
}
134115

135-
public async Task Open(CancellationToken cancellationToken)
116+
internal override async Task Open(CancellationToken cancellationToken)
136117
{
137118
var requestSettings = new GrpcRequestSettings { CancellationToken = cancellationToken };
138119

@@ -233,7 +214,7 @@ public async Task Open(CancellationToken cancellationToken)
233214
await completeTask.Task;
234215
}
235216

236-
public async Task DeleteSession()
217+
internal override async Task DeleteSession()
237218
{
238219
try
239220
{
@@ -264,6 +245,4 @@ public async Task DeleteSession()
264245
SessionId, NodeId);
265246
}
266247
}
267-
268-
public void Close() => _poolingSessionSource.Return(this);
269248
}

src/Ydb.Sdk/src/Ado/Session/PoolingSessionFactory.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,6 @@ public PoolingSessionFactory(IDriver driver, YdbConnectionStringBuilder settings
1515
_logger = loggerFactory.CreateLogger<PoolingSession>();
1616
}
1717

18-
public PoolingSession NewSession(PoolingSessionSource source) =>
19-
new(_driver, source, _disableServerBalancer, _logger);
18+
public PoolingSessionBase NewSession(PoolingSessionSource source) =>
19+
new PoolingSession(_driver, source, _disableServerBalancer, _logger);
2020
}

src/Ydb.Sdk/src/Ado/Session/PoolingSessionSource.cs

Lines changed: 58 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,22 @@
11
using System.Collections.Concurrent;
2-
using System.ComponentModel;
32
using System.Diagnostics.CodeAnalysis;
43
using System.Runtime.CompilerServices;
5-
using System.Threading.Channels;
4+
using Ydb.Query;
5+
using Ydb.Sdk.Value;
66

77
namespace Ydb.Sdk.Ado.Session;
88

9-
internal sealed class PoolingSessionSource : ISessionSource<PoolingSession>
9+
internal sealed class PoolingSessionSource : ISessionSource<PoolingSessionBase>
1010
{
11-
private readonly ConcurrentStack<PoolingSession> _idleSessions = new();
12-
private readonly ConcurrentQueue<TaskCompletionSource<PoolingSession?>> _waiters = new();
11+
private readonly ConcurrentStack<PoolingSessionBase> _idleSessions = new();
12+
private readonly ConcurrentQueue<TaskCompletionSource<PoolingSessionBase?>> _waiters = new();
1313

1414
private readonly IPoolingSessionFactory _sessionFactory;
1515

1616
private readonly int _minSessionSize;
1717
private readonly int _maxSessionSize;
1818

19-
private readonly PoolingSession?[] _sessions;
19+
private readonly PoolingSessionBase?[] _sessions;
2020

2121
private readonly int _createSessionTimeout;
2222
private readonly TimeSpan _sessionIdleTimeout;
@@ -39,19 +39,19 @@ YdbConnectionStringBuilder settings
3939
$"Connection can't have 'Max Session Pool' {_maxSessionSize} under 'Min Session Pool' {_minSessionSize}");
4040
}
4141

42-
_sessions = new PoolingSession?[_maxSessionSize];
42+
_sessions = new PoolingSessionBase?[_maxSessionSize];
4343
_createSessionTimeout = settings.CreateSessionTimeout;
4444
_sessionIdleTimeout = TimeSpan.FromSeconds(settings.SessionIdleTimeout);
4545
_cleanerTimer = new Timer(CleanIdleSessions, this, _sessionIdleTimeout, _sessionIdleTimeout);
4646
}
4747

48-
public ValueTask<PoolingSession> OpenSession(CancellationToken cancellationToken = default) =>
48+
public ValueTask<PoolingSessionBase> OpenSession(CancellationToken cancellationToken = default) =>
4949
TryGetIdleSession(out var session)
50-
? new ValueTask<PoolingSession>(session)
50+
? new ValueTask<PoolingSessionBase>(session)
5151
: RentAsync(cancellationToken);
5252

5353
[MethodImpl(MethodImplOptions.AggressiveInlining)]
54-
private bool TryGetIdleSession([NotNullWhen(true)] out PoolingSession? session)
54+
private bool TryGetIdleSession([NotNullWhen(true)] out PoolingSessionBase? session)
5555
{
5656
while (_idleSessions.TryPop(out session))
5757
{
@@ -65,7 +65,7 @@ private bool TryGetIdleSession([NotNullWhen(true)] out PoolingSession? session)
6565
}
6666

6767
[MethodImpl(MethodImplOptions.AggressiveInlining)]
68-
private bool CheckIdleSession([NotNullWhen(true)] PoolingSession? session)
68+
private bool CheckIdleSession([NotNullWhen(true)] PoolingSessionBase? session)
6969
{
7070
if (session == null || session.State == PoolingSessionState.Clean)
7171
{
@@ -82,7 +82,7 @@ private bool CheckIdleSession([NotNullWhen(true)] PoolingSession? session)
8282
return session.CompareAndSet(PoolingSessionState.In, PoolingSessionState.Out);
8383
}
8484

85-
private async ValueTask<PoolingSession> RentAsync(CancellationToken cancellationToken)
85+
private async ValueTask<PoolingSessionBase> RentAsync(CancellationToken cancellationToken)
8686
{
8787
using var ctsGetSession = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
8888
if (_createSessionTimeout > 0)
@@ -97,7 +97,7 @@ private async ValueTask<PoolingSession> RentAsync(CancellationToken cancellation
9797
while (true)
9898
{
9999
var waiterTcs =
100-
new TaskCompletionSource<PoolingSession?>(TaskCreationOptions.RunContinuationsAsynchronously);
100+
new TaskCompletionSource<PoolingSessionBase?>(TaskCreationOptions.RunContinuationsAsynchronously);
101101
_waiters.Enqueue(waiterTcs);
102102
await using var _ = finalToken.Register(() => waiterTcs.TrySetCanceled(), useSynchronizationContext: false);
103103
session = await waiterTcs.Task.ConfigureAwait(false);
@@ -111,7 +111,7 @@ private async ValueTask<PoolingSession> RentAsync(CancellationToken cancellation
111111
}
112112
}
113113

114-
private async ValueTask<PoolingSession?> OpenNewSession(CancellationToken cancellationToken)
114+
private async ValueTask<PoolingSessionBase?> OpenNewSession(CancellationToken cancellationToken)
115115
{
116116
// As long as we're under max capacity, attempt to increase the session count and open a new session.
117117
for (var numSessions = _numSessions; numSessions < _maxSessionSize; numSessions = _numSessions)
@@ -156,7 +156,7 @@ private void WakeUpWaiter()
156156
waiter.TrySetResult(null); // wake up waiter!
157157
}
158158

159-
public void Return(PoolingSession session)
159+
public void Return(PoolingSessionBase session)
160160
{
161161
if (session.IsBroken)
162162
{
@@ -181,7 +181,7 @@ public void Return(PoolingSession session)
181181
WakeUpWaiter();
182182
}
183183

184-
private void CloseSession(PoolingSession session)
184+
private void CloseSession(PoolingSessionBase session)
185185
{
186186
var i = 0;
187187
for (; i < _maxSessionSize; i++)
@@ -224,7 +224,7 @@ private static void CleanIdleSessions(object? state)
224224

225225
internal interface IPoolingSessionFactory
226226
{
227-
PoolingSession NewSession(PoolingSessionSource source);
227+
PoolingSessionBase NewSession(PoolingSessionSource source);
228228
}
229229

230230
internal enum PoolingSessionState
@@ -233,3 +233,44 @@ internal enum PoolingSessionState
233233
Out,
234234
Clean
235235
}
236+
237+
internal abstract class PoolingSessionBase : ISession
238+
{
239+
private readonly PoolingSessionSource _source;
240+
241+
private int _state = (int)PoolingSessionState.In;
242+
243+
protected PoolingSessionBase(PoolingSessionSource source)
244+
{
245+
_source = source;
246+
}
247+
248+
internal bool CompareAndSet(PoolingSessionState expected, PoolingSessionState actual) =>
249+
Interlocked.CompareExchange(ref _state, (int)expected, (int)actual) == (int)expected;
250+
251+
internal void Set(PoolingSessionState state) => Interlocked.Exchange(ref _state, (int)state);
252+
253+
internal PoolingSessionState State => (PoolingSessionState)Volatile.Read(ref _state);
254+
255+
internal DateTime IdleStartTime { get; set; }
256+
257+
public abstract IDriver Driver { get; }
258+
259+
public abstract bool IsBroken { get; }
260+
261+
internal abstract Task Open(CancellationToken cancellationToken);
262+
263+
internal abstract Task DeleteSession();
264+
265+
public abstract ValueTask<IServerStream<ExecuteQueryResponsePart>> ExecuteQuery(string query,
266+
Dictionary<string, YdbValue> parameters, GrpcRequestSettings settings,
267+
TransactionControl? txControl);
268+
269+
public abstract Task CommitTransaction(string txId, CancellationToken cancellationToken = default);
270+
271+
public abstract Task RollbackTransaction(string txId, CancellationToken cancellationToken = default);
272+
273+
public abstract void OnNotSuccessStatusCode(StatusCode code);
274+
275+
public void Close() => _source.Return(this);
276+
}

src/Ydb.Sdk/src/Ado/YdbSchema.cs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -423,9 +423,7 @@ CancellationToken cancellationToken
423423
);
424424

425425
var operation = response.Operation;
426-
var status = Status.FromProto(operation.Status, operation.Issues);
427-
428-
if (status.IsNotSuccess)
426+
if (operation.Status.IsNotSuccess())
429427
{
430428
throw YdbException.FromServer(operation.Status, operation.Issues);
431429
}

src/Ydb.Sdk/test/Ydb.Sdk.Ado.Benchmarks/SessionSourceBenchmark.cs

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,5 +87,30 @@ public async Task SessionReuse_Pattern()
8787

8888
internal class MockSessionFactory : IPoolingSessionFactory
8989
{
90-
public PoolingSession NewSession(PoolingSessionSource source) => new PoolingSession(null, null, false, null);
90+
public PoolingSessionBase NewSession(PoolingSessionSource source) => new MockPoolingSession(source);
91+
}
92+
93+
internal class MockPoolingSession(PoolingSessionSource source) : PoolingSessionBase(source)
94+
{
95+
public override IDriver Driver => null!;
96+
public override bool IsBroken => false;
97+
98+
internal override Task Open(CancellationToken cancellationToken) => Task.CompletedTask;
99+
internal override Task DeleteSession() => Task.CompletedTask;
100+
101+
public override ValueTask<IServerStream<ExecuteQueryResponsePart>> ExecuteQuery(
102+
string query,
103+
Dictionary<string, YdbValue> parameters, GrpcRequestSettings settings,
104+
TransactionControl? txControl
105+
) => throw new NotImplementedException();
106+
107+
public override Task CommitTransaction(string txId, CancellationToken cancellationToken = default) =>
108+
throw new NotImplementedException();
109+
110+
public override Task RollbackTransaction(string txId, CancellationToken cancellationToken = default) =>
111+
throw new NotImplementedException();
112+
113+
public override void OnNotSuccessStatusCode(StatusCode code)
114+
{
115+
}
91116
}

src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/Session/PoolingSessionSourceMockTests.cs

Lines changed: 17 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -29,33 +29,32 @@ internal class MockPoolingSessionFactory : IPoolingSessionFactory
2929
{
3030
private int _sessionNum;
3131

32-
public PoolingSession NewSession(PoolingSessionSource source) =>
32+
public PoolingSessionBase NewSession(PoolingSessionSource source) =>
3333
new MockPoolingSession(source, Interlocked.Increment(ref _sessionNum));
3434
}
3535

36-
internal class MockPoolingSession(PoolingSessionSource source, int sessionNum) : PoolingSession(null, null, true, null)
36+
internal class MockPoolingSession(PoolingSessionSource source, int sessionNum) : PoolingSessionBase(source)
3737
{
38-
internal string SessionId { get; } = $"session_{sessionNum}";
38+
public string SessionId => $"session_{sessionNum}";
39+
public override IDriver Driver => null!;
40+
public override bool IsBroken => false;
3941

40-
public IDriver Driver => throw new NotImplementedException();
41-
public bool IsBroken { get; set; }
42+
internal override Task Open(CancellationToken cancellationToken) => Task.CompletedTask;
43+
internal override Task DeleteSession() => Task.CompletedTask;
4244

43-
public void Close() => source.Return(this);
44-
45-
public Task Open(CancellationToken cancellationToken) => Task.CompletedTask;
46-
47-
public Task DeleteSession() => Task.CompletedTask;
48-
49-
public Task CommitTransaction(string txId, CancellationToken cancellationToken = default) =>
50-
throw new NotImplementedException();
45+
public override ValueTask<IServerStream<ExecuteQueryResponsePart>> ExecuteQuery(
46+
string query,
47+
Dictionary<string, YdbValue> parameters, GrpcRequestSettings settings,
48+
TransactionControl? txControl
49+
) => throw new NotImplementedException();
5150

52-
public Task RollbackTransaction(string txId, CancellationToken cancellationToken = default) =>
51+
public override Task CommitTransaction(string txId, CancellationToken cancellationToken = default) =>
5352
throw new NotImplementedException();
5453

55-
public ValueTask<IServerStream<ExecuteQueryResponsePart>> ExecuteQuery(string query,
56-
Dictionary<string, YdbValue> parameters, GrpcRequestSettings settings,
57-
TransactionControl? txControl) =>
54+
public override Task RollbackTransaction(string txId, CancellationToken cancellationToken = default) =>
5855
throw new NotImplementedException();
5956

60-
public void OnNotSuccessStatusCode(StatusCode code) => throw new NotImplementedException();
57+
public override void OnNotSuccessStatusCode(StatusCode code)
58+
{
59+
}
6160
}

0 commit comments

Comments
 (0)