Skip to content

Commit 08e82c5

Browse files
feat: FIFO PoolingSessionSource (#487)
1 parent 2d1f392 commit 08e82c5

16 files changed

+734
-305
lines changed

src/Ydb.Sdk/src/Ado/PoolManager.cs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,11 @@ internal static class PoolManager
1111
private static readonly ConcurrentDictionary<string, SessionPool> Pools = new();
1212

1313
internal static async Task<ISession> GetSession(
14-
YdbConnectionStringBuilder connectionString,
14+
YdbConnectionStringBuilder settings,
1515
CancellationToken cancellationToken
1616
)
1717
{
18-
if (Pools.TryGetValue(connectionString.ConnectionString, out var sessionPool))
18+
if (Pools.TryGetValue(settings.ConnectionString, out var sessionPool))
1919
{
2020
return await sessionPool.GetSession(cancellationToken);
2121
}
@@ -24,21 +24,21 @@ CancellationToken cancellationToken
2424
{
2525
await SemaphoreSlim.WaitAsync(cancellationToken);
2626

27-
if (Pools.TryGetValue(connectionString.ConnectionString, out var pool))
27+
if (Pools.TryGetValue(settings.ConnectionString, out var pool))
2828
{
2929
return await pool.GetSession(cancellationToken);
3030
}
3131

3232
var newSessionPool = new SessionPool(
33-
await connectionString.BuildDriver(),
33+
await settings.BuildDriver(),
3434
new SessionPoolConfig(
35-
MaxSessionPool: connectionString.MaxSessionPool,
36-
CreateSessionTimeout: connectionString.CreateSessionTimeout,
35+
MaxSessionPool: settings.MaxSessionPool,
36+
CreateSessionTimeout: settings.CreateSessionTimeout,
3737
DisposeDriver: true
3838
)
3939
);
4040

41-
Pools[connectionString.ConnectionString] = newSessionPool;
41+
Pools[settings.ConnectionString] = newSessionPool;
4242

4343
return await newSessionPool.GetSession(cancellationToken);
4444
}
Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
namespace Ydb.Sdk.Ado.Session;
22

3-
internal interface ISessionSource<TSession> where TSession : ISession
3+
internal interface ISessionSource : IAsyncDisposable
44
{
5-
ValueTask<TSession> OpenSession(CancellationToken cancellationToken);
6-
7-
void Return(TSession session);
5+
ValueTask<ISession> OpenSession(CancellationToken cancellationToken);
86
}

src/Ydb.Sdk/src/Ado/Session/PoolingSession.cs

Lines changed: 13 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -8,42 +8,40 @@
88

99
namespace Ydb.Sdk.Ado.Session;
1010

11-
internal class PoolingSession : IPoolingSession
11+
internal class PoolingSession : PoolingSessionBase<PoolingSession>
1212
{
1313
private const string SessionBalancer = "session-balancer";
1414

1515
private static readonly TimeSpan DeleteSessionTimeout = TimeSpan.FromSeconds(5);
1616
private static readonly CreateSessionRequest CreateSessionRequest = new();
1717

18-
private readonly PoolingSessionSource _poolingSessionSource;
1918
private readonly ILogger<PoolingSession> _logger;
19+
private readonly bool _disableServerBalancer;
20+
2021
private readonly CancellationTokenSource _attachStreamLifecycleCts = new();
2122

2223
private volatile bool _isBroken = true;
2324
private volatile bool _isBadSession;
2425

25-
private readonly bool _disableServerBalancer;
26-
2726
private string SessionId { get; set; } = string.Empty;
2827
private long NodeId { get; set; }
2928

30-
public IDriver Driver { get; }
31-
public bool IsBroken => _isBroken;
29+
public override IDriver Driver { get; }
30+
public override bool IsBroken => _isBroken;
3231

3332
internal PoolingSession(
3433
IDriver driver,
35-
PoolingSessionSource poolingSessionSource,
34+
PoolingSessionSource<PoolingSession> poolingSessionSource,
3635
bool disableServerBalancer,
3736
ILogger<PoolingSession> logger
38-
)
37+
) : base(poolingSessionSource)
3938
{
40-
_poolingSessionSource = poolingSessionSource;
4139
_disableServerBalancer = disableServerBalancer;
4240
_logger = logger;
4341
Driver = driver;
4442
}
4543

46-
public ValueTask<IServerStream<ExecuteQueryResponsePart>> ExecuteQuery(
44+
public override ValueTask<IServerStream<ExecuteQueryResponsePart>> ExecuteQuery(
4745
string query,
4846
Dictionary<string, YdbValue> parameters,
4947
GrpcRequestSettings settings,
@@ -65,10 +63,7 @@ public ValueTask<IServerStream<ExecuteQueryResponsePart>> ExecuteQuery(
6563
return Driver.ServerStreamCall(QueryService.ExecuteQueryMethod, request, settings);
6664
}
6765

68-
public async Task CommitTransaction(
69-
string txId,
70-
CancellationToken cancellationToken = default
71-
)
66+
public override async Task CommitTransaction(string txId, CancellationToken cancellationToken = default)
7267
{
7368
var response = await Driver.UnaryCall(
7469
QueryService.CommitTransactionMethod,
@@ -82,10 +77,7 @@ public async Task CommitTransaction(
8277
}
8378
}
8479

85-
public async Task RollbackTransaction(
86-
string txId,
87-
CancellationToken cancellationToken = default
88-
)
80+
public override async Task RollbackTransaction(string txId, CancellationToken cancellationToken = default)
8981
{
9082
var response = await Driver.UnaryCall(
9183
QueryService.RollbackTransactionMethod,
@@ -99,7 +91,7 @@ public async Task RollbackTransaction(
9991
}
10092
}
10193

102-
public void OnNotSuccessStatusCode(StatusCode statusCode)
94+
public override void OnNotSuccessStatusCode(StatusCode statusCode)
10395
{
10496
_isBadSession = _isBadSession || statusCode is StatusCode.BadSession;
10597

@@ -116,7 +108,7 @@ StatusCode.ClientTransportTimeout or
116108
}
117109
}
118110

119-
public async Task Open(CancellationToken cancellationToken)
111+
internal override async Task Open(CancellationToken cancellationToken)
120112
{
121113
var requestSettings = new GrpcRequestSettings { CancellationToken = cancellationToken };
122114

@@ -217,7 +209,7 @@ public async Task Open(CancellationToken cancellationToken)
217209
await completeTask.Task;
218210
}
219211

220-
public async Task DeleteSession()
212+
internal override async Task DeleteSession()
221213
{
222214
try
223215
{
@@ -248,6 +240,4 @@ public async Task DeleteSession()
248240
SessionId, NodeId);
249241
}
250242
}
251-
252-
public void Close() => _poolingSessionSource.Return(this);
253243
}
Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,26 @@
11
using Microsoft.Extensions.Logging;
2+
using Microsoft.Extensions.Logging.Abstractions;
23

34
namespace Ydb.Sdk.Ado.Session;
45

5-
internal class PoolingSessionFactory : IPoolingSessionFactory
6+
internal class PoolingSessionFactory : IPoolingSessionFactory<PoolingSession>
67
{
78
private readonly IDriver _driver;
89
private readonly bool _disableServerBalancer;
910
private readonly ILogger<PoolingSession> _logger;
1011

11-
public PoolingSessionFactory(IDriver driver, YdbConnectionStringBuilder settings, ILoggerFactory loggerFactory)
12+
internal PoolingSessionFactory(IDriver driver, YdbConnectionStringBuilder settings, ILoggerFactory loggerFactory)
1213
{
1314
_driver = driver;
1415
_disableServerBalancer = settings.DisableServerBalancer;
1516
_logger = loggerFactory.CreateLogger<PoolingSession>();
1617
}
1718

18-
public IPoolingSession NewSession(PoolingSessionSource source) =>
19-
new PoolingSession(_driver, source, _disableServerBalancer, _logger);
19+
public static async Task<PoolingSessionFactory> Create(YdbConnectionStringBuilder settings) =>
20+
new(await settings.BuildDriver(), settings, settings.LoggerFactory ?? NullLoggerFactory.Instance);
21+
22+
public PoolingSession NewSession(PoolingSessionSource<PoolingSession> source) =>
23+
new(_driver, source, _disableServerBalancer, _logger);
24+
25+
public ValueTask DisposeAsync() => _driver.DisposeAsync();
2026
}

0 commit comments

Comments
 (0)