Skip to content

Commit bf97959

Browse files
BufferOverflow processing: WriteAsync
1 parent cb32d5e commit bf97959

File tree

2 files changed

+99
-57
lines changed

2 files changed

+99
-57
lines changed

src/Ydb.Sdk/tests/Topic/WriterIntegrationTests.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,8 @@ public async Task WriteAsync_WhenOneMessage_ReturnWritten()
4040
[Fact]
4141
public async Task WriteAsync_WhenTopicNotFound_ReturnNotFoundException()
4242
{
43-
using var writer = new WriterBuilder<string>(_driver, new WriterConfig(_topicName + "_not_found") { ProducerId = "producerId" })
44-
.Build();
43+
using var writer = new WriterBuilder<string>(_driver, new WriterConfig(_topicName + "_not_found")
44+
{ ProducerId = "producerId" }).Build();
4545

4646
Assert.Equal(StatusCode.SchemeError, (await Assert.ThrowsAsync<WriterException>(
4747
() => writer.WriteAsync("hello world"))).Status.StatusCode);

src/Ydb.Sdk/tests/Topic/WriterMockTests.cs

Lines changed: 97 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
1+
using System.Collections.Concurrent;
12
using Grpc.Core;
23
using Moq;
34
using Xunit;
5+
using Xunit.Abstractions;
46
using Ydb.Issue;
57
using Ydb.Sdk.Services.Topic;
68
using Ydb.Sdk.Services.Topic.Writer;
@@ -10,16 +12,19 @@
1012
namespace Ydb.Sdk.Tests.Topic;
1113

1214
using WriterStream = IBidirectionalStream<StreamWriteMessage.Types.FromClient, StreamWriteMessage.Types.FromServer>;
15+
using FromClient = StreamWriteMessage.Types.FromClient;
1316

1417
public class WriterMockTests
1518
{
19+
private readonly ITestOutputHelper _testOutputHelper;
1620
private readonly Mock<IDriver> _mockIDriver = new();
1721
private readonly Mock<WriterStream> _mockStream = new();
1822

19-
public WriterMockTests()
23+
public WriterMockTests(ITestOutputHelper testOutputHelper)
2024
{
25+
_testOutputHelper = testOutputHelper;
2126
_mockIDriver.Setup(driver => driver.BidirectionalStreamCall(
22-
It.IsAny<Method<StreamWriteMessage.Types.FromClient, StreamWriteMessage.Types.FromServer>>(),
27+
It.IsAny<Method<FromClient, StreamWriteMessage.Types.FromServer>>(),
2328
It.IsAny<GrpcRequestSettings>())
2429
).Returns(_mockStream.Object);
2530

@@ -32,7 +37,7 @@ public async Task Initialize_WhenStreamIsClosedByServer_ThrowWriterExceptionOnWr
3237
var moveNextTry = new TaskCompletionSource<bool>();
3338
var taskNextComplete = new TaskCompletionSource();
3439

35-
_mockStream.Setup(stream => stream.Write(It.IsAny<StreamWriteMessage.Types.FromClient>()))
40+
_mockStream.Setup(stream => stream.Write(It.IsAny<FromClient>()))
3641
.Returns(Task.CompletedTask);
3742
_mockStream.SetupSequence(stream => stream.MoveNextAsync())
3843
.ReturnsAsync(false)
@@ -60,7 +65,7 @@ public async Task Initialize_WhenFailWriteMessage_ThrowWriterExceptionOnWriteAsy
6065
{
6166
var taskSource = new TaskCompletionSource();
6267
var taskNextComplete = new TaskCompletionSource();
63-
_mockStream.SetupSequence(stream => stream.Write(It.IsAny<StreamWriteMessage.Types.FromClient>()))
68+
_mockStream.SetupSequence(stream => stream.Write(It.IsAny<FromClient>()))
6469
.ThrowsAsync(new Driver.TransportException(new RpcException(Grpc.Core.Status.DefaultCancelled)))
6570
.Returns(() =>
6671
{
@@ -77,15 +82,15 @@ public async Task Initialize_WhenFailWriteMessage_ThrowWriterExceptionOnWriteAsy
7782

7883
await taskNextComplete.Task;
7984
// check attempt repeated!!!
80-
_mockStream.Verify(stream => stream.Write(It.IsAny<StreamWriteMessage.Types.FromClient>()), Times.Exactly(2));
85+
_mockStream.Verify(stream => stream.Write(It.IsAny<FromClient>()), Times.Exactly(2));
8186
}
8287

8388
[Fact]
8489
public async Task Initialize_WhenFailMoveNextAsync_ThrowWriterExceptionOnWriteAsyncAndTryNextInitialize()
8590
{
8691
var taskSource = new TaskCompletionSource<bool>();
8792
var taskNextComplete = new TaskCompletionSource();
88-
_mockStream.Setup(stream => stream.Write(It.IsAny<StreamWriteMessage.Types.FromClient>()))
93+
_mockStream.Setup(stream => stream.Write(It.IsAny<FromClient>()))
8994
.Returns(Task.CompletedTask);
9095
_mockStream.SetupSequence(stream => stream.MoveNextAsync())
9196
.ThrowsAsync(new Driver.TransportException(
@@ -105,7 +110,7 @@ public async Task Initialize_WhenFailMoveNextAsync_ThrowWriterExceptionOnWriteAs
105110

106111
await taskNextComplete.Task;
107112
// check attempt repeated!!!
108-
_mockStream.Verify(stream => stream.Write(It.IsAny<StreamWriteMessage.Types.FromClient>()), Times.Exactly(2));
113+
_mockStream.Verify(stream => stream.Write(It.IsAny<FromClient>()), Times.Exactly(2));
109114
_mockStream.Verify(stream => stream.MoveNextAsync(), Times.Exactly(2));
110115
}
111116

@@ -114,7 +119,7 @@ public async Task Initialize_WhenInitResponseNotSuccess_ThrowWriterExceptionOnWr
114119
{
115120
var taskSource = new TaskCompletionSource<bool>();
116121
var taskNextComplete = new TaskCompletionSource();
117-
_mockStream.Setup(stream => stream.Write(It.IsAny<StreamWriteMessage.Types.FromClient>()))
122+
_mockStream.Setup(stream => stream.Write(It.IsAny<FromClient>()))
118123
.Returns(Task.CompletedTask);
119124
_mockStream.SetupSequence(stream => stream.MoveNextAsync())
120125
.Returns(new ValueTask<bool>(true))
@@ -138,14 +143,14 @@ public async Task Initialize_WhenInitResponseNotSuccess_ThrowWriterExceptionOnWr
138143

139144
await taskNextComplete.Task;
140145
// check attempt repeated!!!
141-
_mockStream.Verify(stream => stream.Write(It.IsAny<StreamWriteMessage.Types.FromClient>()), Times.Exactly(2));
146+
_mockStream.Verify(stream => stream.Write(It.IsAny<FromClient>()), Times.Exactly(2));
142147
_mockStream.Verify(stream => stream.MoveNextAsync(), Times.Exactly(2));
143148
}
144149

145150
[Fact]
146151
public async Task Initialize_WhenInitResponseIsSchemaError_ThrowWriterExceptionOnWriteAsyncAndStopInitializing()
147152
{
148-
_mockStream.Setup(stream => stream.Write(It.IsAny<StreamWriteMessage.Types.FromClient>()))
153+
_mockStream.Setup(stream => stream.Write(It.IsAny<FromClient>()))
149154
.Returns(Task.CompletedTask);
150155
_mockStream.Setup(stream => stream.MoveNextAsync())
151156
.Returns(new ValueTask<bool>(true));
@@ -163,14 +168,14 @@ public async Task Initialize_WhenInitResponseIsSchemaError_ThrowWriterExceptionO
163168
(await Assert.ThrowsAsync<WriterException>(() => writer.WriteAsync(123L))).Message);
164169

165170
// check not attempt repeated!!!
166-
_mockStream.Verify(stream => stream.Write(It.IsAny<StreamWriteMessage.Types.FromClient>()), Times.Once);
171+
_mockStream.Verify(stream => stream.Write(It.IsAny<FromClient>()), Times.Once);
167172
_mockStream.Verify(stream => stream.MoveNextAsync(), Times.Once);
168173
}
169174

170175
[Fact]
171176
public async Task Initialize_WhenNotSupportedCodec_ThrowWriterExceptionOnWriteAsyncAndStopInitializing()
172177
{
173-
_mockStream.Setup(stream => stream.Write(It.IsAny<StreamWriteMessage.Types.FromClient>()))
178+
_mockStream.Setup(stream => stream.Write(It.IsAny<FromClient>()))
174179
.Returns(Task.CompletedTask);
175180
_mockStream.Setup(stream => stream.MoveNextAsync())
176181
.Returns(new ValueTask<bool>(true));
@@ -182,7 +187,7 @@ public async Task Initialize_WhenNotSupportedCodec_ThrowWriterExceptionOnWriteAs
182187
LastSeqNo = 1, PartitionId = 1, SessionId = "SessionId",
183188
SupportedCodecs = new SupportedCodecs { Codecs = { 2 /* Gzip */, 3 /* Lzop */ } }
184189
},
185-
Status = StatusIds.Types.StatusCode.Success,
190+
Status = StatusIds.Types.StatusCode.Success
186191
});
187192

188193
using var writer = new WriterBuilder<long>(_mockIDriver.Object, new WriterConfig("/topic")
@@ -195,46 +200,83 @@ public async Task Initialize_WhenNotSupportedCodec_ThrowWriterExceptionOnWriteAs
195200
_mockStream.Verify(stream => stream.Write(It.IsAny<StreamWriteMessage.Types.FromClient>()), Times.Once);
196201
_mockStream.Verify(stream => stream.MoveNextAsync(), Times.Once);
197202
}
198-
199-
200-
201-
/*
202-
* _mockStream.Setup(stream => stream.Current)
203-
.Returns(new StreamWriteMessage.Types.FromServer
204-
{
205-
InitResponse = new StreamWriteMessage.Types.InitResponse
206-
{ LastSeqNo = 1, PartitionId = 1, SessionId = "SessionId" },
207-
Status = StatusIds.Types.StatusCode.Success,
208-
});
209-
moveNextTry.SetResult(true);
210-
await Task.Yield();
211-
212-
var writeTask = writer.WriteAsync(100);
213-
moveNextTryWriteAck.SetResult(true);
214-
215-
_mockStream.Setup(stream => stream.Current).Returns(
216-
new StreamWriteMessage.Types.FromServer
217-
{
218-
WriteResponse = new StreamWriteMessage.Types.WriteResponse
219-
{
220-
Acks =
221-
{
222-
new StreamWriteMessage.Types.WriteResponse.Types.WriteAck
223-
{
224-
SeqNo = 1, Written =
225-
new StreamWriteMessage.Types.WriteResponse.Types.WriteAck.Types.Written
226-
{ Offset = 2 }
227-
}
228-
}
229-
},
230-
Status = StatusIds.Types.StatusCode.Success
231-
});
232-
_mockStream.Setup(stream => stream.MoveNextAsync()).ReturnsAsync(true);
233-
234-
var writeResult = await writeTask;
235-
Assert.Equal(PersistenceStatus.Written, writeResult.Status);
236-
Assert.True(writeResult.TryGetOffset(out var offset));
237-
Assert.Equal(2, offset);
238-
_mockStream.Setup(stream => stream.MoveNextAsync()).ReturnsAsync(false);
239-
*/
203+
204+
[Fact]
205+
public async Task WriteAsyncStress_WhenBufferIsOverflow_ThrowWriterExceptionOnBufferOverflow()
206+
{
207+
const int countBatchSendingSize = 1000;
208+
const int batchTasksSize = 100;
209+
const int bufferSize = 100;
210+
const int messageSize = sizeof(int);
211+
212+
Assert.True(batchTasksSize > bufferSize / 4);
213+
Assert.True(bufferSize % 4 == 0);
214+
215+
var taskSource = new TaskCompletionSource<bool>();
216+
_mockStream.Setup(stream => stream.Write(It.IsAny<FromClient>()))
217+
.Returns(Task.CompletedTask);
218+
var mockNextAsync = _mockStream.SetupSequence(stream => stream.MoveNextAsync())
219+
.Returns(new ValueTask<bool>(true))
220+
.Returns(new ValueTask<bool>(taskSource.Task));
221+
var sequentialResult = _mockStream.SetupSequence(stream => stream.Current)
222+
.Returns(new StreamWriteMessage.Types.FromServer
223+
{
224+
InitResponse = new StreamWriteMessage.Types.InitResponse
225+
{ LastSeqNo = 0, PartitionId = 1, SessionId = "SessionId" },
226+
Status = StatusIds.Types.StatusCode.Success
227+
});
228+
using var writer = new WriterBuilder<int>(_mockIDriver.Object, new WriterConfig("/topic")
229+
{ ProducerId = "producerId", BufferMaxSize = bufferSize /* bytes */ }).Build();
230+
231+
for (var attempt = 0; attempt < countBatchSendingSize; attempt++)
232+
{
233+
_testOutputHelper.WriteLine($"Processing attempt {attempt}");
234+
235+
var tasks = new List<Task<WriteResult>>();
236+
var serverAck = new StreamWriteMessage.Types.FromServer
237+
{
238+
WriteResponse = new StreamWriteMessage.Types.WriteResponse { PartitionId = 1 },
239+
Status = StatusIds.Types.StatusCode.Success
240+
};
241+
for (var i = 0; i < batchTasksSize; i++)
242+
{
243+
tasks.Add(writer.WriteAsync(100));
244+
serverAck.WriteResponse.Acks.Add(new StreamWriteMessage.Types.WriteResponse.Types.WriteAck
245+
{
246+
SeqNo = bufferSize / messageSize * attempt + i + 1,
247+
Written = new StreamWriteMessage.Types.WriteResponse.Types.WriteAck.Types.Written
248+
{ Offset = i * messageSize + bufferSize * attempt }
249+
});
250+
}
251+
252+
sequentialResult.Returns(() =>
253+
{
254+
// ReSharper disable once AccessToModifiedClosure
255+
Volatile.Write(ref taskSource, new TaskCompletionSource<bool>());
256+
mockNextAsync.Returns(new ValueTask<bool>(Volatile.Read(ref taskSource).Task));
257+
return serverAck;
258+
});
259+
taskSource.SetResult(true);
260+
261+
var countSuccess = 0;
262+
var countErrors = 0;
263+
foreach (var task in tasks)
264+
{
265+
try
266+
{
267+
var res = await task;
268+
countSuccess++;
269+
Assert.Equal(PersistenceStatus.Written, res.Status);
270+
}
271+
catch (WriterException e)
272+
{
273+
countErrors++;
274+
Assert.Equal("Buffer overflow", e.Message);
275+
}
276+
}
277+
278+
Assert.Equal(bufferSize / messageSize, countSuccess);
279+
Assert.Equal(batchTasksSize - bufferSize / messageSize, countErrors);
280+
}
281+
}
240282
}

0 commit comments

Comments
 (0)