Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
- Dev: added `ValueTask<string?> GetAuthInfoAsync()` in ICredentialProvider.
- Feat: `Writer.DisposeAsync()` waits for all in-flight messages to complete.
- Feat: `Reader.DisposeAsync()` waits for all pending commits to be completed.
- **Breaking Change**: `IReader` now implements `IAsyncDisposable` instead of `IDisposable`.
Expand Down
2 changes: 1 addition & 1 deletion examples/src/BasicExample/ReadTable.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ internal partial class BasicExample
{
private async Task ReadTable()
{
var readStream = Client.ReadTable(
var readStream = await Client.ReadTable(
FullTablePath("seasons"),
new ReadTableSettings
{
Expand Down
2 changes: 1 addition & 1 deletion examples/src/BasicExample/ScanQuery.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ FROM episodes
ORDER BY series_id, season_id;
";

var scanStream = Client.ExecuteScanQuery(
var scanStream = await Client.ExecuteScanQuery(
query,
new Dictionary<string, YdbValue>
{
Expand Down
4 changes: 2 additions & 2 deletions examples/src/QueryExample/QueryExample.cs
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ private async Task StreamSelect()

await Client.DoTx(async tx =>
{
await foreach (var part in tx.Stream(query, commit: true))
await foreach (var part in await tx.Stream(query, commit: true))
{
foreach (var row in part.ResultSet!.Rows)
{
Expand Down Expand Up @@ -316,7 +316,7 @@ private async Task ReadAllResultSets()
var resultSets = await Client.DoTx(async tx =>
{
var resultSets = new List<Value.ResultSet>();
await foreach (var resultSet in tx.Stream(query, commit: true))
await foreach (var resultSet in await tx.Stream(query, commit: true))
{
resultSets.Add(resultSet.ResultSet!);
}
Expand Down
9 changes: 6 additions & 3 deletions src/Ydb.Sdk/src/Ado/YdbCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -223,9 +223,12 @@ protected override async Task<DbDataReader> ExecuteDbDataReaderAsync(CommandBeha
throw new InvalidOperationException("Transaction mismatched! (Maybe using another connection)");
}

var ydbDataReader = await YdbDataReader.CreateYdbDataReader(YdbConnection.Session.ExecuteQuery(
preparedSql.ToString(), ydbParameters, execSettings, transaction?.TransactionControl),
YdbConnection.Session.OnStatus, transaction);
var ydbDataReader = await YdbDataReader.CreateYdbDataReader(
await YdbConnection.Session.ExecuteQuery(
preparedSql.ToString(), ydbParameters, execSettings, transaction?.TransactionControl
),
YdbConnection.Session.OnStatus, transaction
);

YdbConnection.LastReader = ydbDataReader;
YdbConnection.LastCommand = CommandText;
Expand Down
6 changes: 6 additions & 0 deletions src/Ydb.Sdk/src/Auth/ICredentialsProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,14 @@ namespace Ydb.Sdk.Auth;

public interface ICredentialsProvider
{
// For removal in 1.*
string? GetAuthInfo();

ValueTask<string?> GetAuthInfoAsync()
{
return ValueTask.FromResult(GetAuthInfo());
}

Task ProvideAuthClient(AuthClient authClient)
{
return Task.CompletedTask;
Expand Down
2 changes: 1 addition & 1 deletion src/Ydb.Sdk/src/Driver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ private async Task<Status> DiscoverEndpoints()
TransportTimeout = Config.EndpointDiscoveryTimeout
};

var options = GetCallOptions(requestSettings);
var options = await GetCallOptions(requestSettings);
options.Headers?.Add(Metadata.RpcSdkInfoHeader, _sdkInfo);

var response = await client.ListEndpointsAsync(
Expand Down
22 changes: 11 additions & 11 deletions src/Ydb.Sdk/src/IDriver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,14 @@ public Task<TResponse> UnaryCall<TRequest, TResponse>(
where TRequest : class
where TResponse : class;

public ServerStream<TResponse> ServerStreamCall<TRequest, TResponse>(
public ValueTask<ServerStream<TResponse>> ServerStreamCall<TRequest, TResponse>(
Method<TRequest, TResponse> method,
TRequest request,
GrpcRequestSettings settings)
where TRequest : class
where TResponse : class;

public IBidirectionalStream<TRequest, TResponse> BidirectionalStreamCall<TRequest, TResponse>(
public ValueTask<IBidirectionalStream<TRequest, TResponse>> BidirectionalStreamCall<TRequest, TResponse>(
Method<TRequest, TResponse> method,
GrpcRequestSettings settings)
where TRequest : class
Expand All @@ -38,7 +38,7 @@ public interface IBidirectionalStream<in TRequest, out TResponse> : IDisposable

public TResponse Current { get; }

public string? AuthToken { get; }
public ValueTask<string?> AuthToken { get; }

public Task RequestStreamComplete();
}
Expand Down Expand Up @@ -74,7 +74,7 @@ public async Task<TResponse> UnaryCall<TRequest, TResponse>(
using var call = callInvoker.AsyncUnaryCall(
method: method,
host: null,
options: GetCallOptions(settings),
options: await GetCallOptions(settings),
request: request
);

Expand All @@ -90,7 +90,7 @@ public async Task<TResponse> UnaryCall<TRequest, TResponse>(
}
}

public ServerStream<TResponse> ServerStreamCall<TRequest, TResponse>(
public async ValueTask<ServerStream<TResponse>> ServerStreamCall<TRequest, TResponse>(
Method<TRequest, TResponse> method,
TRequest request,
GrpcRequestSettings settings)
Expand All @@ -103,13 +103,13 @@ public ServerStream<TResponse> ServerStreamCall<TRequest, TResponse>(
var call = callInvoker.AsyncServerStreamingCall(
method: method,
host: null,
options: GetCallOptions(settings),
options: await GetCallOptions(settings),
request: request);

return new ServerStream<TResponse>(call, e => { OnRpcError(endpoint, e); });
}

public IBidirectionalStream<TRequest, TResponse> BidirectionalStreamCall<TRequest, TResponse>(
public async ValueTask<IBidirectionalStream<TRequest, TResponse>> BidirectionalStreamCall<TRequest, TResponse>(
Method<TRequest, TResponse> method,
GrpcRequestSettings settings)
where TRequest : class
Expand All @@ -121,7 +121,7 @@ public IBidirectionalStream<TRequest, TResponse> BidirectionalStreamCall<TReques
var call = callInvoker.AsyncDuplexStreamingCall(
method: method,
host: null,
options: GetCallOptions(settings));
options: await GetCallOptions(settings));

return new BidirectionalStream<TRequest, TResponse>(
call,
Expand All @@ -133,14 +133,14 @@ public IBidirectionalStream<TRequest, TResponse> BidirectionalStreamCall<TReques

protected abstract void OnRpcError(string endpoint, RpcException e);

protected CallOptions GetCallOptions(GrpcRequestSettings settings)
protected async ValueTask<CallOptions> GetCallOptions(GrpcRequestSettings settings)
{
var meta = new Grpc.Core.Metadata
{
{ Metadata.RpcDatabaseHeader, Config.Database }
};

var authInfo = Config.Credentials.GetAuthInfo();
var authInfo = await Config.Credentials.GetAuthInfoAsync();
if (authInfo != null)
{
meta.Add(Metadata.RpcAuthHeader, authInfo);
Expand Down Expand Up @@ -268,7 +268,7 @@ public async ValueTask<bool> MoveNextAsync()

public TResponse Current => _stream.ResponseStream.Current;

public string? AuthToken => _credentialsProvider.GetAuthInfo();
public ValueTask<string?> AuthToken => _credentialsProvider.GetAuthInfoAsync();

public async Task RequestStreamComplete()
{
Expand Down
5 changes: 3 additions & 2 deletions src/Ydb.Sdk/src/Services/Query/QueryClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,9 @@ public Task<T> Stream<T>(string query, Func<ExecuteQueryStream, Task<T>> onStrea
Dictionary<string, YdbValue>? parameters = null, TxMode txMode = TxMode.NoTx,
ExecuteQuerySettings? settings = null)
{
return _sessionPool.ExecOnSession(session => onStream(new ExecuteQueryStream(
session.ExecuteQuery(query, parameters, settings, txMode.TransactionControl()))));
return _sessionPool.ExecOnSession(async session => await onStream(new ExecuteQueryStream(
await session.ExecuteQuery(query, parameters, settings, txMode.TransactionControl())))
);
}

public Task Stream(string query, Func<ExecuteQueryStream, Task> onStream,
Expand Down
11 changes: 6 additions & 5 deletions src/Ydb.Sdk/src/Services/Query/QueryTx.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,18 @@ internal QueryTx(Session session, TxMode txMode)
_txMode = txMode;
}

public ExecuteQueryStream Stream(string query, Dictionary<string, YdbValue>? parameters = null,
public async ValueTask<ExecuteQueryStream> Stream(string query, Dictionary<string, YdbValue>? parameters = null,
bool commit = false, ExecuteQuerySettings? settings = null)
{
return new ExecuteQueryStream(_session.ExecuteQuery(query, parameters, settings, TxControl(commit)),
txId => TxId = txId);
return new ExecuteQueryStream(
await _session.ExecuteQuery(query, parameters, settings, TxControl(commit)), txId => TxId = txId
);
}

public async Task<IReadOnlyList<Value.ResultSet.Row>> ReadAllRows(string query,
Dictionary<string, YdbValue>? parameters = null, bool commit = false, ExecuteQuerySettings? settings = null)
{
await using var stream = Stream(query, parameters, commit, settings);
await using var stream = await Stream(query, parameters, commit, settings);
List<Value.ResultSet.Row> rows = new();

await foreach (var part in stream)
Expand All @@ -64,7 +65,7 @@ public ExecuteQueryStream Stream(string query, Dictionary<string, YdbValue>? par
public async Task Exec(string query, Dictionary<string, YdbValue>? parameters = null,
ExecuteQuerySettings? settings = null, bool commit = false)
{
await using var stream = Stream(query, parameters, commit, settings);
await using var stream = await Stream(query, parameters, commit, settings);
await stream.MoveNextAsync();
}

Expand Down
4 changes: 2 additions & 2 deletions src/Ydb.Sdk/src/Services/Query/SessionPool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ protected override async Task<Session> CreateSession()
{
try
{
await using var stream = _driver.ServerStreamCall(
await using var stream = await _driver.ServerStreamCall(
QueryService.AttachSessionMethod,
new AttachSessionRequest { SessionId = sessionId },
new GrpcRequestSettings { NodeId = nodeId }
Expand Down Expand Up @@ -136,7 +136,7 @@ ILogger<Session> logger
Driver = driver;
}

internal ServerStream<ExecuteQueryResponsePart> ExecuteQuery(
internal ValueTask<ServerStream<ExecuteQueryResponsePart>> ExecuteQuery(
string query,
Dictionary<string, YdbValue>? parameters,
ExecuteQuerySettings? settings,
Expand Down
6 changes: 3 additions & 3 deletions src/Ydb.Sdk/src/Services/Table/ExecuteScanQuery.cs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ protected override ExecuteScanQueryPart MakeResponse(ExecuteScanQueryPartialResp

public partial class TableClient
{
public ExecuteScanQueryStream ExecuteScanQuery(
public async ValueTask<ExecuteScanQueryStream> ExecuteScanQuery(
string query,
IReadOnlyDictionary<string, YdbValue> parameters,
ExecuteScanQuerySettings? settings = null)
Expand All @@ -75,7 +75,7 @@ public ExecuteScanQueryStream ExecuteScanQuery(

request.Parameters.Add(parameters.ToDictionary(p => p.Key, p => p.Value.GetProto()));

var streamIterator = _driver.ServerStreamCall(
var streamIterator = await _driver.ServerStreamCall(
method: TableService.StreamExecuteScanQueryMethod,
request: request,
settings: settings
Expand All @@ -84,7 +84,7 @@ public ExecuteScanQueryStream ExecuteScanQuery(
return new ExecuteScanQueryStream(streamIterator);
}

public ExecuteScanQueryStream ExecuteScanQuery(
public ValueTask<ExecuteScanQueryStream> ExecuteScanQuery(
string query,
ExecuteScanQuerySettings? settings = null)
{
Expand Down
4 changes: 2 additions & 2 deletions src/Ydb.Sdk/src/Services/Table/ReadTable.cs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ protected override ReadTablePart MakeResponse(ReadTableResponse protoResponse)

public partial class TableClient
{
public ReadTableStream ReadTable(string tablePath, ReadTableSettings? settings = null)
public async ValueTask<ReadTableStream> ReadTable(string tablePath, ReadTableSettings? settings = null)
{
settings ??= new ReadTableSettings();

Expand All @@ -74,7 +74,7 @@ public ReadTableStream ReadTable(string tablePath, ReadTableSettings? settings =
Ordered = settings.Ordered
};

var streamIterator = _driver.ServerStreamCall(
var streamIterator = await _driver.ServerStreamCall(
method: TableService.StreamReadTableMethod,
request: request,
settings: settings
Expand Down
8 changes: 6 additions & 2 deletions src/Ydb.Sdk/src/Services/Topic/Reader/Reader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,8 @@ private async Task Initialize()

_logger.LogInformation("Reader session initialization started. ReaderConfig: {ReaderConfig}", _config);

var stream = _driver.BidirectionalStreamCall(TopicService.StreamReadMethod, _readerGrpcRequestSettings);
var stream =
await _driver.BidirectionalStreamCall(TopicService.StreamReadMethod, _readerGrpcRequestSettings);

var initRequest = new StreamReadMessage.Types.InitRequest();
if (_config.ConsumerName != null)
Expand Down Expand Up @@ -191,6 +192,7 @@ await stream.Write(new MessageFromClient
stream,
initResponse.SessionId,
Initialize,
await stream.AuthToken,
_logger,
_receivedMessagesChannel.Writer,
_deserializer
Expand Down Expand Up @@ -270,14 +272,16 @@ public ReaderSession(
ReaderStream stream,
string sessionId,
Func<Task> initialize,
string? lastToken,
ILogger logger,
ChannelWriter<InternalBatchMessages<TValue>> channelWriter,
IDeserializer<TValue> deserializer
) : base(
stream,
logger,
sessionId,
initialize
initialize,
lastToken
)
{
_readerConfig = config;
Expand Down
7 changes: 4 additions & 3 deletions src/Ydb.Sdk/src/Services/Topic/TopicSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,14 @@ protected TopicSession(
IBidirectionalStream<TFromClient, TFromServer> stream,
ILogger logger,
string sessionId,
Func<Task> initialize)
Func<Task> initialize,
string? lastToken)
{
Stream = stream;
Logger = logger;
SessionId = sessionId;
_initialize = initialize;
_lastToken = stream.AuthToken;
_lastToken = lastToken;
}

public bool IsActive => Volatile.Read(ref _isActive) == 1;
Expand All @@ -44,7 +45,7 @@ protected async void ReconnectSession()

protected async Task SendMessage(TFromClient fromClient)
{
var curAuthToken = Stream.AuthToken;
var curAuthToken = await Stream.AuthToken;

if (!string.Equals(_lastToken, curAuthToken) && curAuthToken != null)
{
Expand Down
8 changes: 6 additions & 2 deletions src/Ydb.Sdk/src/Services/Topic/Writer/Writer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,8 @@ private async Task Initialize()

_logger.LogInformation("Writer session initialization started. WriterConfig: {WriterConfig}", _config);

var stream = _driver.BidirectionalStreamCall(TopicService.StreamWriteMethod, _writerGrpcRequestSettings);
var stream =
await _driver.BidirectionalStreamCall(TopicService.StreamWriteMethod, _writerGrpcRequestSettings);

var initRequest = new StreamWriteMessage.Types.InitRequest { Path = _config.TopicPath };
if (_config.ProducerId != null)
Expand Down Expand Up @@ -301,6 +302,7 @@ private async Task Initialize()
lastSeqNo: lastSeqNo,
sessionId: initResponse.SessionId,
initialize: Initialize,
await stream.AuthToken,
logger: _logger,
inFlightMessages: _inFlightMessages
);
Expand Down Expand Up @@ -450,13 +452,15 @@ public WriterSession(
long lastSeqNo,
string sessionId,
Func<Task> initialize,
string? lastToken,
ILogger logger,
ConcurrentQueue<MessageSending> inFlightMessages
) : base(
stream,
logger,
sessionId,
initialize
initialize,
lastToken
)
{
_config = config;
Expand Down
Loading
Loading