Skip to content

Commit 5d5ad0c

Browse files
dev: ydb topic example & micro changes writer
1 parent aed3ca9 commit 5d5ad0c

File tree

6 files changed

+120
-12
lines changed

6 files changed

+120
-12
lines changed

examples/src/Topic/Program.cs

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
using Microsoft.Extensions.Logging;
2+
using Ydb.Sdk;
3+
using Ydb.Sdk.Services.Topic;
4+
using Ydb.Sdk.Services.Topic.Reader;
5+
using Ydb.Sdk.Services.Topic.Writer;
6+
7+
const int countMessages = 100;
8+
const string topicName = "topic_name";
9+
10+
var loggerFactory = LoggerFactory.Create(builder => builder.AddConsole().SetMinimumLevel(LogLevel.Information));
11+
12+
var logger = loggerFactory.CreateLogger<Program>();
13+
14+
var config = new DriverConfig(
15+
endpoint: "grpc://localhost:2136",
16+
database: "/local"
17+
);
18+
19+
await using var driver = await Driver.CreateInitialized(
20+
config: config,
21+
loggerFactory: loggerFactory
22+
);
23+
24+
var topicClient = new TopicClient(driver);
25+
26+
await topicClient.CreateTopic(new CreateTopicSettings
27+
{
28+
Path = topicName,
29+
Consumers = { new Consumer("Consumer_Example") }
30+
});
31+
32+
var readerCts = new CancellationTokenSource();
33+
34+
var writerJob = Task.Run(async () =>
35+
{
36+
// ReSharper disable once AccessToDisposedClosure
37+
await using var writer = new WriterBuilder<string>(driver, topicName)
38+
{
39+
ProducerId = "ProducerId_Example"
40+
}.Build();
41+
42+
for (var i = 0; i < countMessages; i++)
43+
{
44+
await writer.WriteAsync($"Message num {i}: Hello Example YDB Topics!");
45+
}
46+
47+
readerCts.CancelAfter(TimeSpan.FromSeconds(3));
48+
});
49+
50+
var readerJob = Task.Run(async () =>
51+
{
52+
// ReSharper disable once AccessToDisposedClosure
53+
await using var reader = new ReaderBuilder<string>(driver)
54+
{
55+
ConsumerName = "Consumer_Example",
56+
SubscribeSettings = { new SubscribeSettings(topicName) }
57+
}.Build();
58+
59+
try
60+
{
61+
while (!readerCts.IsCancellationRequested)
62+
{
63+
var message = await reader.ReadAsync(readerCts.Token);
64+
65+
logger.LogInformation("Received message: [{MessageData}]", message.Data);
66+
67+
try
68+
{
69+
await message.CommitAsync();
70+
}
71+
catch (Exception e)
72+
{
73+
logger.LogError(e, "Failed commit message");
74+
}
75+
}
76+
}
77+
catch (OperationCanceledException)
78+
{
79+
}
80+
});
81+
82+
await writerJob;
83+
await readerJob;
84+
await topicClient.DropTopic(new DropTopicSettings { Path = topicName });

examples/src/Topic/Topic.csproj

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
<Project Sdk="Microsoft.NET.Sdk">
2+
3+
<PropertyGroup>
4+
<OutputType>Exe</OutputType>
5+
<TargetFramework>net8.0</TargetFramework>
6+
<ImplicitUsings>enable</ImplicitUsings>
7+
<Nullable>enable</Nullable>
8+
</PropertyGroup>
9+
10+
<ItemGroup>
11+
<PackageReference Include="Microsoft.Extensions.Logging.Console" Version="5.0.0" />
12+
<PackageReference Include="Microsoft.Extensions.Logging" Version="5.0.0" />
13+
</ItemGroup>
14+
<ItemGroup>
15+
<ProjectReference Include="..\..\..\src\Ydb.Sdk\src\Ydb.Sdk.csproj"/>
16+
</ItemGroup>
17+
</Project>

examples/src/YdbExamples.sln

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "DapperExample", "DapperExam
1515
EndProject
1616
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "YC", "YC\YC.csproj", "{753E4F33-CB08-47B9-864F-4CC037B278C4}"
1717
EndProject
18+
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Topic", "Topic\Topic.csproj", "{0FB9A1C2-4B0C-4AE4-9FA2-E0ED37802A6E}"
19+
EndProject
1820
Global
1921
GlobalSection(SolutionConfigurationPlatforms) = preSolution
2022
Debug|Any CPU = Debug|Any CPU
@@ -45,6 +47,10 @@ Global
4547
{753E4F33-CB08-47B9-864F-4CC037B278C4}.Debug|Any CPU.Build.0 = Debug|Any CPU
4648
{753E4F33-CB08-47B9-864F-4CC037B278C4}.Release|Any CPU.ActiveCfg = Release|Any CPU
4749
{753E4F33-CB08-47B9-864F-4CC037B278C4}.Release|Any CPU.Build.0 = Release|Any CPU
50+
{0FB9A1C2-4B0C-4AE4-9FA2-E0ED37802A6E}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
51+
{0FB9A1C2-4B0C-4AE4-9FA2-E0ED37802A6E}.Debug|Any CPU.Build.0 = Debug|Any CPU
52+
{0FB9A1C2-4B0C-4AE4-9FA2-E0ED37802A6E}.Release|Any CPU.ActiveCfg = Release|Any CPU
53+
{0FB9A1C2-4B0C-4AE4-9FA2-E0ED37802A6E}.Release|Any CPU.Build.0 = Release|Any CPU
4854
EndGlobalSection
4955
GlobalSection(SolutionProperties) = preSolution
5056
HideSolutionNode = FALSE

src/Ydb.Sdk/src/Services/Topic/Reader/Reader.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ private async Task Initialize()
9696
{
9797
if (_disposeCts.IsCancellationRequested)
9898
{
99-
_logger.LogWarning("Reader writer is canceled because it has been disposed");
99+
_logger.LogInformation("Reader writer is canceled because it has been disposed");
100100

101101
return;
102102
}

src/Ydb.Sdk/src/Services/Topic/Writer/Writer.cs

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ internal class Writer<TValue> : IWriter<TValue>
3131
private volatile TaskCompletionSource _tcsBufferAvailableEvent = new();
3232
private volatile IWriteSession _session = null!;
3333
private volatile int _limitBufferMaxSize;
34+
private volatile bool _isStopped;
3435

3536
internal Writer(IDriver driver, WriterConfig config, ISerializer<TValue> serializer)
3637
{
@@ -52,9 +53,7 @@ public async Task<WriteResult> WriteAsync(Message<TValue> message, CancellationT
5253
{
5354
TaskCompletionSource<WriteResult> tcs = new();
5455
await using var registrationUserCancellationTokenRegistration = cancellationToken.Register(
55-
() => tcs.TrySetException(
56-
new WriterException("The write operation was canceled before it could be completed")
57-
), useSynchronizationContext: false
56+
() => tcs.TrySetCanceled(), useSynchronizationContext: false
5857
);
5958
await using var writerDisposedCancellationTokenRegistration = _disposeCts.Token.Register(
6059
() => tcs.TrySetException(new WriterException($"Writer[{_config}] is disposed")),
@@ -194,9 +193,9 @@ private async Task Initialize()
194193

195194
try
196195
{
197-
if (_disposeCts.IsCancellationRequested && _inFlightMessages.IsEmpty)
196+
if (_isStopped)
198197
{
199-
_logger.LogWarning("Initialize writer is canceled because it has been disposed");
198+
_logger.LogInformation("Initialize writer is stopped because it has been disposed");
200199

201200
return;
202201
}
@@ -367,6 +366,8 @@ public async ValueTask DisposeAsync()
367366
}
368367
}
369368

369+
_isStopped = true;
370+
370371
await _session.DisposeAsync();
371372

372373
_logger.LogInformation("Writer[{WriterConfig}] is disposed", _config);
@@ -579,6 +580,10 @@ Completing task on exception...
579580
{
580581
Logger.LogError(e, "WriterSession[{SessionId}] have error on processing writeAck", SessionId);
581582
}
583+
catch (ObjectDisposedException)
584+
{
585+
Logger.LogDebug("WriterSession[{SessionId}]: stream is disposed", SessionId);
586+
}
582587
finally
583588
{
584589
ReconnectSession();

src/Ydb.Sdk/tests/Topic/WriterUnitTests.cs

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -569,7 +569,6 @@ public async Task WriteAsync_WhenStreamIsClosingOnProcessingWriteAck_ShouldRecon
569569
public async Task WriteAsync_WhenCancellationTokenIsClosed_ThrowCancellationException()
570570
{
571571
var cancellationTokenSource = new CancellationTokenSource();
572-
var nextCompleted = new TaskCompletionSource<bool>();
573572
_mockStream.Setup(stream => stream.Write(It.IsAny<FromClient>()))
574573
.Returns(Task.CompletedTask);
575574
_mockStream.SetupSequence(stream => stream.MoveNextAsync())
@@ -582,10 +581,8 @@ public async Task WriteAsync_WhenCancellationTokenIsClosed_ThrowCancellationExce
582581

583582
var task = writer.WriteAsync(123L, cancellationTokenSource.Token);
584583
cancellationTokenSource.Cancel();
585-
nextCompleted.SetResult(true);
586584

587-
Assert.Equal("The write operation was canceled before it could be completed",
588-
(await Assert.ThrowsAsync<WriterException>(() => task)).Message);
585+
await Assert.ThrowsAsync<TaskCanceledException>(() => task);
589586
}
590587

591588
[Fact]
@@ -713,8 +710,7 @@ public async Task WriteAsync_WhenInFlightBufferSendInInitialize_ReturnCompletedT
713710

714711
moveTcs.SetResult(false); // Fail write ack stream => start reconnect
715712

716-
Assert.Equal("The write operation was canceled before it could be completed",
717-
(await Assert.ThrowsAsync<WriterException>(() => runTaskWithCancel)).Message);
713+
await Assert.ThrowsAsync<TaskCanceledException>(() => runTaskWithCancel);
718714
Assert.Equal(PersistenceStatus.AlreadyWritten, (await runTask1).Status);
719715
Assert.Equal(PersistenceStatus.Written, (await runTask2).Status);
720716

0 commit comments

Comments
 (0)