77
88namespace Ydb . Sdk . Services . Topic . Writer ;
99
10- using InitResponse = StreamWriteMessage . Types . InitResponse ;
1110using MessageData = StreamWriteMessage . Types . WriteRequest . Types . MessageData ;
1211using MessageFromClient = StreamWriteMessage . Types . FromClient ;
1312using MessageFromServer = StreamWriteMessage . Types . FromServer ;
@@ -134,6 +133,11 @@ private async void StartWriteWorker()
134133 await _tcsWakeUp . Task ;
135134 _tcsWakeUp = new TaskCompletionSource ( ) ;
136135
136+ if ( _toSendBuffer . IsEmpty )
137+ {
138+ continue ;
139+ }
140+
137141 await _session . Write ( _toSendBuffer ) ;
138142 }
139143 }
@@ -179,8 +183,8 @@ private async Task Initialize()
179183 await stream . Write ( new MessageFromClient { InitRequest = initRequest } ) ;
180184 if ( ! await stream . MoveNextAsync ( ) )
181185 {
182- _session = new NotStartedWriterSession (
183- $ "Stream unexpectedly closed by YDB server. Current InitRequest: { initRequest } " ) ;
186+ _logger . LogError ( "Stream unexpectedly closed by YDB server. Current InitRequest: {initRequest}" ,
187+ initRequest ) ;
184188
185189 _ = Task . Run ( Initialize , _disposeCts . Token ) ;
186190
@@ -193,14 +197,18 @@ private async Task Initialize()
193197
194198 if ( status . IsNotSuccess )
195199 {
196- _session = new NotStartedWriterSession ( "Initialization failed" , status ) ;
197-
198- if ( status . StatusCode != StatusCode . SchemeError )
200+ if ( RetrySettings . DefaultInstance . GetRetryRule ( status . StatusCode ) . Policy != RetryPolicy . None )
199201 {
202+ _logger . LogError ( "Writer initialization failed to start. Reason: {Status}" , status ) ;
203+
200204 _ = Task . Run ( Initialize , _disposeCts . Token ) ;
201205 }
206+ else
207+ {
208+ _logger . LogCritical ( "Writer initialization failed to start. Reason: {Status}" , status ) ;
202209
203- _logger . LogCritical ( "Writer initialization failed to start. Reason: {Status}" , status ) ;
210+ _session = new NotStartedWriterSession ( "Initialization failed" , status ) ;
211+ }
204212
205213 return ;
206214 }
@@ -222,45 +230,59 @@ private async Task Initialize()
222230 return ;
223231 }
224232
225- var newSession = new WriterSession (
226- _config ,
227- stream ,
228- initResponse ,
229- Initialize ,
230- _logger ,
231- _toSendBuffer
232- ) ;
233-
234- if ( ! _inFlightMessages . IsEmpty )
233+ var copyInFlightMessages = new ConcurrentQueue < MessageSending > ( ) ;
234+ var lastSeqNo = initResponse . LastSeqNo ;
235+ while ( _inFlightMessages . TryDequeue ( out var sendData ) )
235236 {
236- var copyInFlightMessages = new ConcurrentQueue < MessageSending > ( ) ;
237- while ( _inFlightMessages . TryDequeue ( out var sendData ) )
237+ if ( sendData . Tcs . Task . IsFaulted )
238238 {
239- if ( sendData . Tcs . Task . IsFaulted )
240- {
241- _logger . LogWarning ( "Message[SeqNo={SeqNo}] is cancelled" , sendData . MessageData . SeqNo ) ;
239+ _logger . LogWarning ( "Message[SeqNo={SeqNo}] is cancelled" , sendData . MessageData . SeqNo ) ;
242240
243- continue ;
244- }
241+ continue ;
242+ }
245243
246- copyInFlightMessages . Enqueue ( sendData ) ;
244+ if ( lastSeqNo >= sendData . MessageData . SeqNo )
245+ {
246+ _logger . LogWarning (
247+ "Message[SeqNo={SeqNo}] has been skipped because its sequence number " +
248+ "is less than or equal to the last processed server's SeqNo[{LastSeqNo}]" ,
249+ sendData . MessageData . SeqNo , lastSeqNo ) ;
250+
251+ sendData . Tcs . TrySetResult ( WriteResult . Skipped ) ;
252+
253+ continue ;
247254 }
248255
249- await newSession . Write ( copyInFlightMessages ) ; // retry prev in flight messages
256+
257+ // Calculate the next sequence number from the calculated previous messages.
258+ lastSeqNo = Math . Max ( lastSeqNo , sendData . MessageData . SeqNo ) ;
259+
260+ copyInFlightMessages . Enqueue ( sendData ) ;
250261 }
251262
252- _session = newSession ;
253- newSession . RunProcessingWriteAck ( ) ;
254- if ( ! _toSendBuffer . IsEmpty )
263+ var newSession = new WriterSession (
264+ config : _config ,
265+ stream : stream ,
266+ lastSeqNo : lastSeqNo ,
267+ sessionId : initResponse . SessionId ,
268+ initialize : Initialize ,
269+ logger : _logger ,
270+ inFlightMessages : _inFlightMessages
271+ ) ;
272+
273+ if ( ! copyInFlightMessages . IsEmpty )
255274 {
256- WakeUpWorker ( ) ; // send buffer
275+ await newSession . Write ( copyInFlightMessages ) ; // retry prev in flight messages
257276 }
277+
278+ _session = newSession ;
279+ newSession . RunProcessingWriteAck ( ) ;
280+ WakeUpWorker ( ) ; // attempt send buffer
258281 }
259282 catch ( Driver . TransportException e )
260283 {
261284 _logger . LogError ( e , "Transport error on creating WriterSession" ) ;
262285
263- _session = DummyWriteSession . Instance ;
264286 _ = Task . Run ( Initialize , _disposeCts . Token ) ;
265287 }
266288 }
@@ -321,6 +343,10 @@ internal class DummyWriteSession : IWriteSession
321343{
322344 internal static readonly DummyWriteSession Instance = new ( ) ;
323345
346+ private DummyWriteSession ( )
347+ {
348+ }
349+
324350 public void Dispose ( )
325351 {
326352 }
@@ -341,20 +367,21 @@ internal class WriterSession : TopicSession<MessageFromClient, MessageFromServer
341367 public WriterSession (
342368 WriterConfig config ,
343369 WriterStream stream ,
344- InitResponse initResponse ,
370+ long lastSeqNo ,
371+ string sessionId ,
345372 Func < Task > initialize ,
346373 ILogger logger ,
347374 ConcurrentQueue < MessageSending > inFlightMessages
348375 ) : base (
349376 stream ,
350377 logger ,
351- initResponse . SessionId ,
378+ sessionId ,
352379 initialize
353380 )
354381 {
355382 _config = config ;
356383 _inFlightMessages = inFlightMessages ;
357- Volatile . Write ( ref _seqNum , initResponse . LastSeqNo ) ; // happens-before for Volatile.Read
384+ Volatile . Write ( ref _seqNum , lastSeqNo ) ; // happens-before for Volatile.Read
358385 }
359386
360387 public async Task Write ( ConcurrentQueue < MessageSending > toSendBuffer )
@@ -406,7 +433,7 @@ internal async void RunProcessingWriteAck()
406433
407434 if ( status . IsNotSuccess )
408435 {
409- Logger . LogWarning (
436+ Logger . LogError (
410437 "WriterSession[{SessionId}] received unsuccessful status while processing writeAck: {Status}" ,
411438 SessionId , status ) ;
412439 return ;
0 commit comments