Skip to content

Commit 7322f04

Browse files
added
WriteAsync_WhenTransportExceptionOnWriteInWriterSessionThenReconnectSession_ReturnWriteResult
1 parent 014bc38 commit 7322f04

File tree

4 files changed

+97
-25
lines changed

4 files changed

+97
-25
lines changed

src/Ydb.Sdk/src/IDriver.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,9 @@ public IBidirectionalStream<TRequest, TResponse> BidirectionalStreamCall<TReques
3232
public interface IBidirectionalStream<in TRequest, out TResponse> : IDisposable
3333
{
3434
public Task Write(TRequest request);
35+
3536
public ValueTask<bool> MoveNextAsync();
37+
3638
public TResponse Current { get; }
3739
}
3840

src/Ydb.Sdk/src/Services/Topic/TopicSession.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ protected async void ReconnectSession()
2525
{
2626
if (Interlocked.CompareExchange(ref _isActive, 0, 1) == 0)
2727
{
28-
Logger.LogWarning("Skipping reconnect. A reconnect session has already been initiated");
28+
Logger.LogDebug("Skipping reconnect. A reconnect session has already been initiated");
2929

3030
return;
3131
}

src/Ydb.Sdk/src/Services/Topic/Writer/Writer.cs

Lines changed: 25 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,11 @@ internal class Writer<TValue> : IWriter<TValue>
2323
private readonly ILogger<Writer<TValue>> _logger;
2424
private readonly ISerializer<TValue> _serializer;
2525
private readonly ConcurrentQueue<MessageSending> _toSendBuffer = new();
26+
private readonly ConcurrentQueue<MessageSending> _inFlightMessages = new();
2627
private readonly CancellationTokenSource _disposeTokenSource = new();
2728

2829
private volatile TaskCompletionSource _taskWakeUpCompletionSource = new();
29-
private volatile IWriteSession _session = new NotStartedWriterSession("Session not started!");
30+
private volatile IWriteSession _session = null!;
3031

3132
private int _limitBufferMaxSize;
3233

@@ -60,9 +61,8 @@ public async Task<WriteResult> WriteAsync(Message<TValue> message)
6061

6162
foreach (var metadata in message.Metadata)
6263
{
63-
messageData.MetadataItems.Add(
64-
new MetadataItem { Key = metadata.Key, Value = ByteString.CopyFrom(metadata.Value) }
65-
);
64+
messageData.MetadataItems.Add(new MetadataItem
65+
{ Key = metadata.Key, Value = ByteString.CopyFrom(metadata.Value) });
6666
}
6767

6868
while (true)
@@ -77,7 +77,6 @@ public async Task<WriteResult> WriteAsync(Message<TValue> message)
7777
curLimitBufferSize - data.Length, curLimitBufferSize) == curLimitBufferSize)
7878
{
7979
_toSendBuffer.Enqueue(new MessageSending(messageData, completeTask));
80-
8180
WakeUpWorker();
8281

8382
break;
@@ -130,6 +129,8 @@ private async Task Initialize()
130129
{
131130
_logger.LogInformation("Writer session initialization started. WriterConfig: {WriterConfig}", _config);
132131

132+
_session = new NotStartedWriterSession("Session not started!");
133+
133134
var stream = _driver.BidirectionalStreamCall(
134135
TopicService.StreamWriteMethod,
135136
GrpcRequestSettings.DefaultInstance
@@ -194,7 +195,18 @@ private async Task Initialize()
194195
return;
195196
}
196197

197-
_session = new WriterSession(_config, stream, initResponse, Initialize, _logger);
198+
var newSession = new WriterSession(_config, stream, initResponse, Initialize, _logger, _inFlightMessages);
199+
if (!_inFlightMessages.IsEmpty)
200+
{
201+
while (_inFlightMessages.TryDequeue(out var sendData))
202+
{
203+
_toSendBuffer.Enqueue(sendData);
204+
}
205+
206+
await newSession.Write(_toSendBuffer); // retry prev in flight messages
207+
}
208+
209+
_session = newSession;
198210
}
199211
catch (Driver.TransportException e)
200212
{
@@ -252,7 +264,7 @@ public Task Write(ConcurrentQueue<MessageSending> toSendBuffer)
252264
{
253265
while (toSendBuffer.TryDequeue(out var messageSending))
254266
{
255-
messageSending.TaskCompletionSource.SetException(_reasonException);
267+
messageSending.TaskCompletionSource.TrySetException(_reasonException);
256268
}
257269

258270
return Task.CompletedTask;
@@ -267,7 +279,7 @@ public void Dispose()
267279
internal class WriterSession : TopicSession<MessageFromClient, MessageFromServer>, IWriteSession
268280
{
269281
private readonly WriterConfig _config;
270-
private readonly ConcurrentQueue<MessageSending> _inFlightMessages = new();
282+
private readonly ConcurrentQueue<MessageSending> _inFlightMessages;
271283

272284
private long _seqNum;
273285

@@ -276,9 +288,11 @@ public WriterSession(
276288
WriterStream stream,
277289
InitResponse initResponse,
278290
Func<Task> initialize,
279-
ILogger logger) : base(stream, logger, initResponse.SessionId, initialize)
291+
ILogger logger,
292+
ConcurrentQueue<MessageSending> inFlightMessages) : base(stream, logger, initResponse.SessionId, initialize)
280293
{
281294
_config = config;
295+
_inFlightMessages = inFlightMessages;
282296
Volatile.Write(ref _seqNum, initResponse.LastSeqNo); // happens-before for Volatile.Read
283297

284298
RunProcessingWriteAck();
@@ -312,8 +326,6 @@ public async Task Write(ConcurrentQueue<MessageSending> toSendBuffer)
312326
Logger.LogError(e, "WriterSession[{SessionId}] have error on Write, last SeqNo={SeqNo}",
313327
SessionId, Volatile.Read(ref _seqNum));
314328

315-
ClearInFlightMessages(e);
316-
317329
ReconnectSession();
318330
}
319331
}
@@ -366,12 +378,12 @@ Completing task on exception...
366378
Client SeqNo: {SeqNo}, WriteAck: {WriteAck}",
367379
messageFromClient.MessageData.SeqNo, ack);
368380

369-
messageFromClient.TaskCompletionSource.SetException(new WriterException(
381+
messageFromClient.TaskCompletionSource.TrySetException(new WriterException(
370382
$"Client SeqNo[{messageFromClient.MessageData.SeqNo}] is less then server's WriteAck[{ack}]"));
371383
}
372384
else
373385
{
374-
messageFromClient.TaskCompletionSource.SetResult(new WriteResult(ack));
386+
messageFromClient.TaskCompletionSource.TrySetResult(new WriteResult(ack));
375387
}
376388

377389
_inFlightMessages.TryDequeue(out _); // Dequeue
@@ -381,20 +393,10 @@ Completing task on exception...
381393
catch (Driver.TransportException e)
382394
{
383395
Logger.LogError(e, "WriterSession[{SessionId}] have error on processing writeAck", SessionId);
384-
385-
ClearInFlightMessages(e);
386396
}
387397
finally
388398
{
389399
ReconnectSession();
390400
}
391401
}
392-
393-
private void ClearInFlightMessages(Driver.TransportException e)
394-
{
395-
while (_inFlightMessages.TryDequeue(out var sendData))
396-
{
397-
sendData.TaskCompletionSource.SetException(e);
398-
}
399-
}
400402
}

src/Ydb.Sdk/tests/Topic/WriterMockTests.cs

Lines changed: 69 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
using System.Collections.Concurrent;
21
using Grpc.Core;
32
using Moq;
43
using Xunit;
@@ -279,4 +278,73 @@ public async Task WriteAsyncStress_WhenBufferIsOverflow_ThrowWriterExceptionOnBu
279278
Assert.Equal(batchTasksSize - bufferSize / messageSize, countErrors);
280279
}
281280
}
281+
282+
[Fact]
283+
public async Task WriteAsync_WhenTransportExceptionOnWriteInWriterSessionThenReconnectSession_ReturnWriteResult()
284+
{
285+
var moveFirstNextSource = new TaskCompletionSource<bool>();
286+
var moveSecondNextSource = new TaskCompletionSource<bool>();
287+
var moveThirdNextSource = new TaskCompletionSource<bool>();
288+
var nextCompleted = new TaskCompletionSource();
289+
_mockStream.SetupSequence(stream => stream.Write(It.IsAny<FromClient>()))
290+
.Returns(Task.CompletedTask)
291+
.Throws(() =>
292+
{
293+
moveFirstNextSource.SetResult(false);
294+
return new Driver.TransportException(new RpcException(Grpc.Core.Status.DefaultCancelled));
295+
})
296+
.Returns(() =>
297+
{
298+
moveSecondNextSource.SetResult(true);
299+
return Task.CompletedTask;
300+
})
301+
.Returns(() =>
302+
{
303+
nextCompleted.SetResult();
304+
return Task.CompletedTask;
305+
});
306+
_mockStream.SetupSequence(stream => stream.MoveNextAsync())
307+
.Returns(new ValueTask<bool>(true))
308+
.Returns(new ValueTask<bool>(moveFirstNextSource.Task))
309+
.Returns(new ValueTask<bool>(moveSecondNextSource.Task))
310+
.Returns(new ValueTask<bool>(moveThirdNextSource.Task));
311+
_mockStream.SetupSequence(stream => stream.Current)
312+
.Returns(new StreamWriteMessage.Types.FromServer
313+
{
314+
InitResponse = new StreamWriteMessage.Types.InitResponse
315+
{ LastSeqNo = 0, PartitionId = 1, SessionId = "SessionId" },
316+
Status = StatusIds.Types.StatusCode.Success
317+
})
318+
.Returns(new StreamWriteMessage.Types.FromServer
319+
{
320+
InitResponse = new StreamWriteMessage.Types.InitResponse
321+
{ LastSeqNo = 0, PartitionId = 1, SessionId = "SessionId" },
322+
Status = StatusIds.Types.StatusCode.Success
323+
})
324+
.Returns(new StreamWriteMessage.Types.FromServer
325+
{
326+
WriteResponse = new StreamWriteMessage.Types.WriteResponse
327+
{
328+
PartitionId = 1,
329+
Acks =
330+
{
331+
new StreamWriteMessage.Types.WriteResponse.Types.WriteAck
332+
{
333+
SeqNo = 1,
334+
Written = new StreamWriteMessage.Types.WriteResponse.Types.WriteAck.Types.Written
335+
{ Offset = 0 }
336+
}
337+
}
338+
},
339+
Status = StatusIds.Types.StatusCode.Success
340+
});
341+
using var writer = new WriterBuilder<long>(_mockIDriver.Object, new WriterConfig("/topic")
342+
{ ProducerId = "producerId" }).Build();
343+
344+
var runTask = writer.WriteAsync(100L);
345+
await nextCompleted.Task;
346+
moveThirdNextSource.SetResult(true);
347+
348+
Assert.Equal(PersistenceStatus.Written, (await runTask).Status);
349+
}
282350
}

0 commit comments

Comments
 (0)