Skip to content

Commit 31fd7e7

Browse files
Topic Reader & Writer: update auth token in bidirectional stream.
1 parent b1e141e commit 31fd7e7

File tree

7 files changed

+284
-8
lines changed

7 files changed

+284
-8
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
- Topic Reader & Writer: update auth token in bidirectional stream.
2+
13
## v0.14.1
24
- Fixed bug: public key presented not for certificate signature.
35
- Fixed: YdbDataReader does not throw YdbException when CloseAsync is called for UPDATE/INSERT statements with no

src/Ydb.Sdk/src/IDriver.cs

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
using Grpc.Core;
22
using Grpc.Net.Client;
33
using Microsoft.Extensions.Logging;
4+
using Ydb.Sdk.Auth;
45

56
namespace Ydb.Sdk;
67

@@ -36,6 +37,8 @@ public interface IBidirectionalStream<in TRequest, out TResponse> : IDisposable
3637
public ValueTask<bool> MoveNextAsync();
3738

3839
public TResponse Current { get; }
40+
41+
public string? AuthToken { get; }
3942
}
4043

4144
public abstract class BaseDriver : IDriver
@@ -118,7 +121,10 @@ public IBidirectionalStream<TRequest, TResponse> BidirectionalStreamCall<TReques
118121
host: null,
119122
options: GetCallOptions(settings));
120123

121-
return new BidirectionalStream<TRequest, TResponse>(call, e => { OnRpcError(endpoint, e); });
124+
return new BidirectionalStream<TRequest, TResponse>(
125+
call,
126+
e => { OnRpcError(endpoint, e); },
127+
Config.Credentials);
122128
}
123129

124130
protected abstract (string, GrpcChannel) GetChannel(long nodeId);
@@ -218,13 +224,16 @@ internal class BidirectionalStream<TRequest, TResponse> : IBidirectionalStream<T
218224
{
219225
private readonly AsyncDuplexStreamingCall<TRequest, TResponse> _stream;
220226
private readonly Action<RpcException> _rpcErrorAction;
227+
private readonly ICredentialsProvider _credentialsProvider;
221228

222229
internal BidirectionalStream(
223230
AsyncDuplexStreamingCall<TRequest, TResponse> stream,
224-
Action<RpcException> rpcErrorAction)
231+
Action<RpcException> rpcErrorAction,
232+
ICredentialsProvider credentialsProvider)
225233
{
226234
_stream = stream;
227235
_rpcErrorAction = rpcErrorAction;
236+
_credentialsProvider = credentialsProvider;
228237
}
229238

230239
public async Task Write(TRequest request)
@@ -257,6 +266,8 @@ public async ValueTask<bool> MoveNextAsync()
257266

258267
public TResponse Current => _stream.ResponseStream.Current;
259268

269+
public string? AuthToken => _credentialsProvider.GetAuthInfo();
270+
260271
public void Dispose()
261272
{
262273
_stream.Dispose();

src/Ydb.Sdk/src/Services/Topic/Reader/Reader.cs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -289,7 +289,7 @@ public async void RunProcessingTopic()
289289
{
290290
await foreach (var messageFromClient in _channelFromClientMessageSending.Reader.ReadAllAsync())
291291
{
292-
await Stream.Write(messageFromClient);
292+
await SendMessage(messageFromClient);
293293
}
294294
}
295295
catch (Driver.TransportException e)
@@ -539,4 +539,15 @@ await _channelWriter.WriteAsync(
539539
}
540540
}
541541
}
542+
543+
protected override MessageFromClient GetSendUpdateTokenRequest(string token)
544+
{
545+
return new MessageFromClient
546+
{
547+
UpdateTokenRequest = new UpdateTokenRequest
548+
{
549+
Token = token
550+
}
551+
};
552+
}
542553
}

src/Ydb.Sdk/src/Services/Topic/TopicSession.cs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ internal abstract class TopicSession<TFromClient, TFromServer> : IDisposable
1111
protected readonly string SessionId;
1212

1313
private int _isActive = 1;
14+
private string? _lastToken;
1415

1516
protected TopicSession(
1617
IBidirectionalStream<TFromClient, TFromServer> stream,
@@ -22,6 +23,7 @@ protected TopicSession(
2223
Logger = logger;
2324
SessionId = sessionId;
2425
_initialize = initialize;
26+
_lastToken = stream.AuthToken;
2527
}
2628

2729
public bool IsActive => Volatile.Read(ref _isActive) == 1;
@@ -40,8 +42,26 @@ protected async void ReconnectSession()
4042
await _initialize();
4143
}
4244

45+
protected async Task SendMessage(TFromClient fromClient)
46+
{
47+
var curAuthToken = Stream.AuthToken;
48+
49+
if (!string.Equals(_lastToken, curAuthToken) && curAuthToken != null)
50+
{
51+
var updateTokenRequest = GetSendUpdateTokenRequest(curAuthToken);
52+
53+
_lastToken = curAuthToken;
54+
55+
await Stream.Write(updateTokenRequest);
56+
}
57+
58+
await Stream.Write(fromClient);
59+
}
60+
4361
public void Dispose()
4462
{
4563
Stream.Dispose();
4664
}
65+
66+
protected abstract TFromClient GetSendUpdateTokenRequest(string token);
4767
}

src/Ydb.Sdk/src/Services/Topic/Writer/Writer.cs

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -326,7 +326,7 @@ private async Task Initialize()
326326
}
327327
catch (OperationCanceledException)
328328
{
329-
_logger.LogWarning("Initialize writer is canceled because it has been disposed");
329+
_logger.LogInformation("Initialize writer is canceled because it has been disposed");
330330
}
331331
}
332332

@@ -449,7 +449,7 @@ public async Task Write(ConcurrentQueue<MessageSending> toSendBuffer)
449449
}
450450

451451
Volatile.Write(ref _seqNum, currentSeqNum);
452-
await Stream.Write(new MessageFromClient { WriteRequest = writeMessage });
452+
await SendMessage(new MessageFromClient { WriteRequest = writeMessage });
453453
}
454454
catch (Driver.TransportException e)
455455
{
@@ -531,4 +531,15 @@ Completing task on exception...
531531
ReconnectSession();
532532
}
533533
}
534+
535+
protected override MessageFromClient GetSendUpdateTokenRequest(string token)
536+
{
537+
return new MessageFromClient
538+
{
539+
UpdateTokenRequest = new UpdateTokenRequest
540+
{
541+
Token = token
542+
}
543+
};
544+
}
534545
}

src/Ydb.Sdk/tests/Topic/ReaderUnitTests.cs

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1395,6 +1395,107 @@ public async Task ReadAsync_WhenFailDeserializer_ThrowReaderExceptionAndInvokeRe
13951395
(await Assert.ThrowsAsync<ReaderException>(() => reader.ReadAsync().AsTask())).Message);
13961396
}
13971397

1398+
/*
1399+
*
1400+
Performed invocations:
1401+
1402+
Mock<IBidirectionalStream<StreamReadMessage.Types.FromClient, StreamReadMessage.Types.FromServer>:1> (stream):
1403+
1404+
IBidirectionalStream<StreamReadMessage.Types.FromClient, StreamReadMessage.Types.FromServer>.Write({ "initRequest": { "topicsReadSettings": [ { "path": "/topic" } ], "consumer": "Consumer Tester" } })
1405+
IBidirectionalStream<StreamReadMessage.Types.FromClient, StreamReadMessage.Types.FromServer>.MoveNextAsync()
1406+
IBidirectionalStream<StreamReadMessage.Types.FromClient, StreamReadMessage.Types.FromServer>.Current
1407+
IBidirectionalStream<StreamReadMessage.Types.FromClient, StreamReadMessage.Types.FromServer>.Write({ "readRequest": { "bytesSize": "1000" } })
1408+
IBidirectionalStream<StreamReadMessage.Types.FromClient, StreamReadMessage.Types.FromServer>.AuthToken
1409+
IBidirectionalStream<StreamReadMessage.Types.FromClient, StreamReadMessage.Types.FromServer>.MoveNextAsync()
1410+
IBidirectionalStream<StreamReadMessage.Types.FromClient, StreamReadMessage.Types.FromServer>.Current
1411+
IBidirectionalStream<StreamReadMessage.Types.FromClient, StreamReadMessage.Types.FromServer>.MoveNextAsync()
1412+
IBidirectionalStream<StreamReadMessage.Types.FromClient, StreamReadMessage.Types.FromServer>.AuthToken
1413+
IBidirectionalStream<StreamReadMessage.Types.FromClient, StreamReadMessage.Types.FromServer>.Write({ "startPartitionSessionResponse": { "partitionSessionId": "1" } })
1414+
IBidirectionalStream<StreamReadMessage.Types.FromClient, StreamReadMessage.Types.FromServer>.Current
1415+
IBidirectionalStream<StreamReadMessage.Types.FromClient, StreamReadMessage.Types.FromServer>.MoveNextAsync()
1416+
IBidirectionalStream<StreamReadMessage.Types.FromClient, StreamReadMessage.Types.FromServer>.AuthToken
1417+
IBidirectionalStream<StreamReadMessage.Types.FromClient, StreamReadMessage.Types.FromServer>.Write({ "updateTokenRequest": { "token": "Token2" } })
1418+
IBidirectionalStream<StreamReadMessage.Types.FromClient, StreamReadMessage.Types.FromServer>.Write({ "commitOffsetRequest": { "commitOffsets": [ { "partitionSessionId": "1", "offsets": [ { "end": "1" } ] } ] } })
1419+
IBidirectionalStream<StreamReadMessage.Types.FromClient, StreamReadMessage.Types.FromServer>.Current
1420+
*/
1421+
[Fact]
1422+
public async Task ReadAsync_WhenTokenIsUpdatedOneTime_SuccessUpdateToken()
1423+
{
1424+
_mockStream.SetupSequence(stream => stream.AuthToken)
1425+
.Returns("Token1")
1426+
.Returns("Token1")
1427+
.Returns("Token2")
1428+
.Returns("Token2");
1429+
1430+
var tcsMoveNext = new TaskCompletionSource<bool>();
1431+
var tcsCommitMessage = new TaskCompletionSource<bool>();
1432+
1433+
_mockStream.SetupSequence(stream => stream.Write(It.IsAny<FromClient>()))
1434+
.Returns(Task.CompletedTask)
1435+
.Returns(Task.CompletedTask)
1436+
.Returns(() =>
1437+
{
1438+
tcsMoveNext.SetResult(true);
1439+
1440+
return Task.CompletedTask;
1441+
})
1442+
.Returns(Task.CompletedTask)
1443+
.Returns(() =>
1444+
{
1445+
tcsCommitMessage.SetResult(true);
1446+
1447+
return Task.CompletedTask;
1448+
});
1449+
1450+
_mockStream.SetupSequence(stream => stream.MoveNextAsync())
1451+
.ReturnsAsync(true)
1452+
.ReturnsAsync(true)
1453+
.Returns(new ValueTask<bool>(tcsMoveNext.Task))
1454+
.Returns(new ValueTask<bool>(tcsCommitMessage.Task))
1455+
.Returns(new ValueTask<bool>(new TaskCompletionSource<bool>().Task));
1456+
1457+
_mockStream.SetupSequence(stream => stream.Current)
1458+
.Returns(InitResponseFromServer)
1459+
.Returns(StartPartitionSessionRequest())
1460+
.Returns(ReadResponse(0, BitConverter.GetBytes(100)))
1461+
.Returns(CommitOffsetResponse());
1462+
1463+
using var reader = new ReaderBuilder<int>(_mockIDriver.Object)
1464+
{
1465+
ConsumerName = "Consumer Tester",
1466+
MemoryUsageMaxBytes = 1000,
1467+
SubscribeSettings = { new SubscribeSettings("/topic") }
1468+
}.Build();
1469+
1470+
var message = await reader.ReadAsync();
1471+
await message.CommitAsync();
1472+
Assert.Equal(100, message.Data);
1473+
1474+
_mockStream.Verify(stream => stream.Write(It.IsAny<FromClient>()), Times.Exactly(5));
1475+
_mockStream.Verify(stream => stream.MoveNextAsync(), Times.Between(4, 5, Range.Inclusive));
1476+
_mockStream.Verify(stream => stream.Current, Times.Exactly(4));
1477+
1478+
_mockStream.Verify(stream => stream.Write(It.Is<FromClient>(msg =>
1479+
msg.InitRequest != null &&
1480+
msg.InitRequest.Consumer == "Consumer Tester" &&
1481+
msg.InitRequest.TopicsReadSettings[0].Path == "/topic")));
1482+
_mockStream.Verify(stream => stream.Write(It.Is<FromClient>(msg =>
1483+
msg.ReadRequest != null &&
1484+
msg.ReadRequest.BytesSize == 1000)));
1485+
_mockStream.Verify(stream => stream.Write(It.Is<FromClient>(msg =>
1486+
msg.StartPartitionSessionResponse != null &&
1487+
msg.StartPartitionSessionResponse.PartitionSessionId == 1)));
1488+
_mockStream.Verify(stream => stream.Write(It.Is<FromClient>(msg =>
1489+
msg.ReadRequest != null)));
1490+
_mockStream.Verify(stream => stream.Write(It.Is<FromClient>(msg =>
1491+
msg.CommitOffsetRequest != null &&
1492+
msg.CommitOffsetRequest.CommitOffsets[0].PartitionSessionId == 1 &&
1493+
msg.CommitOffsetRequest.CommitOffsets[0].Offsets[0].End == 1)));
1494+
_mockStream.Verify(stream => stream.Write(It.Is<FromClient>(msg =>
1495+
msg.UpdateTokenRequest != null &&
1496+
msg.UpdateTokenRequest.Token == "Token2")));
1497+
}
1498+
13981499
private class FailDeserializer : IDeserializer<int>
13991500
{
14001501
public int Deserialize(byte[] data)

0 commit comments

Comments
 (0)