Skip to content

Commit a6094c0

Browse files
added test on DisposeAsync waiting in flight messages
1 parent 3671a8d commit a6094c0

File tree

4 files changed

+96
-5
lines changed

4 files changed

+96
-5
lines changed

src/Ydb.Sdk/src/Services/Topic/TopicSession.cs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,8 +60,10 @@ protected async Task SendMessage(TFromClient fromClient)
6060

6161
protected abstract TFromClient GetSendUpdateTokenRequest(string token);
6262

63-
public async ValueTask DisposeAsync()
63+
public ValueTask DisposeAsync()
6464
{
65-
await Stream.DisposeAsync();
65+
Logger.LogInformation("TopicSession[{SessionId}] is being deleted", SessionId);
66+
67+
return Stream.DisposeAsync();
6668
}
6769
}

src/Ydb.Sdk/src/Services/Topic/Writer/Writer.cs

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -268,7 +268,7 @@ private async Task Initialize()
268268
return;
269269
}
270270

271-
await _sendInFlightMessagesSemaphoreSlim.WaitAsync(_disposeCts.Token);
271+
await _sendInFlightMessagesSemaphoreSlim.WaitAsync();
272272
try
273273
{
274274
var copyInFlightMessages = new ConcurrentQueue<MessageSending>();
@@ -333,17 +333,22 @@ private async Task Initialize()
333333

334334
public async ValueTask DisposeAsync()
335335
{
336+
_logger.LogInformation("Starting Writer[{WriterConfig}] disposal process", _config);
337+
336338
await _sendInFlightMessagesSemaphoreSlim.WaitAsync();
337339
try
338340
{
341+
_logger.LogDebug("Signaling cancellation token to stop writing new messages");
342+
339343
_disposeCts.Cancel();
340344
}
341345
finally
342346
{
343347
_sendInFlightMessagesSemaphoreSlim.Release();
344348
}
345349

346-
// wait all messages
350+
_logger.LogDebug("Writer[{WriterConfig}] is waiting for all in-flight messages to complete...", _config);
351+
347352
foreach (var inFlightMessage in _inFlightMessages)
348353
{
349354
try
@@ -352,11 +357,14 @@ public async ValueTask DisposeAsync()
352357
}
353358
catch (Exception e)
354359
{
355-
_logger.LogWarning(e, "Failed in flight message");
360+
_logger.LogError(e, "Error occurred while waiting for in-flight message SeqNo: {SeqNo}",
361+
inFlightMessage.MessageData.SeqNo);
356362
}
357363
}
358364

359365
await _session.DisposeAsync();
366+
367+
_logger.LogInformation("Writer[{WriterConfig}] is disposed", _config);
360368
}
361369
}
362370

src/Ydb.Sdk/src/Services/Topic/Writer/WriterConfig.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,11 @@ public override string ToString()
3737
toString.Append(", ProducerId: ").Append(ProducerId);
3838
}
3939

40+
if (PartitionId != null)
41+
{
42+
toString.Append(", PartitionId: ").Append(PartitionId);
43+
}
44+
4045
return toString.Append(", Codec: ").Append(Codec).ToString();
4146
}
4247
}

src/Ydb.Sdk/tests/Topic/WriterUnitTests.cs

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -841,6 +841,82 @@ public async Task WriteAsync_WhenTokenIsUpdatedOneTime_SuccessUpdateToken()
841841
msg.UpdateTokenRequest.Token == "Token2")));
842842
}
843843

844+
[Fact]
845+
public async Task DisposeAsync_WhenInFlightMessages_WaitingInFlightMessages()
846+
{
847+
var tcsDetectedWrite = new TaskCompletionSource();
848+
var writeTcs1 = new TaskCompletionSource<bool>();
849+
850+
_mockStream.SetupSequence(stream => stream.Write(It.IsAny<FromClient>()))
851+
.Returns(Task.CompletedTask)
852+
.Returns(() =>
853+
{
854+
tcsDetectedWrite.TrySetResult();
855+
return Task.CompletedTask;
856+
})
857+
.Returns(Task.CompletedTask)
858+
.Returns(Task.CompletedTask);
859+
_mockStream.SetupSequence(stream => stream.MoveNextAsync())
860+
.ReturnsAsync(true)
861+
.Returns(new ValueTask<bool>(writeTcs1.Task))
862+
.ReturnsAsync(true)
863+
.ReturnsAsync(true)
864+
.Returns(new ValueTask<bool>(new TaskCompletionSource<bool>().Task));
865+
866+
_mockStream.SetupSequence(stream => stream.Current)
867+
.Returns(new StreamWriteMessage.Types.FromServer
868+
{
869+
InitResponse = new StreamWriteMessage.Types.InitResponse
870+
{ LastSeqNo = 0, PartitionId = 1, SessionId = "SessionId" },
871+
Status = StatusIds.Types.StatusCode.Success
872+
})
873+
.Returns(new StreamWriteMessage.Types.FromServer
874+
{
875+
InitResponse = new StreamWriteMessage.Types.InitResponse
876+
{ LastSeqNo = 0, PartitionId = 1, SessionId = "SessionId" },
877+
Status = StatusIds.Types.StatusCode.Success
878+
})
879+
.Returns(new StreamWriteMessage.Types.FromServer
880+
{
881+
WriteResponse = new StreamWriteMessage.Types.WriteResponse
882+
{
883+
PartitionId = 1,
884+
Acks =
885+
{
886+
new StreamWriteMessage.Types.WriteResponse.Types.WriteAck
887+
{
888+
SeqNo = 1,
889+
Written = new StreamWriteMessage.Types.WriteResponse.Types.WriteAck.Types.Written
890+
{ Offset = 0 }
891+
}
892+
}
893+
},
894+
Status = StatusIds.Types.StatusCode.Success
895+
});
896+
897+
var writer = new WriterBuilder<long>(_mockIDriver.Object, "/topic-16")
898+
{ ProducerId = "producerId" }.Build();
899+
900+
var writeTask1 = writer.WriteAsync(100L);
901+
902+
await tcsDetectedWrite.Task;
903+
var disposedTask = writer.DisposeAsync();
904+
905+
Assert.False(writeTask1.IsCompleted);
906+
Assert.False(disposedTask.IsCompleted);
907+
writeTcs1.TrySetException(new Driver.TransportException(
908+
new RpcException(new Grpc.Core.Status(Grpc.Core.StatusCode.DeadlineExceeded, "Some message"))));
909+
Assert.Equal("Writer[TopicPath: /topic-16, ProducerId: producerId, Codec: Raw] is disposed",
910+
(await Assert.ThrowsAsync<WriterException>(() => writer.WriteAsync(12))).Message);
911+
912+
Assert.Equal(PersistenceStatus.Written, (await writeTask1).Status);
913+
914+
Assert.Equal("Writer[TopicPath: /topic-16, ProducerId: producerId, Codec: Raw] is disposed",
915+
(await Assert.ThrowsAsync<WriterException>(() => writer.WriteAsync(12))).Message);
916+
917+
await disposedTask;
918+
}
919+
844920
private ISetupSequentialResult<StreamWriteMessage.Types.FromServer> SetupReadOneWriteAckMessage()
845921
{
846922
return _mockStream.SetupSequence(stream => stream.Current)

0 commit comments

Comments
 (0)