Skip to content

Commit f7ca606

Browse files
feat: impl disposing Writer.cs
1 parent e12f15a commit f7ca606

File tree

8 files changed

+92
-43
lines changed

8 files changed

+92
-43
lines changed

slo/src/TopicService/SloTopicContext.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ public async Task Run(RunConfig config)
7373
{
7474
try
7575
{
76-
using var writer = new WriterBuilder<string>(driver, PathTopic)
76+
await using var writer = new WriterBuilder<string>(driver, PathTopic)
7777
{
7878
BufferMaxSize = 8 * 1024 * 1024,
7979
ProducerId = "producer-" + partitionId,

src/Ydb.Sdk/src/IDriver.cs

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ public IBidirectionalStream<TRequest, TResponse> BidirectionalStreamCall<TReques
3030
ILoggerFactory LoggerFactory { get; }
3131
}
3232

33-
public interface IBidirectionalStream<in TRequest, out TResponse> : IDisposable
33+
public interface IBidirectionalStream<in TRequest, out TResponse> : IAsyncDisposable
3434
{
3535
public Task Write(TRequest request);
3636

@@ -225,6 +225,7 @@ internal class BidirectionalStream<TRequest, TResponse> : IBidirectionalStream<T
225225
private readonly AsyncDuplexStreamingCall<TRequest, TResponse> _stream;
226226
private readonly Action<RpcException> _rpcErrorAction;
227227
private readonly ICredentialsProvider _credentialsProvider;
228+
private readonly TaskCompletionSource _closedResponseStreamTcs = new();
228229

229230
internal BidirectionalStream(
230231
AsyncDuplexStreamingCall<TRequest, TResponse> stream,
@@ -258,6 +259,7 @@ public async ValueTask<bool> MoveNextAsync()
258259
}
259260
catch (RpcException e)
260261
{
262+
_closedResponseStreamTcs.SetResult();
261263
_rpcErrorAction(e);
262264

263265
throw new Driver.TransportException(e);
@@ -268,8 +270,17 @@ public async ValueTask<bool> MoveNextAsync()
268270

269271
public string? AuthToken => _credentialsProvider.GetAuthInfo();
270272

271-
public void Dispose()
273+
public async ValueTask DisposeAsync()
272274
{
275+
try
276+
{
277+
await _stream.RequestStream.CompleteAsync();
278+
}
279+
catch (RpcException e)
280+
{
281+
_rpcErrorAction(e);
282+
}
283+
273284
_stream.Dispose();
274285
}
275286
}

src/Ydb.Sdk/src/Services/Topic/IWriter.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
namespace Ydb.Sdk.Services.Topic;
44

5-
public interface IWriter<TValue> : IDisposable
5+
public interface IWriter<TValue> : IAsyncDisposable
66
{
77
/// <summary>
88
/// Asynchronously send a data to a YDB Topic.

src/Ydb.Sdk/src/Services/Topic/TopicSession.cs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
namespace Ydb.Sdk.Services.Topic;
44

5-
internal abstract class TopicSession<TFromClient, TFromServer> : IDisposable
5+
internal abstract class TopicSession<TFromClient, TFromServer> : IAsyncDisposable
66
{
77
private readonly Func<Task> _initialize;
88

@@ -58,10 +58,10 @@ protected async Task SendMessage(TFromClient fromClient)
5858
await Stream.Write(fromClient);
5959
}
6060

61-
public void Dispose()
61+
protected abstract TFromClient GetSendUpdateTokenRequest(string token);
62+
63+
public async ValueTask DisposeAsync()
6264
{
63-
Stream.Dispose();
65+
await Stream.DisposeAsync();
6466
}
65-
66-
protected abstract TFromClient GetSendUpdateTokenRequest(string token);
6767
}

src/Ydb.Sdk/src/Services/Topic/Writer/Writer.cs

Lines changed: 52 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,11 @@ internal class Writer<TValue> : IWriter<TValue>
2121
private readonly WriterConfig _config;
2222
private readonly ILogger<Writer<TValue>> _logger;
2323
private readonly ISerializer<TValue> _serializer;
24-
private readonly GrpcRequestSettings _writerGrpcRequestSettings;
24+
private readonly GrpcRequestSettings _writerGrpcRequestSettings = new();
2525
private readonly ConcurrentQueue<MessageSending> _toSendBuffer = new();
2626
private readonly ConcurrentQueue<MessageSending> _inFlightMessages = new();
2727
private readonly CancellationTokenSource _disposeCts = new();
28-
private readonly SemaphoreSlim _clearInFlightMessagesSemaphoreSlim = new(1);
28+
private readonly SemaphoreSlim _sendInFlightMessagesSemaphoreSlim = new(1);
2929

3030
private volatile TaskCompletionSource _tcsWakeUp = new();
3131
private volatile TaskCompletionSource _tcsBufferAvailableEvent = new();
@@ -39,7 +39,6 @@ internal Writer(IDriver driver, WriterConfig config, ISerializer<TValue> seriali
3939
_logger = driver.LoggerFactory.CreateLogger<Writer<TValue>>();
4040
_serializer = serializer;
4141
_limitBufferMaxSize = config.BufferMaxSize;
42-
_writerGrpcRequestSettings = new GrpcRequestSettings { CancellationToken = _disposeCts.Token };
4342

4443
StartWriteWorker();
4544
}
@@ -95,7 +94,9 @@ public async Task<WriteResult> WriteAsync(Message<TValue> message, CancellationT
9594
if (Interlocked.CompareExchange(ref _limitBufferMaxSize,
9695
curLimitBufferSize - data.Length, curLimitBufferSize) == curLimitBufferSize)
9796
{
98-
_toSendBuffer.Enqueue(new MessageSending(messageData, tcs));
97+
_toSendBuffer.Enqueue(
98+
new MessageSending(messageData, tcs, writerDisposedCancellationTokenRegistration)
99+
);
99100
WakeUpWorker();
100101

101102
break;
@@ -162,7 +163,7 @@ private async void StartWriteWorker()
162163
continue;
163164
}
164165

165-
await _clearInFlightMessagesSemaphoreSlim.WaitAsync(_disposeCts.Token);
166+
await _sendInFlightMessagesSemaphoreSlim.WaitAsync(_disposeCts.Token);
166167
try
167168
{
168169
if (_session.IsActive)
@@ -172,7 +173,7 @@ private async void StartWriteWorker()
172173
}
173174
finally
174175
{
175-
_clearInFlightMessagesSemaphoreSlim.Release();
176+
_sendInFlightMessagesSemaphoreSlim.Release();
176177
}
177178
}
178179
}
@@ -193,7 +194,7 @@ private async Task Initialize()
193194

194195
try
195196
{
196-
if (_disposeCts.IsCancellationRequested)
197+
if (_disposeCts.IsCancellationRequested && _inFlightMessages.IsEmpty)
197198
{
198199
_logger.LogWarning("Initialize writer is canceled because it has been disposed");
199200

@@ -267,7 +268,7 @@ private async Task Initialize()
267268
return;
268269
}
269270

270-
await _clearInFlightMessagesSemaphoreSlim.WaitAsync(_disposeCts.Token);
271+
await _sendInFlightMessagesSemaphoreSlim.WaitAsync(_disposeCts.Token);
271272
try
272273
{
273274
var copyInFlightMessages = new ConcurrentQueue<MessageSending>();
@@ -315,7 +316,7 @@ private async Task Initialize()
315316
}
316317
finally
317318
{
318-
_clearInFlightMessagesSemaphoreSlim.Release();
319+
_sendInFlightMessagesSemaphoreSlim.Release();
319320
}
320321
}
321322
catch (Driver.TransportException e)
@@ -330,17 +331,42 @@ private async Task Initialize()
330331
}
331332
}
332333

333-
public void Dispose()
334+
public async ValueTask DisposeAsync()
334335
{
335-
_disposeCts.Cancel();
336+
await _sendInFlightMessagesSemaphoreSlim.WaitAsync();
337+
try
338+
{
339+
_disposeCts.Dispose();
340+
}
341+
finally
342+
{
343+
_sendInFlightMessagesSemaphoreSlim.Release();
344+
}
336345

337-
_session = new NotStartedWriterSession("Writer is disposed");
346+
// wait all messages
347+
foreach (var inFlightMessage in _inFlightMessages)
348+
{
349+
try
350+
{
351+
await inFlightMessage.Tcs.Task;
352+
}
353+
catch (Exception e)
354+
{
355+
_logger.LogWarning(e, "Failed in flight message");
356+
}
357+
}
358+
359+
await _session.DisposeAsync();
338360
}
339361
}
340362

341-
internal record MessageSending(MessageData MessageData, TaskCompletionSource<WriteResult> Tcs);
363+
internal record MessageSending(
364+
MessageData MessageData,
365+
TaskCompletionSource<WriteResult> Tcs,
366+
CancellationTokenRegistration DisposedCtr
367+
);
342368

343-
internal interface IWriteSession
369+
internal interface IWriteSession : IAsyncDisposable
344370
{
345371
Task Write(ConcurrentQueue<MessageSending> toSendBuffer);
346372

@@ -372,6 +398,11 @@ public Task Write(ConcurrentQueue<MessageSending> toSendBuffer)
372398
}
373399

374400
public bool IsActive => true;
401+
402+
public ValueTask DisposeAsync()
403+
{
404+
return ValueTask.CompletedTask;
405+
}
375406
}
376407

377408
internal class DummyWriterSession : IWriteSession
@@ -388,6 +419,11 @@ public Task Write(ConcurrentQueue<MessageSending> toSendBuffer)
388419
}
389420

390421
public bool IsActive => false;
422+
423+
public ValueTask DisposeAsync()
424+
{
425+
return ValueTask.CompletedTask;
426+
}
391427
}
392428

393429
internal class WriterSession : TopicSession<MessageFromClient, MessageFromServer>, IWriteSession
@@ -437,6 +473,8 @@ public async Task Write(ConcurrentQueue<MessageSending> toSendBuffer)
437473
continue;
438474
}
439475

476+
sendData.DisposedCtr.Unregister();
477+
440478
var messageData = sendData.MessageData;
441479

442480
if (messageData.SeqNo == default)

src/Ydb.Sdk/tests/Topic/ReaderIntegrationTests.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ public async Task StressTest_WhenReadingThenCommiting_ReturnMessages()
2525
topicSettings.Consumers.Add(new Consumer("Consumer"));
2626
await topicClient.CreateTopic(topicSettings);
2727

28-
using var writer = new WriterBuilder<string>(_driver, _topicName)
28+
await using var writer = new WriterBuilder<string>(_driver, _topicName)
2929
{ ProducerId = "producerId" }.Build();
3030
var reader = new ReaderBuilder<string>(_driver)
3131
{

src/Ydb.Sdk/tests/Topic/WriterIntegrationTests.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ public async Task WriteAsync_WhenOneMessage_ReturnWritten()
3030
};
3131
await topicClient.CreateTopic(topicSettings);
3232

33-
using var writer = new WriterBuilder<string>(_driver, _topicName) { ProducerId = "producerId" }.Build();
33+
await using var writer = new WriterBuilder<string>(_driver, _topicName) { ProducerId = "producerId" }.Build();
3434

3535
var result = await writer.WriteAsync("abacaba");
3636

@@ -42,7 +42,7 @@ public async Task WriteAsync_WhenOneMessage_ReturnWritten()
4242
[Fact]
4343
public async Task WriteAsync_WhenTopicNotFound_ReturnNotFoundException()
4444
{
45-
using var writer = new WriterBuilder<string>(_driver, _topicName + "_not_found")
45+
await using var writer = new WriterBuilder<string>(_driver, _topicName + "_not_found")
4646
{ ProducerId = "producerId" }.Build();
4747

4848
Assert.Contains(
@@ -61,7 +61,7 @@ public async Task WriteAsync_When1000Messages_ReturnWriteResultIsPersisted()
6161
topicSettings.Consumers.Add(new Consumer("Consumer"));
6262
await topicClient.CreateTopic(topicSettings);
6363

64-
using var writer = new WriterBuilder<int>(_driver, topicName)
64+
await using var writer = new WriterBuilder<int>(_driver, topicName)
6565
{ ProducerId = "producerId" }.Build();
6666

6767
var tasks = new List<Task>();

0 commit comments

Comments
 (0)