@@ -21,11 +21,11 @@ internal class Writer<TValue> : IWriter<TValue>
2121 private readonly WriterConfig _config ;
2222 private readonly ILogger < Writer < TValue > > _logger ;
2323 private readonly ISerializer < TValue > _serializer ;
24- private readonly GrpcRequestSettings _writerGrpcRequestSettings ;
24+ private readonly GrpcRequestSettings _writerGrpcRequestSettings = new ( ) ;
2525 private readonly ConcurrentQueue < MessageSending > _toSendBuffer = new ( ) ;
2626 private readonly ConcurrentQueue < MessageSending > _inFlightMessages = new ( ) ;
2727 private readonly CancellationTokenSource _disposeCts = new ( ) ;
28- private readonly SemaphoreSlim _clearInFlightMessagesSemaphoreSlim = new ( 1 ) ;
28+ private readonly SemaphoreSlim _sendInFlightMessagesSemaphoreSlim = new ( 1 ) ;
2929
3030 private volatile TaskCompletionSource _tcsWakeUp = new ( ) ;
3131 private volatile TaskCompletionSource _tcsBufferAvailableEvent = new ( ) ;
@@ -39,7 +39,6 @@ internal Writer(IDriver driver, WriterConfig config, ISerializer<TValue> seriali
3939 _logger = driver . LoggerFactory . CreateLogger < Writer < TValue > > ( ) ;
4040 _serializer = serializer ;
4141 _limitBufferMaxSize = config . BufferMaxSize ;
42- _writerGrpcRequestSettings = new GrpcRequestSettings { CancellationToken = _disposeCts . Token } ;
4342
4443 StartWriteWorker ( ) ;
4544 }
@@ -95,7 +94,9 @@ public async Task<WriteResult> WriteAsync(Message<TValue> message, CancellationT
9594 if ( Interlocked . CompareExchange ( ref _limitBufferMaxSize ,
9695 curLimitBufferSize - data . Length , curLimitBufferSize ) == curLimitBufferSize )
9796 {
98- _toSendBuffer . Enqueue ( new MessageSending ( messageData , tcs ) ) ;
97+ _toSendBuffer . Enqueue (
98+ new MessageSending ( messageData , tcs , writerDisposedCancellationTokenRegistration )
99+ ) ;
99100 WakeUpWorker ( ) ;
100101
101102 break ;
@@ -162,7 +163,7 @@ private async void StartWriteWorker()
162163 continue ;
163164 }
164165
165- await _clearInFlightMessagesSemaphoreSlim . WaitAsync ( _disposeCts . Token ) ;
166+ await _sendInFlightMessagesSemaphoreSlim . WaitAsync ( _disposeCts . Token ) ;
166167 try
167168 {
168169 if ( _session . IsActive )
@@ -172,7 +173,7 @@ private async void StartWriteWorker()
172173 }
173174 finally
174175 {
175- _clearInFlightMessagesSemaphoreSlim . Release ( ) ;
176+ _sendInFlightMessagesSemaphoreSlim . Release ( ) ;
176177 }
177178 }
178179 }
@@ -193,7 +194,7 @@ private async Task Initialize()
193194
194195 try
195196 {
196- if ( _disposeCts . IsCancellationRequested )
197+ if ( _disposeCts . IsCancellationRequested && _inFlightMessages . IsEmpty )
197198 {
198199 _logger . LogWarning ( "Initialize writer is canceled because it has been disposed" ) ;
199200
@@ -267,7 +268,7 @@ private async Task Initialize()
267268 return ;
268269 }
269270
270- await _clearInFlightMessagesSemaphoreSlim . WaitAsync ( _disposeCts . Token ) ;
271+ await _sendInFlightMessagesSemaphoreSlim . WaitAsync ( ) ;
271272 try
272273 {
273274 var copyInFlightMessages = new ConcurrentQueue < MessageSending > ( ) ;
@@ -315,7 +316,7 @@ private async Task Initialize()
315316 }
316317 finally
317318 {
318- _clearInFlightMessagesSemaphoreSlim . Release ( ) ;
319+ _sendInFlightMessagesSemaphoreSlim . Release ( ) ;
319320 }
320321 }
321322 catch ( Driver . TransportException e )
@@ -330,17 +331,55 @@ private async Task Initialize()
330331 }
331332 }
332333
333- public void Dispose ( )
334+ public async ValueTask DisposeAsync ( )
334335 {
335- _disposeCts . Cancel ( ) ;
336+ if ( _disposeCts . IsCancellationRequested )
337+ {
338+ return ;
339+ }
340+
341+ _logger . LogInformation ( "Starting Writer[{WriterConfig}] disposal process" , _config ) ;
342+
343+ await _sendInFlightMessagesSemaphoreSlim . WaitAsync ( ) ;
344+ try
345+ {
346+ _logger . LogDebug ( "Signaling cancellation token to stop writing new messages" ) ;
347+
348+ _disposeCts . Cancel ( ) ;
349+ }
350+ finally
351+ {
352+ _sendInFlightMessagesSemaphoreSlim . Release ( ) ;
353+ }
354+
355+ _logger . LogDebug ( "Writer[{WriterConfig}] is waiting for all in-flight messages to complete..." , _config ) ;
356+
357+ foreach ( var inFlightMessage in _inFlightMessages )
358+ {
359+ try
360+ {
361+ await inFlightMessage . Tcs . Task ;
362+ }
363+ catch ( Exception e )
364+ {
365+ _logger . LogError ( e , "Error occurred while waiting for in-flight message SeqNo: {SeqNo}" ,
366+ inFlightMessage . MessageData . SeqNo ) ;
367+ }
368+ }
369+
370+ await _session . DisposeAsync ( ) ;
336371
337- _session = new NotStartedWriterSession ( "Writer is disposed" ) ;
372+ _logger . LogInformation ( "Writer[{WriterConfig}] is disposed" , _config ) ;
338373 }
339374}
340375
341- internal record MessageSending ( MessageData MessageData , TaskCompletionSource < WriteResult > Tcs ) ;
376+ internal record MessageSending (
377+ MessageData MessageData ,
378+ TaskCompletionSource < WriteResult > Tcs ,
379+ CancellationTokenRegistration DisposedCtr
380+ ) ;
342381
343- internal interface IWriteSession
382+ internal interface IWriteSession : IAsyncDisposable
344383{
345384 Task Write ( ConcurrentQueue < MessageSending > toSendBuffer ) ;
346385
@@ -372,6 +411,11 @@ public Task Write(ConcurrentQueue<MessageSending> toSendBuffer)
372411 }
373412
374413 public bool IsActive => true ;
414+
415+ public ValueTask DisposeAsync ( )
416+ {
417+ return ValueTask . CompletedTask ;
418+ }
375419}
376420
377421internal class DummyWriterSession : IWriteSession
@@ -388,6 +432,11 @@ public Task Write(ConcurrentQueue<MessageSending> toSendBuffer)
388432 }
389433
390434 public bool IsActive => false ;
435+
436+ public ValueTask DisposeAsync ( )
437+ {
438+ return ValueTask . CompletedTask ;
439+ }
391440}
392441
393442internal class WriterSession : TopicSession < MessageFromClient , MessageFromServer > , IWriteSession
@@ -437,6 +486,8 @@ public async Task Write(ConcurrentQueue<MessageSending> toSendBuffer)
437486 continue ;
438487 }
439488
489+ sendData . DisposedCtr . Unregister ( ) ;
490+
440491 var messageData = sendData . MessageData ;
441492
442493 if ( messageData . SeqNo == default )
0 commit comments