Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -392,11 +392,6 @@ await Test(_ => { }, builder =>
}, model => Assert.Collection(
Assert.Single(model.Tables, t => t.Name == "Contacts").Columns,
// ReSharper disable once ParameterOnlyUsedForPreconditionCheck.Local
c =>
{
Assert.Equal("MyComplex_MyNestedComplex_Foo", c.Name);
Assert.True(c.IsNullable);
},
c => Assert.Equal("Id", c.Name),
c => Assert.Equal("Discriminator", c.Name),
c => Assert.Equal("Name", c.Name),
Expand All @@ -412,6 +407,11 @@ await Test(_ => { }, builder =>
{
Assert.Equal("MyComplex_MyNestedComplex_Bar", c.Name);
Assert.True(c.IsNullable);
},
c =>
{
Assert.Equal("MyComplex_MyNestedComplex_Foo", c.Name);
Assert.True(c.IsNullable);
}));

AssertSql(
Expand Down
2 changes: 1 addition & 1 deletion src/Ydb.Sdk/src/Ado/PoolManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ internal static class PoolManager
private static readonly SemaphoreSlim SemaphoreSlim = new(1); // async mutex
private static readonly ConcurrentDictionary<string, SessionPool> Pools = new();

internal static async Task<Session> GetSession(
internal static async Task<Services.Query.Session> GetSession(
YdbConnectionStringBuilder connectionString,
CancellationToken cancellationToken
)
Expand Down
23 changes: 23 additions & 0 deletions src/Ydb.Sdk/src/Ado/Session/ISession.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
using Ydb.Query;
using Ydb.Sdk.Value;
using TransactionControl = Ydb.Query.TransactionControl;

namespace Ydb.Sdk.Ado.Session;

internal interface ISession
{
ValueTask<IServerStream<ExecuteQueryResponsePart>> ExecuteQuery(
string query,
Dictionary<string, YdbValue> parameters,
GrpcRequestSettings settings,
TransactionControl? txControl
);

Task CommitTransaction(string txId, CancellationToken cancellationToken = default);

Task RollbackTransaction(string txId, CancellationToken cancellationToken = default);

void OnNotSuccessStatusCode(StatusCode code);

void Close();
}
8 changes: 8 additions & 0 deletions src/Ydb.Sdk/src/Ado/Session/ISessionSource.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
namespace Ydb.Sdk.Ado.Session;

internal interface ISessionSource<TSession> where TSession : ISession
{
ValueTask<TSession> OpenSession();

void Return(TSession session);
}
56 changes: 56 additions & 0 deletions src/Ydb.Sdk/src/Ado/Session/ImplicitSession.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
using Ydb.Query;
using Ydb.Query.V1;
using Ydb.Sdk.Value;

namespace Ydb.Sdk.Ado.Session;

internal class ImplicitSession : ISession
{
private readonly IDriver _driver;

public ImplicitSession(IDriver driver)
{
_driver = driver;
}

public ValueTask<IServerStream<ExecuteQueryResponsePart>> ExecuteQuery(
string query,
Dictionary<string, YdbValue> parameters,
GrpcRequestSettings settings,
TransactionControl? txControl
)
{
if (txControl is not null && !txControl.CommitTx)
{
throw NotSupportedTransaction;
}

var request = new ExecuteQueryRequest
{
ExecMode = ExecMode.Execute,
QueryContent = new QueryContent { Text = query, Syntax = Syntax.YqlV1 },
StatsMode = StatsMode.None,
TxControl = txControl
};
request.Parameters.Add(parameters.ToDictionary(p => p.Key, p => p.Value.GetProto()));

return _driver.ServerStreamCall(QueryService.ExecuteQueryMethod, request, settings);
}

public Task CommitTransaction(string txId, CancellationToken cancellationToken = default) =>
throw NotSupportedTransaction;

public Task RollbackTransaction(string txId, CancellationToken cancellationToken = default) =>
throw NotSupportedTransaction;

public void OnNotSuccessStatusCode(StatusCode code)
{
}

public void Close()
{
}

private static YdbException NotSupportedTransaction =>
new(StatusCode.BadRequest, "Transactions are not supported in implicit sessions");
}
17 changes: 17 additions & 0 deletions src/Ydb.Sdk/src/Ado/Session/PoolingSessionSource.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
namespace Ydb.Sdk.Ado.Session;

internal class PoolingSessionSource : ISessionSource<IPoolingSession>
{
public ValueTask<IPoolingSession> OpenSession() => throw new NotImplementedException();

public void Return(IPoolingSession session) => throw new NotImplementedException();
}

internal interface IPoolingSession : ISession
{
bool IsActive { get; }

Task Open(CancellationToken cancellationToken);

Task DeleteSession();
}
240 changes: 240 additions & 0 deletions src/Ydb.Sdk/src/Ado/Session/Session.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,240 @@
using Microsoft.Extensions.Logging;
using Ydb.Query;
using Ydb.Query.V1;
using Ydb.Sdk.Ado.Internal;
using Ydb.Sdk.Value;
using CommitTransactionRequest = Ydb.Query.CommitTransactionRequest;
using TransactionControl = Ydb.Query.TransactionControl;

namespace Ydb.Sdk.Ado.Session;

internal class Session : IPoolingSession
{
private const string SessionBalancer = "session-balancer";

private static readonly TimeSpan DeleteSessionTimeout = TimeSpan.FromSeconds(5);
private static readonly CreateSessionRequest CreateSessionRequest = new();

private readonly IDriver _driver;
private readonly PoolingSessionSource _poolingSessionSource;
private readonly YdbConnectionStringBuilder _settings;
private readonly ILogger<Session> _logger;

private volatile bool _isActive;

private string SessionId { get; set; } = string.Empty;
private long NodeId { get; set; }

public bool IsActive => _isActive;

internal Session(
IDriver driver,
PoolingSessionSource poolingSessionSource,
YdbConnectionStringBuilder settings,
ILogger<Session> logger
)
{
_driver = driver;
_poolingSessionSource = poolingSessionSource;
_settings = settings;
_logger = logger;
}

public ValueTask<IServerStream<ExecuteQueryResponsePart>> ExecuteQuery(
string query,
Dictionary<string, YdbValue> parameters,
GrpcRequestSettings settings,
TransactionControl? txControl
)
{
settings.NodeId = NodeId;

var request = new ExecuteQueryRequest
{
SessionId = SessionId,
ExecMode = ExecMode.Execute,
QueryContent = new QueryContent { Text = query, Syntax = Syntax.YqlV1 },
StatsMode = StatsMode.None,
TxControl = txControl
};
request.Parameters.Add(parameters.ToDictionary(p => p.Key, p => p.Value.GetProto()));

return _driver.ServerStreamCall(QueryService.ExecuteQueryMethod, request, settings);
}

public async Task CommitTransaction(
string txId,
CancellationToken cancellationToken = default
)
{
var response = await _driver.UnaryCall(
QueryService.CommitTransactionMethod,
new CommitTransactionRequest { SessionId = SessionId, TxId = txId },
new GrpcRequestSettings { CancellationToken = cancellationToken, NodeId = NodeId }
);

if (response.Status.IsNotSuccess())
{
throw YdbException.FromServer(response.Status, response.Issues);
}
}

public async Task RollbackTransaction(
string txId,
CancellationToken cancellationToken = default
)
{
var response = await _driver.UnaryCall(
QueryService.RollbackTransactionMethod,
new RollbackTransactionRequest { SessionId = SessionId, TxId = txId },
new GrpcRequestSettings { CancellationToken = cancellationToken, NodeId = NodeId }
);

if (response.Status.IsNotSuccess())
{
throw YdbException.FromServer(response.Status, response.Issues);
}
}

public void OnNotSuccessStatusCode(StatusCode code)
{
if (code is
StatusCode.Cancelled or
StatusCode.BadSession or
StatusCode.SessionBusy or
StatusCode.InternalError or
StatusCode.ClientTransportTimeout or
StatusCode.Unavailable or
StatusCode.ClientTransportUnavailable)
{
_logger.LogWarning("Session[{SessionId}] is deactivated. Reason StatusCode: {Code}", SessionId, code);

_isActive = false;
}
}

public async Task Open(CancellationToken cancellationToken)
{
var requestSettings = new GrpcRequestSettings { CancellationToken = cancellationToken };

if (!_settings.DisableServerBalancer)
{
requestSettings.ClientCapabilities.Add(SessionBalancer);
}

var response = await _driver.UnaryCall(QueryService.CreateSessionMethod, CreateSessionRequest, requestSettings);

if (response.Status.IsNotSuccess())
{
throw YdbException.FromServer(response.Status, response.Issues);
}

TaskCompletionSource completeTask = new();

SessionId = response.SessionId;
NodeId = response.NodeId;

_ = Task.Run(async () =>
{
try
{
using var stream = await _driver.ServerStreamCall(
QueryService.AttachSessionMethod,
new AttachSessionRequest { SessionId = SessionId },
new GrpcRequestSettings { NodeId = NodeId }
);

if (!await stream.MoveNextAsync(cancellationToken))
{
// Session wasn't started!
completeTask.SetException(new YdbException(StatusCode.Cancelled, "Attach stream is not started!"));

return;
}

var initSessionState = stream.Current;

if (initSessionState.Status.IsNotSuccess())
{
throw YdbException.FromServer(initSessionState.Status, initSessionState.Issues);
}

completeTask.SetResult();

try
{
// ReSharper disable once MethodSupportsCancellation
while (await stream.MoveNextAsync())
{
var sessionState = stream.Current;

var statusCode = sessionState.Status.Code();

_logger.LogDebug(
"Session[{SessionId}] was received the status from the attach stream: {StatusMessage}",
SessionId, statusCode.ToMessage(sessionState.Issues));

OnNotSuccessStatusCode(statusCode);

if (!IsActive)
{
return;
}
}

_logger.LogDebug("Session[{SessionId}]: Attached stream is closed", SessionId);

// attach stream is closed
}
catch (YdbException e)
{
if (e.Code == StatusCode.Cancelled)
{
_logger.LogDebug("AttachStream is cancelled (possible grpcChannel is closing)");

return;
}

_logger.LogWarning(e, "Session[{SessionId}] is deactivated by transport error", SessionId);
}
}
catch (Exception e)
{
completeTask.SetException(e);
}
finally
{
_isActive = false;
}
}, cancellationToken);

await completeTask.Task;
}

public async Task DeleteSession()
{
try
{
_isActive = false;

var deleteSessionResponse = await _driver.UnaryCall(
QueryService.DeleteSessionMethod,
new DeleteSessionRequest { SessionId = SessionId },
new GrpcRequestSettings { TransportTimeout = DeleteSessionTimeout, NodeId = NodeId }
);

if (deleteSessionResponse.Status.IsNotSuccess())
{
_logger.LogWarning("Failed to delete session[{SessionId}], {StatusMessage}", SessionId,
deleteSessionResponse.Status.Code().ToMessage(deleteSessionResponse.Issues));
}
}
catch (Exception e)
{
_logger.LogWarning(e, "Error occurred while deleting session[{SessionId}] (NodeId = {NodeId})",
SessionId, NodeId);
}
}

public void Close() => _poolingSessionSource.Return(this);
}
4 changes: 2 additions & 2 deletions src/Ydb.Sdk/src/Ado/YdbConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ private YdbConnectionStringBuilder ConnectionStringBuilder
[param: AllowNull] init => _connectionStringBuilder = value;
}

internal Session Session
internal Services.Query.Session Session
{
get
{
Expand All @@ -35,7 +35,7 @@ internal Session Session
private set => _session = value;
}

private Session _session = null!;
private Services.Query.Session _session = null!;

public YdbConnection()
{
Expand Down
Loading
Loading