Skip to content

Commit e3294a9

Browse files
KirillKurdyukovmessieurMe
authored andcommitted
feat: Impl Reader.AsyncDispose (ydb-platform#280)
1 parent af7649b commit e3294a9

File tree

10 files changed

+255
-104
lines changed

10 files changed

+255
-104
lines changed

CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,13 @@
1+
- Feat: `Writer.DisposeAsync()` waits for all in-flight messages to complete.
2+
- Feat: `Reader.DisposeAsync()` waits for all pending commits to be completed.
3+
- **Breaking Change**: `IReader` now implements `IAsyncDisposable` instead of `IDisposable`.
4+
This change requires updates to code that disposes `IReader` instances. Use `await using` instead of `using`.
15
- **Breaking Change**: `IWriter` now implements `IAsyncDisposable` instead of `IDisposable`.
26
This change requires updates to code that disposes `IWriter` instances. Use `await using` instead of `using`.
37
- Topic `Reader` & `Writer`: update auth token in bidirectional stream.
48

59
## v0.14.1
10+
611
- Fixed bug: public key presented not for certificate signature.
712
- Fixed: YdbDataReader does not throw YdbException when CloseAsync is called for UPDATE/INSERT statements with no
813
result.

slo/src/TopicService/SloTopicContext.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ public async Task Run(RunConfig config)
125125
{
126126
try
127127
{
128-
using var reader = new ReaderBuilder<string>(driver)
128+
await using var reader = new ReaderBuilder<string>(driver)
129129
{
130130
ConsumerName = ConsumerName,
131131
SubscribeSettings =

src/Ydb.Sdk/src/IDriver.cs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ public IBidirectionalStream<TRequest, TResponse> BidirectionalStreamCall<TReques
3030
ILoggerFactory LoggerFactory { get; }
3131
}
3232

33-
public interface IBidirectionalStream<in TRequest, out TResponse> : IAsyncDisposable
33+
public interface IBidirectionalStream<in TRequest, out TResponse> : IDisposable
3434
{
3535
public Task Write(TRequest request);
3636

@@ -39,6 +39,8 @@ public interface IBidirectionalStream<in TRequest, out TResponse> : IAsyncDispos
3939
public TResponse Current { get; }
4040

4141
public string? AuthToken { get; }
42+
43+
public Task RequestStreamComplete();
4244
}
4345

4446
public abstract class BaseDriver : IDriver
@@ -225,7 +227,6 @@ internal class BidirectionalStream<TRequest, TResponse> : IBidirectionalStream<T
225227
private readonly AsyncDuplexStreamingCall<TRequest, TResponse> _stream;
226228
private readonly Action<RpcException> _rpcErrorAction;
227229
private readonly ICredentialsProvider _credentialsProvider;
228-
private readonly TaskCompletionSource _closedResponseStreamTcs = new();
229230

230231
internal BidirectionalStream(
231232
AsyncDuplexStreamingCall<TRequest, TResponse> stream,
@@ -259,7 +260,6 @@ public async ValueTask<bool> MoveNextAsync()
259260
}
260261
catch (RpcException e)
261262
{
262-
_closedResponseStreamTcs.SetResult();
263263
_rpcErrorAction(e);
264264

265265
throw new Driver.TransportException(e);
@@ -270,7 +270,7 @@ public async ValueTask<bool> MoveNextAsync()
270270

271271
public string? AuthToken => _credentialsProvider.GetAuthInfo();
272272

273-
public async ValueTask DisposeAsync()
273+
public async Task RequestStreamComplete()
274274
{
275275
try
276276
{
@@ -280,7 +280,10 @@ public async ValueTask DisposeAsync()
280280
{
281281
_rpcErrorAction(e);
282282
}
283+
}
283284

285+
public void Dispose()
286+
{
284287
_stream.Dispose();
285288
}
286289
}

src/Ydb.Sdk/src/Services/Topic/IReader.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
namespace Ydb.Sdk.Services.Topic;
44

5-
public interface IReader<TValue> : IDisposable
5+
public interface IReader<TValue> : IAsyncDisposable
66
{
77
public ValueTask<Message<TValue>> ReadAsync(CancellationToken cancellationToken = default);
88

src/Ydb.Sdk/src/Services/Topic/Reader/Reader.cs

Lines changed: 82 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,9 @@ internal class Reader<TValue> : IReader<TValue>
2121
private readonly ReaderConfig _config;
2222
private readonly IDeserializer<TValue> _deserializer;
2323
private readonly ILogger _logger;
24-
private readonly GrpcRequestSettings _readerGrpcRequestSettings;
24+
private readonly GrpcRequestSettings _readerGrpcRequestSettings = new();
25+
26+
private ReaderSession<TValue>? _currentReaderSession;
2527

2628
private readonly Channel<InternalBatchMessages<TValue>> _receivedMessagesChannel =
2729
Channel.CreateUnbounded<InternalBatchMessages<TValue>>(
@@ -41,7 +43,6 @@ internal Reader(IDriver driver, ReaderConfig config, IDeserializer<TValue> deser
4143
_config = config;
4244
_deserializer = deserializer;
4345
_logger = driver.LoggerFactory.CreateLogger<Reader<TValue>>();
44-
_readerGrpcRequestSettings = new GrpcRequestSettings { CancellationToken = _disposeCts.Token };
4546

4647
_ = Initialize();
4748
}
@@ -68,7 +69,7 @@ public async ValueTask<Message<TValue>> ReadAsync(CancellationToken cancellation
6869
}
6970
}
7071

71-
throw new ObjectDisposedException("Reader");
72+
throw new ReaderException("Reader is disposed");
7273
}
7374

7475
public async ValueTask<BatchMessages<TValue>> ReadBatchAsync(CancellationToken cancellationToken = default)
@@ -86,7 +87,7 @@ public async ValueTask<BatchMessages<TValue>> ReadBatchAsync(CancellationToken c
8687
}
8788
}
8889

89-
throw new ObjectDisposedException("Reader");
90+
throw new ReaderException("Reader is disposed");
9091
}
9192

9293
private async Task Initialize()
@@ -185,15 +186,15 @@ await stream.Write(new MessageFromClient
185186
ReadRequest = new StreamReadMessage.Types.ReadRequest { BytesSize = _config.MemoryUsageMaxBytes }
186187
});
187188

188-
new ReaderSession<TValue>(
189+
_currentReaderSession = new ReaderSession<TValue>(
189190
_config,
190191
stream,
191192
initResponse.SessionId,
192193
Initialize,
193194
_logger,
194195
_receivedMessagesChannel.Writer,
195196
_deserializer
196-
).RunProcessingTopic();
197+
);
197198
}
198199
catch (Driver.TransportException e)
199200
{
@@ -203,18 +204,19 @@ await stream.Write(new MessageFromClient
203204
}
204205
}
205206

206-
public void Dispose()
207+
public async ValueTask DisposeAsync()
207208
{
208-
try
209-
{
210-
_receivedMessagesChannel.Writer.TryComplete();
211-
212-
_disposeCts.Cancel();
213-
}
214-
finally
209+
if (_disposeCts.IsCancellationRequested)
215210
{
216-
_disposeCts.Dispose();
211+
return;
217212
}
213+
214+
_receivedMessagesChannel.Writer.TryComplete();
215+
_disposeCts.Cancel();
216+
217+
await (_currentReaderSession?.DisposeAsync() ?? ValueTask.CompletedTask);
218+
219+
_logger.LogInformation("Reader[{WriterConfig}] is disposed", _config);
218220
}
219221
}
220222

@@ -247,6 +249,8 @@ internal class ReaderSession<TValue> : TopicSession<MessageFromClient, MessageFr
247249
private readonly ChannelWriter<InternalBatchMessages<TValue>> _channelWriter;
248250
private readonly CancellationTokenSource _lifecycleReaderSessionCts = new();
249251
private readonly IDeserializer<TValue> _deserializer;
252+
private readonly Task _runProcessingStreamResponse;
253+
private readonly Task _runProcessingStreamRequest;
250254

251255
private readonly Channel<MessageFromClient> _channelFromClientMessageSending =
252256
Channel.CreateUnbounded<MessageFromClient>(
@@ -279,29 +283,13 @@ IDeserializer<TValue> deserializer
279283
_readerConfig = config;
280284
_channelWriter = channelWriter;
281285
_deserializer = deserializer;
286+
287+
_runProcessingStreamResponse = RunProcessingStreamResponse();
288+
_runProcessingStreamRequest = RunProcessingStreamRequest();
282289
}
283290

284-
public async void RunProcessingTopic()
291+
private async Task RunProcessingStreamResponse()
285292
{
286-
_ = Task.Run(async () =>
287-
{
288-
try
289-
{
290-
await foreach (var messageFromClient in _channelFromClientMessageSending.Reader.ReadAllAsync())
291-
{
292-
await SendMessage(messageFromClient);
293-
}
294-
}
295-
catch (Driver.TransportException e)
296-
{
297-
Logger.LogError(e, "ReaderSession[{SessionId}] have transport error on Write", SessionId);
298-
299-
ReconnectSession();
300-
301-
_lifecycleReaderSessionCts.Cancel();
302-
}
303-
});
304-
305293
try
306294
{
307295
while (await Stream.MoveNextAsync())
@@ -357,6 +345,25 @@ public async void RunProcessingTopic()
357345
}
358346
}
359347

348+
private async Task RunProcessingStreamRequest()
349+
{
350+
try
351+
{
352+
await foreach (var messageFromClient in _channelFromClientMessageSending.Reader.ReadAllAsync())
353+
{
354+
await SendMessage(messageFromClient);
355+
}
356+
}
357+
catch (Driver.TransportException e)
358+
{
359+
Logger.LogError(e, "ReaderSession[{SessionId}] have transport error on Write", SessionId);
360+
361+
ReconnectSession();
362+
363+
_lifecycleReaderSessionCts.Cancel();
364+
}
365+
}
366+
360367
internal async void TryReadRequestBytes(long bytes)
361368
{
362369
var readRequestBytes = Interlocked.Add(ref _readRequestBytes, bytes);
@@ -465,21 +472,28 @@ public async Task CommitOffsetRange(OffsetsRange offsetsRange, long partitionSes
465472
{
466473
partitionSession.RegisterCommitRequest(commitSending);
467474

468-
await _channelFromClientMessageSending.Writer.WriteAsync(new MessageFromClient
469-
{
470-
CommitOffsetRequest = new StreamReadMessage.Types.CommitOffsetRequest
475+
try
476+
{
477+
await _channelFromClientMessageSending.Writer.WriteAsync(new MessageFromClient
471478
{
472-
CommitOffsets =
479+
CommitOffsetRequest = new StreamReadMessage.Types.CommitOffsetRequest
473480
{
474-
new StreamReadMessage.Types.CommitOffsetRequest.Types.PartitionCommitOffset
481+
CommitOffsets =
475482
{
476-
Offsets = { commitSending.OffsetsRange },
477-
PartitionSessionId = partitionSessionId
483+
new StreamReadMessage.Types.CommitOffsetRequest.Types.PartitionCommitOffset
484+
{
485+
Offsets = { commitSending.OffsetsRange },
486+
PartitionSessionId = partitionSessionId
487+
}
478488
}
479489
}
480490
}
481-
}
482-
);
491+
);
492+
}
493+
catch (ChannelClosedException)
494+
{
495+
throw new ReaderException("Reader is disposed");
496+
}
483497
}
484498
else
485499
{
@@ -550,4 +564,28 @@ protected override MessageFromClient GetSendUpdateTokenRequest(string token)
550564
}
551565
};
552566
}
567+
568+
public override async ValueTask DisposeAsync()
569+
{
570+
Logger.LogDebug("ReaderSession[{SessionId}]: start dispose process", SessionId);
571+
572+
_channelFromClientMessageSending.Writer.Complete();
573+
574+
try
575+
{
576+
await _runProcessingStreamRequest;
577+
await Stream.RequestStreamComplete();
578+
await _runProcessingStreamResponse; // waiting all ack's commits
579+
580+
_lifecycleReaderSessionCts.Cancel();
581+
}
582+
catch (Exception e)
583+
{
584+
Logger.LogError(e, "ReaderSession[{SessionId}]: error on disposing", SessionId);
585+
}
586+
finally
587+
{
588+
Stream.Dispose();
589+
}
590+
}
553591
}

src/Ydb.Sdk/src/Services/Topic/TopicSession.cs

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -60,10 +60,5 @@ protected async Task SendMessage(TFromClient fromClient)
6060

6161
protected abstract TFromClient GetSendUpdateTokenRequest(string token);
6262

63-
public ValueTask DisposeAsync()
64-
{
65-
Logger.LogInformation("TopicSession[{SessionId}] is being deleted", SessionId);
66-
67-
return Stream.DisposeAsync();
68-
}
63+
public abstract ValueTask DisposeAsync();
6964
}

src/Ydb.Sdk/src/Services/Topic/Writer/Writer.cs

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -224,7 +224,7 @@ private async Task Initialize()
224224
_logger.LogError("Stream unexpectedly closed by YDB server. Current InitRequest: {initRequest}",
225225
initRequest);
226226

227-
_ = Task.Run(Initialize, _disposeCts.Token);
227+
_ = Task.Run(Initialize);
228228

229229
return;
230230
}
@@ -239,7 +239,7 @@ private async Task Initialize()
239239
{
240240
_logger.LogError("Writer initialization failed to start. Reason: {Status}", status);
241241

242-
_ = Task.Run(Initialize, _disposeCts.Token);
242+
_ = Task.Run(Initialize);
243243
}
244244
else
245245
{
@@ -323,7 +323,7 @@ private async Task Initialize()
323323
{
324324
_logger.LogError(e, "Transport error on creating WriterSession");
325325

326-
_ = Task.Run(Initialize, _disposeCts.Token);
326+
_ = Task.Run(Initialize);
327327
}
328328
catch (OperationCanceledException)
329329
{
@@ -338,8 +338,6 @@ public async ValueTask DisposeAsync()
338338
return;
339339
}
340340

341-
_logger.LogInformation("Starting Writer[{WriterConfig}] disposal process", _config);
342-
343341
await _sendInFlightMessagesSemaphoreSlim.WaitAsync();
344342
try
345343
{
@@ -593,4 +591,13 @@ protected override MessageFromClient GetSendUpdateTokenRequest(string token)
593591
}
594592
};
595593
}
594+
595+
public override async ValueTask DisposeAsync()
596+
{
597+
Logger.LogDebug("WriterSession[{SessionId}]: start dispose process", SessionId);
598+
599+
await Stream.RequestStreamComplete();
600+
601+
Stream.Dispose();
602+
}
596603
}

src/Ydb.Sdk/tests/Topic/ReaderIntegrationTests.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ public async Task StressTest_WhenReadingThenCommiting_ReturnMessages()
4242
await message.CommitAsync();
4343
}
4444

45-
reader.Dispose();
45+
await reader.DisposeAsync();
4646

4747
var readerNext = new ReaderBuilder<string>(_driver)
4848
{
@@ -59,7 +59,7 @@ public async Task StressTest_WhenReadingThenCommiting_ReturnMessages()
5959
await message.CommitAsync();
6060
}
6161

62-
readerNext.Dispose();
62+
await readerNext.DisposeAsync();
6363

6464
await topicClient.DropTopic(new DropTopicSettings { Path = _topicName });
6565
}

0 commit comments

Comments
 (0)