diff --git a/src/Ydb.Sdk/src/Services/Topic/Deserializer.cs b/src/Ydb.Sdk/src/Services/Topic/Deserializer.cs new file mode 100644 index 00000000..c0b8b11f --- /dev/null +++ b/src/Ydb.Sdk/src/Services/Topic/Deserializer.cs @@ -0,0 +1,90 @@ +using System.Text; + +namespace Ydb.Sdk.Services.Topic; + +public interface IDeserializer +{ + /// Deserialize a message key or value. + /// The data to deserialize. + /// The deserialized value. + TValue Deserialize(byte[] data); +} + +public static class Deserializers +{ + /// + /// String (UTF8 encoded) deserializer. + /// + public static IDeserializer Utf8 = new Utf8Deserializer(); + + /// + /// System.Int64 (big endian encoded, network byte ordered) deserializer. + /// + public static IDeserializer Int64 = new Int64Deserializer(); + + /// + /// System.Int32 (big endian encoded, network byte ordered) deserializer. + /// + public static IDeserializer Int32 = new Int32Deserializer(); + + /// + /// System.Byte[] deserializer. + /// + /// + /// Byte ordering is original order. + /// + public static IDeserializer ByteArray = new ByteArrayDeserializer(); + + internal static readonly Dictionary DefaultDeserializers = new() + { + { typeof(int), Int32 }, + { typeof(long), Int64 }, + { typeof(string), Utf8 }, + { typeof(byte[]), ByteArray } + }; + + private class Utf8Deserializer : IDeserializer + { + public string Deserialize(byte[] data) + { + return Encoding.UTF8.GetString(data); + } + } + + private class Int64Deserializer : IDeserializer + { + public long Deserialize(byte[] data) + { + if (data.Length != 8) + { + throw new ArgumentException( + $"Deserializer encountered data of length ${data.Length}. Expecting data length to be 8"); + } + + return ((long)data[0] << 56) | ((long)data[1] << 48) | ((long)data[2] << 40) | ((long)data[3] << 32) | + ((long)data[4] << 24) | ((long)data[5] << 16) | ((long)data[6] << 8) | data[7]; + } + } + + private class Int32Deserializer : IDeserializer + { + public int Deserialize(byte[] data) + { + if (data.Length != 4) + { + throw new ArgumentException( + $"Deserializer encountered data of length ${data.Length}. Expecting data length to be 4"); + } + + return (data[0] << 24) | (data[1] << 16) | (data[2] << 8) | data[3]; + } + } + + private class ByteArrayDeserializer : IDeserializer + { + public byte[] Deserialize(byte[] data) + { + return data; + } + } +} diff --git a/src/Ydb.Sdk/src/Services/Topic/Exceptions.cs b/src/Ydb.Sdk/src/Services/Topic/Exceptions.cs index cd1c08fd..84648c71 100644 --- a/src/Ydb.Sdk/src/Services/Topic/Exceptions.cs +++ b/src/Ydb.Sdk/src/Services/Topic/Exceptions.cs @@ -13,3 +13,10 @@ public YdbWriterException(string message) : base(message) { } } + +public class YdbReaderException : YdbTopicException +{ + protected YdbReaderException(string message) : base(message) + { + } +} diff --git a/src/Ydb.Sdk/src/Services/Topic/IReader.cs b/src/Ydb.Sdk/src/Services/Topic/IReader.cs new file mode 100644 index 00000000..fc021168 --- /dev/null +++ b/src/Ydb.Sdk/src/Services/Topic/IReader.cs @@ -0,0 +1,10 @@ +using Ydb.Sdk.Services.Topic.Reader; + +namespace Ydb.Sdk.Services.Topic; + +public interface IReader +{ + public Task ReadAsync(); + + public Task> ReadMessageAsync(); +} diff --git a/src/Ydb.Sdk/src/Services/Topic/IWriter.cs b/src/Ydb.Sdk/src/Services/Topic/IWriter.cs index eabe571f..5df9cfed 100644 --- a/src/Ydb.Sdk/src/Services/Topic/IWriter.cs +++ b/src/Ydb.Sdk/src/Services/Topic/IWriter.cs @@ -1,3 +1,5 @@ +using Ydb.Sdk.Services.Topic.Writer; + namespace Ydb.Sdk.Services.Topic; public interface IWriter diff --git a/src/Ydb.Sdk/src/Services/Topic/Reader/Message.cs b/src/Ydb.Sdk/src/Services/Topic/Reader/Message.cs new file mode 100644 index 00000000..1feb5c7d --- /dev/null +++ b/src/Ydb.Sdk/src/Services/Topic/Reader/Message.cs @@ -0,0 +1,5 @@ +namespace Ydb.Sdk.Services.Topic.Reader; + +public class Message +{ +} diff --git a/src/Ydb.Sdk/src/Services/Topic/Reader/Reader.cs b/src/Ydb.Sdk/src/Services/Topic/Reader/Reader.cs new file mode 100644 index 00000000..caf87943 --- /dev/null +++ b/src/Ydb.Sdk/src/Services/Topic/Reader/Reader.cs @@ -0,0 +1,23 @@ +namespace Ydb.Sdk.Services.Topic.Reader; + +internal class Reader : IReader +{ + // internal Reader(Driver driver, ReaderConfig config, IDeserializer deserializer) + // { + // } + + internal Task Initialize() + { + throw new NotImplementedException(); + } + + public Task ReadAsync() + { + throw new NotImplementedException(); + } + + public Task> ReadMessageAsync() + { + throw new NotImplementedException(); + } +} diff --git a/src/Ydb.Sdk/src/Services/Topic/Reader/ReaderBuilder.cs b/src/Ydb.Sdk/src/Services/Topic/Reader/ReaderBuilder.cs new file mode 100644 index 00000000..ebfc67f6 --- /dev/null +++ b/src/Ydb.Sdk/src/Services/Topic/Reader/ReaderBuilder.cs @@ -0,0 +1,33 @@ +namespace Ydb.Sdk.Services.Topic.Reader; + +public class ReaderBuilder +{ + // private readonly ReaderConfig _config; + // private readonly Driver _driver; + // + // public ReaderBuilder(Driver driver, ReaderConfig config) + // { + // _driver = driver; + // _config = config; + // } + + public IDeserializer? Deserializer { get; set; } + + public Task> Build() + { + throw new NotImplementedException(); + // var reader = new Reader( + // _driver, + // _config, + // Deserializer ?? (IDeserializer)( + // Deserializers.DefaultDeserializers.TryGetValue(typeof(TValue), out var deserializer) + // ? deserializer + // : throw new YdbWriterException("The serializer is not set") + // ) + // ); + // + // await reader.Initialize(); + // + // return reader; + } +} diff --git a/src/Ydb.Sdk/src/Services/Topic/Reader/ReaderConfig.cs b/src/Ydb.Sdk/src/Services/Topic/Reader/ReaderConfig.cs new file mode 100644 index 00000000..b9bb8851 --- /dev/null +++ b/src/Ydb.Sdk/src/Services/Topic/Reader/ReaderConfig.cs @@ -0,0 +1,54 @@ +namespace Ydb.Sdk.Services.Topic.Reader; + +public class ReaderConfig +{ + /// + /// Message that describes topic to read. + /// Topics that will be read by this reader. + /// + public List SubscribeSettings { get; } = new(); + + /// + /// Path of consumer that is used for reading by this session. + /// + public string? ConsumerName { get; set; } + + /// + /// Optional name. Will be shown in debug stat. + /// + public string? ReaderName { get; set; } + + /// + /// Direct reading from a partition node. + /// + public bool DirectRead { get; set; } +} + +public class SubscribeSettings +{ + public string TopicPath { get; } + + /// Topic path + public SubscribeSettings(string topicPath) + { + TopicPath = topicPath; + } + + /// + /// Partitions that will be read by this session. + /// If list is empty - then session will read all partitions. + /// + public List PartitionIds { get; } = new(); + + /// + /// Skip all messages that has write timestamp smaller than now - max_lag. + /// Zero means infinite lag. + /// + public TimeSpan? MaxLag { get; set; } + + /// + /// Read data only after this timestamp from this topic. + /// Read only messages with 'written_at' value greater or equal than this timestamp. + /// + public DateTime? ReadFrom { get; set; } +} diff --git a/src/Ydb.Sdk/src/Services/Topic/Serializer.cs b/src/Ydb.Sdk/src/Services/Topic/Serializer.cs index 46db81ad..58423169 100644 --- a/src/Ydb.Sdk/src/Services/Topic/Serializer.cs +++ b/src/Ydb.Sdk/src/Services/Topic/Serializer.cs @@ -23,8 +23,10 @@ public static class Serializers public static readonly ISerializer Int32 = new Int32Serializer(); /// - /// System.Byte[] (nullable) serializer. - /// Byte order is original order. + /// System.Byte[] serializer. + /// + /// Byte order is original order. + /// public static readonly ISerializer ByteArray = new ByteArraySerializer(); internal static readonly Dictionary DefaultSerializers = new() diff --git a/src/Ydb.Sdk/src/Services/Topic/Message.cs b/src/Ydb.Sdk/src/Services/Topic/Writer/Message.cs similarity index 87% rename from src/Ydb.Sdk/src/Services/Topic/Message.cs rename to src/Ydb.Sdk/src/Services/Topic/Writer/Message.cs index 820923a2..1d0db4b1 100644 --- a/src/Ydb.Sdk/src/Services/Topic/Message.cs +++ b/src/Ydb.Sdk/src/Services/Topic/Writer/Message.cs @@ -1,4 +1,4 @@ -namespace Ydb.Sdk.Services.Topic; +namespace Ydb.Sdk.Services.Topic.Writer; public class Message { diff --git a/src/Ydb.Sdk/src/Services/Topic/WriteResult.cs b/src/Ydb.Sdk/src/Services/Topic/Writer/WriteResult.cs similarity index 96% rename from src/Ydb.Sdk/src/Services/Topic/WriteResult.cs rename to src/Ydb.Sdk/src/Services/Topic/Writer/WriteResult.cs index 1291946d..1320c5a4 100644 --- a/src/Ydb.Sdk/src/Services/Topic/WriteResult.cs +++ b/src/Ydb.Sdk/src/Services/Topic/Writer/WriteResult.cs @@ -1,6 +1,6 @@ using Ydb.Topic; -namespace Ydb.Sdk.Services.Topic; +namespace Ydb.Sdk.Services.Topic.Writer; public class WriteResult { diff --git a/src/Ydb.Sdk/src/Services/Topic/Writer.cs b/src/Ydb.Sdk/src/Services/Topic/Writer/Writer.cs similarity index 99% rename from src/Ydb.Sdk/src/Services/Topic/Writer.cs rename to src/Ydb.Sdk/src/Services/Topic/Writer/Writer.cs index 0e3c8b15..7e1553a9 100644 --- a/src/Ydb.Sdk/src/Services/Topic/Writer.cs +++ b/src/Ydb.Sdk/src/Services/Topic/Writer/Writer.cs @@ -6,7 +6,7 @@ using Ydb.Topic; using Ydb.Topic.V1; -namespace Ydb.Sdk.Services.Topic; +namespace Ydb.Sdk.Services.Topic.Writer; using InitResponse = StreamWriteMessage.Types.InitResponse; using MessageData = StreamWriteMessage.Types.WriteRequest.Types.MessageData; diff --git a/src/Ydb.Sdk/src/Services/Topic/WriterBuilder.cs b/src/Ydb.Sdk/src/Services/Topic/Writer/WriterBuilder.cs similarity index 95% rename from src/Ydb.Sdk/src/Services/Topic/WriterBuilder.cs rename to src/Ydb.Sdk/src/Services/Topic/Writer/WriterBuilder.cs index f7110786..2f565aad 100644 --- a/src/Ydb.Sdk/src/Services/Topic/WriterBuilder.cs +++ b/src/Ydb.Sdk/src/Services/Topic/Writer/WriterBuilder.cs @@ -1,4 +1,4 @@ -namespace Ydb.Sdk.Services.Topic; +namespace Ydb.Sdk.Services.Topic.Writer; public class WriterBuilder { diff --git a/src/Ydb.Sdk/src/Services/Topic/WriterConfig.cs b/src/Ydb.Sdk/src/Services/Topic/Writer/WriterConfig.cs similarity index 56% rename from src/Ydb.Sdk/src/Services/Topic/WriterConfig.cs rename to src/Ydb.Sdk/src/Services/Topic/Writer/WriterConfig.cs index f0e1b56d..bf5dba66 100644 --- a/src/Ydb.Sdk/src/Services/Topic/WriterConfig.cs +++ b/src/Ydb.Sdk/src/Services/Topic/Writer/WriterConfig.cs @@ -1,17 +1,35 @@ using System.Text; -namespace Ydb.Sdk.Services.Topic; +namespace Ydb.Sdk.Services.Topic.Writer; public class WriterConfig { + /// Full path of topic to write to. public WriterConfig(string topicPath) { TopicPath = topicPath; } + /// + /// Full path of topic to write to. + /// public string TopicPath { get; } + + /// + /// Producer identifier of client data stream. + /// Used for message deduplication by sequence numbers. + /// public string? ProducerId { get; set; } + + /// + /// All messages with given pair (producer_id, message_group_id) go to single partition in order of writes. + /// public string? MessageGroupId { get; set; } + + /// + /// Codec that is used for data compression. + /// See enum Codec above for values. + /// public Codec Codec { get; set; } = Codec.Raw; // TODO Supported only Raw public override string ToString()