@@ -21,7 +21,9 @@ internal class Reader<TValue> : IReader<TValue>
2121 private readonly ReaderConfig _config ;
2222 private readonly IDeserializer < TValue > _deserializer ;
2323 private readonly ILogger _logger ;
24- private readonly GrpcRequestSettings _readerGrpcRequestSettings ;
24+ private readonly GrpcRequestSettings _readerGrpcRequestSettings = new ( ) ;
25+
26+ private ReaderSession < TValue > ? _currentReaderSession ;
2527
2628 private readonly Channel < InternalBatchMessages < TValue > > _receivedMessagesChannel =
2729 Channel . CreateUnbounded < InternalBatchMessages < TValue > > (
@@ -41,7 +43,6 @@ internal Reader(IDriver driver, ReaderConfig config, IDeserializer<TValue> deser
4143 _config = config ;
4244 _deserializer = deserializer ;
4345 _logger = driver . LoggerFactory . CreateLogger < Reader < TValue > > ( ) ;
44- _readerGrpcRequestSettings = new GrpcRequestSettings { CancellationToken = _disposeCts . Token } ;
4546
4647 _ = Initialize ( ) ;
4748 }
@@ -68,7 +69,7 @@ public async ValueTask<Message<TValue>> ReadAsync(CancellationToken cancellation
6869 }
6970 }
7071
71- throw new ObjectDisposedException ( "Reader" ) ;
72+ throw new ReaderException ( "Reader is disposed " ) ;
7273 }
7374
7475 public async ValueTask < BatchMessages < TValue > > ReadBatchAsync ( CancellationToken cancellationToken = default )
@@ -86,7 +87,7 @@ public async ValueTask<BatchMessages<TValue>> ReadBatchAsync(CancellationToken c
8687 }
8788 }
8889
89- throw new ObjectDisposedException ( "Reader" ) ;
90+ throw new ReaderException ( "Reader is disposed " ) ;
9091 }
9192
9293 private async Task Initialize ( )
@@ -185,15 +186,15 @@ await stream.Write(new MessageFromClient
185186 ReadRequest = new StreamReadMessage . Types . ReadRequest { BytesSize = _config . MemoryUsageMaxBytes }
186187 } ) ;
187188
188- new ReaderSession < TValue > (
189+ _currentReaderSession = new ReaderSession < TValue > (
189190 _config ,
190191 stream ,
191192 initResponse . SessionId ,
192193 Initialize ,
193194 _logger ,
194195 _receivedMessagesChannel . Writer ,
195196 _deserializer
196- ) . RunProcessingTopic ( ) ;
197+ ) ;
197198 }
198199 catch ( Driver . TransportException e )
199200 {
@@ -203,18 +204,19 @@ await stream.Write(new MessageFromClient
203204 }
204205 }
205206
206- public void Dispose ( )
207+ public async ValueTask DisposeAsync ( )
207208 {
208- try
209- {
210- _receivedMessagesChannel . Writer . TryComplete ( ) ;
211-
212- _disposeCts . Cancel ( ) ;
213- }
214- finally
209+ if ( _disposeCts . IsCancellationRequested )
215210 {
216- _disposeCts . Dispose ( ) ;
211+ return ;
217212 }
213+
214+ _receivedMessagesChannel . Writer . TryComplete ( ) ;
215+ _disposeCts . Cancel ( ) ;
216+
217+ await ( _currentReaderSession ? . DisposeAsync ( ) ?? ValueTask . CompletedTask ) ;
218+
219+ _logger . LogInformation ( "Reader[{WriterConfig}] is disposed" , _config ) ;
218220 }
219221}
220222
@@ -247,6 +249,8 @@ internal class ReaderSession<TValue> : TopicSession<MessageFromClient, MessageFr
247249 private readonly ChannelWriter < InternalBatchMessages < TValue > > _channelWriter ;
248250 private readonly CancellationTokenSource _lifecycleReaderSessionCts = new ( ) ;
249251 private readonly IDeserializer < TValue > _deserializer ;
252+ private readonly Task _runProcessingStreamResponse ;
253+ private readonly Task _runProcessingStreamRequest ;
250254
251255 private readonly Channel < MessageFromClient > _channelFromClientMessageSending =
252256 Channel . CreateUnbounded < MessageFromClient > (
@@ -279,29 +283,13 @@ IDeserializer<TValue> deserializer
279283 _readerConfig = config ;
280284 _channelWriter = channelWriter ;
281285 _deserializer = deserializer ;
286+
287+ _runProcessingStreamResponse = RunProcessingStreamResponse ( ) ;
288+ _runProcessingStreamRequest = RunProcessingStreamRequest ( ) ;
282289 }
283290
284- public async void RunProcessingTopic ( )
291+ private async Task RunProcessingStreamResponse ( )
285292 {
286- _ = Task . Run ( async ( ) =>
287- {
288- try
289- {
290- await foreach ( var messageFromClient in _channelFromClientMessageSending . Reader . ReadAllAsync ( ) )
291- {
292- await SendMessage ( messageFromClient ) ;
293- }
294- }
295- catch ( Driver . TransportException e )
296- {
297- Logger . LogError ( e , "ReaderSession[{SessionId}] have transport error on Write" , SessionId ) ;
298-
299- ReconnectSession ( ) ;
300-
301- _lifecycleReaderSessionCts . Cancel ( ) ;
302- }
303- } ) ;
304-
305293 try
306294 {
307295 while ( await Stream . MoveNextAsync ( ) )
@@ -357,6 +345,25 @@ public async void RunProcessingTopic()
357345 }
358346 }
359347
348+ private async Task RunProcessingStreamRequest ( )
349+ {
350+ try
351+ {
352+ await foreach ( var messageFromClient in _channelFromClientMessageSending . Reader . ReadAllAsync ( ) )
353+ {
354+ await SendMessage ( messageFromClient ) ;
355+ }
356+ }
357+ catch ( Driver . TransportException e )
358+ {
359+ Logger . LogError ( e , "ReaderSession[{SessionId}] have transport error on Write" , SessionId ) ;
360+
361+ ReconnectSession ( ) ;
362+
363+ _lifecycleReaderSessionCts . Cancel ( ) ;
364+ }
365+ }
366+
360367 internal async void TryReadRequestBytes ( long bytes )
361368 {
362369 var readRequestBytes = Interlocked . Add ( ref _readRequestBytes , bytes ) ;
@@ -465,21 +472,28 @@ public async Task CommitOffsetRange(OffsetsRange offsetsRange, long partitionSes
465472 {
466473 partitionSession . RegisterCommitRequest ( commitSending ) ;
467474
468- await _channelFromClientMessageSending . Writer . WriteAsync ( new MessageFromClient
469- {
470- CommitOffsetRequest = new StreamReadMessage . Types . CommitOffsetRequest
475+ try
476+ {
477+ await _channelFromClientMessageSending . Writer . WriteAsync ( new MessageFromClient
471478 {
472- CommitOffsets =
479+ CommitOffsetRequest = new StreamReadMessage . Types . CommitOffsetRequest
473480 {
474- new StreamReadMessage . Types . CommitOffsetRequest . Types . PartitionCommitOffset
481+ CommitOffsets =
475482 {
476- Offsets = { commitSending . OffsetsRange } ,
477- PartitionSessionId = partitionSessionId
483+ new StreamReadMessage . Types . CommitOffsetRequest . Types . PartitionCommitOffset
484+ {
485+ Offsets = { commitSending . OffsetsRange } ,
486+ PartitionSessionId = partitionSessionId
487+ }
478488 }
479489 }
480490 }
481- }
482- ) ;
491+ ) ;
492+ }
493+ catch ( ChannelClosedException )
494+ {
495+ throw new ReaderException ( "Reader is disposed" ) ;
496+ }
483497 }
484498 else
485499 {
@@ -550,4 +564,28 @@ protected override MessageFromClient GetSendUpdateTokenRequest(string token)
550564 }
551565 } ;
552566 }
567+
568+ public override async ValueTask DisposeAsync ( )
569+ {
570+ Logger . LogDebug ( "ReaderSession[{SessionId}]: start dispose process" , SessionId ) ;
571+
572+ _channelFromClientMessageSending . Writer . Complete ( ) ;
573+
574+ try
575+ {
576+ await _runProcessingStreamRequest ;
577+ await Stream . RequestStreamComplete ( ) ;
578+ await _runProcessingStreamResponse ; // waiting all ack's commits
579+
580+ _lifecycleReaderSessionCts . Cancel ( ) ;
581+ }
582+ catch ( Exception e )
583+ {
584+ Logger . LogError ( e , "ReaderSession[{SessionId}]: error on disposing" , SessionId ) ;
585+ }
586+ finally
587+ {
588+ Stream . Dispose ( ) ;
589+ }
590+ }
553591}
0 commit comments