Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ jobs:
strategy:
fail-fast: false
matrix:
ydb-version: [ 'latest', '24.1' ]
ydb-version: [ 'latest', 'trunk' ]
dotnet-version: [ 6.0.x, 7.0.x ]
include:
- dotnet-version: 6.0.x
Expand All @@ -181,7 +181,7 @@ jobs:
uses: actions/setup-dotnet@v4
with:
dotnet-version: ${{ matrix.dotnet-version }}
- name: Run ADO.NET examples
- name: Run ADO.NET example
run: |
docker cp ydb-local:/ydb_certs/ca.pem ~/
cd ./examples/src/AdoNet
Expand All @@ -190,3 +190,7 @@ jobs:
run: |
cd ./examples/src/DapperExample
dotnet run
- name: YDB Topic example
run: |
cd ./examples/src/Topic
dotnet run
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
- Fixed Writer: possible creation of a session after `DisposeAsync()`, which this could happen when there are canceled tasks in `InFlightMessages`.
- Dev: `Writer.MoveNext()` changed exception on cancelToken from `WriterException` to `TaskCanceledException`.
- Dev: changed log level from `Warning` to `Information` in `(Reader / Writer).Initialize()` when it is disposed.

## v0.15.0
- Dev: added `ValueTask<string?> GetAuthInfoAsync()` in ICredentialProvider.
- Feat: `Writer.DisposeAsync()` waits for all in-flight messages to complete.
Expand Down
84 changes: 84 additions & 0 deletions examples/src/Topic/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
using Microsoft.Extensions.Logging;
using Ydb.Sdk;
using Ydb.Sdk.Services.Topic;
using Ydb.Sdk.Services.Topic.Reader;
using Ydb.Sdk.Services.Topic.Writer;

const int countMessages = 100;
const string topicName = "topic_name";

var loggerFactory = LoggerFactory.Create(builder => builder.AddConsole().SetMinimumLevel(LogLevel.Information));

var logger = loggerFactory.CreateLogger<Program>();

var config = new DriverConfig(
endpoint: "grpc://localhost:2136",
database: "/local"
);

await using var driver = await Driver.CreateInitialized(
config: config,
loggerFactory: loggerFactory
);

var topicClient = new TopicClient(driver);

await topicClient.CreateTopic(new CreateTopicSettings
{
Path = topicName,
Consumers = { new Consumer("Consumer_Example") }
});

var readerCts = new CancellationTokenSource();

var writerJob = Task.Run(async () =>
{
// ReSharper disable once AccessToDisposedClosure
await using var writer = new WriterBuilder<string>(driver, topicName)
{
ProducerId = "ProducerId_Example"
}.Build();

for (var i = 0; i < countMessages; i++)
{
await writer.WriteAsync($"Message num {i}: Hello Example YDB Topics!");
}

readerCts.CancelAfter(TimeSpan.FromSeconds(3));
});

var readerJob = Task.Run(async () =>
{
// ReSharper disable once AccessToDisposedClosure
await using var reader = new ReaderBuilder<string>(driver)
{
ConsumerName = "Consumer_Example",
SubscribeSettings = { new SubscribeSettings(topicName) }
}.Build();

try
{
while (!readerCts.IsCancellationRequested)
{
var message = await reader.ReadAsync(readerCts.Token);

logger.LogInformation("Received message: [{MessageData}]", message.Data);

try
{
await message.CommitAsync();
}
catch (Exception e)
{
logger.LogError(e, "Failed commit message");
}
}
}
catch (OperationCanceledException)
{
}
});

await writerJob;
await readerJob;
await topicClient.DropTopic(new DropTopicSettings { Path = topicName });
17 changes: 17 additions & 0 deletions examples/src/Topic/Topic.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net8.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Microsoft.Extensions.Logging.Console" Version="5.0.0" />
<PackageReference Include="Microsoft.Extensions.Logging" Version="5.0.0" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\..\..\src\Ydb.Sdk\src\Ydb.Sdk.csproj"/>
</ItemGroup>
</Project>
6 changes: 6 additions & 0 deletions examples/src/YdbExamples.sln
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "DapperExample", "DapperExam
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "YC", "YC\YC.csproj", "{753E4F33-CB08-47B9-864F-4CC037B278C4}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Topic", "Topic\Topic.csproj", "{0FB9A1C2-4B0C-4AE4-9FA2-E0ED37802A6E}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -45,6 +47,10 @@ Global
{753E4F33-CB08-47B9-864F-4CC037B278C4}.Debug|Any CPU.Build.0 = Debug|Any CPU
{753E4F33-CB08-47B9-864F-4CC037B278C4}.Release|Any CPU.ActiveCfg = Release|Any CPU
{753E4F33-CB08-47B9-864F-4CC037B278C4}.Release|Any CPU.Build.0 = Release|Any CPU
{0FB9A1C2-4B0C-4AE4-9FA2-E0ED37802A6E}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{0FB9A1C2-4B0C-4AE4-9FA2-E0ED37802A6E}.Debug|Any CPU.Build.0 = Debug|Any CPU
{0FB9A1C2-4B0C-4AE4-9FA2-E0ED37802A6E}.Release|Any CPU.ActiveCfg = Release|Any CPU
{0FB9A1C2-4B0C-4AE4-9FA2-E0ED37802A6E}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down
6 changes: 5 additions & 1 deletion src/Ydb.Sdk/src/Services/Topic/Reader/Reader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ private async Task Initialize()
{
if (_disposeCts.IsCancellationRequested)
{
_logger.LogWarning("Reader writer is canceled because it has been disposed");
_logger.LogDebug("Initialize Reader[{ReaderConfig}] is stopped because it has been disposed", _config);

return;
}
Expand Down Expand Up @@ -335,6 +335,8 @@ private async Task RunProcessingStreamResponse()
throw new ArgumentOutOfRangeException();
}
}

Logger.LogInformation("ReaderSession[{SessionId}]: ResponseStream is closed", SessionId);
}
catch (Driver.TransportException e)
{
Expand Down Expand Up @@ -579,6 +581,8 @@ public override async ValueTask DisposeAsync()
{
await _runProcessingStreamRequest;
await Stream.RequestStreamComplete();
Logger.LogInformation("ReaderSession[{SessionId}]: RequestStream is closed", SessionId);

await _runProcessingStreamResponse; // waiting all ack's commits

_lifecycleReaderSessionCts.Cancel();
Expand Down
2 changes: 1 addition & 1 deletion src/Ydb.Sdk/src/Services/Topic/TopicSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ protected async void ReconnectSession()
return;
}

Logger.LogInformation("TopicSession[{SessionId}] has been deactivated, starting to reconnect", SessionId);
Logger.LogDebug("TopicSession[{SessionId}] has been deactivated, starting to reconnect", SessionId);

await _initialize();
}
Expand Down
20 changes: 12 additions & 8 deletions src/Ydb.Sdk/src/Services/Topic/Writer/Writer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ internal class Writer<TValue> : IWriter<TValue>
private volatile TaskCompletionSource _tcsBufferAvailableEvent = new();
private volatile IWriteSession _session = null!;
private volatile int _limitBufferMaxSize;
private volatile bool _isStopped;

internal Writer(IDriver driver, WriterConfig config, ISerializer<TValue> serializer)
{
Expand All @@ -52,9 +53,7 @@ public async Task<WriteResult> WriteAsync(Message<TValue> message, CancellationT
{
TaskCompletionSource<WriteResult> tcs = new();
await using var registrationUserCancellationTokenRegistration = cancellationToken.Register(
() => tcs.TrySetException(
new WriterException("The write operation was canceled before it could be completed")
), useSynchronizationContext: false
() => tcs.TrySetCanceled(), useSynchronizationContext: false
);
await using var writerDisposedCancellationTokenRegistration = _disposeCts.Token.Register(
() => tcs.TrySetException(new WriterException($"Writer[{_config}] is disposed")),
Expand Down Expand Up @@ -194,9 +193,9 @@ private async Task Initialize()

try
{
if (_disposeCts.IsCancellationRequested && _inFlightMessages.IsEmpty)
if (_isStopped)
{
_logger.LogWarning("Initialize writer is canceled because it has been disposed");
_logger.LogDebug("Initialize Writer[{WriterConfig}] is stopped because it has been disposed", _config);

return;
}
Expand Down Expand Up @@ -313,7 +312,6 @@ private async Task Initialize()
}

_session = newSession;
newSession.RunProcessingWriteAck();
WakeUpWorker(); // attempt send buffer
}
finally
Expand Down Expand Up @@ -367,6 +365,8 @@ public async ValueTask DisposeAsync()
}
}

_isStopped = true;

await _session.DisposeAsync();

_logger.LogInformation("Writer[{WriterConfig}] is disposed", _config);
Expand Down Expand Up @@ -443,6 +443,7 @@ internal class WriterSession : TopicSession<MessageFromClient, MessageFromServer
{
private readonly WriterConfig _config;
private readonly ConcurrentQueue<MessageSending> _inFlightMessages;
private readonly Task _processingResponseStream;

private long _seqNum;

Expand All @@ -466,6 +467,8 @@ ConcurrentQueue<MessageSending> inFlightMessages
_config = config;
_inFlightMessages = inFlightMessages;
Volatile.Write(ref _seqNum, lastSeqNo); // happens-before for Volatile.Read

_processingResponseStream = RunProcessingWriteAck();
}

public async Task Write(ConcurrentQueue<MessageSending> toSendBuffer)
Expand Down Expand Up @@ -513,7 +516,7 @@ public async Task Write(ConcurrentQueue<MessageSending> toSendBuffer)
}
}

internal async void RunProcessingWriteAck()
private async Task RunProcessingWriteAck()
{
try
{
Expand Down Expand Up @@ -573,7 +576,7 @@ Completing task on exception...
}
}

Logger.LogWarning("WriterSession[{SessionId}]: stream is closed", SessionId);
Logger.LogInformation("WriterSession[{SessionId}]: ResponseStream is closed", SessionId);
}
catch (Driver.TransportException e)
{
Expand Down Expand Up @@ -601,6 +604,7 @@ public override async ValueTask DisposeAsync()
Logger.LogDebug("WriterSession[{SessionId}]: start dispose process", SessionId);

await Stream.RequestStreamComplete();
await _processingResponseStream;

Stream.Dispose();
}
Expand Down
36 changes: 24 additions & 12 deletions src/Ydb.Sdk/tests/Topic/WriterUnitTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,7 @@ public async Task Initialize_WhenNotSupportedCodec_ThrowWriterExceptionOnWriteAs
public async Task WriteAsync_WhenTransportExceptionOnWriteInWriterSession_ShouldReconnectAndReturnWriteResult()
{
var moveTcs = new TaskCompletionSource<bool>();
var moveTcsRetry = new TaskCompletionSource<bool>();

_mockStream.SetupSequence(stream => stream.Write(It.IsAny<FromClient>()))
.Returns(Task.CompletedTask)
Expand All @@ -367,12 +368,17 @@ public async Task WriteAsync_WhenTransportExceptionOnWriteInWriterSession_Should
return new Driver.TransportException(new RpcException(Grpc.Core.Status.DefaultCancelled));
})
.Returns(Task.CompletedTask)
.Returns(Task.CompletedTask);
.Returns(() =>
{
moveTcsRetry.SetResult(true);

return Task.CompletedTask;
});
_mockStream.SetupSequence(stream => stream.MoveNextAsync())
.ReturnsAsync(true)
.Returns(new ValueTask<bool>(moveTcs.Task))
.ReturnsAsync(true)
.ReturnsAsync(true)
.Returns(new ValueTask<bool>(moveTcsRetry.Task))
.Returns(_lastMoveNext);
_mockStream.SetupSequence(stream => stream.Current)
.Returns(new StreamWriteMessage.Types.FromServer
Expand Down Expand Up @@ -569,7 +575,6 @@ public async Task WriteAsync_WhenStreamIsClosingOnProcessingWriteAck_ShouldRecon
public async Task WriteAsync_WhenCancellationTokenIsClosed_ThrowCancellationException()
{
var cancellationTokenSource = new CancellationTokenSource();
var nextCompleted = new TaskCompletionSource<bool>();
_mockStream.Setup(stream => stream.Write(It.IsAny<FromClient>()))
.Returns(Task.CompletedTask);
_mockStream.SetupSequence(stream => stream.MoveNextAsync())
Expand All @@ -582,10 +587,8 @@ public async Task WriteAsync_WhenCancellationTokenIsClosed_ThrowCancellationExce

var task = writer.WriteAsync(123L, cancellationTokenSource.Token);
cancellationTokenSource.Cancel();
nextCompleted.SetResult(true);

Assert.Equal("The write operation was canceled before it could be completed",
(await Assert.ThrowsAsync<WriterException>(() => task)).Message);
await Assert.ThrowsAsync<TaskCanceledException>(() => task);
}

[Fact]
Expand Down Expand Up @@ -638,6 +641,7 @@ public async Task WriteAsync_WhenInFlightBufferSendInInitialize_ReturnCompletedT
var writeTcs2 = new TaskCompletionSource();
var writeTcs3 = new TaskCompletionSource();
var moveTcs = new TaskCompletionSource<bool>();
var moveTcsRetry = new TaskCompletionSource<bool>();

_mockStream.SetupSequence(stream => stream.Write(It.IsAny<FromClient>()))
.Returns(Task.CompletedTask)
Expand All @@ -657,13 +661,17 @@ public async Task WriteAsync_WhenInFlightBufferSendInInitialize_ReturnCompletedT
return Task.CompletedTask;
})
.Returns(Task.CompletedTask)
.Returns(Task.CompletedTask);
.Returns(() =>
{
moveTcsRetry.SetResult(true);
return Task.CompletedTask;
});

_mockStream.SetupSequence(stream => stream.MoveNextAsync())
.ReturnsAsync(true)
.Returns(new ValueTask<bool>(moveTcs.Task))
.ReturnsAsync(true)
.ReturnsAsync(true)
.Returns(new ValueTask<bool>(moveTcsRetry.Task))
.Returns(_lastMoveNext);
_mockStream.SetupSequence(stream => stream.Current)
.Returns(new StreamWriteMessage.Types.FromServer
Expand Down Expand Up @@ -713,8 +721,7 @@ public async Task WriteAsync_WhenInFlightBufferSendInInitialize_ReturnCompletedT

moveTcs.SetResult(false); // Fail write ack stream => start reconnect

Assert.Equal("The write operation was canceled before it could be completed",
(await Assert.ThrowsAsync<WriterException>(() => runTaskWithCancel)).Message);
await Assert.ThrowsAsync<TaskCanceledException>(() => runTaskWithCancel);
Assert.Equal(PersistenceStatus.AlreadyWritten, (await runTask1).Status);
Assert.Equal(PersistenceStatus.Written, (await runTask2).Status);

Expand Down Expand Up @@ -863,6 +870,7 @@ public async Task DisposeAsync_WhenInFlightMessages_WaitingInFlightMessages()
{
var tcsDetectedWrite = new TaskCompletionSource();
var writeTcs1 = new TaskCompletionSource<bool>();
var moveTcsRetry = new TaskCompletionSource<bool>();

_mockStream.SetupSequence(stream => stream.Write(It.IsAny<FromClient>()))
.Returns(Task.CompletedTask)
Expand All @@ -872,12 +880,16 @@ public async Task DisposeAsync_WhenInFlightMessages_WaitingInFlightMessages()
return Task.CompletedTask;
})
.Returns(Task.CompletedTask)
.Returns(Task.CompletedTask);
.Returns(() =>
{
moveTcsRetry.SetResult(true);
return Task.CompletedTask;
});
_mockStream.SetupSequence(stream => stream.MoveNextAsync())
.ReturnsAsync(true)
.Returns(new ValueTask<bool>(writeTcs1.Task))
.ReturnsAsync(true)
.ReturnsAsync(true)
.Returns(new ValueTask<bool>(moveTcsRetry.Task))
.Returns(_lastMoveNext);

_mockStream.SetupSequence(stream => stream.Current)
Expand Down
Loading