Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
- Feat: `Writer.DisposeAsync()` waits for all in-flight messages to complete.
- Feat: `Reader.DisposeAsync()` waits for all pending commits to be completed.
- **Breaking Change**: `IReader` now implements `IAsyncDisposable` instead of `IDisposable`.
This change requires updates to code that disposes `IReader` instances. Use `await using` instead of `using`.
- **Breaking Change**: `IWriter` now implements `IAsyncDisposable` instead of `IDisposable`.
This change requires updates to code that disposes `IWriter` instances. Use `await using` instead of `using`.
- Topic `Reader` & `Writer`: update auth token in bidirectional stream.

## v0.14.1

- Fixed bug: public key presented not for certificate signature.
- Fixed: YdbDataReader does not throw YdbException when CloseAsync is called for UPDATE/INSERT statements with no
result.
Expand Down
2 changes: 1 addition & 1 deletion slo/src/TopicService/SloTopicContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ public async Task Run(RunConfig config)
{
try
{
using var reader = new ReaderBuilder<string>(driver)
await using var reader = new ReaderBuilder<string>(driver)
{
ConsumerName = ConsumerName,
SubscribeSettings =
Expand Down
11 changes: 7 additions & 4 deletions src/Ydb.Sdk/src/IDriver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public IBidirectionalStream<TRequest, TResponse> BidirectionalStreamCall<TReques
ILoggerFactory LoggerFactory { get; }
}

public interface IBidirectionalStream<in TRequest, out TResponse> : IAsyncDisposable
public interface IBidirectionalStream<in TRequest, out TResponse> : IDisposable
{
public Task Write(TRequest request);

Expand All @@ -39,6 +39,8 @@ public interface IBidirectionalStream<in TRequest, out TResponse> : IAsyncDispos
public TResponse Current { get; }

public string? AuthToken { get; }

public Task RequestStreamComplete();
}

public abstract class BaseDriver : IDriver
Expand Down Expand Up @@ -225,7 +227,6 @@ internal class BidirectionalStream<TRequest, TResponse> : IBidirectionalStream<T
private readonly AsyncDuplexStreamingCall<TRequest, TResponse> _stream;
private readonly Action<RpcException> _rpcErrorAction;
private readonly ICredentialsProvider _credentialsProvider;
private readonly TaskCompletionSource _closedResponseStreamTcs = new();

internal BidirectionalStream(
AsyncDuplexStreamingCall<TRequest, TResponse> stream,
Expand Down Expand Up @@ -259,7 +260,6 @@ public async ValueTask<bool> MoveNextAsync()
}
catch (RpcException e)
{
_closedResponseStreamTcs.SetResult();
_rpcErrorAction(e);

throw new Driver.TransportException(e);
Expand All @@ -270,7 +270,7 @@ public async ValueTask<bool> MoveNextAsync()

public string? AuthToken => _credentialsProvider.GetAuthInfo();

public async ValueTask DisposeAsync()
public async Task RequestStreamComplete()
{
try
{
Expand All @@ -280,7 +280,10 @@ public async ValueTask DisposeAsync()
{
_rpcErrorAction(e);
}
}

public void Dispose()
{
_stream.Dispose();
}
}
2 changes: 1 addition & 1 deletion src/Ydb.Sdk/src/Services/Topic/IReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

namespace Ydb.Sdk.Services.Topic;

public interface IReader<TValue> : IDisposable
public interface IReader<TValue> : IAsyncDisposable
{
public ValueTask<Message<TValue>> ReadAsync(CancellationToken cancellationToken = default);

Expand Down
126 changes: 82 additions & 44 deletions src/Ydb.Sdk/src/Services/Topic/Reader/Reader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ internal class Reader<TValue> : IReader<TValue>
private readonly ReaderConfig _config;
private readonly IDeserializer<TValue> _deserializer;
private readonly ILogger _logger;
private readonly GrpcRequestSettings _readerGrpcRequestSettings;
private readonly GrpcRequestSettings _readerGrpcRequestSettings = new();

private ReaderSession<TValue>? _currentReaderSession;

private readonly Channel<InternalBatchMessages<TValue>> _receivedMessagesChannel =
Channel.CreateUnbounded<InternalBatchMessages<TValue>>(
Expand All @@ -41,7 +43,6 @@ internal Reader(IDriver driver, ReaderConfig config, IDeserializer<TValue> deser
_config = config;
_deserializer = deserializer;
_logger = driver.LoggerFactory.CreateLogger<Reader<TValue>>();
_readerGrpcRequestSettings = new GrpcRequestSettings { CancellationToken = _disposeCts.Token };

_ = Initialize();
}
Expand All @@ -68,7 +69,7 @@ public async ValueTask<Message<TValue>> ReadAsync(CancellationToken cancellation
}
}

throw new ObjectDisposedException("Reader");
throw new ReaderException("Reader is disposed");
}

public async ValueTask<BatchMessages<TValue>> ReadBatchAsync(CancellationToken cancellationToken = default)
Expand All @@ -86,7 +87,7 @@ public async ValueTask<BatchMessages<TValue>> ReadBatchAsync(CancellationToken c
}
}

throw new ObjectDisposedException("Reader");
throw new ReaderException("Reader is disposed");
}

private async Task Initialize()
Expand Down Expand Up @@ -185,15 +186,15 @@ await stream.Write(new MessageFromClient
ReadRequest = new StreamReadMessage.Types.ReadRequest { BytesSize = _config.MemoryUsageMaxBytes }
});

new ReaderSession<TValue>(
_currentReaderSession = new ReaderSession<TValue>(
_config,
stream,
initResponse.SessionId,
Initialize,
_logger,
_receivedMessagesChannel.Writer,
_deserializer
).RunProcessingTopic();
);
}
catch (Driver.TransportException e)
{
Expand All @@ -203,18 +204,19 @@ await stream.Write(new MessageFromClient
}
}

public void Dispose()
public async ValueTask DisposeAsync()
{
try
{
_receivedMessagesChannel.Writer.TryComplete();

_disposeCts.Cancel();
}
finally
if (_disposeCts.IsCancellationRequested)
{
_disposeCts.Dispose();
return;
}

_receivedMessagesChannel.Writer.TryComplete();
_disposeCts.Cancel();

await (_currentReaderSession?.DisposeAsync() ?? ValueTask.CompletedTask);

_logger.LogInformation("Reader[{WriterConfig}] is disposed", _config);
}
}

Expand Down Expand Up @@ -247,6 +249,8 @@ internal class ReaderSession<TValue> : TopicSession<MessageFromClient, MessageFr
private readonly ChannelWriter<InternalBatchMessages<TValue>> _channelWriter;
private readonly CancellationTokenSource _lifecycleReaderSessionCts = new();
private readonly IDeserializer<TValue> _deserializer;
private readonly Task _runProcessingStreamResponse;
private readonly Task _runProcessingStreamRequest;

private readonly Channel<MessageFromClient> _channelFromClientMessageSending =
Channel.CreateUnbounded<MessageFromClient>(
Expand Down Expand Up @@ -279,29 +283,13 @@ IDeserializer<TValue> deserializer
_readerConfig = config;
_channelWriter = channelWriter;
_deserializer = deserializer;

_runProcessingStreamResponse = RunProcessingStreamResponse();
_runProcessingStreamRequest = RunProcessingStreamRequest();
}

public async void RunProcessingTopic()
private async Task RunProcessingStreamResponse()
{
_ = Task.Run(async () =>
{
try
{
await foreach (var messageFromClient in _channelFromClientMessageSending.Reader.ReadAllAsync())
{
await SendMessage(messageFromClient);
}
}
catch (Driver.TransportException e)
{
Logger.LogError(e, "ReaderSession[{SessionId}] have transport error on Write", SessionId);

ReconnectSession();

_lifecycleReaderSessionCts.Cancel();
}
});

try
{
while (await Stream.MoveNextAsync())
Expand Down Expand Up @@ -357,6 +345,25 @@ public async void RunProcessingTopic()
}
}

private async Task RunProcessingStreamRequest()
{
try
{
await foreach (var messageFromClient in _channelFromClientMessageSending.Reader.ReadAllAsync())
{
await SendMessage(messageFromClient);
}
}
catch (Driver.TransportException e)
{
Logger.LogError(e, "ReaderSession[{SessionId}] have transport error on Write", SessionId);

ReconnectSession();

_lifecycleReaderSessionCts.Cancel();
}
}

internal async void TryReadRequestBytes(long bytes)
{
var readRequestBytes = Interlocked.Add(ref _readRequestBytes, bytes);
Expand Down Expand Up @@ -465,21 +472,28 @@ public async Task CommitOffsetRange(OffsetsRange offsetsRange, long partitionSes
{
partitionSession.RegisterCommitRequest(commitSending);

await _channelFromClientMessageSending.Writer.WriteAsync(new MessageFromClient
{
CommitOffsetRequest = new StreamReadMessage.Types.CommitOffsetRequest
try
{
await _channelFromClientMessageSending.Writer.WriteAsync(new MessageFromClient
{
CommitOffsets =
CommitOffsetRequest = new StreamReadMessage.Types.CommitOffsetRequest
{
new StreamReadMessage.Types.CommitOffsetRequest.Types.PartitionCommitOffset
CommitOffsets =
{
Offsets = { commitSending.OffsetsRange },
PartitionSessionId = partitionSessionId
new StreamReadMessage.Types.CommitOffsetRequest.Types.PartitionCommitOffset
{
Offsets = { commitSending.OffsetsRange },
PartitionSessionId = partitionSessionId
}
}
}
}
}
);
);
}
catch (ChannelClosedException)
{
throw new ReaderException("Reader is disposed");
}
}
else
{
Expand Down Expand Up @@ -550,4 +564,28 @@ protected override MessageFromClient GetSendUpdateTokenRequest(string token)
}
};
}

public override async ValueTask DisposeAsync()
{
Logger.LogDebug("ReaderSession[{SessionId}]: start dispose process", SessionId);

_channelFromClientMessageSending.Writer.Complete();

try
{
await _runProcessingStreamRequest;
await Stream.RequestStreamComplete();
await _runProcessingStreamResponse; // waiting all ack's commits

_lifecycleReaderSessionCts.Cancel();
}
catch (Exception e)
{
Logger.LogError(e, "ReaderSession[{SessionId}]: error on disposing", SessionId);
}
finally
{
Stream.Dispose();
}
}
}
7 changes: 1 addition & 6 deletions src/Ydb.Sdk/src/Services/Topic/TopicSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,5 @@ protected async Task SendMessage(TFromClient fromClient)

protected abstract TFromClient GetSendUpdateTokenRequest(string token);

public ValueTask DisposeAsync()
{
Logger.LogInformation("TopicSession[{SessionId}] is being deleted", SessionId);

return Stream.DisposeAsync();
}
public abstract ValueTask DisposeAsync();
}
17 changes: 12 additions & 5 deletions src/Ydb.Sdk/src/Services/Topic/Writer/Writer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ private async Task Initialize()
_logger.LogError("Stream unexpectedly closed by YDB server. Current InitRequest: {initRequest}",
initRequest);

_ = Task.Run(Initialize, _disposeCts.Token);
_ = Task.Run(Initialize);

return;
}
Expand All @@ -239,7 +239,7 @@ private async Task Initialize()
{
_logger.LogError("Writer initialization failed to start. Reason: {Status}", status);

_ = Task.Run(Initialize, _disposeCts.Token);
_ = Task.Run(Initialize);
}
else
{
Expand Down Expand Up @@ -323,7 +323,7 @@ private async Task Initialize()
{
_logger.LogError(e, "Transport error on creating WriterSession");

_ = Task.Run(Initialize, _disposeCts.Token);
_ = Task.Run(Initialize);
}
catch (OperationCanceledException)
{
Expand All @@ -338,8 +338,6 @@ public async ValueTask DisposeAsync()
return;
}

_logger.LogInformation("Starting Writer[{WriterConfig}] disposal process", _config);

await _sendInFlightMessagesSemaphoreSlim.WaitAsync();
try
{
Expand Down Expand Up @@ -593,4 +591,13 @@ protected override MessageFromClient GetSendUpdateTokenRequest(string token)
}
};
}

public override async ValueTask DisposeAsync()
{
Logger.LogDebug("WriterSession[{SessionId}]: start dispose process", SessionId);

await Stream.RequestStreamComplete();

Stream.Dispose();
}
}
4 changes: 2 additions & 2 deletions src/Ydb.Sdk/tests/Topic/ReaderIntegrationTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public async Task StressTest_WhenReadingThenCommiting_ReturnMessages()
await message.CommitAsync();
}

reader.Dispose();
await reader.DisposeAsync();

var readerNext = new ReaderBuilder<string>(_driver)
{
Expand All @@ -59,7 +59,7 @@ public async Task StressTest_WhenReadingThenCommiting_ReturnMessages()
await message.CommitAsync();
}

readerNext.Dispose();
await readerNext.DisposeAsync();

await topicClient.DropTopic(new DropTopicSettings { Path = _topicName });
}
Expand Down
Loading
Loading