Skip to content

Commit 299f120

Browse files
feat: impl disposing reader
1 parent 4dbb834 commit 299f120

File tree

7 files changed

+92
-65
lines changed

7 files changed

+92
-65
lines changed

src/Ydb.Sdk/src/IDriver.cs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ public IBidirectionalStream<TRequest, TResponse> BidirectionalStreamCall<TReques
3030
ILoggerFactory LoggerFactory { get; }
3131
}
3232

33-
public interface IBidirectionalStream<in TRequest, out TResponse> : IAsyncDisposable
33+
public interface IBidirectionalStream<in TRequest, out TResponse> : IDisposable
3434
{
3535
public Task Write(TRequest request);
3636

@@ -39,6 +39,8 @@ public interface IBidirectionalStream<in TRequest, out TResponse> : IAsyncDispos
3939
public TResponse Current { get; }
4040

4141
public string? AuthToken { get; }
42+
43+
public Task RequestStreamComplete();
4244
}
4345

4446
public abstract class BaseDriver : IDriver
@@ -225,7 +227,6 @@ internal class BidirectionalStream<TRequest, TResponse> : IBidirectionalStream<T
225227
private readonly AsyncDuplexStreamingCall<TRequest, TResponse> _stream;
226228
private readonly Action<RpcException> _rpcErrorAction;
227229
private readonly ICredentialsProvider _credentialsProvider;
228-
private readonly TaskCompletionSource _closedResponseStreamTcs = new();
229230

230231
internal BidirectionalStream(
231232
AsyncDuplexStreamingCall<TRequest, TResponse> stream,
@@ -259,7 +260,6 @@ public async ValueTask<bool> MoveNextAsync()
259260
}
260261
catch (RpcException e)
261262
{
262-
_closedResponseStreamTcs.SetResult();
263263
_rpcErrorAction(e);
264264

265265
throw new Driver.TransportException(e);
@@ -270,7 +270,7 @@ public async ValueTask<bool> MoveNextAsync()
270270

271271
public string? AuthToken => _credentialsProvider.GetAuthInfo();
272272

273-
public async ValueTask DisposeAsync()
273+
public async Task RequestStreamComplete()
274274
{
275275
try
276276
{
@@ -280,7 +280,10 @@ public async ValueTask DisposeAsync()
280280
{
281281
_rpcErrorAction(e);
282282
}
283+
}
283284

285+
public void Dispose()
286+
{
284287
_stream.Dispose();
285288
}
286289
}

src/Ydb.Sdk/src/Services/Topic/IReader.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
namespace Ydb.Sdk.Services.Topic;
44

5-
public interface IReader<TValue> : IDisposable
5+
public interface IReader<TValue> : IAsyncDisposable
66
{
77
public ValueTask<Message<TValue>> ReadAsync(CancellationToken cancellationToken = default);
88

src/Ydb.Sdk/src/Services/Topic/Reader/Reader.cs

Lines changed: 56 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,9 @@ internal class Reader<TValue> : IReader<TValue>
2121
private readonly ReaderConfig _config;
2222
private readonly IDeserializer<TValue> _deserializer;
2323
private readonly ILogger _logger;
24-
private readonly GrpcRequestSettings _readerGrpcRequestSettings;
24+
private readonly GrpcRequestSettings _readerGrpcRequestSettings = new();
25+
26+
private ReaderSession<TValue>? _currentReaderSession;
2527

2628
private readonly Channel<InternalBatchMessages<TValue>> _receivedMessagesChannel =
2729
Channel.CreateUnbounded<InternalBatchMessages<TValue>>(
@@ -41,7 +43,6 @@ internal Reader(IDriver driver, ReaderConfig config, IDeserializer<TValue> deser
4143
_config = config;
4244
_deserializer = deserializer;
4345
_logger = driver.LoggerFactory.CreateLogger<Reader<TValue>>();
44-
_readerGrpcRequestSettings = new GrpcRequestSettings { CancellationToken = _disposeCts.Token };
4546

4647
_ = Initialize();
4748
}
@@ -68,7 +69,7 @@ public async ValueTask<Message<TValue>> ReadAsync(CancellationToken cancellation
6869
}
6970
}
7071

71-
throw new ObjectDisposedException("Reader");
72+
throw new ReaderException("Reader is disposed");
7273
}
7374

7475
public async ValueTask<BatchMessages<TValue>> ReadBatchAsync(CancellationToken cancellationToken = default)
@@ -86,7 +87,7 @@ public async ValueTask<BatchMessages<TValue>> ReadBatchAsync(CancellationToken c
8687
}
8788
}
8889

89-
throw new ObjectDisposedException("Reader");
90+
throw new ReaderException("Reader is disposed");
9091
}
9192

9293
private async Task Initialize()
@@ -185,15 +186,15 @@ await stream.Write(new MessageFromClient
185186
ReadRequest = new StreamReadMessage.Types.ReadRequest { BytesSize = _config.MemoryUsageMaxBytes }
186187
});
187188

188-
new ReaderSession<TValue>(
189+
_currentReaderSession = new ReaderSession<TValue>(
189190
_config,
190191
stream,
191192
initResponse.SessionId,
192193
Initialize,
193194
_logger,
194195
_receivedMessagesChannel.Writer,
195196
_deserializer
196-
).RunProcessingTopic();
197+
);
197198
}
198199
catch (Driver.TransportException e)
199200
{
@@ -203,18 +204,12 @@ await stream.Write(new MessageFromClient
203204
}
204205
}
205206

206-
public void Dispose()
207+
public ValueTask DisposeAsync()
207208
{
208-
try
209-
{
210-
_receivedMessagesChannel.Writer.TryComplete();
209+
_receivedMessagesChannel.Writer.TryComplete();
210+
_disposeCts.Cancel();
211211

212-
_disposeCts.Cancel();
213-
}
214-
finally
215-
{
216-
_disposeCts.Dispose();
217-
}
212+
return _currentReaderSession?.DisposeAsync() ?? ValueTask.CompletedTask;
218213
}
219214
}
220215

@@ -247,6 +242,8 @@ internal class ReaderSession<TValue> : TopicSession<MessageFromClient, MessageFr
247242
private readonly ChannelWriter<InternalBatchMessages<TValue>> _channelWriter;
248243
private readonly CancellationTokenSource _lifecycleReaderSessionCts = new();
249244
private readonly IDeserializer<TValue> _deserializer;
245+
private readonly Task _runProcessingStreamResponse;
246+
private readonly Task _runProcessingStreamRequest;
250247

251248
private readonly Channel<MessageFromClient> _channelFromClientMessageSending =
252249
Channel.CreateUnbounded<MessageFromClient>(
@@ -279,29 +276,13 @@ IDeserializer<TValue> deserializer
279276
_readerConfig = config;
280277
_channelWriter = channelWriter;
281278
_deserializer = deserializer;
279+
280+
_runProcessingStreamResponse = RunProcessingStreamResponse();
281+
_runProcessingStreamRequest = RunProcessingStreamRequest();
282282
}
283283

284-
public async void RunProcessingTopic()
284+
private async Task RunProcessingStreamResponse()
285285
{
286-
_ = Task.Run(async () =>
287-
{
288-
try
289-
{
290-
await foreach (var messageFromClient in _channelFromClientMessageSending.Reader.ReadAllAsync())
291-
{
292-
await SendMessage(messageFromClient);
293-
}
294-
}
295-
catch (Driver.TransportException e)
296-
{
297-
Logger.LogError(e, "ReaderSession[{SessionId}] have transport error on Write", SessionId);
298-
299-
ReconnectSession();
300-
301-
_lifecycleReaderSessionCts.Cancel();
302-
}
303-
});
304-
305286
try
306287
{
307288
while (await Stream.MoveNextAsync())
@@ -349,6 +330,10 @@ public async void RunProcessingTopic()
349330
Logger.LogError(e, "ReaderSession[{SessionId}] have transport error on processing server messages",
350331
SessionId);
351332
}
333+
catch (OperationCanceledException)
334+
{
335+
Logger.LogInformation("");
336+
}
352337
finally
353338
{
354339
ReconnectSession();
@@ -357,6 +342,25 @@ public async void RunProcessingTopic()
357342
}
358343
}
359344

345+
private async Task RunProcessingStreamRequest()
346+
{
347+
try
348+
{
349+
await foreach (var messageFromClient in _channelFromClientMessageSending.Reader.ReadAllAsync())
350+
{
351+
await SendMessage(messageFromClient);
352+
}
353+
}
354+
catch (Driver.TransportException e)
355+
{
356+
Logger.LogError(e, "ReaderSession[{SessionId}] have transport error on Write", SessionId);
357+
358+
ReconnectSession();
359+
360+
_lifecycleReaderSessionCts.Cancel();
361+
}
362+
}
363+
360364
internal async void TryReadRequestBytes(long bytes)
361365
{
362366
var readRequestBytes = Interlocked.Add(ref _readRequestBytes, bytes);
@@ -550,4 +554,20 @@ protected override MessageFromClient GetSendUpdateTokenRequest(string token)
550554
}
551555
};
552556
}
557+
558+
public override async ValueTask DisposeAsync()
559+
{
560+
_channelFromClientMessageSending.Writer.Complete();
561+
562+
try
563+
{
564+
await _runProcessingStreamRequest;
565+
await Stream.RequestStreamComplete();
566+
await _runProcessingStreamResponse; // waiting all ack's commits
567+
}
568+
finally
569+
{
570+
Stream.Dispose();
571+
}
572+
}
553573
}

src/Ydb.Sdk/src/Services/Topic/TopicSession.cs

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -60,10 +60,5 @@ protected async Task SendMessage(TFromClient fromClient)
6060

6161
protected abstract TFromClient GetSendUpdateTokenRequest(string token);
6262

63-
public ValueTask DisposeAsync()
64-
{
65-
Logger.LogInformation("TopicSession[{SessionId}] is being deleted", SessionId);
66-
67-
return Stream.DisposeAsync();
68-
}
63+
public abstract ValueTask DisposeAsync();
6964
}

src/Ydb.Sdk/src/Services/Topic/Writer/Writer.cs

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -224,7 +224,7 @@ private async Task Initialize()
224224
_logger.LogError("Stream unexpectedly closed by YDB server. Current InitRequest: {initRequest}",
225225
initRequest);
226226

227-
_ = Task.Run(Initialize, _disposeCts.Token);
227+
_ = Task.Run(Initialize);
228228

229229
return;
230230
}
@@ -239,7 +239,7 @@ private async Task Initialize()
239239
{
240240
_logger.LogError("Writer initialization failed to start. Reason: {Status}", status);
241241

242-
_ = Task.Run(Initialize, _disposeCts.Token);
242+
_ = Task.Run(Initialize);
243243
}
244244
else
245245
{
@@ -323,7 +323,7 @@ private async Task Initialize()
323323
{
324324
_logger.LogError(e, "Transport error on creating WriterSession");
325325

326-
_ = Task.Run(Initialize, _disposeCts.Token);
326+
_ = Task.Run(Initialize);
327327
}
328328
catch (OperationCanceledException)
329329
{
@@ -593,4 +593,13 @@ protected override MessageFromClient GetSendUpdateTokenRequest(string token)
593593
}
594594
};
595595
}
596+
597+
public override async ValueTask DisposeAsync()
598+
{
599+
Logger.LogInformation("TopicSession[{SessionId}] is being deleted", SessionId);
600+
601+
await Stream.RequestStreamComplete();
602+
603+
Stream.Dispose();
604+
}
596605
}

src/Ydb.Sdk/tests/Topic/ReaderIntegrationTests.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ public async Task StressTest_WhenReadingThenCommiting_ReturnMessages()
4242
await message.CommitAsync();
4343
}
4444

45-
reader.Dispose();
45+
await reader.DisposeAsync();
4646

4747
var readerNext = new ReaderBuilder<string>(_driver)
4848
{
@@ -59,7 +59,7 @@ public async Task StressTest_WhenReadingThenCommiting_ReturnMessages()
5959
await message.CommitAsync();
6060
}
6161

62-
readerNext.Dispose();
62+
await reader.DisposeAsync();
6363

6464
await topicClient.DropTopic(new DropTopicSettings { Path = _topicName });
6565
}

0 commit comments

Comments
 (0)