Skip to content

Commit 5b8d65a

Browse files
feat: IProducer, ProducerConfig, ProducerBuilder + default Serializers
1 parent 6982a40 commit 5b8d65a

File tree

6 files changed

+265
-0
lines changed

6 files changed

+265
-0
lines changed
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
namespace Ydb.Sdk.Services.Topic;
2+
3+
public class YdbTopicException : Exception
4+
{
5+
protected YdbTopicException(string message) : base(message)
6+
{
7+
}
8+
}
9+
10+
public class YdbProducerException : YdbTopicException
11+
{
12+
public YdbProducerException(string message) : base(message)
13+
{
14+
}
15+
}
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
namespace Ydb.Sdk.Services.Topic;
2+
3+
public interface IProducer<TValue>
4+
{
5+
public Task<SendResult> SendAsync(TValue data);
6+
7+
public Task<SendResult> SendAsync(Message<TValue> message);
8+
}
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
using System.Collections.Concurrent;
2+
using Microsoft.Extensions.Logging;
3+
using Ydb.Topic;
4+
5+
namespace Ydb.Sdk.Services.Topic;
6+
7+
using ProducerStream = Driver.BidirectionalStream<
8+
StreamWriteMessage.Types.FromClient,
9+
StreamWriteMessage.Types.FromServer
10+
>;
11+
12+
internal class Producer<TValue> : IProducer<TValue>
13+
{
14+
private readonly Driver _driver;
15+
private readonly ILogger<Producer<TValue>> _logger;
16+
private readonly long _partitionId;
17+
private readonly string _sessionId;
18+
private readonly ISerializer<TValue> _serializer;
19+
20+
private long _seqNum;
21+
22+
private readonly ConcurrentQueue<StreamWriteMessage.Types.FromClient> _inFlightMessages;
23+
private volatile ProducerStream _stream;
24+
25+
internal Producer(
26+
ProducerConfig producerConfig,
27+
StreamWriteMessage.Types.InitResponse initResponse,
28+
ProducerStream stream,
29+
ISerializer<TValue> serializer)
30+
{
31+
_driver = producerConfig.Driver;
32+
_stream = stream;
33+
_serializer = serializer;
34+
_logger = producerConfig.Driver.LoggerFactory.CreateLogger<Producer<TValue>>();
35+
_partitionId = initResponse.PartitionId;
36+
_sessionId = initResponse.SessionId;
37+
_seqNum = initResponse.LastSeqNo;
38+
_inFlightMessages = new ConcurrentQueue<StreamWriteMessage.Types.FromClient>();
39+
}
40+
41+
public Task<SendResult> SendAsync(TValue data)
42+
{
43+
throw new NotImplementedException();
44+
}
45+
46+
public Task<SendResult> SendAsync(Message<TValue> message)
47+
{
48+
throw new NotImplementedException();
49+
}
50+
}
51+
52+
public class Message<TValue>
53+
{
54+
public DateTime Timestamp { get; set; }
55+
56+
public TValue Data { get; set; }
57+
58+
public List<Metadata> Metadata { get; } = new();
59+
}
60+
61+
public record Metadata(string Key, byte[] Value);
62+
63+
public class SendResult
64+
{
65+
public SendResult(State status)
66+
{
67+
State = status;
68+
}
69+
70+
public State State { get; }
71+
}
72+
73+
public enum State
74+
{
75+
Written,
76+
AlreadyWritten,
77+
}
78+
79+
internal enum ProducerState
80+
{
81+
Ready,
82+
// Broken
83+
}
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
using Ydb.Topic;
2+
using Ydb.Topic.V1;
3+
4+
namespace Ydb.Sdk.Services.Topic;
5+
6+
public class ProducerBuilder<TValue>
7+
{
8+
private readonly ProducerConfig _config;
9+
10+
public ProducerBuilder(ProducerConfig config)
11+
{
12+
_config = config;
13+
}
14+
15+
public ISerializer<TValue>? Serializer { get; set; }
16+
17+
public async Task<IProducer<TValue>> Build()
18+
{
19+
var stream = _config.Driver.BidirectionalStreamCall(TopicService.StreamWriteMethod,
20+
GrpcRequestSettings.DefaultInstance);
21+
22+
var initRequest = new StreamWriteMessage.Types.InitRequest { Path = _config.TopicPath };
23+
if (_config.ProducerId != null)
24+
{
25+
initRequest.ProducerId = _config.ProducerId;
26+
}
27+
28+
if (_config.MessageGroupId != null)
29+
{
30+
initRequest.MessageGroupId = _config.MessageGroupId;
31+
}
32+
33+
await stream.Write(new StreamWriteMessage.Types.FromClient { InitRequest = initRequest });
34+
if (!await stream.MoveNextAsync())
35+
{
36+
throw new YdbProducerException("Write stream is closed by YDB server");
37+
}
38+
39+
var receivedInitMessage = stream.Current;
40+
41+
Status.FromProto(receivedInitMessage.Status, receivedInitMessage.Issues).EnsureSuccess();
42+
43+
var initResponse = receivedInitMessage.InitResponse;
44+
45+
if (!initResponse.SupportedCodecs.Codecs.Contains((int)_config.Codec))
46+
{
47+
throw new YdbProducerException($"Topic is not supported codec: {_config.Codec}");
48+
}
49+
50+
return new Producer<TValue>(
51+
_config, initResponse, stream,
52+
Serializer ?? (ISerializer<TValue>)(
53+
Serializers.DefaultSerializers.TryGetValue(typeof(TValue), out var serializer)
54+
? serializer
55+
: throw new YdbProducerException("The serializer is not set")
56+
)
57+
);
58+
}
59+
}
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
2+
namespace Ydb.Sdk.Services.Topic;
3+
4+
public class ProducerConfig
5+
{
6+
public ProducerConfig(Driver driver, string topicPath)
7+
{
8+
Driver = driver;
9+
TopicPath = topicPath;
10+
}
11+
12+
public Driver Driver { get; }
13+
public string TopicPath { get; }
14+
public string? ProducerId { get; set; }
15+
public string? MessageGroupId { get; set; }
16+
public Codec Codec { get; set; } = Codec.Raw; // TODO Supported only Raw
17+
}
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
using System.Text;
2+
using Grpc.Core;
3+
4+
namespace Ydb.Sdk.Services.Topic;
5+
6+
public interface ISerializer<in TValue>
7+
{
8+
public byte[] Serialize(TValue data);
9+
}
10+
11+
public static class Serializers
12+
{
13+
/// <summary>String (UTF8) serializer.</summary>
14+
public static readonly ISerializer<string> Utf8 = new Utf8Serializer();
15+
16+
/// <summary>
17+
/// System.Int64 (big endian, network byte order) serializer.
18+
/// </summary>
19+
public static readonly ISerializer<long> Int64 = new Int64Serializer();
20+
21+
/// <summary>
22+
/// System.Int32 (big endian, network byte order) serializer.
23+
/// </summary>
24+
public static readonly ISerializer<int> Int32 = new Int32Serializer();
25+
26+
/// <summary>
27+
/// System.Byte[] (nullable) serializer.</summary>
28+
/// <remarks>Byte order is original order.</remarks>
29+
public static readonly ISerializer<byte[]> ByteArray = new ByteArraySerializer();
30+
31+
internal static readonly Dictionary<System.Type, object> DefaultSerializers = new()
32+
{
33+
{ typeof(int), Int32 },
34+
{ typeof(long), Int64 },
35+
{ typeof(string), Utf8 },
36+
{ typeof(byte[]), ByteArray }
37+
};
38+
39+
private class Utf8Serializer : ISerializer<string>
40+
{
41+
public byte[] Serialize(string data)
42+
{
43+
return Encoding.UTF8.GetBytes(data);
44+
}
45+
}
46+
47+
private class Int64Serializer : ISerializer<long>
48+
{
49+
public byte[] Serialize(long data)
50+
{
51+
return new[]
52+
{
53+
(byte)(data >> 56),
54+
(byte)(data >> 48),
55+
(byte)(data >> 40),
56+
(byte)(data >> 32),
57+
(byte)(data >> 24),
58+
(byte)(data >> 16),
59+
(byte)(data >> 8),
60+
(byte)data
61+
};
62+
}
63+
}
64+
65+
private class Int32Serializer : ISerializer<int>
66+
{
67+
public byte[] Serialize(int data)
68+
{
69+
return new[]
70+
{
71+
(byte)(data >> 24),
72+
(byte)(data >> 16),
73+
(byte)(data >> 8),
74+
(byte)data
75+
};
76+
}
77+
}
78+
79+
private class ByteArraySerializer : ISerializer<byte[]>
80+
{
81+
public byte[] Serialize(byte[] data) => data;
82+
}
83+
}

0 commit comments

Comments
 (0)