-
Notifications
You must be signed in to change notification settings - Fork 28
feat: init IReader, ReaderConfig, ReaderBuilder #207
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 4 commits
aef7f2a
df03c59
7876dc7
ffffbbd
2dd2fba
0f15f53
e1c5b55
d5d5e02
b6822bb
d34c568
7de2c0d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,90 @@ | ||
| using System.Text; | ||
|
|
||
| namespace Ydb.Sdk.Services.Topic; | ||
|
|
||
| public interface IDeserializer<out TValue> | ||
| { | ||
| /// <summary>Deserialize a message key or value.</summary> | ||
| /// <param name="data">The data to deserialize.</param> | ||
| /// <returns>The deserialized value.</returns> | ||
| TValue Deserialize(byte[] data); | ||
| } | ||
|
|
||
| public static class Deserializers | ||
| { | ||
| /// <summary> | ||
| /// String (UTF8 encoded) deserializer. | ||
| /// </summary> | ||
| public static IDeserializer<string> Utf8 = new Utf8Deserializer(); | ||
|
|
||
| /// <summary> | ||
| /// System.Int64 (big endian encoded, network byte ordered) deserializer. | ||
| /// </summary> | ||
| public static IDeserializer<long> Int64 = new Int64Deserializer(); | ||
|
|
||
| /// <summary> | ||
| /// System.Int32 (big endian encoded, network byte ordered) deserializer. | ||
| /// </summary> | ||
| public static IDeserializer<int> Int32 = new Int32Deserializer(); | ||
|
|
||
| /// <summary> | ||
| /// System.Byte[] deserializer. | ||
| /// </summary> | ||
| /// <remarks> | ||
| /// Byte ordering is original order. | ||
| /// </remarks> | ||
| public static IDeserializer<byte[]> ByteArray = new ByteArrayDeserializer(); | ||
|
|
||
| internal static readonly Dictionary<System.Type, object> DefaultSerializers = new() | ||
| { | ||
| { typeof(int), Int32 }, | ||
| { typeof(long), Int64 }, | ||
| { typeof(string), Utf8 }, | ||
| { typeof(byte[]), ByteArray } | ||
| }; | ||
|
|
||
| private class Utf8Deserializer : IDeserializer<string> | ||
| { | ||
| public string Deserialize(byte[] data) | ||
| { | ||
| return Encoding.UTF8.GetString(data); | ||
| } | ||
| } | ||
|
|
||
| private class Int64Deserializer : IDeserializer<long> | ||
| { | ||
| public long Deserialize(byte[] data) | ||
| { | ||
| if (data.Length != 8) | ||
| { | ||
| throw new ArgumentException( | ||
| $"Deserializer<Long> encountered data of length ${data.Length}. Expecting data length to be 8"); | ||
| } | ||
|
|
||
| return (long)data[0] << 56 | (long)data[1] << 48 | (long)data[2] << 40 | (long)data[3] << 32 | | ||
| (long)data[4] << 24 | (long)data[5] << 16 | (long)data[6] << 8 | data[7]; | ||
| } | ||
| } | ||
|
|
||
| private class Int32Deserializer : IDeserializer<int> | ||
| { | ||
| public int Deserialize(byte[] data) | ||
| { | ||
| if (data.Length != 4) | ||
| { | ||
| throw new ArgumentException( | ||
| $"Deserializer<Int32> encountered data of length ${data.Length}. Expecting data length to be 4"); | ||
| } | ||
|
|
||
| return data[0] << 24 | data[1] << 16 | data[2] << 8 | data[3]; | ||
| } | ||
| } | ||
|
|
||
| private class ByteArrayDeserializer : IDeserializer<byte[]> | ||
| { | ||
| public byte[] Deserialize(byte[] data) | ||
| { | ||
| return data; | ||
| } | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,10 @@ | ||
| using Ydb.Sdk.Services.Topic.Writer; | ||
|
|
||
| namespace Ydb.Sdk.Services.Topic; | ||
|
|
||
| public interface IReader<TValue> | ||
| { | ||
| public Task<TValue> ReadAsync(); | ||
|
|
||
| public Task<Message<TValue>> ReadMessageAsync(); | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,3 +1,5 @@ | ||
| using Ydb.Sdk.Services.Topic.Writer; | ||
|
|
||
| namespace Ydb.Sdk.Services.Topic; | ||
|
|
||
| public interface IWriter<TValue> | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,6 @@ | ||
| namespace Ydb.Sdk.Services.Topic.Reader; | ||
|
|
||
| public class Message<TValue> | ||
| { | ||
|
|
||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,23 @@ | ||
| namespace Ydb.Sdk.Services.Topic.Reader; | ||
|
|
||
| internal class Reader<TValue> : IReader<TValue> | ||
| { | ||
| public Reader(Driver driver, ReaderConfig config, IDeserializer<TValue> deserializer) | ||
|
Check warning on line 5 in src/Ydb.Sdk/src/Services/Topic/Reader/Reader.cs
|
||
|
||
| { | ||
| } | ||
|
|
||
| internal Task Initialize() | ||
| { | ||
| throw new NotImplementedException(); | ||
| } | ||
|
|
||
| public Task<TValue> ReadAsync() | ||
| { | ||
| throw new NotImplementedException(); | ||
| } | ||
|
|
||
| public Task<Writer.Message<TValue>> ReadMessageAsync() | ||
KirillKurdyukov marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| { | ||
| throw new NotImplementedException(); | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,32 @@ | ||
| namespace Ydb.Sdk.Services.Topic.Reader; | ||
|
|
||
| public class ReaderBuilder<TValue> | ||
| { | ||
| private readonly ReaderConfig _config; | ||
| private readonly Driver _driver; | ||
|
|
||
| public ReaderBuilder(Driver driver, ReaderConfig config) | ||
| { | ||
| _driver = driver; | ||
| _config = config; | ||
| } | ||
|
|
||
| public IDeserializer<TValue>? Deserializer { get; set; } | ||
|
|
||
| public async Task<IReader<TValue>> Build() | ||
| { | ||
| var reader = new Reader<TValue>( | ||
| _driver, | ||
| _config, | ||
| Deserializer ?? (IDeserializer<TValue>)( | ||
| Deserializers.DefaultSerializers.TryGetValue(typeof(TValue), out var deserializer) | ||
| ? deserializer | ||
| : throw new YdbWriterException("The serializer is not set") | ||
| ) | ||
| ); | ||
|
|
||
| await reader.Initialize(); | ||
|
|
||
| return reader; | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,54 @@ | ||
| namespace Ydb.Sdk.Services.Topic.Reader; | ||
|
|
||
| public class ReaderConfig | ||
| { | ||
| /// <summary> | ||
| /// Message that describes topic to read. | ||
| /// Topics that will be read by this reader. | ||
| /// </summary> | ||
| public List<SubscribeSettings> SubscribeSettings { get; } = new(); | ||
|
|
||
| /// <summary> | ||
| /// Path of consumer that is used for reading by this session. | ||
| /// </summary> | ||
| public string? ConsumerName { get; set; } | ||
|
|
||
| /// <summary> | ||
| /// Optional name. Will be shown in debug stat. | ||
| /// </summary> | ||
| public string? ReaderName { get; set; } | ||
|
|
||
| /// <summary> | ||
| /// Direct reading from a partition node. | ||
| /// </summary> | ||
| public bool DirectRead { get; set; } | ||
| } | ||
|
|
||
| public class SubscribeSettings | ||
| { | ||
| public string TopicPath { get; } | ||
|
|
||
| /// <param name="topicPath">Topic path</param> | ||
| public SubscribeSettings(string topicPath) | ||
| { | ||
| TopicPath = topicPath; | ||
| } | ||
|
|
||
| /// <summary> | ||
| /// Partitions that will be read by this session. | ||
| /// If list is empty - then session will read all partitions. | ||
| /// </summary> | ||
| public List<long> PartitionIds { get; } = new(); | ||
|
|
||
| /// <summary> | ||
| /// Skip all messages that has write timestamp smaller than now - max_lag. | ||
| /// Zero means infinite lag. | ||
| /// </summary> | ||
| public TimeSpan? MaxLag { get; set; } | ||
|
|
||
| /// <summary> | ||
| /// Read data only after this timestamp from this topic. | ||
| /// Read only messages with 'written_at' value greater or equal than this timestamp. | ||
| /// </summary> | ||
| public DateTime? ReadFrom { get; set; } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,4 +1,4 @@ | ||
| namespace Ydb.Sdk.Services.Topic; | ||
| namespace Ydb.Sdk.Services.Topic.Writer; | ||
|
|
||
| public class Message<TValue> | ||
| { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,4 +1,4 @@ | ||
| namespace Ydb.Sdk.Services.Topic; | ||
| namespace Ydb.Sdk.Services.Topic.Writer; | ||
|
|
||
| public class WriterBuilder<TValue> | ||
| { | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.