Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/Ydb.Sdk/src/Ado/PoolManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ internal static class PoolManager
private static readonly SemaphoreSlim SemaphoreSlim = new(1); // async mutex
private static readonly ConcurrentDictionary<string, SessionPool> Pools = new();

internal static async Task<Session> GetSession(
internal static async Task<Services.Query.Session> GetSession(
YdbConnectionStringBuilder connectionString,
CancellationToken cancellationToken
)
Expand Down
23 changes: 23 additions & 0 deletions src/Ydb.Sdk/src/Ado/Session/ISession.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
using Ydb.Query;
using Ydb.Sdk.Value;
using TransactionControl = Ydb.Query.TransactionControl;

namespace Ydb.Sdk.Ado.Session;

internal interface ISession
{
ValueTask<IServerStream<ExecuteQueryResponsePart>> ExecuteQuery(
string query,
Dictionary<string, YdbValue> parameters,
GrpcRequestSettings settings,
TransactionControl? txControl
);

Task CommitTransaction(string txId, CancellationToken cancellationToken = default);

Task RollbackTransaction(string txId, CancellationToken cancellationToken = default);

void OnNotSuccessStatusCode(StatusCode code);

void Close();
}
8 changes: 8 additions & 0 deletions src/Ydb.Sdk/src/Ado/Session/ISessionSource.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
namespace Ydb.Sdk.Ado.Session;

internal interface ISessionSource<TSession> where TSession : ISession
{
ValueTask<TSession> OpenSession();

void Return(TSession session);
}
56 changes: 56 additions & 0 deletions src/Ydb.Sdk/src/Ado/Session/ImplicitSession.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
using Ydb.Query;
using Ydb.Query.V1;
using Ydb.Sdk.Value;

namespace Ydb.Sdk.Ado.Session;

internal class ImplicitSession : ISession
{
private readonly IDriver _driver;

public ImplicitSession(IDriver driver)
{
_driver = driver;
}

public ValueTask<IServerStream<ExecuteQueryResponsePart>> ExecuteQuery(
string query,
Dictionary<string, YdbValue> parameters,
GrpcRequestSettings settings,
TransactionControl? txControl
)
{
if (txControl is not null && !txControl.CommitTx)
{
throw NotSupportedTransaction;
}

var request = new ExecuteQueryRequest
{
ExecMode = ExecMode.Execute,
QueryContent = new QueryContent { Text = query, Syntax = Syntax.YqlV1 },
StatsMode = StatsMode.None,
TxControl = txControl
};
request.Parameters.Add(parameters.ToDictionary(p => p.Key, p => p.Value.GetProto()));

return _driver.ServerStreamCall(QueryService.ExecuteQueryMethod, request, settings);
}

public Task CommitTransaction(string txId, CancellationToken cancellationToken = default) =>
throw NotSupportedTransaction;

public Task RollbackTransaction(string txId, CancellationToken cancellationToken = default) =>
throw NotSupportedTransaction;

public void OnNotSuccessStatusCode(StatusCode code)
{
}

public void Close()
{
}

private static YdbException NotSupportedTransaction =>
new(StatusCode.BadRequest, "Transactions are not supported in implicit sessions");
}
17 changes: 17 additions & 0 deletions src/Ydb.Sdk/src/Ado/Session/PoolingSessionSource.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
namespace Ydb.Sdk.Ado.Session;

internal class PoolingSessionSource : ISessionSource<IPoolingSession>
{
public ValueTask<IPoolingSession> OpenSession() => throw new NotImplementedException();

public void Return(IPoolingSession session) => throw new NotImplementedException();
}

internal interface IPoolingSession : ISession
{
bool IsActive { get; }

Task Open(CancellationToken cancellationToken);

Task DeleteSession();
}
240 changes: 240 additions & 0 deletions src/Ydb.Sdk/src/Ado/Session/Session.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,240 @@
using Microsoft.Extensions.Logging;
using Ydb.Query;
using Ydb.Query.V1;
using Ydb.Sdk.Ado.Internal;
using Ydb.Sdk.Value;
using CommitTransactionRequest = Ydb.Query.CommitTransactionRequest;
using TransactionControl = Ydb.Query.TransactionControl;

namespace Ydb.Sdk.Ado.Session;

internal class Session : IPoolingSession
{
private const string SessionBalancer = "session-balancer";

private static readonly TimeSpan DeleteSessionTimeout = TimeSpan.FromSeconds(5);
private static readonly CreateSessionRequest CreateSessionRequest = new();

private readonly IDriver _driver;
private readonly PoolingSessionSource _poolingSessionSource;
private readonly YdbConnectionStringBuilder _settings;
private readonly ILogger<Session> _logger;

private volatile bool _isActive;

private string SessionId { get; set; } = string.Empty;
private long NodeId { get; set; }

public bool IsActive => _isActive;

internal Session(
IDriver driver,
PoolingSessionSource poolingSessionSource,
YdbConnectionStringBuilder settings,
ILogger<Session> logger
)
{
_driver = driver;
_poolingSessionSource = poolingSessionSource;
_settings = settings;
_logger = logger;
}

public ValueTask<IServerStream<ExecuteQueryResponsePart>> ExecuteQuery(
string query,
Dictionary<string, YdbValue> parameters,
GrpcRequestSettings settings,
TransactionControl? txControl
)
{
settings.NodeId = NodeId;

var request = new ExecuteQueryRequest
{
SessionId = SessionId,
ExecMode = ExecMode.Execute,
QueryContent = new QueryContent { Text = query, Syntax = Syntax.YqlV1 },
StatsMode = StatsMode.None,
TxControl = txControl
};
request.Parameters.Add(parameters.ToDictionary(p => p.Key, p => p.Value.GetProto()));

return _driver.ServerStreamCall(QueryService.ExecuteQueryMethod, request, settings);
}

public async Task CommitTransaction(
string txId,
CancellationToken cancellationToken = default
)
{
var response = await _driver.UnaryCall(
QueryService.CommitTransactionMethod,
new CommitTransactionRequest { SessionId = SessionId, TxId = txId },
new GrpcRequestSettings { CancellationToken = cancellationToken, NodeId = NodeId }
);

if (response.Status.IsNotSuccess())
{
throw YdbException.FromServer(response.Status, response.Issues);
}
}

public async Task RollbackTransaction(
string txId,
CancellationToken cancellationToken = default
)
{
var response = await _driver.UnaryCall(
QueryService.RollbackTransactionMethod,
new RollbackTransactionRequest { SessionId = SessionId, TxId = txId },
new GrpcRequestSettings { CancellationToken = cancellationToken, NodeId = NodeId }
);

if (response.Status.IsNotSuccess())
{
throw YdbException.FromServer(response.Status, response.Issues);
}
}

public void OnNotSuccessStatusCode(StatusCode code)
{
if (code is
StatusCode.Cancelled or
StatusCode.BadSession or
StatusCode.SessionBusy or
StatusCode.InternalError or
StatusCode.ClientTransportTimeout or
StatusCode.Unavailable or
StatusCode.ClientTransportUnavailable)
{
_logger.LogWarning("Session[{SessionId}] is deactivated. Reason StatusCode: {Code}", SessionId, code);

_isActive = false;
}
}

public async Task Open(CancellationToken cancellationToken)
{
var requestSettings = new GrpcRequestSettings { CancellationToken = cancellationToken };

if (!_settings.DisableServerBalancer)
{
requestSettings.ClientCapabilities.Add(SessionBalancer);
}

var response = await _driver.UnaryCall(QueryService.CreateSessionMethod, CreateSessionRequest, requestSettings);

if (response.Status.IsNotSuccess())
{
throw YdbException.FromServer(response.Status, response.Issues);
}

TaskCompletionSource completeTask = new();

SessionId = response.SessionId;
NodeId = response.NodeId;

_ = Task.Run(async () =>
{
try
{
using var stream = await _driver.ServerStreamCall(
QueryService.AttachSessionMethod,
new AttachSessionRequest { SessionId = SessionId },
new GrpcRequestSettings { NodeId = NodeId }
);

if (!await stream.MoveNextAsync(cancellationToken))
{
// Session wasn't started!
completeTask.SetException(new YdbException(StatusCode.Cancelled, "Attach stream is not started!"));

return;
}

var initSessionState = stream.Current;

if (initSessionState.Status.IsNotSuccess())
{
throw YdbException.FromServer(initSessionState.Status, initSessionState.Issues);
}

completeTask.SetResult();

try
{
// ReSharper disable once MethodSupportsCancellation
while (await stream.MoveNextAsync())
{
var sessionState = stream.Current;

var statusCode = sessionState.Status.Code();

_logger.LogDebug(
"Session[{SessionId}] was received the status from the attach stream: {StatusMessage}",
SessionId, statusCode.ToMessage(sessionState.Issues));

OnNotSuccessStatusCode(statusCode);

if (!IsActive)
{
return;
}
}

_logger.LogDebug("Session[{SessionId}]: Attached stream is closed", SessionId);

// attach stream is closed
}
catch (YdbException e)
{
if (e.Code == StatusCode.Cancelled)
{
_logger.LogDebug("AttachStream is cancelled (possible grpcChannel is closing)");

return;
}

_logger.LogWarning(e, "Session[{SessionId}] is deactivated by transport error", SessionId);
}
}
catch (Exception e)
{
completeTask.SetException(e);
}
finally
{
_isActive = false;
}
}, cancellationToken);

await completeTask.Task;
}

public async Task DeleteSession()
{
try
{
_isActive = false;

var deleteSessionResponse = await _driver.UnaryCall(
QueryService.DeleteSessionMethod,
new DeleteSessionRequest { SessionId = SessionId },
new GrpcRequestSettings { TransportTimeout = DeleteSessionTimeout, NodeId = NodeId }
);

if (deleteSessionResponse.Status.IsNotSuccess())
{
_logger.LogWarning("Failed to delete session[{SessionId}], {StatusMessage}", SessionId,
deleteSessionResponse.Status.Code().ToMessage(deleteSessionResponse.Issues));
}
}
catch (Exception e)
{
_logger.LogWarning(e, "Error occurred while deleting session[{SessionId}] (NodeId = {NodeId})",
SessionId, NodeId);
}
}

public void Close() => _poolingSessionSource.Return(this);
}
4 changes: 2 additions & 2 deletions src/Ydb.Sdk/src/Ado/YdbConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ private YdbConnectionStringBuilder ConnectionStringBuilder
[param: AllowNull] init => _connectionStringBuilder = value;
}

internal Session Session
internal Services.Query.Session Session
{
get
{
Expand All @@ -35,7 +35,7 @@ internal Session Session
private set => _session = value;
}

private Session _session = null!;
private Services.Query.Session _session = null!;

public YdbConnection()
{
Expand Down
4 changes: 2 additions & 2 deletions src/Ydb.Sdk/src/Client/Response.cs
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,11 @@ public abstract class StreamResponse<TProtoResponse, TResponse>
where TProtoResponse : class
where TResponse : class
{
private readonly ServerStream<TProtoResponse> _iterator;
private readonly IServerStream<TProtoResponse> _iterator;
private TResponse? _response;
private bool _transportError;

internal StreamResponse(ServerStream<TProtoResponse> iterator)
internal StreamResponse(IServerStream<TProtoResponse> iterator)
{
_iterator = iterator;
}
Expand Down
Loading
Loading