Skip to content

Commit 6e2a66d

Browse files
push
1 parent 580409e commit 6e2a66d

File tree

4 files changed

+249
-16
lines changed

4 files changed

+249
-16
lines changed

src/Ydb.Sdk/src/Services/Topic/Reader/InternalBatchMessages.cs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,9 +40,9 @@ internal bool TryDequeueMessage([MaybeNullWhen(false)] out Message<TValue> messa
4040
}
4141

4242
var index = _startMessageDataIndex++;
43-
var approximatelyMessageBytesSize = Utils
44-
.CalculateApproximatelyBytesSize(_approximatelyBatchSize, OriginalMessageCount, index);
4543
var messageData = _batch.MessageData[index];
44+
_readerSession.TryReadRequestBytes(Utils
45+
.CalculateApproximatelyBytesSize(_approximatelyBatchSize, OriginalMessageCount, index));
4646

4747
TValue value;
4848
try
@@ -53,8 +53,7 @@ internal bool TryDequeueMessage([MaybeNullWhen(false)] out Message<TValue> messa
5353
{
5454
throw new ReaderException("Error when deserializing message data", e);
5555
}
56-
57-
_readerSession.TryReadRequestBytes(approximatelyMessageBytesSize);
56+
5857
var nextCommitedOffset = messageData.Offset + 1;
5958

6059
message = new Message<TValue>(

src/Ydb.Sdk/src/Services/Topic/Reader/PartitionSession.cs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -78,12 +78,19 @@ internal void HandleCommitedOffset(long commitedOffset)
7878
}
7979
}
8080

81-
internal void Stop()
81+
internal void Stop(long commitedOffset)
8282
{
8383
_isStopped = true;
8484
while (_waitCommitMessages.TryDequeue(out var commitSending))
8585
{
86-
Utils.SetPartitionClosedException(commitSending, PartitionSessionId);
86+
if (commitSending.OffsetsRange.End <= commitedOffset)
87+
{
88+
commitSending.TcsCommit.SetResult();
89+
}
90+
else
91+
{
92+
Utils.SetPartitionClosedException(commitSending, PartitionSessionId);
93+
}
8794
}
8895
}
8996
}

src/Ydb.Sdk/src/Services/Topic/Reader/Reader.cs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -420,14 +420,19 @@ private async Task StopPartitionSessionRequest(
420420
{
421421
if (stopPartitionSessionRequest.Graceful)
422422
{
423+
partitionSession.Stop(stopPartitionSessionRequest.CommittedOffset);
424+
423425
await _channelFromClientMessageSending.Writer.WriteAsync(new MessageFromClient
424426
{
425427
StopPartitionSessionResponse = new StreamReadMessage.Types.StopPartitionSessionResponse
426428
{ PartitionSessionId = partitionSession.PartitionSessionId }
427429
});
428430
}
429-
430-
partitionSession.Stop();
431+
else
432+
{
433+
// Maybe a race condition with the server dropping all waiters before they can commit.
434+
partitionSession.Stop(-1);
435+
}
431436
}
432437
else
433438
{

src/Ydb.Sdk/tests/Topic/ReaderUnitTests.cs

Lines changed: 230 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
using System.ComponentModel;
12
using System.Text;
23
using Google.Protobuf;
34
using Google.Protobuf.WellKnownTypes;
@@ -1089,15 +1090,48 @@ public async Task
10891090
msg.StartPartitionSessionResponse != null &&
10901091
msg.StartPartitionSessionResponse.PartitionSessionId == 1)), Times.Exactly(2));
10911092
_mockStream.Verify(stream => stream.Write(It.Is<FromClient>(msg =>
1092-
msg.ReadRequest != null && msg.ReadRequest.BytesSize == 100)));
1093+
msg.ReadRequest != null && msg.ReadRequest.BytesSize == 25)), Times.Exactly(4));
10931094
_mockStream.Verify(stream => stream.Write(It.Is<FromClient>(msg =>
10941095
msg.CommitOffsetRequest != null &&
10951096
msg.CommitOffsetRequest.CommitOffsets[0].PartitionSessionId == 1 &&
10961097
msg.CommitOffsetRequest.CommitOffsets[0].Offsets[0].Start == 0 &&
10971098
msg.CommitOffsetRequest.CommitOffsets[0].Offsets[0].End == 21)));
10981099
}
10991100

1100-
//[Fact]
1101+
/*
1102+
*
1103+
Performed invocations:
1104+
1105+
Mock<IBidirectionalStream<StreamReadMessage.Types.FromClient, StreamReadMessage.Types.FromServer>:1> (stream):
1106+
1107+
IBidirectionalStream<StreamReadMessage.Types.FromClient, StreamReadMessage.Types.FromServer>.Write({ "initRequest": { "topicsReadSettings": [ { "path": "/topic" } ], "consumer": "Consumer" } })
1108+
IBidirectionalStream<StreamReadMessage.Types.FromClient, StreamReadMessage.Types.FromServer>.MoveNextAsync()
1109+
IBidirectionalStream<StreamReadMessage.Types.FromClient, StreamReadMessage.Types.FromServer>.Current
1110+
IBidirectionalStream<StreamReadMessage.Types.FromClient, StreamReadMessage.Types.FromServer>.Write({ "readRequest": { "bytesSize": "1000" } })
1111+
IBidirectionalStream<StreamReadMessage.Types.FromClient, StreamReadMessage.Types.FromServer>.MoveNextAsync()
1112+
IBidirectionalStream<StreamReadMessage.Types.FromClient, StreamReadMessage.Types.FromServer>.Current
1113+
IBidirectionalStream<StreamReadMessage.Types.FromClient, StreamReadMessage.Types.FromServer>.MoveNextAsync()
1114+
IBidirectionalStream<StreamReadMessage.Types.FromClient, StreamReadMessage.Types.FromServer>.Write({ "startPartitionSessionResponse": { "partitionSessionId": "1" } })
1115+
IBidirectionalStream<StreamReadMessage.Types.FromClient, StreamReadMessage.Types.FromServer>.Current
1116+
IBidirectionalStream<StreamReadMessage.Types.FromClient, StreamReadMessage.Types.FromServer>.MoveNextAsync()
1117+
IBidirectionalStream<StreamReadMessage.Types.FromClient, StreamReadMessage.Types.FromServer>.Write({ "commitOffsetRequest": { "commitOffsets": [ { "partitionSessionId": "1", "offsets": [ { "end": "23" } ] } ] } })
1118+
IBidirectionalStream<StreamReadMessage.Types.FromClient, StreamReadMessage.Types.FromServer>.Write({ "initRequest": { "topicsReadSettings": [ { "path": "/topic" } ], "consumer": "Consumer" } })
1119+
IBidirectionalStream<StreamReadMessage.Types.FromClient, StreamReadMessage.Types.FromServer>.MoveNextAsync()
1120+
IBidirectionalStream<StreamReadMessage.Types.FromClient, StreamReadMessage.Types.FromServer>.Current
1121+
IBidirectionalStream<StreamReadMessage.Types.FromClient, StreamReadMessage.Types.FromServer>.Write({ "readRequest": { "bytesSize": "1000" } })
1122+
IBidirectionalStream<StreamReadMessage.Types.FromClient, StreamReadMessage.Types.FromServer>.MoveNextAsync()
1123+
IBidirectionalStream<StreamReadMessage.Types.FromClient, StreamReadMessage.Types.FromServer>.Current
1124+
IBidirectionalStream<StreamReadMessage.Types.FromClient, StreamReadMessage.Types.FromServer>.MoveNextAsync()
1125+
IBidirectionalStream<StreamReadMessage.Types.FromClient, StreamReadMessage.Types.FromServer>.Write({ "startPartitionSessionResponse": { "partitionSessionId": "1" } })
1126+
IBidirectionalStream<StreamReadMessage.Types.FromClient, StreamReadMessage.Types.FromServer>.Current
1127+
IBidirectionalStream<StreamReadMessage.Types.FromClient, StreamReadMessage.Types.FromServer>.MoveNextAsync()
1128+
IBidirectionalStream<StreamReadMessage.Types.FromClient, StreamReadMessage.Types.FromServer>.Write({ "commitOffsetRequest": { "commitOffsets": [ { "partitionSessionId": "1", "offsets": [ { "end": "23" } ] } ] } })
1129+
IBidirectionalStream<StreamReadMessage.Types.FromClient, StreamReadMessage.Types.FromServer>.Current
1130+
IBidirectionalStream<StreamReadMessage.Types.FromClient, StreamReadMessage.Types.FromServer>.MoveNextAsync()
1131+
IBidirectionalStream<StreamReadMessage.Types.FromClient, StreamReadMessage.Types.FromServer>.Current
1132+
IBidirectionalStream<StreamReadMessage.Types.FromClient, StreamReadMessage.Types.FromServer>.MoveNextAsync() [Maybe]
1133+
*/
1134+
[Fact]
11011135
public async Task
11021136
RunProcessingTopic_WhenMoveNextAsyncThrowTransportExceptionAfterCommit_ShouldRetryInitializeAndReadThenCommitMessages()
11031137
{
@@ -1120,7 +1154,7 @@ public async Task
11201154
{
11211155
tcsMoveNextSecond.TrySetException(
11221156
new Driver.TransportException(new RpcException(Grpc.Core.Status.DefaultCancelled)));
1123-
1157+
11241158
return Task.CompletedTask;
11251159
})
11261160
.Returns(Task.CompletedTask)
@@ -1131,8 +1165,6 @@ public async Task
11311165

11321166
return Task.CompletedTask;
11331167
})
1134-
.Returns(Task.CompletedTask)
1135-
.Returns(Task.CompletedTask)
11361168
.Returns(() =>
11371169
{
11381170
tcsCommitMessage.SetResult(true);
@@ -1149,6 +1181,7 @@ public async Task
11491181
.ReturnsAsync(true)
11501182
.Returns(new ValueTask<bool>(tcsMoveNextThird.Task))
11511183
.Returns(new ValueTask<bool>(tcsCommitMessage.Task))
1184+
.ReturnsAsync(true)
11521185
.Returns(new ValueTask<bool>(new TaskCompletionSource<bool>().Task));
11531186

11541187
_mockStream.SetupSequence(stream => stream.Current)
@@ -1159,15 +1192,15 @@ public async Task
11591192
.Returns(StartPartitionSessionRequest())
11601193
.Returns(ReadResponse(20, bytes, bytes, bytes))
11611194
.Returns(CommitOffsetResponse(10))
1162-
.Returns(CommitOffsetResponse(20));
1195+
.Returns(CommitOffsetResponse(23));
11631196

11641197
using var reader = new ReaderBuilder<byte[]>(_mockIDriver.Object)
11651198
{
11661199
ConsumerName = "Consumer",
11671200
MemoryUsageMaxBytes = 1000,
11681201
SubscribeSettings = { new SubscribeSettings("/topic") }
11691202
}.Build();
1170-
1203+
11711204
var batch = await reader.ReadBatchAsync();
11721205
Assert.Equal("ReaderSession[SessionId] was deactivated",
11731206
(await Assert.ThrowsAsync<ReaderException>(() => batch.CommitBatchAsync())).Message);
@@ -1184,6 +1217,196 @@ public async Task
11841217
{
11851218
Assert.Equal(bytes, message.Data);
11861219
}
1220+
1221+
_mockStream.Verify(stream => stream.Write(It.IsAny<FromClient>()), Times.Exactly(8));
1222+
_mockStream.Verify(stream => stream.MoveNextAsync(), Times.Between(9, 10, Range.Inclusive));
1223+
_mockStream.Verify(stream => stream.Current, Times.Exactly(8));
1224+
1225+
_mockStream.Verify(stream => stream.Write(It.Is<FromClient>(msg =>
1226+
msg.InitRequest != null &&
1227+
msg.InitRequest.Consumer == "Consumer" &&
1228+
msg.InitRequest.TopicsReadSettings[0].Path == "/topic")), Times.Exactly(2));
1229+
_mockStream.Verify(stream => stream.Write(It.Is<FromClient>(msg =>
1230+
msg.ReadRequest != null && msg.ReadRequest.BytesSize == 1000)), Times.Exactly(2));
1231+
_mockStream.Verify(stream => stream.Write(It.Is<FromClient>(msg =>
1232+
msg.ReadRequest != null)), Times.Exactly(2));
1233+
_mockStream.Verify(stream => stream.Write(It.Is<FromClient>(msg =>
1234+
msg.StartPartitionSessionResponse != null &&
1235+
msg.StartPartitionSessionResponse.PartitionSessionId == 1)), Times.Exactly(2));
1236+
_mockStream.Verify(stream => stream.Write(It.Is<FromClient>(msg =>
1237+
msg.CommitOffsetRequest != null &&
1238+
msg.CommitOffsetRequest.CommitOffsets[0].PartitionSessionId == 1 &&
1239+
msg.CommitOffsetRequest.CommitOffsets[0].Offsets[0].Start == 0 &&
1240+
msg.CommitOffsetRequest.CommitOffsets[0].Offsets[0].End == 23)));
1241+
}
1242+
1243+
[Theory]
1244+
[InlineData(true)]
1245+
[InlineData(false)]
1246+
public async Task RunProcessingTopic_WhenStopPartitionSessionRequestBeforeCommit_ThrowReaderExceptionOnCommit(
1247+
bool graceful)
1248+
{
1249+
var tcsMoveNext = new TaskCompletionSource<bool>();
1250+
var stopPartitionSessionRequest = new TaskCompletionSource<bool>();
1251+
1252+
var sequentialResultWrite = _mockStream.SetupSequence(stream => stream.Write(It.IsAny<FromClient>()))
1253+
.Returns(Task.CompletedTask)
1254+
.Returns(Task.CompletedTask)
1255+
.Returns(() =>
1256+
{
1257+
tcsMoveNext.SetResult(true);
1258+
1259+
return Task.CompletedTask;
1260+
})
1261+
.Returns(() =>
1262+
{
1263+
stopPartitionSessionRequest.SetResult(true);
1264+
1265+
return Task.CompletedTask;
1266+
});
1267+
1268+
TaskCompletionSource waitStopPartitionSessionResponse;
1269+
if (graceful)
1270+
{
1271+
waitStopPartitionSessionResponse = new TaskCompletionSource();
1272+
sequentialResultWrite.Returns(() =>
1273+
{
1274+
waitStopPartitionSessionResponse.SetResult();
1275+
return Task.CompletedTask;
1276+
});
1277+
}
1278+
else
1279+
{
1280+
waitStopPartitionSessionResponse = new TaskCompletionSource();
1281+
waitStopPartitionSessionResponse.SetResult();
1282+
}
1283+
1284+
_mockStream.SetupSequence(stream => stream.MoveNextAsync())
1285+
.ReturnsAsync(true)
1286+
.ReturnsAsync(true)
1287+
.Returns(new ValueTask<bool>(tcsMoveNext.Task))
1288+
.Returns(new ValueTask<bool>(stopPartitionSessionRequest.Task))
1289+
.ReturnsAsync(true)
1290+
.Returns(new ValueTask<bool>(new TaskCompletionSource<bool>().Task));
1291+
1292+
_mockStream.SetupSequence(stream => stream.Current)
1293+
.Returns(InitResponseFromServer)
1294+
.Returns(StartPartitionSessionRequest())
1295+
.Returns(ReadResponse(0, BitConverter.GetBytes(100), BitConverter.GetBytes(100)))
1296+
.Returns(new FromServer
1297+
{
1298+
Status = StatusIds.Types.StatusCode.Success,
1299+
StopPartitionSessionRequest = new StreamReadMessage.Types.StopPartitionSessionRequest
1300+
{
1301+
PartitionSessionId = 1,
1302+
CommittedOffset = 1,
1303+
Graceful = graceful
1304+
}
1305+
})
1306+
.Returns(CommitOffsetResponse(2));
1307+
1308+
using var reader = new ReaderBuilder<int>(_mockIDriver.Object)
1309+
{
1310+
ConsumerName = "Consumer",
1311+
MemoryUsageMaxBytes = 1000,
1312+
SubscribeSettings = { new SubscribeSettings("/topic") }
1313+
}.Build();
1314+
1315+
var firstMessage = await reader.ReadAsync();
1316+
Assert.Equal(100, firstMessage.Data);
1317+
if (graceful)
1318+
{
1319+
await firstMessage.CommitAsync();
1320+
}
1321+
else
1322+
{
1323+
Assert.Equal("PartitionSession[1] was closed by server.",
1324+
(await Assert.ThrowsAsync<ReaderException>(() => firstMessage.CommitAsync())).Message);
1325+
}
1326+
1327+
var secondMessage = await reader.ReadAsync();
1328+
Assert.Equal(100, secondMessage.Data);
1329+
Assert.Equal("PartitionSession[1] was closed by server.",
1330+
(await Assert.ThrowsAsync<ReaderException>(() => secondMessage.CommitAsync())).Message);
1331+
1332+
await waitStopPartitionSessionResponse.Task;
1333+
1334+
_mockStream.Verify(stream => stream.Write(It.IsAny<FromClient>()), Times.Exactly(4 + (graceful ? 1 : 0)));
1335+
_mockStream.Verify(stream => stream.MoveNextAsync(),
1336+
Times.Between(4, 6, Range.Inclusive));
1337+
_mockStream.Verify(stream => stream.Current, Times.Between(4, 5, Range.Inclusive));
1338+
1339+
_mockStream.Verify(stream => stream.Write(It.Is<FromClient>(msg =>
1340+
msg.InitRequest != null &&
1341+
msg.InitRequest.Consumer == "Consumer" &&
1342+
msg.InitRequest.TopicsReadSettings[0].Path == "/topic")));
1343+
_mockStream.Verify(stream => stream.Write(It.Is<FromClient>(msg =>
1344+
msg.ReadRequest != null && msg.ReadRequest.BytesSize == 1000)));
1345+
_mockStream.Verify(stream => stream.Write(It.Is<FromClient>(msg =>
1346+
msg.ReadRequest != null)));
1347+
_mockStream.Verify(stream => stream.Write(It.Is<FromClient>(msg =>
1348+
msg.StartPartitionSessionResponse != null &&
1349+
msg.StartPartitionSessionResponse.PartitionSessionId == 1)));
1350+
_mockStream.Verify(stream => stream.Write(It.Is<FromClient>(msg =>
1351+
msg.CommitOffsetRequest != null &&
1352+
msg.CommitOffsetRequest.CommitOffsets[0].PartitionSessionId == 1 &&
1353+
msg.CommitOffsetRequest.CommitOffsets[0].Offsets[0].Start == 0 &&
1354+
msg.CommitOffsetRequest.CommitOffsets[0].Offsets[0].End == 1)));
1355+
1356+
if (graceful)
1357+
{
1358+
_mockStream.Verify(stream => stream.Write(It.Is<FromClient>(msg =>
1359+
msg.StopPartitionSessionResponse != null &&
1360+
msg.StopPartitionSessionResponse.PartitionSessionId == 1)));
1361+
}
1362+
}
1363+
1364+
[Fact]
1365+
public async Task ReadAsync_WhenFailDeserializer_ThrowReaderExceptionAndInvokeReadRequest()
1366+
{
1367+
var tcsMoveNext = new TaskCompletionSource<bool>();
1368+
1369+
_mockStream.SetupSequence(stream => stream.Write(It.IsAny<FromClient>()))
1370+
.ThrowsAsync(new Driver.TransportException(new RpcException(Grpc.Core.Status.DefaultCancelled)))
1371+
.Returns(Task.CompletedTask)
1372+
.Returns(Task.CompletedTask)
1373+
.Returns(() =>
1374+
{
1375+
tcsMoveNext.SetResult(true);
1376+
1377+
return Task.CompletedTask;
1378+
})
1379+
.Returns(Task.CompletedTask);
1380+
1381+
_mockStream.SetupSequence(stream => stream.MoveNextAsync())
1382+
.ReturnsAsync(true)
1383+
.ReturnsAsync(true)
1384+
.Returns(new ValueTask<bool>(tcsMoveNext.Task))
1385+
.Returns(new ValueTask<bool>(new TaskCompletionSource<bool>().Task));
1386+
1387+
_mockStream.SetupSequence(stream => stream.Current)
1388+
.Returns(InitResponseFromServer)
1389+
.Returns(StartPartitionSessionRequest())
1390+
.Returns(ReadResponse(0, BitConverter.GetBytes(100)));
1391+
1392+
using var reader = new ReaderBuilder<int>(_mockIDriver.Object)
1393+
{
1394+
ConsumerName = "Consumer",
1395+
MemoryUsageMaxBytes = 100,
1396+
SubscribeSettings = { new SubscribeSettings("/topic") },
1397+
Deserializer = new FailDeserializer()
1398+
}.Build();
1399+
1400+
Assert.Equal("Error when deserializing message data",
1401+
(await Assert.ThrowsAsync<ReaderException>(() => reader.ReadAsync().AsTask())).Message);
1402+
}
1403+
1404+
private class FailDeserializer : IDeserializer<int>
1405+
{
1406+
public int Deserialize(byte[] data)
1407+
{
1408+
throw new Exception("Some serialize exception");
1409+
}
11871410
}
11881411

11891412
private static FromServer StartPartitionSessionRequest(int commitedOffset = 0)
@@ -1201,7 +1424,6 @@ private static FromServer StartPartitionSessionRequest(int commitedOffset = 0)
12011424
};
12021425
}
12031426

1204-
12051427
private static FromServer ReadResponse(int commitedOffset = 0, params byte[][] args)
12061428
{
12071429
var batch = new StreamReadMessage.Types.ReadResponse.Types.Batch

0 commit comments

Comments
 (0)