Skip to content

Commit 77e34ca

Browse files
feat: Init Producer implementation
1 parent d0d22de commit 77e34ca

File tree

5 files changed

+290
-103
lines changed

5 files changed

+290
-103
lines changed
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
namespace Ydb.Sdk.Services.Topic;
2+
3+
public class Message<TValue>
4+
{
5+
public Message(TValue data)
6+
{
7+
Data = data;
8+
}
9+
10+
public DateTime Timestamp { get; set; } = DateTime.Now;
11+
12+
public TValue Data { get; }
13+
14+
public List<Metadata> Metadata { get; } = new();
15+
}
16+
17+
public record Metadata(string Key, byte[] Value);
Lines changed: 175 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -1,88 +1,203 @@
1-
// using System.Collections.Concurrent;
2-
// using Microsoft.Extensions.Logging;
3-
// using Ydb.Topic;
1+
using System.Collections.Concurrent;
2+
using System.Transactions;
3+
using Google.Protobuf;
4+
using Google.Protobuf.WellKnownTypes;
5+
using Microsoft.Extensions.Logging;
6+
using Ydb.Topic;
7+
using Ydb.Topic.V1;
48

59
namespace Ydb.Sdk.Services.Topic;
610

7-
// using ProducerStream = Driver.BidirectionalStream<
8-
// StreamWriteMessage.Types.FromClient,
9-
// StreamWriteMessage.Types.FromServer
10-
// >;
11+
using InitResponse = StreamWriteMessage.Types.InitResponse;
12+
using MessageData = StreamWriteMessage.Types.WriteRequest.Types.MessageData;
13+
using MessageFromClient = StreamWriteMessage.Types.FromClient;
14+
using ProducerStream = Driver.BidirectionalStream<
15+
StreamWriteMessage.Types.FromClient,
16+
StreamWriteMessage.Types.FromServer
17+
>;
1118

1219
internal class Producer<TValue> : IProducer<TValue>
1320
{
14-
// private readonly Driver _driver;
15-
// private readonly ILogger<Producer<TValue>> _logger;
16-
// private readonly long _partitionId;
17-
// private readonly string _sessionId;
18-
// private readonly ISerializer<TValue> _serializer;
19-
//
20-
// private long _seqNum;
21-
//
22-
// private readonly ConcurrentQueue<StreamWriteMessage.Types.FromClient> _inFlightMessages;
23-
// private volatile ProducerStream _stream;
24-
//
25-
// internal Producer(
26-
// ProducerConfig producerConfig,
27-
// StreamWriteMessage.Types.InitResponse initResponse,
28-
// ProducerStream stream,
29-
// ISerializer<TValue> serializer)
30-
// {
31-
// _driver = producerConfig.Driver;
32-
// _stream = stream;
33-
// _serializer = serializer;
34-
// _logger = producerConfig.Driver.LoggerFactory.CreateLogger<Producer<TValue>>();
35-
// _partitionId = initResponse.PartitionId;
36-
// _sessionId = initResponse.SessionId;
37-
// _seqNum = initResponse.LastSeqNo;
38-
// _inFlightMessages = new ConcurrentQueue<StreamWriteMessage.Types.FromClient>();
39-
// }
21+
private readonly ProducerConfig _config;
22+
private readonly ILogger<Producer<TValue>> _logger;
23+
private readonly ISerializer<TValue> _serializer;
24+
25+
private readonly ConcurrentQueue<MessageSending> _inFlightMessages = new();
26+
private readonly ConcurrentQueue<MessageSending> _toSendBuffer = new();
27+
private readonly SemaphoreSlim _writeSemaphoreSlim = new(1);
28+
29+
private volatile ProducerSession _session = null!;
30+
31+
internal Producer(ProducerConfig producerConfig, ISerializer<TValue> serializer)
32+
{
33+
_config = producerConfig;
34+
_serializer = serializer;
35+
_logger = producerConfig.Driver.LoggerFactory.CreateLogger<Producer<TValue>>();
36+
}
37+
38+
internal async Task Initialize()
39+
{
40+
var stream = _config.Driver.BidirectionalStreamCall(TopicService.StreamWriteMethod,
41+
GrpcRequestSettings.DefaultInstance);
42+
43+
var initRequest = new StreamWriteMessage.Types.InitRequest { Path = _config.TopicPath };
44+
if (_config.ProducerId != null)
45+
{
46+
initRequest.ProducerId = _config.ProducerId;
47+
}
48+
49+
if (_config.MessageGroupId != null)
50+
{
51+
initRequest.MessageGroupId = _config.MessageGroupId;
52+
}
53+
54+
await stream.Write(new MessageFromClient { InitRequest = initRequest });
55+
if (!await stream.MoveNextAsync())
56+
{
57+
throw new YdbProducerException("Write stream is closed by YDB server");
58+
}
59+
60+
var receivedInitMessage = stream.Current;
61+
62+
Status.FromProto(receivedInitMessage.Status, receivedInitMessage.Issues).EnsureSuccess();
63+
64+
var initResponse = receivedInitMessage.InitResponse;
65+
66+
if (!initResponse.SupportedCodecs.Codecs.Contains((int)_config.Codec))
67+
{
68+
throw new YdbProducerException($"Topic is not supported codec: {_config.Codec}");
69+
}
70+
71+
_session = new ProducerSession(_config, stream, initResponse, Initialize, _logger);
72+
_ = _session.RunProcessingWriteAck(_inFlightMessages);
73+
}
4074

4175
public Task<SendResult> SendAsync(TValue data)
4276
{
43-
throw new NotImplementedException();
77+
return SendAsync(new Message<TValue>(data));
4478
}
4579

46-
public Task<SendResult> SendAsync(Message<TValue> message)
80+
public async Task<SendResult> SendAsync(Message<TValue> message)
4781
{
48-
throw new NotImplementedException();
82+
TaskCompletionSource<SendResult> completeTask = new();
83+
84+
var data = _serializer.Serialize(message.Data);
85+
var messageData = new MessageData
86+
{
87+
Data = ByteString.CopyFrom(data),
88+
CreatedAt = Timestamp.FromDateTime(message.Timestamp),
89+
UncompressedSize = data.Length
90+
};
91+
92+
foreach (var metadata in message.Metadata)
93+
{
94+
messageData.MetadataItems.Add(
95+
new MetadataItem { Key = metadata.Key, Value = ByteString.CopyFrom(metadata.Value) }
96+
);
97+
}
98+
99+
_toSendBuffer.Enqueue(new MessageSending(messageData, completeTask));
100+
101+
if (_toSendBuffer.IsEmpty) // concurrent sending
102+
{
103+
return await completeTask.Task;
104+
}
105+
106+
await _writeSemaphoreSlim.WaitAsync();
107+
try
108+
{
109+
await _session.Write(_toSendBuffer, _inFlightMessages);
110+
}
111+
finally
112+
{
113+
_writeSemaphoreSlim.Release();
114+
}
115+
116+
return await completeTask.Task;
49117
}
50118
}
51119

52-
public class Message<TValue>
120+
// No thread safe
121+
internal class ProducerSession : TopicSession
53122
{
54-
public Message(TValue data)
123+
private readonly ProducerConfig _config;
124+
private readonly ProducerStream _stream;
125+
126+
private long _seqNum;
127+
128+
public ProducerSession(
129+
ProducerConfig config,
130+
ProducerStream stream,
131+
InitResponse initResponse,
132+
Func<Task> initialize,
133+
ILogger logger) : base(logger, initResponse.SessionId, initialize)
55134
{
56-
Data = data;
135+
_config = config;
136+
_stream = stream;
137+
_seqNum = initResponse.LastSeqNo;
57138
}
58139

59-
public DateTime Timestamp { get; set; }
140+
internal async Task RunProcessingWriteAck(ConcurrentQueue<MessageSending> inFlightMessages)
141+
{
142+
await foreach (var messageFromServer in _stream)
143+
{
144+
var status = Status.FromProto(messageFromServer.Status, messageFromServer.Issues);
60145

61-
public TValue Data { get; }
146+
if (status.IsNotSuccess)
147+
{
148+
Logger.LogWarning("");
149+
return;
150+
}
62151

63-
public List<Metadata> Metadata { get; } = new();
64-
}
152+
foreach (var ack in messageFromServer.WriteResponse.Acks)
153+
{
154+
if (!inFlightMessages.TryDequeue(out var messageFromClient))
155+
{
156+
break;
157+
}
65158

66-
public record Metadata(string Key, byte[] Value);
159+
messageFromClient.TaskCompletionSource.SetResult(new SendResult(ack));
160+
}
161+
}
162+
}
67163

68-
public class SendResult
69-
{
70-
public SendResult(State status)
164+
internal async Task Write(ConcurrentQueue<MessageSending> toSendBuffer,
165+
ConcurrentQueue<MessageSending> inFlightMessages)
71166
{
72-
State = status;
73-
}
167+
try
168+
{
169+
var writeMessage = new StreamWriteMessage.Types.WriteRequest
170+
{
171+
Codec = (int)_config.Codec
172+
};
74173

75-
public State State { get; }
76-
}
174+
var currentSeqNum = Volatile.Read(ref _seqNum);
77175

78-
public enum State
79-
{
80-
Written,
81-
AlreadyWritten
82-
}
176+
while (toSendBuffer.TryDequeue(out var sendData))
177+
{
178+
var messageData = sendData.MessageData;
83179

84-
internal enum ProducerState
85-
{
86-
Ready
87-
// Broken
180+
messageData.SeqNo = ++currentSeqNum;
181+
writeMessage.Messages.Add(messageData);
182+
inFlightMessages.Enqueue(sendData);
183+
}
184+
185+
Volatile.Write(ref _seqNum, currentSeqNum);
186+
await _stream.Write(new MessageFromClient { WriteRequest = writeMessage });
187+
}
188+
catch (TransactionException e)
189+
{
190+
ReconnectSession();
191+
192+
Console.WriteLine(e);
193+
throw;
194+
}
195+
}
196+
197+
public ValueTask DisposeAsync()
198+
{
199+
return _stream.DisposeAsync();
200+
}
88201
}
202+
203+
internal record MessageSending(MessageData MessageData, TaskCompletionSource<SendResult> TaskCompletionSource);
Lines changed: 12 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,3 @@
1-
using Ydb.Topic;
2-
using Ydb.Topic.V1;
3-
41
namespace Ydb.Sdk.Services.Topic;
52

63
public class ProducerBuilder<TValue>
@@ -16,45 +13,17 @@ public ProducerBuilder(ProducerConfig config)
1613

1714
public async Task<IProducer<TValue>> Build()
1815
{
19-
var stream = _config.Driver.BidirectionalStreamCall(TopicService.StreamWriteMethod,
20-
GrpcRequestSettings.DefaultInstance);
21-
22-
var initRequest = new StreamWriteMessage.Types.InitRequest { Path = _config.TopicPath };
23-
if (_config.ProducerId != null)
24-
{
25-
initRequest.ProducerId = _config.ProducerId;
26-
}
27-
28-
if (_config.MessageGroupId != null)
29-
{
30-
initRequest.MessageGroupId = _config.MessageGroupId;
31-
}
32-
33-
await stream.Write(new StreamWriteMessage.Types.FromClient { InitRequest = initRequest });
34-
if (!await stream.MoveNextAsync())
35-
{
36-
throw new YdbProducerException("Write stream is closed by YDB server");
37-
}
38-
39-
var receivedInitMessage = stream.Current;
40-
41-
Status.FromProto(receivedInitMessage.Status, receivedInitMessage.Issues).EnsureSuccess();
42-
43-
var initResponse = receivedInitMessage.InitResponse;
44-
45-
if (!initResponse.SupportedCodecs.Codecs.Contains((int)_config.Codec))
46-
{
47-
throw new YdbProducerException($"Topic is not supported codec: {_config.Codec}");
48-
}
49-
50-
throw new NotImplementedException();
51-
// return new Producer<TValue>(
52-
// _config, initResponse, stream,
53-
// Serializer ?? (ISerializer<TValue>)(
54-
// Serializers.DefaultSerializers.TryGetValue(typeof(TValue), out var serializer)
55-
// ? serializer
56-
// : throw new YdbProducerException("The serializer is not set")
57-
// )
58-
// );
16+
var producer = new Producer<TValue>(
17+
_config,
18+
Serializer ?? (ISerializer<TValue>)(
19+
Serializers.DefaultSerializers.TryGetValue(typeof(TValue), out var serializer)
20+
? serializer
21+
: throw new YdbProducerException("The serializer is not set")
22+
)
23+
);
24+
25+
await producer.Initialize();
26+
27+
return producer;
5928
}
6029
}
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
using Ydb.Topic;
2+
3+
namespace Ydb.Sdk.Services.Topic;
4+
5+
public class SendResult
6+
{
7+
private readonly long _offset;
8+
9+
internal SendResult(StreamWriteMessage.Types.WriteResponse.Types.WriteAck ack)
10+
{
11+
switch (ack.MessageWriteStatusCase)
12+
{
13+
case StreamWriteMessage.Types.WriteResponse.Types.WriteAck.MessageWriteStatusOneofCase.Written:
14+
Status = PersistenceStatus.Written;
15+
_offset = ack.Written.Offset;
16+
break;
17+
case StreamWriteMessage.Types.WriteResponse.Types.WriteAck.MessageWriteStatusOneofCase.Skipped:
18+
Status = PersistenceStatus.AlreadyWritten;
19+
break;
20+
case StreamWriteMessage.Types.WriteResponse.Types.WriteAck.MessageWriteStatusOneofCase.None:
21+
default:
22+
throw new YdbProducerException($"Unexpected WriteAck status: {ack.MessageWriteStatusCase}");
23+
}
24+
}
25+
26+
public PersistenceStatus Status { get; }
27+
28+
public bool TryGetOffset(out long offset)
29+
{
30+
offset = _offset;
31+
32+
return Status == PersistenceStatus.Written;
33+
}
34+
}
35+
36+
public enum PersistenceStatus
37+
{
38+
Written,
39+
AlreadyWritten
40+
}

0 commit comments

Comments
 (0)