diff --git a/CHANGELOG.md b/CHANGELOG.md index 5523b5e4..81dec7d9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,4 @@ +- Dev: added `ValueTask GetAuthInfoAsync()` in ICredentialProvider. - Feat: `Writer.DisposeAsync()` waits for all in-flight messages to complete. - Feat: `Reader.DisposeAsync()` waits for all pending commits to be completed. - **Breaking Change**: `IReader` now implements `IAsyncDisposable` instead of `IDisposable`. diff --git a/examples/src/BasicExample/ReadTable.cs b/examples/src/BasicExample/ReadTable.cs index dd1c4829..7433db5b 100644 --- a/examples/src/BasicExample/ReadTable.cs +++ b/examples/src/BasicExample/ReadTable.cs @@ -6,7 +6,7 @@ internal partial class BasicExample { private async Task ReadTable() { - var readStream = Client.ReadTable( + var readStream = await Client.ReadTable( FullTablePath("seasons"), new ReadTableSettings { diff --git a/examples/src/BasicExample/ScanQuery.cs b/examples/src/BasicExample/ScanQuery.cs index e8effbc2..4260bf6f 100644 --- a/examples/src/BasicExample/ScanQuery.cs +++ b/examples/src/BasicExample/ScanQuery.cs @@ -18,7 +18,7 @@ FROM episodes ORDER BY series_id, season_id; "; - var scanStream = Client.ExecuteScanQuery( + var scanStream = await Client.ExecuteScanQuery( query, new Dictionary { diff --git a/examples/src/QueryExample/QueryExample.cs b/examples/src/QueryExample/QueryExample.cs index 544223b7..796a2688 100644 --- a/examples/src/QueryExample/QueryExample.cs +++ b/examples/src/QueryExample/QueryExample.cs @@ -231,7 +231,7 @@ private async Task StreamSelect() await Client.DoTx(async tx => { - await foreach (var part in tx.Stream(query, commit: true)) + await foreach (var part in await tx.Stream(query, commit: true)) { foreach (var row in part.ResultSet!.Rows) { @@ -316,7 +316,7 @@ private async Task ReadAllResultSets() var resultSets = await Client.DoTx(async tx => { var resultSets = new List(); - await foreach (var resultSet in tx.Stream(query, commit: true)) + await foreach (var resultSet in await tx.Stream(query, commit: true)) { resultSets.Add(resultSet.ResultSet!); } diff --git a/src/Ydb.Sdk/src/Ado/YdbCommand.cs b/src/Ydb.Sdk/src/Ado/YdbCommand.cs index 01ffbca0..01cb989b 100644 --- a/src/Ydb.Sdk/src/Ado/YdbCommand.cs +++ b/src/Ydb.Sdk/src/Ado/YdbCommand.cs @@ -223,9 +223,12 @@ protected override async Task ExecuteDbDataReaderAsync(CommandBeha throw new InvalidOperationException("Transaction mismatched! (Maybe using another connection)"); } - var ydbDataReader = await YdbDataReader.CreateYdbDataReader(YdbConnection.Session.ExecuteQuery( - preparedSql.ToString(), ydbParameters, execSettings, transaction?.TransactionControl), - YdbConnection.Session.OnStatus, transaction); + var ydbDataReader = await YdbDataReader.CreateYdbDataReader( + await YdbConnection.Session.ExecuteQuery( + preparedSql.ToString(), ydbParameters, execSettings, transaction?.TransactionControl + ), + YdbConnection.Session.OnStatus, transaction + ); YdbConnection.LastReader = ydbDataReader; YdbConnection.LastCommand = CommandText; diff --git a/src/Ydb.Sdk/src/Auth/ICredentialsProvider.cs b/src/Ydb.Sdk/src/Auth/ICredentialsProvider.cs index 540b0a81..a3f7f8d0 100644 --- a/src/Ydb.Sdk/src/Auth/ICredentialsProvider.cs +++ b/src/Ydb.Sdk/src/Auth/ICredentialsProvider.cs @@ -4,8 +4,14 @@ namespace Ydb.Sdk.Auth; public interface ICredentialsProvider { + // For removal in 1.* string? GetAuthInfo(); + ValueTask GetAuthInfoAsync() + { + return ValueTask.FromResult(GetAuthInfo()); + } + Task ProvideAuthClient(AuthClient authClient) { return Task.CompletedTask; diff --git a/src/Ydb.Sdk/src/Driver.cs b/src/Ydb.Sdk/src/Driver.cs index 092b5117..f5b13724 100644 --- a/src/Ydb.Sdk/src/Driver.cs +++ b/src/Ydb.Sdk/src/Driver.cs @@ -129,7 +129,7 @@ private async Task DiscoverEndpoints() TransportTimeout = Config.EndpointDiscoveryTimeout }; - var options = GetCallOptions(requestSettings); + var options = await GetCallOptions(requestSettings); options.Headers?.Add(Metadata.RpcSdkInfoHeader, _sdkInfo); var response = await client.ListEndpointsAsync( diff --git a/src/Ydb.Sdk/src/IDriver.cs b/src/Ydb.Sdk/src/IDriver.cs index e83d1c12..cfa4a0ed 100644 --- a/src/Ydb.Sdk/src/IDriver.cs +++ b/src/Ydb.Sdk/src/IDriver.cs @@ -14,14 +14,14 @@ public Task UnaryCall( where TRequest : class where TResponse : class; - public ServerStream ServerStreamCall( + public ValueTask> ServerStreamCall( Method method, TRequest request, GrpcRequestSettings settings) where TRequest : class where TResponse : class; - public IBidirectionalStream BidirectionalStreamCall( + public ValueTask> BidirectionalStreamCall( Method method, GrpcRequestSettings settings) where TRequest : class @@ -38,7 +38,7 @@ public interface IBidirectionalStream : IDisposable public TResponse Current { get; } - public string? AuthToken { get; } + public ValueTask AuthToken { get; } public Task RequestStreamComplete(); } @@ -74,7 +74,7 @@ public async Task UnaryCall( using var call = callInvoker.AsyncUnaryCall( method: method, host: null, - options: GetCallOptions(settings), + options: await GetCallOptions(settings), request: request ); @@ -90,7 +90,7 @@ public async Task UnaryCall( } } - public ServerStream ServerStreamCall( + public async ValueTask> ServerStreamCall( Method method, TRequest request, GrpcRequestSettings settings) @@ -103,13 +103,13 @@ public ServerStream ServerStreamCall( var call = callInvoker.AsyncServerStreamingCall( method: method, host: null, - options: GetCallOptions(settings), + options: await GetCallOptions(settings), request: request); return new ServerStream(call, e => { OnRpcError(endpoint, e); }); } - public IBidirectionalStream BidirectionalStreamCall( + public async ValueTask> BidirectionalStreamCall( Method method, GrpcRequestSettings settings) where TRequest : class @@ -121,7 +121,7 @@ public IBidirectionalStream BidirectionalStreamCall( call, @@ -133,14 +133,14 @@ public IBidirectionalStream BidirectionalStreamCall GetCallOptions(GrpcRequestSettings settings) { var meta = new Grpc.Core.Metadata { { Metadata.RpcDatabaseHeader, Config.Database } }; - var authInfo = Config.Credentials.GetAuthInfo(); + var authInfo = await Config.Credentials.GetAuthInfoAsync(); if (authInfo != null) { meta.Add(Metadata.RpcAuthHeader, authInfo); @@ -268,7 +268,7 @@ public async ValueTask MoveNextAsync() public TResponse Current => _stream.ResponseStream.Current; - public string? AuthToken => _credentialsProvider.GetAuthInfo(); + public ValueTask AuthToken => _credentialsProvider.GetAuthInfoAsync(); public async Task RequestStreamComplete() { diff --git a/src/Ydb.Sdk/src/Services/Query/QueryClient.cs b/src/Ydb.Sdk/src/Services/Query/QueryClient.cs index df878c78..c6109daf 100644 --- a/src/Ydb.Sdk/src/Services/Query/QueryClient.cs +++ b/src/Ydb.Sdk/src/Services/Query/QueryClient.cs @@ -37,8 +37,9 @@ public Task Stream(string query, Func> onStrea Dictionary? parameters = null, TxMode txMode = TxMode.NoTx, ExecuteQuerySettings? settings = null) { - return _sessionPool.ExecOnSession(session => onStream(new ExecuteQueryStream( - session.ExecuteQuery(query, parameters, settings, txMode.TransactionControl())))); + return _sessionPool.ExecOnSession(async session => await onStream(new ExecuteQueryStream( + await session.ExecuteQuery(query, parameters, settings, txMode.TransactionControl()))) + ); } public Task Stream(string query, Func onStream, diff --git a/src/Ydb.Sdk/src/Services/Query/QueryTx.cs b/src/Ydb.Sdk/src/Services/Query/QueryTx.cs index 103cde41..962ec16a 100644 --- a/src/Ydb.Sdk/src/Services/Query/QueryTx.cs +++ b/src/Ydb.Sdk/src/Services/Query/QueryTx.cs @@ -27,17 +27,18 @@ internal QueryTx(Session session, TxMode txMode) _txMode = txMode; } - public ExecuteQueryStream Stream(string query, Dictionary? parameters = null, + public async ValueTask Stream(string query, Dictionary? parameters = null, bool commit = false, ExecuteQuerySettings? settings = null) { - return new ExecuteQueryStream(_session.ExecuteQuery(query, parameters, settings, TxControl(commit)), - txId => TxId = txId); + return new ExecuteQueryStream( + await _session.ExecuteQuery(query, parameters, settings, TxControl(commit)), txId => TxId = txId + ); } public async Task> ReadAllRows(string query, Dictionary? parameters = null, bool commit = false, ExecuteQuerySettings? settings = null) { - await using var stream = Stream(query, parameters, commit, settings); + await using var stream = await Stream(query, parameters, commit, settings); List rows = new(); await foreach (var part in stream) @@ -64,7 +65,7 @@ public ExecuteQueryStream Stream(string query, Dictionary? par public async Task Exec(string query, Dictionary? parameters = null, ExecuteQuerySettings? settings = null, bool commit = false) { - await using var stream = Stream(query, parameters, commit, settings); + await using var stream = await Stream(query, parameters, commit, settings); await stream.MoveNextAsync(); } diff --git a/src/Ydb.Sdk/src/Services/Query/SessionPool.cs b/src/Ydb.Sdk/src/Services/Query/SessionPool.cs index c70514d1..a04a8d14 100644 --- a/src/Ydb.Sdk/src/Services/Query/SessionPool.cs +++ b/src/Ydb.Sdk/src/Services/Query/SessionPool.cs @@ -57,7 +57,7 @@ protected override async Task CreateSession() { try { - await using var stream = _driver.ServerStreamCall( + await using var stream = await _driver.ServerStreamCall( QueryService.AttachSessionMethod, new AttachSessionRequest { SessionId = sessionId }, new GrpcRequestSettings { NodeId = nodeId } @@ -136,7 +136,7 @@ ILogger logger Driver = driver; } - internal ServerStream ExecuteQuery( + internal ValueTask> ExecuteQuery( string query, Dictionary? parameters, ExecuteQuerySettings? settings, diff --git a/src/Ydb.Sdk/src/Services/Table/ExecuteScanQuery.cs b/src/Ydb.Sdk/src/Services/Table/ExecuteScanQuery.cs index 9c6eb74b..eaf07294 100644 --- a/src/Ydb.Sdk/src/Services/Table/ExecuteScanQuery.cs +++ b/src/Ydb.Sdk/src/Services/Table/ExecuteScanQuery.cs @@ -57,7 +57,7 @@ protected override ExecuteScanQueryPart MakeResponse(ExecuteScanQueryPartialResp public partial class TableClient { - public ExecuteScanQueryStream ExecuteScanQuery( + public async ValueTask ExecuteScanQuery( string query, IReadOnlyDictionary parameters, ExecuteScanQuerySettings? settings = null) @@ -75,7 +75,7 @@ public ExecuteScanQueryStream ExecuteScanQuery( request.Parameters.Add(parameters.ToDictionary(p => p.Key, p => p.Value.GetProto())); - var streamIterator = _driver.ServerStreamCall( + var streamIterator = await _driver.ServerStreamCall( method: TableService.StreamExecuteScanQueryMethod, request: request, settings: settings @@ -84,7 +84,7 @@ public ExecuteScanQueryStream ExecuteScanQuery( return new ExecuteScanQueryStream(streamIterator); } - public ExecuteScanQueryStream ExecuteScanQuery( + public ValueTask ExecuteScanQuery( string query, ExecuteScanQuerySettings? settings = null) { diff --git a/src/Ydb.Sdk/src/Services/Table/ReadTable.cs b/src/Ydb.Sdk/src/Services/Table/ReadTable.cs index fcd7c457..155e6189 100644 --- a/src/Ydb.Sdk/src/Services/Table/ReadTable.cs +++ b/src/Ydb.Sdk/src/Services/Table/ReadTable.cs @@ -62,7 +62,7 @@ protected override ReadTablePart MakeResponse(ReadTableResponse protoResponse) public partial class TableClient { - public ReadTableStream ReadTable(string tablePath, ReadTableSettings? settings = null) + public async ValueTask ReadTable(string tablePath, ReadTableSettings? settings = null) { settings ??= new ReadTableSettings(); @@ -74,7 +74,7 @@ public ReadTableStream ReadTable(string tablePath, ReadTableSettings? settings = Ordered = settings.Ordered }; - var streamIterator = _driver.ServerStreamCall( + var streamIterator = await _driver.ServerStreamCall( method: TableService.StreamReadTableMethod, request: request, settings: settings diff --git a/src/Ydb.Sdk/src/Services/Topic/Reader/Reader.cs b/src/Ydb.Sdk/src/Services/Topic/Reader/Reader.cs index 0416a7eb..0ce47759 100644 --- a/src/Ydb.Sdk/src/Services/Topic/Reader/Reader.cs +++ b/src/Ydb.Sdk/src/Services/Topic/Reader/Reader.cs @@ -103,7 +103,8 @@ private async Task Initialize() _logger.LogInformation("Reader session initialization started. ReaderConfig: {ReaderConfig}", _config); - var stream = _driver.BidirectionalStreamCall(TopicService.StreamReadMethod, _readerGrpcRequestSettings); + var stream = + await _driver.BidirectionalStreamCall(TopicService.StreamReadMethod, _readerGrpcRequestSettings); var initRequest = new StreamReadMessage.Types.InitRequest(); if (_config.ConsumerName != null) @@ -191,6 +192,7 @@ await stream.Write(new MessageFromClient stream, initResponse.SessionId, Initialize, + await stream.AuthToken, _logger, _receivedMessagesChannel.Writer, _deserializer @@ -270,6 +272,7 @@ public ReaderSession( ReaderStream stream, string sessionId, Func initialize, + string? lastToken, ILogger logger, ChannelWriter> channelWriter, IDeserializer deserializer @@ -277,7 +280,8 @@ IDeserializer deserializer stream, logger, sessionId, - initialize + initialize, + lastToken ) { _readerConfig = config; diff --git a/src/Ydb.Sdk/src/Services/Topic/TopicSession.cs b/src/Ydb.Sdk/src/Services/Topic/TopicSession.cs index c2dc5b9e..711f0404 100644 --- a/src/Ydb.Sdk/src/Services/Topic/TopicSession.cs +++ b/src/Ydb.Sdk/src/Services/Topic/TopicSession.cs @@ -17,13 +17,14 @@ protected TopicSession( IBidirectionalStream stream, ILogger logger, string sessionId, - Func initialize) + Func initialize, + string? lastToken) { Stream = stream; Logger = logger; SessionId = sessionId; _initialize = initialize; - _lastToken = stream.AuthToken; + _lastToken = lastToken; } public bool IsActive => Volatile.Read(ref _isActive) == 1; @@ -44,7 +45,7 @@ protected async void ReconnectSession() protected async Task SendMessage(TFromClient fromClient) { - var curAuthToken = Stream.AuthToken; + var curAuthToken = await Stream.AuthToken; if (!string.Equals(_lastToken, curAuthToken) && curAuthToken != null) { diff --git a/src/Ydb.Sdk/src/Services/Topic/Writer/Writer.cs b/src/Ydb.Sdk/src/Services/Topic/Writer/Writer.cs index dd7dd3ae..714e1875 100644 --- a/src/Ydb.Sdk/src/Services/Topic/Writer/Writer.cs +++ b/src/Ydb.Sdk/src/Services/Topic/Writer/Writer.cs @@ -203,7 +203,8 @@ private async Task Initialize() _logger.LogInformation("Writer session initialization started. WriterConfig: {WriterConfig}", _config); - var stream = _driver.BidirectionalStreamCall(TopicService.StreamWriteMethod, _writerGrpcRequestSettings); + var stream = + await _driver.BidirectionalStreamCall(TopicService.StreamWriteMethod, _writerGrpcRequestSettings); var initRequest = new StreamWriteMessage.Types.InitRequest { Path = _config.TopicPath }; if (_config.ProducerId != null) @@ -301,6 +302,7 @@ private async Task Initialize() lastSeqNo: lastSeqNo, sessionId: initResponse.SessionId, initialize: Initialize, + await stream.AuthToken, logger: _logger, inFlightMessages: _inFlightMessages ); @@ -450,13 +452,15 @@ public WriterSession( long lastSeqNo, string sessionId, Func initialize, + string? lastToken, ILogger logger, ConcurrentQueue inFlightMessages ) : base( stream, logger, sessionId, - initialize + initialize, + lastToken ) { _config = config; diff --git a/src/Ydb.Sdk/tests/Topic/ReaderUnitTests.cs b/src/Ydb.Sdk/tests/Topic/ReaderUnitTests.cs index b4f69247..495c3ff9 100644 --- a/src/Ydb.Sdk/tests/Topic/ReaderUnitTests.cs +++ b/src/Ydb.Sdk/tests/Topic/ReaderUnitTests.cs @@ -33,7 +33,7 @@ public ReaderUnitTests() _mockIDriver.Setup(driver => driver.BidirectionalStreamCall( It.IsAny>(), It.IsAny()) - ).Returns(_mockStream.Object); + ).ReturnsAsync(_mockStream.Object); _mockIDriver.Setup(driver => driver.LoggerFactory).Returns(Utils.GetLoggerFactory); @@ -1431,10 +1431,10 @@ public async Task ReadAsync_WhenFailDeserializer_ThrowReaderExceptionAndInvokeRe public async Task ReadAsync_WhenTokenIsUpdatedOneTime_SuccessUpdateToken() { _mockStream.SetupSequence(stream => stream.AuthToken) - .Returns("Token1") - .Returns("Token1") - .Returns("Token2") - .Returns("Token2"); + .ReturnsAsync("Token1") + .ReturnsAsync("Token1") + .ReturnsAsync("Token2") + .ReturnsAsync("Token2"); var tcsMoveNext = new TaskCompletionSource(); var tcsCommitMessage = new TaskCompletionSource(); diff --git a/src/Ydb.Sdk/tests/Topic/WriterIntegrationTests.cs b/src/Ydb.Sdk/tests/Topic/WriterIntegrationTests.cs index f7b31bbc..2cb5018a 100644 --- a/src/Ydb.Sdk/tests/Topic/WriterIntegrationTests.cs +++ b/src/Ydb.Sdk/tests/Topic/WriterIntegrationTests.cs @@ -79,7 +79,8 @@ public async Task WriteAsync_When1000Messages_ReturnWriteResultIsPersisted() await Task.WhenAll(tasks); - var initStream = _driver.BidirectionalStreamCall(TopicService.StreamReadMethod, new GrpcRequestSettings()); + var initStream = + await _driver.BidirectionalStreamCall(TopicService.StreamReadMethod, new GrpcRequestSettings()); await initStream.Write(new StreamReadMessage.Types.FromClient { InitRequest = new StreamReadMessage.Types.InitRequest diff --git a/src/Ydb.Sdk/tests/Topic/WriterUnitTests.cs b/src/Ydb.Sdk/tests/Topic/WriterUnitTests.cs index bbf3c3ee..844c4bcc 100644 --- a/src/Ydb.Sdk/tests/Topic/WriterUnitTests.cs +++ b/src/Ydb.Sdk/tests/Topic/WriterUnitTests.cs @@ -25,7 +25,7 @@ public WriterUnitTests() _mockIDriver.Setup(driver => driver.BidirectionalStreamCall( It.IsAny>(), It.IsAny()) - ).Returns(_mockStream.Object); + ).ReturnsAsync(_mockStream.Object); _mockIDriver.Setup(driver => driver.LoggerFactory).Returns(Utils.GetLoggerFactory); @@ -772,10 +772,10 @@ public async Task WriteAsync_WhenTokenIsUpdatedOneTime_SuccessUpdateToken() var writeTcs3 = new TaskCompletionSource(); _mockStream.SetupSequence(stream => stream.AuthToken) - .Returns("Token1") - .Returns("Token1") - .Returns("Token2") - .Returns("Token2"); + .ReturnsAsync("Token1") + .ReturnsAsync("Token1") + .ReturnsAsync("Token2") + .ReturnsAsync("Token2"); _mockStream.SetupSequence(stream => stream.Write(It.IsAny())) .Returns(Task.CompletedTask)