Skip to content

Commit 676c69e

Browse files
feat: init IReader, ReaderConfig, ReaderBuilder (#207)
1 parent 5a964cd commit 676c69e

File tree

14 files changed

+251
-7
lines changed

14 files changed

+251
-7
lines changed
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
using System.Text;
2+
3+
namespace Ydb.Sdk.Services.Topic;
4+
5+
public interface IDeserializer<out TValue>
6+
{
7+
/// <summary>Deserialize a message key or value.</summary>
8+
/// <param name="data">The data to deserialize.</param>
9+
/// <returns>The deserialized value.</returns>
10+
TValue Deserialize(byte[] data);
11+
}
12+
13+
public static class Deserializers
14+
{
15+
/// <summary>
16+
/// String (UTF8 encoded) deserializer.
17+
/// </summary>
18+
public static IDeserializer<string> Utf8 = new Utf8Deserializer();
19+
20+
/// <summary>
21+
/// System.Int64 (big endian encoded, network byte ordered) deserializer.
22+
/// </summary>
23+
public static IDeserializer<long> Int64 = new Int64Deserializer();
24+
25+
/// <summary>
26+
/// System.Int32 (big endian encoded, network byte ordered) deserializer.
27+
/// </summary>
28+
public static IDeserializer<int> Int32 = new Int32Deserializer();
29+
30+
/// <summary>
31+
/// System.Byte[] deserializer.
32+
/// </summary>
33+
/// <remarks>
34+
/// Byte ordering is original order.
35+
/// </remarks>
36+
public static IDeserializer<byte[]> ByteArray = new ByteArrayDeserializer();
37+
38+
internal static readonly Dictionary<System.Type, object> DefaultDeserializers = new()
39+
{
40+
{ typeof(int), Int32 },
41+
{ typeof(long), Int64 },
42+
{ typeof(string), Utf8 },
43+
{ typeof(byte[]), ByteArray }
44+
};
45+
46+
private class Utf8Deserializer : IDeserializer<string>
47+
{
48+
public string Deserialize(byte[] data)
49+
{
50+
return Encoding.UTF8.GetString(data);
51+
}
52+
}
53+
54+
private class Int64Deserializer : IDeserializer<long>
55+
{
56+
public long Deserialize(byte[] data)
57+
{
58+
if (data.Length != 8)
59+
{
60+
throw new ArgumentException(
61+
$"Deserializer<Long> encountered data of length ${data.Length}. Expecting data length to be 8");
62+
}
63+
64+
return ((long)data[0] << 56) | ((long)data[1] << 48) | ((long)data[2] << 40) | ((long)data[3] << 32) |
65+
((long)data[4] << 24) | ((long)data[5] << 16) | ((long)data[6] << 8) | data[7];
66+
}
67+
}
68+
69+
private class Int32Deserializer : IDeserializer<int>
70+
{
71+
public int Deserialize(byte[] data)
72+
{
73+
if (data.Length != 4)
74+
{
75+
throw new ArgumentException(
76+
$"Deserializer<Int32> encountered data of length ${data.Length}. Expecting data length to be 4");
77+
}
78+
79+
return (data[0] << 24) | (data[1] << 16) | (data[2] << 8) | data[3];
80+
}
81+
}
82+
83+
private class ByteArrayDeserializer : IDeserializer<byte[]>
84+
{
85+
public byte[] Deserialize(byte[] data)
86+
{
87+
return data;
88+
}
89+
}
90+
}

src/Ydb.Sdk/src/Services/Topic/Exceptions.cs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,3 +13,10 @@ public YdbWriterException(string message) : base(message)
1313
{
1414
}
1515
}
16+
17+
public class YdbReaderException : YdbTopicException
18+
{
19+
protected YdbReaderException(string message) : base(message)
20+
{
21+
}
22+
}
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
using Ydb.Sdk.Services.Topic.Reader;
2+
3+
namespace Ydb.Sdk.Services.Topic;
4+
5+
public interface IReader<TValue>
6+
{
7+
public Task<TValue> ReadAsync();
8+
9+
public Task<Message<TValue>> ReadMessageAsync();
10+
}

src/Ydb.Sdk/src/Services/Topic/IWriter.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
using Ydb.Sdk.Services.Topic.Writer;
2+
13
namespace Ydb.Sdk.Services.Topic;
24

35
public interface IWriter<TValue>
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
namespace Ydb.Sdk.Services.Topic.Reader;
2+
3+
public class Message<TValue>
4+
{
5+
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
namespace Ydb.Sdk.Services.Topic.Reader;
2+
3+
internal class Reader<TValue> : IReader<TValue>
4+
{
5+
// internal Reader(Driver driver, ReaderConfig config, IDeserializer<TValue> deserializer)
6+
// {
7+
// }
8+
9+
internal Task Initialize()
10+
{
11+
throw new NotImplementedException();
12+
}
13+
14+
public Task<TValue> ReadAsync()
15+
{
16+
throw new NotImplementedException();
17+
}
18+
19+
public Task<Message<TValue>> ReadMessageAsync()
20+
{
21+
throw new NotImplementedException();
22+
}
23+
}
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
namespace Ydb.Sdk.Services.Topic.Reader;
2+
3+
public class ReaderBuilder<TValue>
4+
{
5+
// private readonly ReaderConfig _config;
6+
// private readonly Driver _driver;
7+
//
8+
// public ReaderBuilder(Driver driver, ReaderConfig config)
9+
// {
10+
// _driver = driver;
11+
// _config = config;
12+
// }
13+
14+
public IDeserializer<TValue>? Deserializer { get; set; }
15+
16+
public Task<IReader<TValue>> Build()
17+
{
18+
throw new NotImplementedException();
19+
// var reader = new Reader<TValue>(
20+
// _driver,
21+
// _config,
22+
// Deserializer ?? (IDeserializer<TValue>)(
23+
// Deserializers.DefaultDeserializers.TryGetValue(typeof(TValue), out var deserializer)
24+
// ? deserializer
25+
// : throw new YdbWriterException("The serializer is not set")
26+
// )
27+
// );
28+
//
29+
// await reader.Initialize();
30+
//
31+
// return reader;
32+
}
33+
}
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
namespace Ydb.Sdk.Services.Topic.Reader;
2+
3+
public class ReaderConfig
4+
{
5+
/// <summary>
6+
/// Message that describes topic to read.
7+
/// Topics that will be read by this reader.
8+
/// </summary>
9+
public List<SubscribeSettings> SubscribeSettings { get; } = new();
10+
11+
/// <summary>
12+
/// Path of consumer that is used for reading by this session.
13+
/// </summary>
14+
public string? ConsumerName { get; set; }
15+
16+
/// <summary>
17+
/// Optional name. Will be shown in debug stat.
18+
/// </summary>
19+
public string? ReaderName { get; set; }
20+
21+
/// <summary>
22+
/// Direct reading from a partition node.
23+
/// </summary>
24+
public bool DirectRead { get; set; }
25+
}
26+
27+
public class SubscribeSettings
28+
{
29+
public string TopicPath { get; }
30+
31+
/// <param name="topicPath">Topic path</param>
32+
public SubscribeSettings(string topicPath)
33+
{
34+
TopicPath = topicPath;
35+
}
36+
37+
/// <summary>
38+
/// Partitions that will be read by this session.
39+
/// If list is empty - then session will read all partitions.
40+
/// </summary>
41+
public List<long> PartitionIds { get; } = new();
42+
43+
/// <summary>
44+
/// Skip all messages that has write timestamp smaller than now - max_lag.
45+
/// Zero means infinite lag.
46+
/// </summary>
47+
public TimeSpan? MaxLag { get; set; }
48+
49+
/// <summary>
50+
/// Read data only after this timestamp from this topic.
51+
/// Read only messages with 'written_at' value greater or equal than this timestamp.
52+
/// </summary>
53+
public DateTime? ReadFrom { get; set; }
54+
}

src/Ydb.Sdk/src/Services/Topic/Serializer.cs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,10 @@ public static class Serializers
2323
public static readonly ISerializer<int> Int32 = new Int32Serializer();
2424

2525
/// <summary>
26-
/// System.Byte[] (nullable) serializer.</summary>
27-
/// <remarks>Byte order is original order.</remarks>
26+
/// System.Byte[] serializer.</summary>
27+
/// <remarks>
28+
/// Byte order is original order.
29+
/// </remarks>
2830
public static readonly ISerializer<byte[]> ByteArray = new ByteArraySerializer();
2931

3032
internal static readonly Dictionary<System.Type, object> DefaultSerializers = new()

src/Ydb.Sdk/src/Services/Topic/Message.cs renamed to src/Ydb.Sdk/src/Services/Topic/Writer/Message.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
namespace Ydb.Sdk.Services.Topic;
1+
namespace Ydb.Sdk.Services.Topic.Writer;
22

33
public class Message<TValue>
44
{

0 commit comments

Comments
 (0)