Skip to content

Commit 5c68107

Browse files
fix
1 parent bc83841 commit 5c68107

File tree

8 files changed

+112
-58
lines changed

8 files changed

+112
-58
lines changed

src/Ydb.Sdk/src/Services/Topic/Exceptions.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,9 @@ protected YdbTopicException(string message) : base(message)
77
}
88
}
99

10-
public class YdbProducerException : YdbTopicException
10+
public class YdbWriterException : YdbTopicException
1111
{
12-
public YdbProducerException(string message) : base(message)
12+
public YdbWriterException(string message) : base(message)
1313
{
1414
}
1515
}

src/Ydb.Sdk/src/Services/Topic/IProducer.cs

Lines changed: 0 additions & 8 deletions
This file was deleted.
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
namespace Ydb.Sdk.Services.Topic;
2+
3+
public interface IWriter<TValue>
4+
{
5+
public Task<WriteResult> WriteAsync(TValue data);
6+
7+
public Task<WriteResult> WriteAsync(Message<TValue> message);
8+
}

src/Ydb.Sdk/src/Services/Topic/TopicSession.cs

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ internal abstract class TopicSession : IDisposable
1010
protected readonly string SessionId;
1111

1212
private int _isActive = 1;
13+
private bool _disposed;
1314

1415
protected TopicSession(ILogger logger, string sessionId, Func<Task> initialize)
1516
{
@@ -22,13 +23,14 @@ protected async void ReconnectSession()
2223
{
2324
if (Interlocked.CompareExchange(ref _isActive, 0, 1) == 0)
2425
{
25-
Logger.LogWarning("The reconnect has already been launched");
26+
Logger.LogWarning("Skipping reconnect. A reconnect session has already been initiated");
2627

2728
return;
2829
}
29-
3030

31-
while (true)
31+
Logger.LogInformation("WriterSession[{SessionId}] has been deactivated, starting to reconnect", SessionId);
32+
33+
while (!_disposed)
3234
{
3335
try
3436
{
@@ -37,12 +39,16 @@ protected async void ReconnectSession()
3739
}
3840
catch (Exception e)
3941
{
40-
Logger.LogError(e, "Error reconnect session!");
42+
Logger.LogError(e, "Unable to reconnect the session due to the following error");
4143
}
4244
}
4345
}
4446

4547
public void Dispose()
4648
{
49+
lock (this)
50+
{
51+
_disposed = true;
52+
}
4753
}
4854
}

src/Ydb.Sdk/src/Services/Topic/SendResult.cs renamed to src/Ydb.Sdk/src/Services/Topic/WriteResult.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,11 @@
22

33
namespace Ydb.Sdk.Services.Topic;
44

5-
public class SendResult
5+
public class WriteResult
66
{
77
private readonly long _offset;
88

9-
internal SendResult(StreamWriteMessage.Types.WriteResponse.Types.WriteAck ack)
9+
internal WriteResult(StreamWriteMessage.Types.WriteResponse.Types.WriteAck ack)
1010
{
1111
switch (ack.MessageWriteStatusCase)
1212
{
@@ -19,7 +19,7 @@ internal SendResult(StreamWriteMessage.Types.WriteResponse.Types.WriteAck ack)
1919
break;
2020
case StreamWriteMessage.Types.WriteResponse.Types.WriteAck.MessageWriteStatusOneofCase.None:
2121
default:
22-
throw new YdbProducerException($"Unexpected WriteAck status: {ack.MessageWriteStatusCase}");
22+
throw new YdbWriterException($"Unexpected WriteAck status: {ack.MessageWriteStatusCase}");
2323
}
2424
}
2525

src/Ydb.Sdk/src/Services/Topic/Producer.cs renamed to src/Ydb.Sdk/src/Services/Topic/Writer.cs

Lines changed: 79 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -11,33 +11,33 @@ namespace Ydb.Sdk.Services.Topic;
1111
using InitResponse = StreamWriteMessage.Types.InitResponse;
1212
using MessageData = StreamWriteMessage.Types.WriteRequest.Types.MessageData;
1313
using MessageFromClient = StreamWriteMessage.Types.FromClient;
14-
using ProducerStream = Driver.BidirectionalStream<
14+
using WriterStream = Driver.BidirectionalStream<
1515
StreamWriteMessage.Types.FromClient,
1616
StreamWriteMessage.Types.FromServer
1717
>;
1818

19-
internal class Producer<TValue> : IProducer<TValue>
19+
internal class Writer<TValue> : IWriter<TValue>
2020
{
21-
private readonly ProducerConfig _config;
22-
private readonly ILogger<Producer<TValue>> _logger;
21+
private readonly WriterConfig _config;
22+
private readonly ILogger<Writer<TValue>> _logger;
2323
private readonly ISerializer<TValue> _serializer;
2424

2525
private readonly ConcurrentQueue<MessageSending> _inFlightMessages = new();
2626
private readonly ConcurrentQueue<MessageSending> _toSendBuffer = new();
2727
private readonly SemaphoreSlim _writeSemaphoreSlim = new(1);
2828

29-
private volatile ProducerSession _session = null!;
29+
private volatile WriterSession _session = null!;
3030

31-
internal Producer(ProducerConfig producerConfig, ISerializer<TValue> serializer)
31+
internal Writer(WriterConfig config, ISerializer<TValue> serializer)
3232
{
33-
_config = producerConfig;
33+
_config = config;
3434
_serializer = serializer;
35-
_logger = producerConfig.Driver.LoggerFactory.CreateLogger<Producer<TValue>>();
35+
_logger = config.Driver.LoggerFactory.CreateLogger<Writer<TValue>>();
3636
}
3737

3838
internal async Task Initialize()
3939
{
40-
_logger.LogInformation("Producer session initialization started. ProducerConfig: {ProducerConfig}", _config);
40+
_logger.LogInformation("Writer session initialization started. WriterConfig: {WriterConfig}", _config);
4141

4242
var stream = _config.Driver.BidirectionalStreamCall(
4343
TopicService.StreamWriteMethod,
@@ -55,10 +55,12 @@ internal async Task Initialize()
5555
initRequest.MessageGroupId = _config.MessageGroupId;
5656
}
5757

58+
_logger.LogDebug("Sending initialization request for the write stream: {InitRequest}", initRequest);
59+
5860
await stream.Write(new MessageFromClient { InitRequest = initRequest });
5961
if (!await stream.MoveNextAsync())
6062
{
61-
throw new YdbProducerException(
63+
throw new YdbWriterException(
6264
$"Stream unexpectedly closed by YDB server. Current InitRequest: {initRequest}");
6365
}
6466

@@ -68,23 +70,39 @@ internal async Task Initialize()
6870

6971
var initResponse = receivedInitMessage.InitResponse;
7072

73+
_logger.LogDebug("Received a response for the initialization request on the write stream: {InitResponse}",
74+
initResponse);
75+
7176
if (!initResponse.SupportedCodecs.Codecs.Contains((int)_config.Codec))
7277
{
73-
throw new YdbProducerException($"Topic is not supported codec: {_config.Codec}");
78+
throw new YdbWriterException($"Topic[{_config.TopicPath}] is not supported codec: {_config.Codec}");
79+
}
80+
81+
_session = new WriterSession(_config, stream, initResponse, Initialize, _logger);
82+
83+
await _writeSemaphoreSlim.WaitAsync();
84+
try
85+
{
86+
_logger.LogDebug("Retrying to send pending in-flight messages after stream restart");
87+
88+
await _session.Write(_inFlightMessages, _inFlightMessages);
89+
}
90+
finally
91+
{
92+
_writeSemaphoreSlim.Release();
7493
}
7594

76-
_session = new ProducerSession(_config, stream, initResponse, Initialize, _logger);
7795
_ = _session.RunProcessingWriteAck(_inFlightMessages);
7896
}
7997

80-
public Task<SendResult> SendAsync(TValue data)
98+
public Task<WriteResult> WriteAsync(TValue data)
8199
{
82-
return SendAsync(new Message<TValue>(data));
100+
return WriteAsync(new Message<TValue>(data));
83101
}
84102

85-
public async Task<SendResult> SendAsync(Message<TValue> message)
103+
public async Task<WriteResult> WriteAsync(Message<TValue> message)
86104
{
87-
TaskCompletionSource<SendResult> completeTask = new();
105+
TaskCompletionSource<WriteResult> completeTask = new();
88106

89107
var data = _serializer.Serialize(message.Data);
90108
var messageData = new MessageData
@@ -123,16 +141,16 @@ public async Task<SendResult> SendAsync(Message<TValue> message)
123141
}
124142

125143
// No thread safe
126-
internal class ProducerSession : TopicSession
144+
internal class WriterSession : TopicSession
127145
{
128-
private readonly ProducerConfig _config;
129-
private readonly ProducerStream _stream;
146+
private readonly WriterConfig _config;
147+
private readonly WriterStream _stream;
130148

131149
private long _seqNum;
132150

133-
public ProducerSession(
134-
ProducerConfig config,
135-
ProducerStream stream,
151+
public WriterSession(
152+
WriterConfig config,
153+
WriterStream stream,
136154
InitResponse initResponse,
137155
Func<Task> initialize,
138156
ILogger logger) : base(logger, initResponse.SessionId, initialize)
@@ -146,7 +164,7 @@ internal async Task RunProcessingWriteAck(ConcurrentQueue<MessageSending> inFlig
146164
{
147165
try
148166
{
149-
Logger.LogInformation("ProducerSession[{SessionId}] is running processing writeAck", SessionId);
167+
Logger.LogInformation("WriterSession[{SessionId}] is running processing writeAck", SessionId);
150168

151169
await foreach (var messageFromServer in _stream)
152170
{
@@ -155,25 +173,55 @@ internal async Task RunProcessingWriteAck(ConcurrentQueue<MessageSending> inFlig
155173
if (status.IsNotSuccess)
156174
{
157175
Logger.LogWarning(
158-
"ProducerSession[{SessionId}] received unsuccessful status while processing writeAck: {Status}",
176+
"WriterSession[{SessionId}] received unsuccessful status while processing writeAck: {Status}",
159177
SessionId, status);
160178
return;
161179
}
162180

163181
foreach (var ack in messageFromServer.WriteResponse.Acks)
164182
{
165-
if (!inFlightMessages.TryDequeue(out var messageFromClient))
183+
if (!inFlightMessages.TryPeek(out var messageFromClient))
166184
{
185+
Logger.LogCritical("No client message was found upon receipt of an acknowledgement: {WriteAck}",
186+
ack);
187+
167188
break;
168189
}
169190

170-
messageFromClient.TaskCompletionSource.SetResult(new SendResult(ack));
191+
if (messageFromClient.MessageData.SeqNo > ack.SeqNo)
192+
{
193+
Logger.LogCritical(
194+
@"The sequence number of the client's message in the queue is greater than the server's write acknowledgment number.
195+
Skipping the WriteAck...
196+
Client SeqNo: {SeqNo}, WriteAck: {WriteAck}",
197+
messageFromClient.MessageData.SeqNo, ack);
198+
199+
continue;
200+
}
201+
202+
if (messageFromClient.MessageData.SeqNo < ack.SeqNo)
203+
{
204+
Logger.LogCritical(
205+
@"The sequence number of the client's message in the queue is less than the server's write acknowledgment number.
206+
Completing task on exception...
207+
Client SeqNo: {SeqNo}, WriteAck: {WriteAck}",
208+
messageFromClient.MessageData.SeqNo, ack);
209+
210+
messageFromClient.TaskCompletionSource.SetException(new YdbWriterException(
211+
$"Client SeqNo[{messageFromClient.MessageData.SeqNo}] is less then server's WriteAck[{ack}]"));
212+
}
213+
else
214+
{
215+
messageFromClient.TaskCompletionSource.SetResult(new WriteResult(ack));
216+
}
217+
218+
inFlightMessages.TryDequeue(out _); // Dequeue
171219
}
172220
}
173221
}
174222
catch (Exception e)
175223
{
176-
Logger.LogError(e, "ProducerSession[{SessionId}] have error on processing writeAck", SessionId);
224+
Logger.LogError(e, "WriterSession[{SessionId}] have error on processing writeAck", SessionId);
177225
}
178226
finally
179227
{
@@ -207,10 +255,10 @@ internal async Task Write(ConcurrentQueue<MessageSending> toSendBuffer,
207255
}
208256
catch (TransactionException e)
209257
{
210-
ReconnectSession();
258+
Logger.LogError(e, "WriterSession[{SessionId}] have error on Write, last SeqNo={SeqNo}",
259+
SessionId, Volatile.Read(ref _seqNum));
211260

212-
Console.WriteLine(e);
213-
throw;
261+
ReconnectSession();
214262
}
215263
}
216264

@@ -220,4 +268,4 @@ public ValueTask DisposeAsync()
220268
}
221269
}
222270

223-
internal record MessageSending(MessageData MessageData, TaskCompletionSource<SendResult> TaskCompletionSource);
271+
internal record MessageSending(MessageData MessageData, TaskCompletionSource<WriteResult> TaskCompletionSource);
Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,29 +1,29 @@
11
namespace Ydb.Sdk.Services.Topic;
22

3-
public class ProducerBuilder<TValue>
3+
public class WriterBuilder<TValue>
44
{
5-
private readonly ProducerConfig _config;
5+
private readonly WriterConfig _config;
66

7-
public ProducerBuilder(ProducerConfig config)
7+
public WriterBuilder(WriterConfig config)
88
{
99
_config = config;
1010
}
1111

1212
public ISerializer<TValue>? Serializer { get; set; }
1313

14-
public async Task<IProducer<TValue>> Build()
14+
public async Task<IWriter<TValue>> Build()
1515
{
16-
var producer = new Producer<TValue>(
16+
var writer = new Writer<TValue>(
1717
_config,
1818
Serializer ?? (ISerializer<TValue>)(
1919
Serializers.DefaultSerializers.TryGetValue(typeof(TValue), out var serializer)
2020
? serializer
21-
: throw new YdbProducerException("The serializer is not set")
21+
: throw new YdbWriterException("The serializer is not set")
2222
)
2323
);
2424

25-
await producer.Initialize();
25+
await writer.Initialize();
2626

27-
return producer;
27+
return writer;
2828
}
2929
}

src/Ydb.Sdk/src/Services/Topic/ProducerConfig.cs renamed to src/Ydb.Sdk/src/Services/Topic/WriterConfig.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,9 @@
22

33
namespace Ydb.Sdk.Services.Topic;
44

5-
public class ProducerConfig
5+
public class WriterConfig
66
{
7-
public ProducerConfig(Driver driver, string topicPath)
7+
public WriterConfig(Driver driver, string topicPath)
88
{
99
Driver = driver;
1010
TopicPath = topicPath;

0 commit comments

Comments
 (0)