Skip to content

Commit d0d22de

Browse files
feat: IProducer, ProducerConfig, ProducerBuilder + default Serializers (#202)
* commit * fix linter * fix linter
1 parent 6982a40 commit d0d22de

File tree

6 files changed

+272
-0
lines changed

6 files changed

+272
-0
lines changed
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
namespace Ydb.Sdk.Services.Topic;
2+
3+
public class YdbTopicException : Exception
4+
{
5+
protected YdbTopicException(string message) : base(message)
6+
{
7+
}
8+
}
9+
10+
public class YdbProducerException : YdbTopicException
11+
{
12+
public YdbProducerException(string message) : base(message)
13+
{
14+
}
15+
}
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
namespace Ydb.Sdk.Services.Topic;
2+
3+
public interface IProducer<TValue>
4+
{
5+
public Task<SendResult> SendAsync(TValue data);
6+
7+
public Task<SendResult> SendAsync(Message<TValue> message);
8+
}
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
// using System.Collections.Concurrent;
2+
// using Microsoft.Extensions.Logging;
3+
// using Ydb.Topic;
4+
5+
namespace Ydb.Sdk.Services.Topic;
6+
7+
// using ProducerStream = Driver.BidirectionalStream<
8+
// StreamWriteMessage.Types.FromClient,
9+
// StreamWriteMessage.Types.FromServer
10+
// >;
11+
12+
internal class Producer<TValue> : IProducer<TValue>
13+
{
14+
// private readonly Driver _driver;
15+
// private readonly ILogger<Producer<TValue>> _logger;
16+
// private readonly long _partitionId;
17+
// private readonly string _sessionId;
18+
// private readonly ISerializer<TValue> _serializer;
19+
//
20+
// private long _seqNum;
21+
//
22+
// private readonly ConcurrentQueue<StreamWriteMessage.Types.FromClient> _inFlightMessages;
23+
// private volatile ProducerStream _stream;
24+
//
25+
// internal Producer(
26+
// ProducerConfig producerConfig,
27+
// StreamWriteMessage.Types.InitResponse initResponse,
28+
// ProducerStream stream,
29+
// ISerializer<TValue> serializer)
30+
// {
31+
// _driver = producerConfig.Driver;
32+
// _stream = stream;
33+
// _serializer = serializer;
34+
// _logger = producerConfig.Driver.LoggerFactory.CreateLogger<Producer<TValue>>();
35+
// _partitionId = initResponse.PartitionId;
36+
// _sessionId = initResponse.SessionId;
37+
// _seqNum = initResponse.LastSeqNo;
38+
// _inFlightMessages = new ConcurrentQueue<StreamWriteMessage.Types.FromClient>();
39+
// }
40+
41+
public Task<SendResult> SendAsync(TValue data)
42+
{
43+
throw new NotImplementedException();
44+
}
45+
46+
public Task<SendResult> SendAsync(Message<TValue> message)
47+
{
48+
throw new NotImplementedException();
49+
}
50+
}
51+
52+
public class Message<TValue>
53+
{
54+
public Message(TValue data)
55+
{
56+
Data = data;
57+
}
58+
59+
public DateTime Timestamp { get; set; }
60+
61+
public TValue Data { get; }
62+
63+
public List<Metadata> Metadata { get; } = new();
64+
}
65+
66+
public record Metadata(string Key, byte[] Value);
67+
68+
public class SendResult
69+
{
70+
public SendResult(State status)
71+
{
72+
State = status;
73+
}
74+
75+
public State State { get; }
76+
}
77+
78+
public enum State
79+
{
80+
Written,
81+
AlreadyWritten
82+
}
83+
84+
internal enum ProducerState
85+
{
86+
Ready
87+
// Broken
88+
}
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
using Ydb.Topic;
2+
using Ydb.Topic.V1;
3+
4+
namespace Ydb.Sdk.Services.Topic;
5+
6+
public class ProducerBuilder<TValue>
7+
{
8+
private readonly ProducerConfig _config;
9+
10+
public ProducerBuilder(ProducerConfig config)
11+
{
12+
_config = config;
13+
}
14+
15+
public ISerializer<TValue>? Serializer { get; set; }
16+
17+
public async Task<IProducer<TValue>> Build()
18+
{
19+
var stream = _config.Driver.BidirectionalStreamCall(TopicService.StreamWriteMethod,
20+
GrpcRequestSettings.DefaultInstance);
21+
22+
var initRequest = new StreamWriteMessage.Types.InitRequest { Path = _config.TopicPath };
23+
if (_config.ProducerId != null)
24+
{
25+
initRequest.ProducerId = _config.ProducerId;
26+
}
27+
28+
if (_config.MessageGroupId != null)
29+
{
30+
initRequest.MessageGroupId = _config.MessageGroupId;
31+
}
32+
33+
await stream.Write(new StreamWriteMessage.Types.FromClient { InitRequest = initRequest });
34+
if (!await stream.MoveNextAsync())
35+
{
36+
throw new YdbProducerException("Write stream is closed by YDB server");
37+
}
38+
39+
var receivedInitMessage = stream.Current;
40+
41+
Status.FromProto(receivedInitMessage.Status, receivedInitMessage.Issues).EnsureSuccess();
42+
43+
var initResponse = receivedInitMessage.InitResponse;
44+
45+
if (!initResponse.SupportedCodecs.Codecs.Contains((int)_config.Codec))
46+
{
47+
throw new YdbProducerException($"Topic is not supported codec: {_config.Codec}");
48+
}
49+
50+
throw new NotImplementedException();
51+
// return new Producer<TValue>(
52+
// _config, initResponse, stream,
53+
// Serializer ?? (ISerializer<TValue>)(
54+
// Serializers.DefaultSerializers.TryGetValue(typeof(TValue), out var serializer)
55+
// ? serializer
56+
// : throw new YdbProducerException("The serializer is not set")
57+
// )
58+
// );
59+
}
60+
}
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
namespace Ydb.Sdk.Services.Topic;
2+
3+
public class ProducerConfig
4+
{
5+
public ProducerConfig(Driver driver, string topicPath)
6+
{
7+
Driver = driver;
8+
TopicPath = topicPath;
9+
}
10+
11+
public Driver Driver { get; }
12+
public string TopicPath { get; }
13+
public string? ProducerId { get; set; }
14+
public string? MessageGroupId { get; set; }
15+
public Codec Codec { get; set; } = Codec.Raw; // TODO Supported only Raw
16+
}
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
using System.Text;
2+
3+
namespace Ydb.Sdk.Services.Topic;
4+
5+
public interface ISerializer<in TValue>
6+
{
7+
public byte[] Serialize(TValue data);
8+
}
9+
10+
public static class Serializers
11+
{
12+
/// <summary>String (UTF8) serializer.</summary>
13+
public static readonly ISerializer<string> Utf8 = new Utf8Serializer();
14+
15+
/// <summary>
16+
/// System.Int64 (big endian, network byte order) serializer.
17+
/// </summary>
18+
public static readonly ISerializer<long> Int64 = new Int64Serializer();
19+
20+
/// <summary>
21+
/// System.Int32 (big endian, network byte order) serializer.
22+
/// </summary>
23+
public static readonly ISerializer<int> Int32 = new Int32Serializer();
24+
25+
/// <summary>
26+
/// System.Byte[] (nullable) serializer.</summary>
27+
/// <remarks>Byte order is original order.</remarks>
28+
public static readonly ISerializer<byte[]> ByteArray = new ByteArraySerializer();
29+
30+
internal static readonly Dictionary<System.Type, object> DefaultSerializers = new()
31+
{
32+
{ typeof(int), Int32 },
33+
{ typeof(long), Int64 },
34+
{ typeof(string), Utf8 },
35+
{ typeof(byte[]), ByteArray }
36+
};
37+
38+
private class Utf8Serializer : ISerializer<string>
39+
{
40+
public byte[] Serialize(string data)
41+
{
42+
return Encoding.UTF8.GetBytes(data);
43+
}
44+
}
45+
46+
private class Int64Serializer : ISerializer<long>
47+
{
48+
public byte[] Serialize(long data)
49+
{
50+
return new[]
51+
{
52+
(byte)(data >> 56),
53+
(byte)(data >> 48),
54+
(byte)(data >> 40),
55+
(byte)(data >> 32),
56+
(byte)(data >> 24),
57+
(byte)(data >> 16),
58+
(byte)(data >> 8),
59+
(byte)data
60+
};
61+
}
62+
}
63+
64+
private class Int32Serializer : ISerializer<int>
65+
{
66+
public byte[] Serialize(int data)
67+
{
68+
return new[]
69+
{
70+
(byte)(data >> 24),
71+
(byte)(data >> 16),
72+
(byte)(data >> 8),
73+
(byte)data
74+
};
75+
}
76+
}
77+
78+
private class ByteArraySerializer : ISerializer<byte[]>
79+
{
80+
public byte[] Serialize(byte[] data)
81+
{
82+
return data;
83+
}
84+
}
85+
}

0 commit comments

Comments
 (0)