diff --git a/src/Ydb.Sdk/src/Services/Topic/Exceptions.cs b/src/Ydb.Sdk/src/Services/Topic/Exceptions.cs new file mode 100644 index 00000000..b9b80a1c --- /dev/null +++ b/src/Ydb.Sdk/src/Services/Topic/Exceptions.cs @@ -0,0 +1,15 @@ +namespace Ydb.Sdk.Services.Topic; + +public class YdbTopicException : Exception +{ + protected YdbTopicException(string message) : base(message) + { + } +} + +public class YdbProducerException : YdbTopicException +{ + public YdbProducerException(string message) : base(message) + { + } +} diff --git a/src/Ydb.Sdk/src/Services/Topic/IProducer.cs b/src/Ydb.Sdk/src/Services/Topic/IProducer.cs new file mode 100644 index 00000000..f528e1c7 --- /dev/null +++ b/src/Ydb.Sdk/src/Services/Topic/IProducer.cs @@ -0,0 +1,8 @@ +namespace Ydb.Sdk.Services.Topic; + +public interface IProducer +{ + public Task SendAsync(TValue data); + + public Task SendAsync(Message message); +} diff --git a/src/Ydb.Sdk/src/Services/Topic/Producer.cs b/src/Ydb.Sdk/src/Services/Topic/Producer.cs new file mode 100644 index 00000000..a7d9dc3b --- /dev/null +++ b/src/Ydb.Sdk/src/Services/Topic/Producer.cs @@ -0,0 +1,88 @@ +// using System.Collections.Concurrent; +// using Microsoft.Extensions.Logging; +// using Ydb.Topic; + +namespace Ydb.Sdk.Services.Topic; + +// using ProducerStream = Driver.BidirectionalStream< +// StreamWriteMessage.Types.FromClient, +// StreamWriteMessage.Types.FromServer +// >; + +internal class Producer : IProducer +{ + // private readonly Driver _driver; + // private readonly ILogger> _logger; + // private readonly long _partitionId; + // private readonly string _sessionId; + // private readonly ISerializer _serializer; + // + // private long _seqNum; + // + // private readonly ConcurrentQueue _inFlightMessages; + // private volatile ProducerStream _stream; + // + // internal Producer( + // ProducerConfig producerConfig, + // StreamWriteMessage.Types.InitResponse initResponse, + // ProducerStream stream, + // ISerializer serializer) + // { + // _driver = producerConfig.Driver; + // _stream = stream; + // _serializer = serializer; + // _logger = producerConfig.Driver.LoggerFactory.CreateLogger>(); + // _partitionId = initResponse.PartitionId; + // _sessionId = initResponse.SessionId; + // _seqNum = initResponse.LastSeqNo; + // _inFlightMessages = new ConcurrentQueue(); + // } + + public Task SendAsync(TValue data) + { + throw new NotImplementedException(); + } + + public Task SendAsync(Message message) + { + throw new NotImplementedException(); + } +} + +public class Message +{ + public Message(TValue data) + { + Data = data; + } + + public DateTime Timestamp { get; set; } + + public TValue Data { get; } + + public List Metadata { get; } = new(); +} + +public record Metadata(string Key, byte[] Value); + +public class SendResult +{ + public SendResult(State status) + { + State = status; + } + + public State State { get; } +} + +public enum State +{ + Written, + AlreadyWritten +} + +internal enum ProducerState +{ + Ready + // Broken +} diff --git a/src/Ydb.Sdk/src/Services/Topic/ProducerBuilder.cs b/src/Ydb.Sdk/src/Services/Topic/ProducerBuilder.cs new file mode 100644 index 00000000..668c147b --- /dev/null +++ b/src/Ydb.Sdk/src/Services/Topic/ProducerBuilder.cs @@ -0,0 +1,60 @@ +using Ydb.Topic; +using Ydb.Topic.V1; + +namespace Ydb.Sdk.Services.Topic; + +public class ProducerBuilder +{ + private readonly ProducerConfig _config; + + public ProducerBuilder(ProducerConfig config) + { + _config = config; + } + + public ISerializer? Serializer { get; set; } + + public async Task> Build() + { + var stream = _config.Driver.BidirectionalStreamCall(TopicService.StreamWriteMethod, + GrpcRequestSettings.DefaultInstance); + + var initRequest = new StreamWriteMessage.Types.InitRequest { Path = _config.TopicPath }; + if (_config.ProducerId != null) + { + initRequest.ProducerId = _config.ProducerId; + } + + if (_config.MessageGroupId != null) + { + initRequest.MessageGroupId = _config.MessageGroupId; + } + + await stream.Write(new StreamWriteMessage.Types.FromClient { InitRequest = initRequest }); + if (!await stream.MoveNextAsync()) + { + throw new YdbProducerException("Write stream is closed by YDB server"); + } + + var receivedInitMessage = stream.Current; + + Status.FromProto(receivedInitMessage.Status, receivedInitMessage.Issues).EnsureSuccess(); + + var initResponse = receivedInitMessage.InitResponse; + + if (!initResponse.SupportedCodecs.Codecs.Contains((int)_config.Codec)) + { + throw new YdbProducerException($"Topic is not supported codec: {_config.Codec}"); + } + + throw new NotImplementedException(); + // return new Producer( + // _config, initResponse, stream, + // Serializer ?? (ISerializer)( + // Serializers.DefaultSerializers.TryGetValue(typeof(TValue), out var serializer) + // ? serializer + // : throw new YdbProducerException("The serializer is not set") + // ) + // ); + } +} diff --git a/src/Ydb.Sdk/src/Services/Topic/ProducerConfig.cs b/src/Ydb.Sdk/src/Services/Topic/ProducerConfig.cs new file mode 100644 index 00000000..fcf72af3 --- /dev/null +++ b/src/Ydb.Sdk/src/Services/Topic/ProducerConfig.cs @@ -0,0 +1,16 @@ +namespace Ydb.Sdk.Services.Topic; + +public class ProducerConfig +{ + public ProducerConfig(Driver driver, string topicPath) + { + Driver = driver; + TopicPath = topicPath; + } + + public Driver Driver { get; } + public string TopicPath { get; } + public string? ProducerId { get; set; } + public string? MessageGroupId { get; set; } + public Codec Codec { get; set; } = Codec.Raw; // TODO Supported only Raw +} diff --git a/src/Ydb.Sdk/src/Services/Topic/Serializer.cs b/src/Ydb.Sdk/src/Services/Topic/Serializer.cs new file mode 100644 index 00000000..46db81ad --- /dev/null +++ b/src/Ydb.Sdk/src/Services/Topic/Serializer.cs @@ -0,0 +1,85 @@ +using System.Text; + +namespace Ydb.Sdk.Services.Topic; + +public interface ISerializer +{ + public byte[] Serialize(TValue data); +} + +public static class Serializers +{ + /// String (UTF8) serializer. + public static readonly ISerializer Utf8 = new Utf8Serializer(); + + /// + /// System.Int64 (big endian, network byte order) serializer. + /// + public static readonly ISerializer Int64 = new Int64Serializer(); + + /// + /// System.Int32 (big endian, network byte order) serializer. + /// + public static readonly ISerializer Int32 = new Int32Serializer(); + + /// + /// System.Byte[] (nullable) serializer. + /// Byte order is original order. + public static readonly ISerializer ByteArray = new ByteArraySerializer(); + + internal static readonly Dictionary DefaultSerializers = new() + { + { typeof(int), Int32 }, + { typeof(long), Int64 }, + { typeof(string), Utf8 }, + { typeof(byte[]), ByteArray } + }; + + private class Utf8Serializer : ISerializer + { + public byte[] Serialize(string data) + { + return Encoding.UTF8.GetBytes(data); + } + } + + private class Int64Serializer : ISerializer + { + public byte[] Serialize(long data) + { + return new[] + { + (byte)(data >> 56), + (byte)(data >> 48), + (byte)(data >> 40), + (byte)(data >> 32), + (byte)(data >> 24), + (byte)(data >> 16), + (byte)(data >> 8), + (byte)data + }; + } + } + + private class Int32Serializer : ISerializer + { + public byte[] Serialize(int data) + { + return new[] + { + (byte)(data >> 24), + (byte)(data >> 16), + (byte)(data >> 8), + (byte)data + }; + } + } + + private class ByteArraySerializer : ISerializer + { + public byte[] Serialize(byte[] data) + { + return data; + } + } +}