Skip to content

Commit df674fc

Browse files
commit reconnection work
1 parent 7322f04 commit df674fc

File tree

3 files changed

+82
-25
lines changed

3 files changed

+82
-25
lines changed

src/Ydb.Sdk/src/Services/Topic/TopicSession.cs

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,23 +5,29 @@ namespace Ydb.Sdk.Services.Topic;
55
internal abstract class TopicSession<TFromClient, TFromServer> : IDisposable
66
{
77
private readonly Func<Task> _initialize;
8+
private readonly Action<WriterException> _resetSessionOnTransportError;
89

910
protected readonly IBidirectionalStream<TFromClient, TFromServer> Stream;
1011
protected readonly ILogger Logger;
1112
protected readonly string SessionId;
1213

1314
private int _isActive = 1;
1415

15-
protected TopicSession(IBidirectionalStream<TFromClient, TFromServer> stream, ILogger logger,
16-
string sessionId, Func<Task> initialize)
16+
protected TopicSession(
17+
IBidirectionalStream<TFromClient, TFromServer> stream,
18+
ILogger logger,
19+
string sessionId,
20+
Func<Task> initialize,
21+
Action<WriterException> resetSessionOnTransportError)
1722
{
1823
Stream = stream;
1924
Logger = logger;
2025
SessionId = sessionId;
2126
_initialize = initialize;
27+
_resetSessionOnTransportError = resetSessionOnTransportError;
2228
}
2329

24-
protected async void ReconnectSession()
30+
protected async void ReconnectSession(WriterException exception)
2531
{
2632
if (Interlocked.CompareExchange(ref _isActive, 0, 1) == 0)
2733
{
@@ -30,6 +36,8 @@ protected async void ReconnectSession()
3036
return;
3137
}
3238

39+
_resetSessionOnTransportError(exception);
40+
3341
Logger.LogInformation("WriterSession[{SessionId}] has been deactivated, starting to reconnect", SessionId);
3442

3543
await _initialize();

src/Ydb.Sdk/src/Services/Topic/Writer/Writer.cs

Lines changed: 37 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ internal class Writer<TValue> : IWriter<TValue>
2727
private readonly CancellationTokenSource _disposeTokenSource = new();
2828

2929
private volatile TaskCompletionSource _taskWakeUpCompletionSource = new();
30-
private volatile IWriteSession _session = null!;
30+
private volatile IWriteSession _session = new NotStartedWriterSession("Session not started!");
3131

3232
private int _limitBufferMaxSize;
3333

@@ -127,9 +127,12 @@ private async Task Initialize()
127127
{
128128
try
129129
{
130-
_logger.LogInformation("Writer session initialization started. WriterConfig: {WriterConfig}", _config);
130+
if (_disposeTokenSource.IsCancellationRequested)
131+
{
132+
return;
133+
}
131134

132-
_session = new NotStartedWriterSession("Session not started!");
135+
_logger.LogInformation("Writer session initialization started. WriterConfig: {WriterConfig}", _config);
133136

134137
var stream = _driver.BidirectionalStreamCall(
135138
TopicService.StreamWriteMethod,
@@ -195,15 +198,24 @@ private async Task Initialize()
195198
return;
196199
}
197200

198-
var newSession = new WriterSession(_config, stream, initResponse, Initialize, _logger, _inFlightMessages);
201+
var newSession = new WriterSession(
202+
_config,
203+
stream,
204+
initResponse,
205+
Initialize,
206+
e => _session = new NotStartedWriterSession(e),
207+
_logger,
208+
_inFlightMessages
209+
);
199210
if (!_inFlightMessages.IsEmpty)
200211
{
212+
var copyInFlightMessages = new ConcurrentQueue<MessageSending>();
201213
while (_inFlightMessages.TryDequeue(out var sendData))
202214
{
203-
_toSendBuffer.Enqueue(sendData);
215+
copyInFlightMessages.Enqueue(sendData);
204216
}
205217

206-
await newSession.Write(_toSendBuffer); // retry prev in flight messages
218+
await newSession.Write(copyInFlightMessages); // retry prev in flight messages
207219
}
208220

209221
_session = newSession;
@@ -213,7 +225,7 @@ private async Task Initialize()
213225
_logger.LogError(e, "Unable to connect the session");
214226

215227
_session = new NotStartedWriterSession(
216-
new WriterException("Transport error on creating write session", e));
228+
new WriterException("Transport error on creating WriterSession", e));
217229

218230
_ = Task.Run(Initialize, _disposeTokenSource.Token);
219231
}
@@ -288,8 +300,16 @@ public WriterSession(
288300
WriterStream stream,
289301
InitResponse initResponse,
290302
Func<Task> initialize,
303+
Action<WriterException> resetSessionOnTransportError,
291304
ILogger logger,
292-
ConcurrentQueue<MessageSending> inFlightMessages) : base(stream, logger, initResponse.SessionId, initialize)
305+
ConcurrentQueue<MessageSending> inFlightMessages
306+
) : base(
307+
stream,
308+
logger,
309+
initResponse.SessionId,
310+
initialize,
311+
resetSessionOnTransportError
312+
)
293313
{
294314
_config = config;
295315
_inFlightMessages = inFlightMessages;
@@ -326,7 +346,7 @@ public async Task Write(ConcurrentQueue<MessageSending> toSendBuffer)
326346
Logger.LogError(e, "WriterSession[{SessionId}] have error on Write, last SeqNo={SeqNo}",
327347
SessionId, Volatile.Read(ref _seqNum));
328348

329-
ReconnectSession();
349+
ReconnectSession(new WriterException("Transport error in the WriterSession on write messages", e));
330350
}
331351
}
332352

@@ -393,10 +413,14 @@ Completing task on exception...
393413
catch (Driver.TransportException e)
394414
{
395415
Logger.LogError(e, "WriterSession[{SessionId}] have error on processing writeAck", SessionId);
416+
417+
ReconnectSession(new WriterException("Transport error in the WriterSession on processing writeAck", e));
418+
419+
return;
396420
}
397-
finally
398-
{
399-
ReconnectSession();
400-
}
421+
422+
Logger.LogWarning("WriterSession[{SessionId}]: stream is closed", SessionId);
423+
424+
ReconnectSession(new WriterException("WriterStream is closed"));
401425
}
402426
}

src/Ydb.Sdk/tests/Topic/WriterMockTests.cs

Lines changed: 34 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ public async Task Initialize_WhenFailWriteMessage_ThrowWriterExceptionOnWriteAsy
7676
{ ProducerId = "producerId" }).Build();
7777

7878
var writerException = await Assert.ThrowsAsync<WriterException>(() => writer.WriteAsync("abacaba"));
79-
Assert.Equal("Transport error on creating write session", writerException.Message);
79+
Assert.Equal("Transport error on creating WriterSession", writerException.Message);
8080
Assert.Equal(StatusCode.Cancelled, writerException.Status.StatusCode);
8181

8282
await taskNextComplete.Task;
@@ -104,7 +104,7 @@ public async Task Initialize_WhenFailMoveNextAsync_ThrowWriterExceptionOnWriteAs
104104
{ ProducerId = "producerId" }).Build();
105105

106106
var writerException = await Assert.ThrowsAsync<WriterException>(() => writer.WriteAsync("abacaba"));
107-
Assert.Equal("Transport error on creating write session", writerException.Message);
107+
Assert.Equal("Transport error on creating WriterSession", writerException.Message);
108108
Assert.Equal(StatusCode.ClientTransportTimeout, writerException.Status.StatusCode);
109109

110110
await taskNextComplete.Task;
@@ -279,6 +279,24 @@ public async Task WriteAsyncStress_WhenBufferIsOverflow_ThrowWriterExceptionOnBu
279279
}
280280
}
281281

282+
/*
283+
* Performed invocations:
284+
285+
Mock<IBidirectionalStream<StreamWriteMessage.Types.FromClient, StreamWriteMessage.Types.FromServer>:1> (stream):
286+
287+
IBidirectionalStream<StreamWriteMessage.Types.FromClient, StreamWriteMessage.Types.FromServer>.Write({ "initRequest": { "path": "/topic", "producerId": "producerId" } })
288+
IBidirectionalStream<StreamWriteMessage.Types.FromClient, StreamWriteMessage.Types.FromServer>.MoveNextAsync()
289+
IBidirectionalStream<StreamWriteMessage.Types.FromClient, StreamWriteMessage.Types.FromServer>.Current
290+
IBidirectionalStream<StreamWriteMessage.Types.FromClient, StreamWriteMessage.Types.FromServer>.MoveNextAsync()
291+
IBidirectionalStream<StreamWriteMessage.Types.FromClient, StreamWriteMessage.Types.FromServer>.Write({ "writeRequest": { "messages": [ { "seqNo": "1", "createdAt": "2024-11-22T10:08:58.732882Z", "data": "AAAAAAAAAGQ=", "uncompressedSize": "8" } ], "codec": 1 } })
292+
IBidirectionalStream<StreamWriteMessage.Types.FromClient, StreamWriteMessage.Types.FromServer>.Write({ "initRequest": { "path": "/topic", "producerId": "producerId" } })
293+
IBidirectionalStream<StreamWriteMessage.Types.FromClient, StreamWriteMessage.Types.FromServer>.MoveNextAsync()
294+
IBidirectionalStream<StreamWriteMessage.Types.FromClient, StreamWriteMessage.Types.FromServer>.Current
295+
IBidirectionalStream<StreamWriteMessage.Types.FromClient, StreamWriteMessage.Types.FromServer>.MoveNextAsync()
296+
IBidirectionalStream<StreamWriteMessage.Types.FromClient, StreamWriteMessage.Types.FromServer>.Write({ "writeRequest": { "messages": [ { "seqNo": "1", "createdAt": "2024-11-22T10:08:58.732882Z", "data": "AAAAAAAAAGQ=", "uncompressedSize": "8" } ], "codec": 1 } })
297+
IBidirectionalStream<StreamWriteMessage.Types.FromClient, StreamWriteMessage.Types.FromServer>.Current
298+
IBidirectionalStream<StreamWriteMessage.Types.FromClient, StreamWriteMessage.Types.FromServer>.MoveNextAsync()
299+
*/
282300
[Fact]
283301
public async Task WriteAsync_WhenTransportExceptionOnWriteInWriterSessionThenReconnectSession_ReturnWriteResult()
284302
{
@@ -288,14 +306,10 @@ public async Task WriteAsync_WhenTransportExceptionOnWriteInWriterSessionThenRec
288306
var nextCompleted = new TaskCompletionSource();
289307
_mockStream.SetupSequence(stream => stream.Write(It.IsAny<FromClient>()))
290308
.Returns(Task.CompletedTask)
291-
.Throws(() =>
292-
{
293-
moveFirstNextSource.SetResult(false);
294-
return new Driver.TransportException(new RpcException(Grpc.Core.Status.DefaultCancelled));
295-
})
309+
.ThrowsAsync(new Driver.TransportException(new RpcException(Grpc.Core.Status.DefaultCancelled)))
296310
.Returns(() =>
297311
{
298-
moveSecondNextSource.SetResult(true);
312+
moveFirstNextSource.SetResult(false);
299313
return Task.CompletedTask;
300314
})
301315
.Returns(() =>
@@ -307,7 +321,8 @@ public async Task WriteAsync_WhenTransportExceptionOnWriteInWriterSessionThenRec
307321
.Returns(new ValueTask<bool>(true))
308322
.Returns(new ValueTask<bool>(moveFirstNextSource.Task))
309323
.Returns(new ValueTask<bool>(moveSecondNextSource.Task))
310-
.Returns(new ValueTask<bool>(moveThirdNextSource.Task));
324+
.Returns(new ValueTask<bool>(moveThirdNextSource.Task))
325+
.Returns(new ValueTask<bool>(new TaskCompletionSource<bool>().Task));
311326
_mockStream.SetupSequence(stream => stream.Current)
312327
.Returns(new StreamWriteMessage.Types.FromServer
313328
{
@@ -342,9 +357,19 @@ public async Task WriteAsync_WhenTransportExceptionOnWriteInWriterSessionThenRec
342357
{ ProducerId = "producerId" }).Build();
343358

344359
var runTask = writer.WriteAsync(100L);
360+
361+
var writerExceptionAfterResetSession = await Assert.ThrowsAsync<WriterException>(() => writer.WriteAsync(100));
362+
Assert.Equal("Transport error in the WriterSession on write messages",
363+
writerExceptionAfterResetSession.Message);
364+
Assert.Equal(StatusCode.Cancelled, writerExceptionAfterResetSession.Status.StatusCode);
365+
366+
moveSecondNextSource.SetResult(true);
345367
await nextCompleted.Task;
346368
moveThirdNextSource.SetResult(true);
347369

348370
Assert.Equal(PersistenceStatus.Written, (await runTask).Status);
371+
_mockStream.Verify(stream => stream.Write(It.IsAny<FromClient>()), Times.Exactly(4));
372+
_mockStream.Verify(stream => stream.MoveNextAsync(), Times.Exactly(5));
373+
_mockStream.Verify(stream => stream.Current, Times.Exactly(3));
349374
}
350375
}

0 commit comments

Comments
 (0)