@@ -24,10 +24,10 @@ internal class Writer<TValue> : IWriter<TValue>
2424 private readonly ILogger < Writer < TValue > > _logger ;
2525 private readonly ISerializer < TValue > _serializer ;
2626 private readonly ConcurrentQueue < MessageSending > _toSendBuffer = new ( ) ;
27+ private readonly CancellationTokenSource _disposeTokenSource = new ( ) ;
2728
2829 private volatile TaskCompletionSource _taskWakeUpCompletionSource = new ( ) ;
29- private volatile IWriteSession _session = null ! ;
30- private volatile bool _disposed ;
30+ private volatile IWriteSession _session = new NotStartedWriterSession ( "Session not started!" ) ;
3131
3232 private int _limitBufferMaxSize ;
3333
@@ -55,7 +55,7 @@ public async Task<WriteResult> WriteAsync(Message<TValue> message)
5555 var messageData = new MessageData
5656 {
5757 Data = ByteString . CopyFrom ( data ) ,
58- CreatedAt = Timestamp . FromDateTime ( message . Timestamp ) ,
58+ CreatedAt = Timestamp . FromDateTime ( message . Timestamp . ToUniversalTime ( ) ) ,
5959 UncompressedSize = data . Length
6060 } ;
6161
@@ -111,7 +111,7 @@ private async void StartWriteWorker()
111111 {
112112 await Initialize ( ) ;
113113
114- while ( ! _disposed )
114+ while ( ! _disposeTokenSource . Token . IsCancellationRequested )
115115 {
116116 await _taskWakeUpCompletionSource . Task ;
117117 _taskWakeUpCompletionSource = new TaskCompletionSource ( ) ;
@@ -127,76 +127,100 @@ private void WakeUpWorker()
127127
128128 private async Task Initialize ( )
129129 {
130- _logger . LogInformation ( "Writer session initialization started. WriterConfig: {WriterConfig}" , _config ) ;
130+ try
131+ {
132+ _logger . LogInformation ( "Writer session initialization started. WriterConfig: {WriterConfig}" , _config ) ;
131133
132- var stream = _driver . BidirectionalStreamCall (
133- TopicService . StreamWriteMethod ,
134- GrpcRequestSettings . DefaultInstance
135- ) ;
134+ var stream = _driver . BidirectionalStreamCall (
135+ TopicService . StreamWriteMethod ,
136+ GrpcRequestSettings . DefaultInstance
137+ ) ;
136138
137- var initRequest = new StreamWriteMessage . Types . InitRequest { Path = _config . TopicPath } ;
138- if ( _config . ProducerId != null )
139- {
140- initRequest . ProducerId = _config . ProducerId ;
141- }
139+ var initRequest = new StreamWriteMessage . Types . InitRequest { Path = _config . TopicPath } ;
140+ if ( _config . ProducerId != null )
141+ {
142+ initRequest . ProducerId = _config . ProducerId ;
143+ }
142144
143- if ( _config . MessageGroupId != null )
144- {
145- initRequest . MessageGroupId = _config . MessageGroupId ;
146- }
145+ if ( _config . MessageGroupId != null )
146+ {
147+ initRequest . MessageGroupId = _config . MessageGroupId ;
148+ }
147149
148- _logger . LogDebug ( "Sending initialization request for the write stream: {InitRequest}" , initRequest ) ;
150+ _logger . LogDebug ( "Sending initialization request for the write stream: {InitRequest}" , initRequest ) ;
149151
150- await stream . Write ( new MessageFromClient { InitRequest = initRequest } ) ;
151- if ( ! await stream . MoveNextAsync ( ) )
152- {
153- _session = new NotStartedWriterSession (
154- $ "Stream unexpectedly closed by YDB server. Current InitRequest: { initRequest } ") ;
152+ await stream . Write ( new MessageFromClient { InitRequest = initRequest } ) ;
153+ if ( ! await stream . MoveNextAsync ( ) )
154+ {
155+ _session = new NotStartedWriterSession (
156+ $ "Stream unexpectedly closed by YDB server. Current InitRequest: { initRequest } ") ;
155157
156- _ = Task . Run ( Initialize ) ;
158+ _ = Task . Run ( Initialize , _disposeTokenSource . Token ) ;
157159
158- return ;
159- }
160+ return ;
161+ }
160162
161- var receivedInitMessage = stream . Current ;
163+ var receivedInitMessage = stream . Current ;
162164
163- var status = Status . FromProto ( receivedInitMessage . Status , receivedInitMessage . Issues ) ;
165+ var status = Status . FromProto ( receivedInitMessage . Status , receivedInitMessage . Issues ) ;
164166
165- if ( status . IsNotSuccess )
166- {
167- _session = new NotStartedWriterSession ( status . ToString ( ) ) ;
167+ if ( status . IsNotSuccess )
168+ {
169+ _session = new NotStartedWriterSession ( "Initialization failed" , status ) ;
168170
169- _ = Task . Run ( Initialize ) ;
171+ if ( status . StatusCode != StatusCode . SchemeError )
172+ {
173+ _ = Task . Run ( Initialize , _disposeTokenSource . Token ) ;
174+ }
170175
171- return ;
172- }
176+ return ;
177+ }
173178
174- var initResponse = receivedInitMessage . InitResponse ;
179+ var initResponse = receivedInitMessage . InitResponse ;
175180
176- _logger . LogDebug ( "Received a response for the initialization request on the write stream: {InitResponse}" ,
177- initResponse ) ;
181+ _logger . LogDebug ( "Received a response for the initialization request on the writer stream: {InitResponse}" ,
182+ initResponse ) ;
178183
179- if ( ! initResponse . SupportedCodecs . Codecs . Contains ( ( int ) _config . Codec ) )
184+ if ( initResponse . SupportedCodecs != null &&
185+ ! initResponse . SupportedCodecs . Codecs . Contains ( ( int ) _config . Codec ) )
186+ {
187+ _logger . LogCritical ( "Topic[{TopicPath}] is not supported codec: {Codec}" , _config . TopicPath ,
188+ _config . Codec ) ;
189+
190+ _session = new NotStartedWriterSession (
191+ $ "Topic[{ _config . TopicPath } ] is not supported codec: { _config . Codec } ") ;
192+ return ;
193+ }
194+
195+ _session = new WriterSession ( _config , stream , initResponse , Initialize , _logger ) ;
196+ }
197+ catch ( Driver . TransportException e )
180198 {
181- _logger . LogCritical ( "Topic[{TopicPath}] is not supported codec: {Codec}" , _config . TopicPath , _config . Codec ) ;
199+ _logger . LogError ( e , "Unable to connect the session" ) ;
182200
183201 _session = new NotStartedWriterSession (
184- $ "Topic[{ _config . TopicPath } ] is not supported codec: { _config . Codec } ") ;
185- return ;
202+ new YdbWriterException ( "Transport error on creating write session" , e ) ) ;
186203 }
187-
188- _session = new WriterSession ( _config , stream , initResponse , Initialize , _logger ) ;
189204 }
190205
191206 public void Dispose ( )
192207 {
193- _disposed = true ;
208+ try
209+ {
210+ _disposeTokenSource . Cancel ( ) ;
211+
212+ _session . Dispose ( ) ;
213+ }
214+ finally
215+ {
216+ _disposeTokenSource . Dispose ( ) ;
217+ }
194218 }
195219}
196220
197221internal record MessageSending ( MessageData MessageData , TaskCompletionSource < WriteResult > TaskCompletionSource ) ;
198222
199- internal interface IWriteSession
223+ internal interface IWriteSession : IDisposable
200224{
201225 Task Write ( ConcurrentQueue < MessageSending > toSendBuffer ) ;
202226}
@@ -210,6 +234,16 @@ public NotStartedWriterSession(string reasonExceptionMessage)
210234 _reasonException = new YdbWriterException ( reasonExceptionMessage ) ;
211235 }
212236
237+ public NotStartedWriterSession ( string reasonExceptionMessage , Status status )
238+ {
239+ _reasonException = new YdbWriterException ( reasonExceptionMessage , status ) ;
240+ }
241+
242+ public NotStartedWriterSession ( YdbWriterException reasonException )
243+ {
244+ _reasonException = reasonException ;
245+ }
246+
213247 public Task Write ( ConcurrentQueue < MessageSending > toSendBuffer )
214248 {
215249 foreach ( var messageSending in toSendBuffer )
@@ -219,6 +253,10 @@ public Task Write(ConcurrentQueue<MessageSending> toSendBuffer)
219253
220254 return Task . CompletedTask ;
221255 }
256+
257+ public void Dispose ( )
258+ {
259+ }
222260}
223261
224262// No thread safe
0 commit comments