Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 90 additions & 0 deletions src/Ydb.Sdk/src/Services/Topic/Deserializer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
using System.Text;

namespace Ydb.Sdk.Services.Topic;

public interface IDeserializer<out TValue>
{
/// <summary>Deserialize a message key or value.</summary>
/// <param name="data">The data to deserialize.</param>
/// <returns>The deserialized value.</returns>
TValue Deserialize(byte[] data);
}

public static class Deserializers
{
/// <summary>
/// String (UTF8 encoded) deserializer.
/// </summary>
public static IDeserializer<string> Utf8 = new Utf8Deserializer();

/// <summary>
/// System.Int64 (big endian encoded, network byte ordered) deserializer.
/// </summary>
public static IDeserializer<long> Int64 = new Int64Deserializer();

/// <summary>
/// System.Int32 (big endian encoded, network byte ordered) deserializer.
/// </summary>
public static IDeserializer<int> Int32 = new Int32Deserializer();

/// <summary>
/// System.Byte[] deserializer.
/// </summary>
/// <remarks>
/// Byte ordering is original order.
/// </remarks>
public static IDeserializer<byte[]> ByteArray = new ByteArrayDeserializer();

internal static readonly Dictionary<System.Type, object> DefaultDeserializers = new()
{
{ typeof(int), Int32 },
{ typeof(long), Int64 },
{ typeof(string), Utf8 },
{ typeof(byte[]), ByteArray }
};

private class Utf8Deserializer : IDeserializer<string>
{
public string Deserialize(byte[] data)
{
return Encoding.UTF8.GetString(data);
}
}

private class Int64Deserializer : IDeserializer<long>
{
public long Deserialize(byte[] data)
{
if (data.Length != 8)
{
throw new ArgumentException(
$"Deserializer<Long> encountered data of length ${data.Length}. Expecting data length to be 8");
}

return ((long)data[0] << 56) | ((long)data[1] << 48) | ((long)data[2] << 40) | ((long)data[3] << 32) |
((long)data[4] << 24) | ((long)data[5] << 16) | ((long)data[6] << 8) | data[7];
}
}

private class Int32Deserializer : IDeserializer<int>
{
public int Deserialize(byte[] data)
{
if (data.Length != 4)
{
throw new ArgumentException(
$"Deserializer<Int32> encountered data of length ${data.Length}. Expecting data length to be 4");
}

return (data[0] << 24) | (data[1] << 16) | (data[2] << 8) | data[3];
}
}

private class ByteArrayDeserializer : IDeserializer<byte[]>
{
public byte[] Deserialize(byte[] data)
{
return data;
}
}
}
7 changes: 7 additions & 0 deletions src/Ydb.Sdk/src/Services/Topic/Exceptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,10 @@ public YdbWriterException(string message) : base(message)
{
}
}

public class YdbReaderException : YdbTopicException
{
protected YdbReaderException(string message) : base(message)
{
}
}
10 changes: 10 additions & 0 deletions src/Ydb.Sdk/src/Services/Topic/IReader.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
using Ydb.Sdk.Services.Topic.Reader;

namespace Ydb.Sdk.Services.Topic;

public interface IReader<TValue>
{
public Task<TValue> ReadAsync();

public Task<Message<TValue>> ReadMessageAsync();
}
2 changes: 2 additions & 0 deletions src/Ydb.Sdk/src/Services/Topic/IWriter.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
using Ydb.Sdk.Services.Topic.Writer;

namespace Ydb.Sdk.Services.Topic;

public interface IWriter<TValue>
Expand Down
5 changes: 5 additions & 0 deletions src/Ydb.Sdk/src/Services/Topic/Reader/Message.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
namespace Ydb.Sdk.Services.Topic.Reader;

public class Message<TValue>
{
}
23 changes: 23 additions & 0 deletions src/Ydb.Sdk/src/Services/Topic/Reader/Reader.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
namespace Ydb.Sdk.Services.Topic.Reader;

internal class Reader<TValue> : IReader<TValue>
{
// internal Reader(Driver driver, ReaderConfig config, IDeserializer<TValue> deserializer)
// {
// }

internal Task Initialize()
{
throw new NotImplementedException();
}

public Task<TValue> ReadAsync()
{
throw new NotImplementedException();
}

public Task<Message<TValue>> ReadMessageAsync()
{
throw new NotImplementedException();
}
}
33 changes: 33 additions & 0 deletions src/Ydb.Sdk/src/Services/Topic/Reader/ReaderBuilder.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
namespace Ydb.Sdk.Services.Topic.Reader;

public class ReaderBuilder<TValue>
{
// private readonly ReaderConfig _config;
// private readonly Driver _driver;
//
// public ReaderBuilder(Driver driver, ReaderConfig config)
// {
// _driver = driver;
// _config = config;
// }

public IDeserializer<TValue>? Deserializer { get; set; }

public Task<IReader<TValue>> Build()
{
throw new NotImplementedException();
// var reader = new Reader<TValue>(
// _driver,
// _config,
// Deserializer ?? (IDeserializer<TValue>)(
// Deserializers.DefaultDeserializers.TryGetValue(typeof(TValue), out var deserializer)
// ? deserializer
// : throw new YdbWriterException("The serializer is not set")
// )
// );
//
// await reader.Initialize();
//
// return reader;
}
}
54 changes: 54 additions & 0 deletions src/Ydb.Sdk/src/Services/Topic/Reader/ReaderConfig.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
namespace Ydb.Sdk.Services.Topic.Reader;

public class ReaderConfig
{
/// <summary>
/// Message that describes topic to read.
/// Topics that will be read by this reader.
/// </summary>
public List<SubscribeSettings> SubscribeSettings { get; } = new();

/// <summary>
/// Path of consumer that is used for reading by this session.
/// </summary>
public string? ConsumerName { get; set; }

/// <summary>
/// Optional name. Will be shown in debug stat.
/// </summary>
public string? ReaderName { get; set; }

/// <summary>
/// Direct reading from a partition node.
/// </summary>
public bool DirectRead { get; set; }
}

public class SubscribeSettings
{
public string TopicPath { get; }

/// <param name="topicPath">Topic path</param>
public SubscribeSettings(string topicPath)
{
TopicPath = topicPath;
}

/// <summary>
/// Partitions that will be read by this session.
/// If list is empty - then session will read all partitions.
/// </summary>
public List<long> PartitionIds { get; } = new();

/// <summary>
/// Skip all messages that has write timestamp smaller than now - max_lag.
/// Zero means infinite lag.
/// </summary>
public TimeSpan? MaxLag { get; set; }

/// <summary>
/// Read data only after this timestamp from this topic.
/// Read only messages with 'written_at' value greater or equal than this timestamp.
/// </summary>
public DateTime? ReadFrom { get; set; }
}
6 changes: 4 additions & 2 deletions src/Ydb.Sdk/src/Services/Topic/Serializer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@ public static class Serializers
public static readonly ISerializer<int> Int32 = new Int32Serializer();

/// <summary>
/// System.Byte[] (nullable) serializer.</summary>
/// <remarks>Byte order is original order.</remarks>
/// System.Byte[] serializer.</summary>
/// <remarks>
/// Byte order is original order.
/// </remarks>
public static readonly ISerializer<byte[]> ByteArray = new ByteArraySerializer();

internal static readonly Dictionary<System.Type, object> DefaultSerializers = new()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
namespace Ydb.Sdk.Services.Topic;
namespace Ydb.Sdk.Services.Topic.Writer;

public class Message<TValue>
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
using Ydb.Topic;

namespace Ydb.Sdk.Services.Topic;
namespace Ydb.Sdk.Services.Topic.Writer;

public class WriteResult
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
using Ydb.Topic;
using Ydb.Topic.V1;

namespace Ydb.Sdk.Services.Topic;
namespace Ydb.Sdk.Services.Topic.Writer;

using InitResponse = StreamWriteMessage.Types.InitResponse;
using MessageData = StreamWriteMessage.Types.WriteRequest.Types.MessageData;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
namespace Ydb.Sdk.Services.Topic;
namespace Ydb.Sdk.Services.Topic.Writer;

public class WriterBuilder<TValue>
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,35 @@
using System.Text;

namespace Ydb.Sdk.Services.Topic;
namespace Ydb.Sdk.Services.Topic.Writer;

public class WriterConfig
{
/// <param name="topicPath">Full path of topic to write to.</param>
public WriterConfig(string topicPath)
{
TopicPath = topicPath;
}

/// <summary>
/// Full path of topic to write to.
/// </summary>
public string TopicPath { get; }

/// <summary>
/// Producer identifier of client data stream.
/// Used for message deduplication by sequence numbers.
/// </summary>
public string? ProducerId { get; set; }

/// <summary>
/// All messages with given pair (producer_id, message_group_id) go to single partition in order of writes.
/// </summary>
public string? MessageGroupId { get; set; }

/// <summary>
/// Codec that is used for data compression.
/// See enum Codec above for values.
/// </summary>
public Codec Codec { get; set; } = Codec.Raw; // TODO Supported only Raw

public override string ToString()
Expand Down
Loading