Skip to content

Commit 5372f75

Browse files
feat: updating Serializer / Deserializer, added stress Integration Test
1 parent 2a87cb1 commit 5372f75

File tree

5 files changed

+114
-35
lines changed

5 files changed

+114
-35
lines changed

src/Ydb.Sdk/src/Services/Topic/Deserializer.cs

Lines changed: 2 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -55,28 +55,15 @@ private class Int64Deserializer : IDeserializer<long>
5555
{
5656
public long Deserialize(byte[] data)
5757
{
58-
if (data.Length != 8)
59-
{
60-
throw new ArgumentException(
61-
$"Deserializer<Long> encountered data of length ${data.Length}. Expecting data length to be 8");
62-
}
63-
64-
return ((long)data[0] << 56) | ((long)data[1] << 48) | ((long)data[2] << 40) | ((long)data[3] << 32) |
65-
((long)data[4] << 24) | ((long)data[5] << 16) | ((long)data[6] << 8) | data[7];
58+
return BitConverter.ToInt64(data);
6659
}
6760
}
6861

6962
private class Int32Deserializer : IDeserializer<int>
7063
{
7164
public int Deserialize(byte[] data)
7265
{
73-
if (data.Length != 4)
74-
{
75-
throw new ArgumentException(
76-
$"Deserializer<Int32> encountered data of length ${data.Length}. Expecting data length to be 4");
77-
}
78-
79-
return (data[0] << 24) | (data[1] << 16) | (data[2] << 8) | data[3];
66+
return BitConverter.ToInt32(data);
8067
}
8168
}
8269

src/Ydb.Sdk/src/Services/Topic/Serializer.cs

Lines changed: 2 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -49,31 +49,15 @@ private class Int64Serializer : ISerializer<long>
4949
{
5050
public byte[] Serialize(long data)
5151
{
52-
return new[]
53-
{
54-
(byte)(data >> 56),
55-
(byte)(data >> 48),
56-
(byte)(data >> 40),
57-
(byte)(data >> 32),
58-
(byte)(data >> 24),
59-
(byte)(data >> 16),
60-
(byte)(data >> 8),
61-
(byte)data
62-
};
52+
return BitConverter.GetBytes(data);
6353
}
6454
}
6555

6656
private class Int32Serializer : ISerializer<int>
6757
{
6858
public byte[] Serialize(int data)
6959
{
70-
return new[]
71-
{
72-
(byte)(data >> 24),
73-
(byte)(data >> 16),
74-
(byte)(data >> 8),
75-
(byte)data
76-
};
60+
return BitConverter.GetBytes(data);
7761
}
7862
}
7963

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
using Xunit;
2+
using Ydb.Sdk.Services.Topic;
3+
4+
namespace Ydb.Sdk.Tests.Topic;
5+
6+
public class SerializerDeserializerUnitTests
7+
{
8+
[Fact]
9+
public void SerializeDeserialize_WhenSerializer64Deserializer32_ReturnInt32()
10+
{
11+
Assert.Equal(32, Deserializers.Int32.Deserialize(Serializers.Int32.Serialize(32)));
12+
}
13+
14+
[Fact]
15+
public void SerializeDeserialize_WhenSerializer64Deserializer64_ReturnInt64()
16+
{
17+
Assert.Equal(32 * 1_000_000_000L,
18+
Deserializers.Int64.Deserialize(Serializers.Int64.Serialize(32 * 1_000_000_000L)));
19+
}
20+
21+
[Fact]
22+
public void SerializeDeserialize_WhenSerializerUtf8DeserializerUtf8_ReturnString()
23+
{
24+
Assert.Equal("abacaba",
25+
Deserializers.Utf8.Deserialize(Serializers.Utf8.Serialize("abacaba")));
26+
}
27+
}

src/Ydb.Sdk/tests/Topic/WriterIntegrationTests.cs

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
1+
using Google.Protobuf.WellKnownTypes;
12
using Xunit;
23
using Ydb.Sdk.Services.Topic;
34
using Ydb.Sdk.Services.Topic.Writer;
45
using Ydb.Sdk.Tests.Fixture;
6+
using Ydb.Topic;
7+
using Consumer = Ydb.Sdk.Services.Topic.Consumer;
58

69
namespace Ydb.Sdk.Tests.Topic;
710

@@ -44,4 +47,82 @@ public async Task WriteAsync_WhenTopicNotFound_ReturnNotFoundException()
4447
Assert.Equal(StatusCode.SchemeError, (await Assert.ThrowsAsync<WriterException>(
4548
() => writer.WriteAsync("hello world"))).Status.StatusCode);
4649
}
50+
51+
[Fact]
52+
public async Task WriteAsync_When1000Messages_ReturnWriteResultIsPersisted()
53+
{
54+
const int messageCount = 1000;
55+
var topicName = _topicName + "_stress";
56+
var topicClient = new TopicClient(_driver);
57+
var topicSettings = new CreateTopicSettings { Path = topicName };
58+
topicSettings.Consumers.Add(new Consumer("Consumer"));
59+
await topicClient.CreateTopic(topicSettings);
60+
61+
using var writer = new WriterBuilder<int>(_driver, topicName) { ProducerId = "producerId" }.Build();
62+
63+
var tasks = new List<Task>();
64+
for (var i = 0; i < messageCount; i++)
65+
{
66+
var i1 = i;
67+
tasks.Add(Task.Run(async () =>
68+
{
69+
// ReSharper disable once AccessToDisposedClosure
70+
var message = await writer.WriteAsync(i1);
71+
72+
Assert.Equal(PersistenceStatus.Written, message.Status);
73+
}));
74+
}
75+
76+
await Task.WhenAll(tasks);
77+
78+
var initStream =
79+
_driver.BidirectionalStreamCall(Ydb.Topic.V1.TopicService.StreamReadMethod, new GrpcRequestSettings());
80+
await initStream.Write(new StreamReadMessage.Types.FromClient
81+
{
82+
InitRequest = new StreamReadMessage.Types.InitRequest
83+
{
84+
Consumer = "Consumer", ReaderName = "reader-test", TopicsReadSettings =
85+
{
86+
new StreamReadMessage.Types.InitRequest.Types.TopicReadSettings
87+
{ ReadFrom = new Timestamp(), Path = topicName }
88+
}
89+
}
90+
});
91+
92+
var ans = 0;
93+
94+
await initStream.MoveNextAsync();
95+
await initStream.Write(new StreamReadMessage.Types.FromClient
96+
{
97+
ReadRequest = new StreamReadMessage.Types.ReadRequest
98+
{
99+
BytesSize = 2000 * messageCount * sizeof(int)
100+
}
101+
});
102+
await initStream.MoveNextAsync();
103+
var startRequest = initStream.Current.StartPartitionSessionRequest;
104+
await initStream.Write(new StreamReadMessage.Types.FromClient
105+
{
106+
StartPartitionSessionResponse = new StreamReadMessage.Types.StartPartitionSessionResponse
107+
{
108+
CommitOffset = startRequest.CommittedOffset,
109+
PartitionSessionId = startRequest.PartitionSession.PartitionSessionId,
110+
}
111+
});
112+
var receivedMessageCount = 0;
113+
while (receivedMessageCount < messageCount)
114+
{
115+
await initStream.MoveNextAsync();
116+
ans += initStream.Current.ReadResponse.PartitionData.Sum(data => data.Batches.Sum(batch =>
117+
batch.MessageData.Sum(message =>
118+
{
119+
receivedMessageCount++;
120+
return Deserializers.Int32.Deserialize(message.Data.ToByteArray());
121+
})));
122+
}
123+
124+
Assert.Equal(messageCount * (messageCount - 1) / 2, ans);
125+
126+
await topicClient.DropTopic(new DropTopicSettings { Path = topicName });
127+
}
47128
}

src/Ydb.Sdk/tests/Topic/WriterMockTests.cs renamed to src/Ydb.Sdk/tests/Topic/WriterUnitTests.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,13 @@ namespace Ydb.Sdk.Tests.Topic;
1313
using WriterStream = IBidirectionalStream<StreamWriteMessage.Types.FromClient, StreamWriteMessage.Types.FromServer>;
1414
using FromClient = StreamWriteMessage.Types.FromClient;
1515

16-
public class WriterMockTests
16+
public class WriterUnitTests
1717
{
1818
private readonly ITestOutputHelper _testOutputHelper;
1919
private readonly Mock<IDriver> _mockIDriver = new();
2020
private readonly Mock<WriterStream> _mockStream = new();
2121

22-
public WriterMockTests(ITestOutputHelper testOutputHelper)
22+
public WriterUnitTests(ITestOutputHelper testOutputHelper)
2323
{
2424
_testOutputHelper = testOutputHelper;
2525
_mockIDriver.Setup(driver => driver.BidirectionalStreamCall(

0 commit comments

Comments
 (0)