Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/slo-topic.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ jobs:
cd slo/src/TopicService
dotnet run create grpc://localhost:2135 /Root/testdb
- name: Run SLO Tests
continue-on-error: true
run: |
cd slo/src/TopicService
dotnet run run grpc://localhost:2135 /Root/testdb \
Expand Down
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
- Topic Reader & Writer: update auth token in bidirectional stream.
- **Breaking Change**: `IWriter` now implements `IAsyncDisposable` instead of `IDisposable`.
This change requires updates to code that disposes `IWriter` instances. Use `await using` instead of `using`.
- Topic `Reader` & `Writer`: update auth token in bidirectional stream.

## v0.14.1
- Fixed bug: public key presented not for certificate signature.
Expand Down
2 changes: 1 addition & 1 deletion slo/src/TopicService/SloTopicContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public async Task Run(RunConfig config)
{
try
{
using var writer = new WriterBuilder<string>(driver, PathTopic)
await using var writer = new WriterBuilder<string>(driver, PathTopic)
{
BufferMaxSize = 8 * 1024 * 1024,
ProducerId = "producer-" + partitionId,
Expand Down
15 changes: 13 additions & 2 deletions src/Ydb.Sdk/src/IDriver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public IBidirectionalStream<TRequest, TResponse> BidirectionalStreamCall<TReques
ILoggerFactory LoggerFactory { get; }
}

public interface IBidirectionalStream<in TRequest, out TResponse> : IDisposable
public interface IBidirectionalStream<in TRequest, out TResponse> : IAsyncDisposable
{
public Task Write(TRequest request);

Expand Down Expand Up @@ -225,6 +225,7 @@ internal class BidirectionalStream<TRequest, TResponse> : IBidirectionalStream<T
private readonly AsyncDuplexStreamingCall<TRequest, TResponse> _stream;
private readonly Action<RpcException> _rpcErrorAction;
private readonly ICredentialsProvider _credentialsProvider;
private readonly TaskCompletionSource _closedResponseStreamTcs = new();

internal BidirectionalStream(
AsyncDuplexStreamingCall<TRequest, TResponse> stream,
Expand Down Expand Up @@ -258,6 +259,7 @@ public async ValueTask<bool> MoveNextAsync()
}
catch (RpcException e)
{
_closedResponseStreamTcs.SetResult();
_rpcErrorAction(e);

throw new Driver.TransportException(e);
Expand All @@ -268,8 +270,17 @@ public async ValueTask<bool> MoveNextAsync()

public string? AuthToken => _credentialsProvider.GetAuthInfo();

public void Dispose()
public async ValueTask DisposeAsync()
{
try
{
await _stream.RequestStream.CompleteAsync();
}
catch (RpcException e)
{
_rpcErrorAction(e);
}

_stream.Dispose();
}
}
2 changes: 1 addition & 1 deletion src/Ydb.Sdk/src/Services/Topic/IWriter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

namespace Ydb.Sdk.Services.Topic;

public interface IWriter<TValue> : IDisposable
public interface IWriter<TValue> : IAsyncDisposable
{
/// <summary>
/// Asynchronously send a data to a YDB Topic.
Expand Down
12 changes: 7 additions & 5 deletions src/Ydb.Sdk/src/Services/Topic/TopicSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

namespace Ydb.Sdk.Services.Topic;

internal abstract class TopicSession<TFromClient, TFromServer> : IDisposable
internal abstract class TopicSession<TFromClient, TFromServer> : IAsyncDisposable
{
private readonly Func<Task> _initialize;

Expand Down Expand Up @@ -58,10 +58,12 @@ protected async Task SendMessage(TFromClient fromClient)
await Stream.Write(fromClient);
}

public void Dispose()
protected abstract TFromClient GetSendUpdateTokenRequest(string token);

public ValueTask DisposeAsync()
{
Stream.Dispose();
}
Logger.LogInformation("TopicSession[{SessionId}] is being deleted", SessionId);

protected abstract TFromClient GetSendUpdateTokenRequest(string token);
return Stream.DisposeAsync();
}
}
79 changes: 65 additions & 14 deletions src/Ydb.Sdk/src/Services/Topic/Writer/Writer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@ internal class Writer<TValue> : IWriter<TValue>
private readonly WriterConfig _config;
private readonly ILogger<Writer<TValue>> _logger;
private readonly ISerializer<TValue> _serializer;
private readonly GrpcRequestSettings _writerGrpcRequestSettings;
private readonly GrpcRequestSettings _writerGrpcRequestSettings = new();
private readonly ConcurrentQueue<MessageSending> _toSendBuffer = new();
private readonly ConcurrentQueue<MessageSending> _inFlightMessages = new();
private readonly CancellationTokenSource _disposeCts = new();
private readonly SemaphoreSlim _clearInFlightMessagesSemaphoreSlim = new(1);
private readonly SemaphoreSlim _sendInFlightMessagesSemaphoreSlim = new(1);

private volatile TaskCompletionSource _tcsWakeUp = new();
private volatile TaskCompletionSource _tcsBufferAvailableEvent = new();
Expand All @@ -39,7 +39,6 @@ internal Writer(IDriver driver, WriterConfig config, ISerializer<TValue> seriali
_logger = driver.LoggerFactory.CreateLogger<Writer<TValue>>();
_serializer = serializer;
_limitBufferMaxSize = config.BufferMaxSize;
_writerGrpcRequestSettings = new GrpcRequestSettings { CancellationToken = _disposeCts.Token };

StartWriteWorker();
}
Expand Down Expand Up @@ -95,7 +94,9 @@ public async Task<WriteResult> WriteAsync(Message<TValue> message, CancellationT
if (Interlocked.CompareExchange(ref _limitBufferMaxSize,
curLimitBufferSize - data.Length, curLimitBufferSize) == curLimitBufferSize)
{
_toSendBuffer.Enqueue(new MessageSending(messageData, tcs));
_toSendBuffer.Enqueue(
new MessageSending(messageData, tcs, writerDisposedCancellationTokenRegistration)
);
WakeUpWorker();

break;
Expand Down Expand Up @@ -162,7 +163,7 @@ private async void StartWriteWorker()
continue;
}

await _clearInFlightMessagesSemaphoreSlim.WaitAsync(_disposeCts.Token);
await _sendInFlightMessagesSemaphoreSlim.WaitAsync(_disposeCts.Token);
try
{
if (_session.IsActive)
Expand All @@ -172,7 +173,7 @@ private async void StartWriteWorker()
}
finally
{
_clearInFlightMessagesSemaphoreSlim.Release();
_sendInFlightMessagesSemaphoreSlim.Release();
}
}
}
Expand All @@ -193,7 +194,7 @@ private async Task Initialize()

try
{
if (_disposeCts.IsCancellationRequested)
if (_disposeCts.IsCancellationRequested && _inFlightMessages.IsEmpty)
{
_logger.LogWarning("Initialize writer is canceled because it has been disposed");

Expand Down Expand Up @@ -267,7 +268,7 @@ private async Task Initialize()
return;
}

await _clearInFlightMessagesSemaphoreSlim.WaitAsync(_disposeCts.Token);
await _sendInFlightMessagesSemaphoreSlim.WaitAsync();
try
{
var copyInFlightMessages = new ConcurrentQueue<MessageSending>();
Expand Down Expand Up @@ -315,7 +316,7 @@ private async Task Initialize()
}
finally
{
_clearInFlightMessagesSemaphoreSlim.Release();
_sendInFlightMessagesSemaphoreSlim.Release();
}
}
catch (Driver.TransportException e)
Expand All @@ -330,17 +331,55 @@ private async Task Initialize()
}
}

public void Dispose()
public async ValueTask DisposeAsync()
{
_disposeCts.Cancel();
if (_disposeCts.IsCancellationRequested)
{
return;
}

_logger.LogInformation("Starting Writer[{WriterConfig}] disposal process", _config);

await _sendInFlightMessagesSemaphoreSlim.WaitAsync();
try
{
_logger.LogDebug("Signaling cancellation token to stop writing new messages");

_disposeCts.Cancel();
}
finally
{
_sendInFlightMessagesSemaphoreSlim.Release();
}

_logger.LogDebug("Writer[{WriterConfig}] is waiting for all in-flight messages to complete...", _config);

foreach (var inFlightMessage in _inFlightMessages)
{
try
{
await inFlightMessage.Tcs.Task;
}
catch (Exception e)
{
_logger.LogError(e, "Error occurred while waiting for in-flight message SeqNo: {SeqNo}",
inFlightMessage.MessageData.SeqNo);
}
}

await _session.DisposeAsync();

_session = new NotStartedWriterSession("Writer is disposed");
_logger.LogInformation("Writer[{WriterConfig}] is disposed", _config);
}
}

internal record MessageSending(MessageData MessageData, TaskCompletionSource<WriteResult> Tcs);
internal record MessageSending(
MessageData MessageData,
TaskCompletionSource<WriteResult> Tcs,
CancellationTokenRegistration DisposedCtr
);

internal interface IWriteSession
internal interface IWriteSession : IAsyncDisposable
{
Task Write(ConcurrentQueue<MessageSending> toSendBuffer);

Expand Down Expand Up @@ -372,6 +411,11 @@ public Task Write(ConcurrentQueue<MessageSending> toSendBuffer)
}

public bool IsActive => true;

public ValueTask DisposeAsync()
{
return ValueTask.CompletedTask;
}
}

internal class DummyWriterSession : IWriteSession
Expand All @@ -388,6 +432,11 @@ public Task Write(ConcurrentQueue<MessageSending> toSendBuffer)
}

public bool IsActive => false;

public ValueTask DisposeAsync()
{
return ValueTask.CompletedTask;
}
}

internal class WriterSession : TopicSession<MessageFromClient, MessageFromServer>, IWriteSession
Expand Down Expand Up @@ -437,6 +486,8 @@ public async Task Write(ConcurrentQueue<MessageSending> toSendBuffer)
continue;
}

sendData.DisposedCtr.Unregister();

var messageData = sendData.MessageData;

if (messageData.SeqNo == default)
Expand Down
5 changes: 5 additions & 0 deletions src/Ydb.Sdk/src/Services/Topic/Writer/WriterConfig.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@ public override string ToString()
toString.Append(", ProducerId: ").Append(ProducerId);
}

if (PartitionId != null)
{
toString.Append(", PartitionId: ").Append(PartitionId);
}

return toString.Append(", Codec: ").Append(Codec).ToString();
}
}
2 changes: 1 addition & 1 deletion src/Ydb.Sdk/tests/Topic/ReaderIntegrationTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public async Task StressTest_WhenReadingThenCommiting_ReturnMessages()
topicSettings.Consumers.Add(new Consumer("Consumer"));
await topicClient.CreateTopic(topicSettings);

using var writer = new WriterBuilder<string>(_driver, _topicName)
await using var writer = new WriterBuilder<string>(_driver, _topicName)
{ ProducerId = "producerId" }.Build();
var reader = new ReaderBuilder<string>(_driver)
{
Expand Down
6 changes: 3 additions & 3 deletions src/Ydb.Sdk/tests/Topic/WriterIntegrationTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public async Task WriteAsync_WhenOneMessage_ReturnWritten()
};
await topicClient.CreateTopic(topicSettings);

using var writer = new WriterBuilder<string>(_driver, _topicName) { ProducerId = "producerId" }.Build();
await using var writer = new WriterBuilder<string>(_driver, _topicName) { ProducerId = "producerId" }.Build();

var result = await writer.WriteAsync("abacaba");

Expand All @@ -42,7 +42,7 @@ public async Task WriteAsync_WhenOneMessage_ReturnWritten()
[Fact]
public async Task WriteAsync_WhenTopicNotFound_ReturnNotFoundException()
{
using var writer = new WriterBuilder<string>(_driver, _topicName + "_not_found")
await using var writer = new WriterBuilder<string>(_driver, _topicName + "_not_found")
{ ProducerId = "producerId" }.Build();

Assert.Contains(
Expand All @@ -61,7 +61,7 @@ public async Task WriteAsync_When1000Messages_ReturnWriteResultIsPersisted()
topicSettings.Consumers.Add(new Consumer("Consumer"));
await topicClient.CreateTopic(topicSettings);

using var writer = new WriterBuilder<int>(_driver, topicName)
await using var writer = new WriterBuilder<int>(_driver, topicName)
{ ProducerId = "producerId" }.Build();

var tasks = new List<Task>();
Expand Down
Loading
Loading