@@ -23,22 +23,109 @@ internal class Writer<TValue> : IWriter<TValue>
2323 private readonly WriterConfig _config ;
2424 private readonly ILogger < Writer < TValue > > _logger ;
2525 private readonly ISerializer < TValue > _serializer ;
26-
27- private readonly ConcurrentQueue < MessageSending > _inFlightMessages = new ( ) ;
2826 private readonly ConcurrentQueue < MessageSending > _toSendBuffer = new ( ) ;
29- private readonly SemaphoreSlim _writeSemaphoreSlim = new ( 1 ) ;
3027
31- private volatile WriterSession _session = null ! ;
28+ private volatile TaskCompletionSource _taskWakeUpCompletionSource = new ( ) ;
29+ private volatile IWriteSession _session = null ! ;
30+ private volatile bool _disposed ;
31+
32+ private int _limitBufferMaxSize ;
3233
3334 internal Writer ( IDriver driver , WriterConfig config , ISerializer < TValue > serializer )
3435 {
3536 _driver = driver ;
3637 _config = config ;
37- _serializer = serializer ;
3838 _logger = driver . LoggerFactory . CreateLogger < Writer < TValue > > ( ) ;
39+ _serializer = serializer ;
40+ _limitBufferMaxSize = config . BufferMaxSize ;
41+
42+ StartWriteWorker ( ) ;
43+ }
44+
45+ public Task < WriteResult > WriteAsync ( TValue data )
46+ {
47+ return WriteAsync ( new Message < TValue > ( data ) ) ;
48+ }
49+
50+ public async Task < WriteResult > WriteAsync ( Message < TValue > message )
51+ {
52+ TaskCompletionSource < WriteResult > completeTask = new ( ) ;
53+
54+ var data = _serializer . Serialize ( message . Data ) ;
55+ var messageData = new MessageData
56+ {
57+ Data = ByteString . CopyFrom ( data ) ,
58+ CreatedAt = Timestamp . FromDateTime ( message . Timestamp ) ,
59+ UncompressedSize = data . Length
60+ } ;
61+
62+ foreach ( var metadata in message . Metadata )
63+ {
64+ messageData . MetadataItems . Add (
65+ new MetadataItem { Key = metadata . Key , Value = ByteString . CopyFrom ( metadata . Value ) }
66+ ) ;
67+ }
68+
69+ while ( true )
70+ {
71+ var curLimitBufferSize = Volatile . Read ( ref _limitBufferMaxSize ) ;
72+
73+ if ( // sending one biggest message anyway
74+ curLimitBufferSize == _config . BufferMaxSize && data . Length > curLimitBufferSize
75+ || curLimitBufferSize >= data . Length )
76+ {
77+ if ( Interlocked . CompareExchange ( ref _limitBufferMaxSize , curLimitBufferSize ,
78+ curLimitBufferSize - data . Length ) == curLimitBufferSize )
79+ {
80+ _toSendBuffer . Enqueue ( new MessageSending ( messageData , completeTask ) ) ;
81+
82+ WakeUpWorker ( ) ;
83+
84+ break ;
85+ }
86+
87+ // Next try on race condition
88+ continue ;
89+ }
90+
91+ _logger . LogWarning (
92+ "Buffer overflow: the data size [{DataLength}] exceeds the current buffer limit ({CurLimitBufferSize}) [BufferMaxSize = {BufferMaxSize}]" ,
93+ data . Length , curLimitBufferSize , _config . BufferMaxSize ) ;
94+
95+ throw new YdbWriterException ( "Buffer overflow" ) ;
96+ }
97+
98+ try
99+ {
100+ var writeResult = await completeTask . Task ;
101+
102+ return writeResult ;
103+ }
104+ finally
105+ {
106+ Interlocked . Add ( ref _limitBufferMaxSize , data . Length ) ;
107+ }
39108 }
40109
41- internal async Task Initialize ( )
110+ private async void StartWriteWorker ( )
111+ {
112+ await Initialize ( ) ;
113+
114+ while ( ! _disposed )
115+ {
116+ await _taskWakeUpCompletionSource . Task ;
117+ _taskWakeUpCompletionSource = new TaskCompletionSource ( ) ;
118+
119+ await _session . Write ( _toSendBuffer ) ;
120+ }
121+ }
122+
123+ private void WakeUpWorker ( )
124+ {
125+ _taskWakeUpCompletionSource . TrySetResult ( ) ;
126+ }
127+
128+ private async Task Initialize ( )
42129 {
43130 _logger . LogInformation ( "Writer session initialization started. WriterConfig: {WriterConfig}" , _config ) ;
44131
@@ -63,13 +150,26 @@ internal async Task Initialize()
63150 await stream . Write ( new MessageFromClient { InitRequest = initRequest } ) ;
64151 if ( ! await stream . MoveNextAsync ( ) )
65152 {
66- throw new YdbWriterException (
153+ _session = new NotStartedWriterSession (
67154 $ "Stream unexpectedly closed by YDB server. Current InitRequest: { initRequest } ") ;
155+
156+ _ = Task . Run ( Initialize ) ;
157+
158+ return ;
68159 }
69160
70161 var receivedInitMessage = stream . Current ;
71162
72- Status . FromProto ( receivedInitMessage . Status , receivedInitMessage . Issues ) . EnsureSuccess ( ) ;
163+ var status = Status . FromProto ( receivedInitMessage . Status , receivedInitMessage . Issues ) ;
164+
165+ if ( status . IsNotSuccess )
166+ {
167+ _session = new NotStartedWriterSession ( status . ToString ( ) ) ;
168+
169+ _ = Task . Run ( Initialize ) ;
170+
171+ return ;
172+ }
73173
74174 var initResponse = receivedInitMessage . InitResponse ;
75175
@@ -78,75 +178,54 @@ internal async Task Initialize()
78178
79179 if ( ! initResponse . SupportedCodecs . Codecs . Contains ( ( int ) _config . Codec ) )
80180 {
81- throw new YdbWriterException ( $ "Topic[{ _config . TopicPath } ] is not supported codec: { _config . Codec } ") ;
82- }
83-
84- _session = new WriterSession ( _config , stream , initResponse , Initialize , _logger ) ;
181+ _logger . LogCritical ( "Topic[{TopicPath}] is not supported codec: {Codec}" , _config . TopicPath , _config . Codec ) ;
85182
86- await _writeSemaphoreSlim . WaitAsync ( ) ;
87- try
88- {
89- _logger . LogDebug ( "Retrying to send pending in-flight messages after stream restart" ) ;
90-
91- await _session . Write ( _inFlightMessages , _inFlightMessages ) ;
92- }
93- finally
94- {
95- _writeSemaphoreSlim . Release ( ) ;
183+ _session = new NotStartedWriterSession (
184+ $ "Topic[{ _config . TopicPath } ] is not supported codec: { _config . Codec } ") ;
185+ return ;
96186 }
97187
98- _ = _session . RunProcessingWriteAck ( _inFlightMessages ) ;
188+ _session = new WriterSession ( _config , stream , initResponse , Initialize , _logger ) ;
99189 }
100190
101- public Task < WriteResult > WriteAsync ( TValue data )
191+ public void Dispose ( )
102192 {
103- return WriteAsync ( new Message < TValue > ( data ) ) ;
193+ _disposed = true ;
104194 }
195+ }
105196
106- public async Task < WriteResult > WriteAsync ( Message < TValue > message )
107- {
108- TaskCompletionSource < WriteResult > completeTask = new ( ) ;
109-
110- var data = _serializer . Serialize ( message . Data ) ;
111- var messageData = new MessageData
112- {
113- Data = ByteString . CopyFrom ( data ) ,
114- CreatedAt = Timestamp . FromDateTime ( message . Timestamp ) ,
115- UncompressedSize = data . Length
116- } ;
197+ internal record MessageSending ( MessageData MessageData , TaskCompletionSource < WriteResult > TaskCompletionSource ) ;
117198
118- foreach ( var metadata in message . Metadata )
119- {
120- messageData . MetadataItems . Add (
121- new MetadataItem { Key = metadata . Key , Value = ByteString . CopyFrom ( metadata . Value ) }
122- ) ;
123- }
199+ internal interface IWriteSession
200+ {
201+ Task Write ( ConcurrentQueue < MessageSending > toSendBuffer ) ;
202+ }
124203
125- _toSendBuffer . Enqueue ( new MessageSending ( messageData , completeTask ) ) ;
204+ internal class NotStartedWriterSession : IWriteSession
205+ {
206+ private readonly YdbWriterException _reasonException ;
126207
127- if ( _toSendBuffer . IsEmpty ) // concurrent sending
128- {
129- return await completeTask . Task ;
130- }
208+ public NotStartedWriterSession ( string reasonExceptionMessage )
209+ {
210+ _reasonException = new YdbWriterException ( reasonExceptionMessage ) ;
211+ }
131212
132- await _writeSemaphoreSlim . WaitAsync ( ) ;
133- try
134- {
135- await _session . Write ( _toSendBuffer , _inFlightMessages ) ;
136- }
137- finally
213+ public Task Write ( ConcurrentQueue < MessageSending > toSendBuffer )
214+ {
215+ foreach ( var messageSending in toSendBuffer )
138216 {
139- _writeSemaphoreSlim . Release ( ) ;
217+ messageSending . TaskCompletionSource . SetException ( _reasonException ) ;
140218 }
141219
142- return await completeTask . Task ;
220+ return Task . CompletedTask ;
143221 }
144222}
145223
146224// No thread safe
147- internal class WriterSession : TopicSession < MessageFromClient , MessageFromServer >
225+ internal class WriterSession : TopicSession < MessageFromClient , MessageFromServer > , IWriteSession
148226{
149227 private readonly WriterConfig _config ;
228+ private readonly ConcurrentQueue < MessageSending > _inFlightMessages = new ( ) ;
150229
151230 private long _seqNum ;
152231
@@ -159,9 +238,48 @@ public WriterSession(
159238 {
160239 _config = config ;
161240 Volatile . Write ( ref _seqNum , initResponse . LastSeqNo ) ; // happens-before for Volatile.Read
241+
242+ RunProcessingWriteAck ( ) ;
162243 }
163244
164- internal async Task RunProcessingWriteAck ( ConcurrentQueue < MessageSending > inFlightMessages )
245+ public async Task Write ( ConcurrentQueue < MessageSending > toSendBuffer )
246+ {
247+ try
248+ {
249+ var writeMessage = new StreamWriteMessage . Types . WriteRequest
250+ {
251+ Codec = ( int ) _config . Codec
252+ } ;
253+
254+ var currentSeqNum = Volatile . Read ( ref _seqNum ) ;
255+
256+ while ( toSendBuffer . TryDequeue ( out var sendData ) )
257+ {
258+ var messageData = sendData . MessageData ;
259+
260+ messageData . SeqNo = ++ currentSeqNum ;
261+ writeMessage . Messages . Add ( messageData ) ;
262+ _inFlightMessages . Enqueue ( sendData ) ;
263+ }
264+
265+ Volatile . Write ( ref _seqNum , currentSeqNum ) ;
266+ await Stream . Write ( new MessageFromClient { WriteRequest = writeMessage } ) ;
267+ }
268+ catch ( TransactionException e )
269+ {
270+ Logger . LogError ( e , "WriterSession[{SessionId}] have error on Write, last SeqNo={SeqNo}" ,
271+ SessionId , Volatile . Read ( ref _seqNum ) ) ;
272+
273+ ReconnectSession ( ) ;
274+
275+ while ( _inFlightMessages . TryDequeue ( out var sendData ) )
276+ {
277+ sendData . TaskCompletionSource . SetException ( e ) ;
278+ }
279+ }
280+ }
281+
282+ private async void RunProcessingWriteAck ( )
165283 {
166284 try
167285 {
@@ -182,7 +300,7 @@ internal async Task RunProcessingWriteAck(ConcurrentQueue<MessageSending> inFlig
182300
183301 foreach ( var ack in messageFromServer . WriteResponse . Acks )
184302 {
185- if ( ! inFlightMessages . TryPeek ( out var messageFromClient ) )
303+ if ( ! _inFlightMessages . TryPeek ( out var messageFromClient ) )
186304 {
187305 Logger . LogCritical ( "No client message was found upon receipt of an acknowledgement: {WriteAck}" ,
188306 ack ) ;
@@ -217,7 +335,7 @@ Completing task on exception...
217335 messageFromClient . TaskCompletionSource . SetResult ( new WriteResult ( ack ) ) ;
218336 }
219337
220- inFlightMessages . TryDequeue ( out _ ) ; // Dequeue
338+ _inFlightMessages . TryDequeue ( out _ ) ; // Dequeue
221339 }
222340 }
223341 }
@@ -230,39 +348,4 @@ Completing task on exception...
230348 ReconnectSession ( ) ;
231349 }
232350 }
233-
234- internal async Task Write ( ConcurrentQueue < MessageSending > toSendBuffer ,
235- ConcurrentQueue < MessageSending > inFlightMessages )
236- {
237- try
238- {
239- var writeMessage = new StreamWriteMessage . Types . WriteRequest
240- {
241- Codec = ( int ) _config . Codec
242- } ;
243-
244- var currentSeqNum = Volatile . Read ( ref _seqNum ) ;
245-
246- while ( toSendBuffer . TryDequeue ( out var sendData ) )
247- {
248- var messageData = sendData . MessageData ;
249-
250- messageData . SeqNo = ++ currentSeqNum ;
251- writeMessage . Messages . Add ( messageData ) ;
252- inFlightMessages . Enqueue ( sendData ) ;
253- }
254-
255- Volatile . Write ( ref _seqNum , currentSeqNum ) ;
256- await Stream . Write ( new MessageFromClient { WriteRequest = writeMessage } ) ;
257- }
258- catch ( TransactionException e )
259- {
260- Logger . LogError ( e , "WriterSession[{SessionId}] have error on Write, last SeqNo={SeqNo}" ,
261- SessionId , Volatile . Read ( ref _seqNum ) ) ;
262-
263- ReconnectSession ( ) ;
264- }
265- }
266351}
267-
268- internal record MessageSending ( MessageData MessageData , TaskCompletionSource < WriteResult > TaskCompletionSource ) ;
0 commit comments