Skip to content

Commit 8a94e5a

Browse files
feat: writer retry always
1 parent 8749518 commit 8a94e5a

File tree

7 files changed

+94
-59
lines changed

7 files changed

+94
-59
lines changed

src/Ydb.Sdk/src/Services/Topic/Exceptions.cs

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,20 +4,15 @@ public class WriterException : Exception
44
{
55
public WriterException(string message) : base(message)
66
{
7-
Status = new Status(StatusCode.Unspecified);
87
}
98

109
public WriterException(string message, Status status) : base(message + ": " + status)
1110
{
12-
Status = status;
1311
}
1412

15-
public WriterException(string message, Driver.TransportException e) : base(message, e)
13+
public WriterException(string message, Exception inner) : base(message, inner)
1614
{
17-
Status = e.Status;
1815
}
19-
20-
public Status Status { get; }
2116
}
2217

2318
public class ReaderException : Exception

src/Ydb.Sdk/src/Services/Topic/TopicSession.cs

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ namespace Ydb.Sdk.Services.Topic;
55
internal abstract class TopicSession<TFromClient, TFromServer> : IDisposable
66
{
77
private readonly Func<Task> _initialize;
8-
private readonly Action<WriterException> _resetSessionOnTransportError;
98

109
protected readonly IBidirectionalStream<TFromClient, TFromServer> Stream;
1110
protected readonly ILogger Logger;
@@ -17,28 +16,23 @@ protected TopicSession(
1716
IBidirectionalStream<TFromClient, TFromServer> stream,
1817
ILogger logger,
1918
string sessionId,
20-
Func<Task> initialize,
21-
Action<WriterException> resetSessionOnTransportError)
19+
Func<Task> initialize)
2220
{
2321
Stream = stream;
2422
Logger = logger;
2523
SessionId = sessionId;
2624
_initialize = initialize;
27-
_resetSessionOnTransportError = resetSessionOnTransportError;
2825
}
2926

30-
protected async void ReconnectSession(WriterException exception)
31-
{
27+
protected async void ReconnectSession() {
3228
if (Interlocked.CompareExchange(ref _isActive, 0, 1) == 0)
3329
{
3430
Logger.LogDebug("Skipping reconnect. A reconnect session has already been initiated");
3531

3632
return;
3733
}
3834

39-
_resetSessionOnTransportError(exception);
40-
41-
Logger.LogInformation("WriterSession[{SessionId}] has been deactivated, starting to reconnect", SessionId);
35+
Logger.LogInformation("TopicSession[{SessionId}] has been deactivated, starting to reconnect", SessionId);
4236

4337
await _initialize();
4438
}

src/Ydb.Sdk/src/Services/Topic/Writer/Writer.cs

Lines changed: 62 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,10 @@ internal class Writer<TValue> : IWriter<TValue>
2424
private readonly ISerializer<TValue> _serializer;
2525
private readonly ConcurrentQueue<MessageSending> _toSendBuffer = new();
2626
private readonly ConcurrentQueue<MessageSending> _inFlightMessages = new();
27-
private readonly CancellationTokenSource _disposeTokenSource = new();
27+
private readonly CancellationTokenSource _disposeCts = new();
2828

2929
private volatile TaskCompletionSource _tcsWakeUp = new();
30-
private volatile IWriteSession _session = new NotStartedWriterSession("Session not started!");
30+
private volatile IWriteSession _session = null!;
3131

3232
private int _limitBufferMaxSize;
3333

@@ -51,18 +51,27 @@ public async Task<WriteResult> WriteAsync(Message<TValue> message, CancellationT
5151
{
5252
TaskCompletionSource<WriteResult> tcs = new();
5353
cancellationToken.Register(
54-
() => tcs.TrySetCanceled(cancellationToken),
55-
useSynchronizationContext: false
54+
() => tcs.TrySetException(
55+
new WriterException("The write operation was canceled before it could be completed")
56+
), useSynchronizationContext: false
5657
);
5758

58-
var data = _serializer.Serialize(message.Data);
59+
byte[] data;
60+
try
61+
{
62+
data = _serializer.Serialize(message.Data);
63+
}
64+
catch (Exception e)
65+
{
66+
throw new WriterException("Error when serializing message data", e);
67+
}
68+
5969
var messageData = new MessageData
6070
{
6171
Data = ByteString.CopyFrom(data),
6272
CreatedAt = Timestamp.FromDateTime(message.Timestamp.ToUniversalTime()),
6373
UncompressedSize = data.Length
6474
};
65-
6675
foreach (var metadata in message.Metadata)
6776
{
6877
messageData.MetadataItems.Add(new MetadataItem
@@ -94,7 +103,14 @@ public async Task<WriteResult> WriteAsync(Message<TValue> message, CancellationT
94103
"Buffer overflow: the data size [{DataLength}] exceeds the current buffer limit ({CurLimitBufferSize}) [BufferMaxSize = {BufferMaxSize}]",
95104
data.Length, curLimitBufferSize, _config.BufferMaxSize);
96105

97-
throw new WriterException("Buffer overflow");
106+
try
107+
{
108+
await Task.Delay(_config.BufferOverflowRetryTimeoutMs, cancellationToken);
109+
}
110+
catch (TaskCanceledException)
111+
{
112+
throw new WriterException("Buffer overflow");
113+
}
98114
}
99115

100116
try
@@ -113,7 +129,7 @@ private async void StartWriteWorker()
113129
{
114130
await Initialize();
115131

116-
while (!_disposeTokenSource.Token.IsCancellationRequested)
132+
while (!_disposeCts.Token.IsCancellationRequested)
117133
{
118134
await _tcsWakeUp.Task;
119135
_tcsWakeUp = new TaskCompletionSource();
@@ -129,10 +145,14 @@ private void WakeUpWorker()
129145

130146
private async Task Initialize()
131147
{
148+
_session = DummyWriteSession.Instance;
149+
132150
try
133151
{
134-
if (_disposeTokenSource.IsCancellationRequested)
152+
if (_disposeCts.IsCancellationRequested)
135153
{
154+
_logger.LogWarning("Initialize writer is canceled because it has been disposed");
155+
136156
return;
137157
}
138158

@@ -162,7 +182,7 @@ private async Task Initialize()
162182
_session = new NotStartedWriterSession(
163183
$"Stream unexpectedly closed by YDB server. Current InitRequest: {initRequest}");
164184

165-
_ = Task.Run(Initialize, _disposeTokenSource.Token);
185+
_ = Task.Run(Initialize, _disposeCts.Token);
166186

167187
return;
168188
}
@@ -177,7 +197,7 @@ private async Task Initialize()
177197

178198
if (status.StatusCode != StatusCode.SchemeError)
179199
{
180-
_ = Task.Run(Initialize, _disposeTokenSource.Token);
200+
_ = Task.Run(Initialize, _disposeCts.Token);
181201
}
182202

183203
_logger.LogCritical("Writer initialization failed to start. Reason: {Status}", status);
@@ -207,17 +227,16 @@ private async Task Initialize()
207227
stream,
208228
initResponse,
209229
Initialize,
210-
e => { _session = new NotStartedWriterSession(e); },
211230
_logger,
212-
_inFlightMessages
231+
_toSendBuffer
213232
);
214233

215234
if (!_inFlightMessages.IsEmpty)
216235
{
217236
var copyInFlightMessages = new ConcurrentQueue<MessageSending>();
218237
while (_inFlightMessages.TryDequeue(out var sendData))
219238
{
220-
if (sendData.Tcs.Task.IsCanceled)
239+
if (sendData.Tcs.Task.IsFaulted)
221240
{
222241
_logger.LogWarning("Message[SeqNo={SeqNo}] is cancelled", sendData.MessageData.SeqNo);
223242

@@ -232,29 +251,31 @@ private async Task Initialize()
232251

233252
_session = newSession;
234253
newSession.RunProcessingWriteAck();
254+
if (!_toSendBuffer.IsEmpty)
255+
{
256+
WakeUpWorker(); // send buffer
257+
}
235258
}
236259
catch (Driver.TransportException e)
237260
{
238-
_logger.LogError(e, "Unable to connect the session");
239-
240-
_session = new NotStartedWriterSession(
241-
new WriterException("Transport error on creating WriterSession", e));
261+
_logger.LogError(e, "Transport error on creating WriterSession");
242262

243-
_ = Task.Run(Initialize, _disposeTokenSource.Token);
263+
_session = DummyWriteSession.Instance;
264+
_ = Task.Run(Initialize, _disposeCts.Token);
244265
}
245266
}
246267

247268
public void Dispose()
248269
{
249270
try
250271
{
251-
_disposeTokenSource.Cancel();
272+
_disposeCts.Cancel();
252273

253274
_session.Dispose();
254275
}
255276
finally
256277
{
257-
_disposeTokenSource.Dispose();
278+
_disposeCts.Dispose();
258279
}
259280
}
260281
}
@@ -280,11 +301,6 @@ public NotStartedWriterSession(string reasonExceptionMessage, Status status)
280301
_reasonException = new WriterException(reasonExceptionMessage, status);
281302
}
282303

283-
public NotStartedWriterSession(WriterException reasonException)
284-
{
285-
_reasonException = reasonException;
286-
}
287-
288304
public Task Write(ConcurrentQueue<MessageSending> toSendBuffer)
289305
{
290306
while (toSendBuffer.TryDequeue(out var messageSending))
@@ -297,6 +313,21 @@ public Task Write(ConcurrentQueue<MessageSending> toSendBuffer)
297313

298314
public void Dispose()
299315
{
316+
// Do nothing
317+
}
318+
}
319+
320+
internal class DummyWriteSession : IWriteSession
321+
{
322+
internal static readonly DummyWriteSession Instance = new();
323+
324+
public void Dispose()
325+
{
326+
}
327+
328+
public Task Write(ConcurrentQueue<MessageSending> toSendBuffer)
329+
{
330+
return Task.CompletedTask;
300331
}
301332
}
302333

@@ -312,15 +343,13 @@ public WriterSession(
312343
WriterStream stream,
313344
InitResponse initResponse,
314345
Func<Task> initialize,
315-
Action<WriterException> resetSessionOnTransportError,
316346
ILogger logger,
317347
ConcurrentQueue<MessageSending> inFlightMessages
318348
) : base(
319349
stream,
320350
logger,
321351
initResponse.SessionId,
322-
initialize,
323-
resetSessionOnTransportError
352+
initialize
324353
)
325354
{
326355
_config = config;
@@ -357,10 +386,10 @@ public async Task Write(ConcurrentQueue<MessageSending> toSendBuffer)
357386
}
358387
catch (Driver.TransportException e)
359388
{
360-
Logger.LogError(e, "WriterSession[{SessionId}] have error on Write, last SeqNo={SeqNo}",
389+
Logger.LogError(e, "WriterSession[{SessionId}] have transport error on Write, last SeqNo={SeqNo}",
361390
SessionId, Volatile.Read(ref _seqNum));
362391

363-
ReconnectSession(new WriterException("Transport error in the WriterSession on write messages", e));
392+
ReconnectSession();
364393
}
365394
}
366395

@@ -428,13 +457,13 @@ Completing task on exception...
428457
{
429458
Logger.LogError(e, "WriterSession[{SessionId}] have error on processing writeAck", SessionId);
430459

431-
ReconnectSession(new WriterException("Transport error in the WriterSession on processing writeAck", e));
460+
ReconnectSession();
432461

433462
return;
434463
}
435464

436465
Logger.LogWarning("WriterSession[{SessionId}]: stream is closed", SessionId);
437466

438-
ReconnectSession(new WriterException("WriterStream is closed"));
467+
ReconnectSession();
439468
}
440469
}

src/Ydb.Sdk/src/Services/Topic/Writer/WriterBuilder.cs

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,16 +39,36 @@ public WriterBuilder(IDriver driver, string topicPath)
3939
/// </summary>
4040
public long? PartitionId { get; set; }
4141

42+
/// <summary>
43+
/// The serializer to use to serialize values.
44+
/// </summary>
45+
/// <remarks>
46+
/// If your value serializer throws an exception, this will be
47+
/// wrapped in a WriterException with unspecified status.
48+
/// </remarks>
4249
public ISerializer<TValue>? Serializer { get; set; }
4350

51+
52+
/// <summary>
53+
/// Represents the timeout duration, in milliseconds, used when a buffer overflow is detected.
54+
/// This timeout specifies how long the system should wait before attempting to retry the operation.
55+
/// </summary>
56+
/// <remarks>
57+
/// This timeout is important for managing system performance and stability.
58+
/// Too short a timeout could lead to rapid retry attempts, potentially causing further resource contention
59+
/// and degrading system performance. Conversely, too long a timeout might delay processing significantly.
60+
/// </remarks>
61+
public int BufferOverflowRetryTimeoutMs { get; set; } = 10;
62+
4463
public IWriter<TValue> Build()
4564
{
4665
var config = new WriterConfig(
4766
topicPath: TopicPath,
4867
producerId: ProducerId,
4968
codec: Codec,
5069
bufferMaxSize: BufferMaxSize,
51-
partitionId: PartitionId
70+
partitionId: PartitionId,
71+
bufferOverflowRetryTimeoutMs: BufferOverflowRetryTimeoutMs
5272
);
5373

5474
return new Writer<TValue>(

src/Ydb.Sdk/src/Services/Topic/Writer/WriterConfig.cs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,15 @@ internal WriterConfig(
99
string? producerId,
1010
Codec codec,
1111
int bufferMaxSize,
12-
long? partitionId
13-
)
12+
long? partitionId,
13+
int bufferOverflowRetryTimeoutMs)
1414
{
1515
TopicPath = topicPath;
1616
ProducerId = producerId;
1717
Codec = codec;
1818
BufferMaxSize = bufferMaxSize;
1919
PartitionId = partitionId;
20+
BufferOverflowRetryTimeoutMs = bufferOverflowRetryTimeoutMs;
2021
}
2122

2223
public string TopicPath { get; }
@@ -28,6 +29,8 @@ internal WriterConfig(
2829
public int BufferMaxSize { get; }
2930

3031
public long? PartitionId { get; }
32+
33+
public int BufferOverflowRetryTimeoutMs { get; }
3134

3235
public override string ToString()
3336
{

src/Ydb.Sdk/tests/Topic/WriterIntegrationTests.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,8 @@ public async Task WriteAsync_WhenTopicNotFound_ReturnNotFoundException()
4545
using var writer = new WriterBuilder<string>(_driver, _topicName + "_not_found")
4646
{ ProducerId = "producerId" }.Build();
4747

48-
Assert.Equal(StatusCode.SchemeError, (await Assert.ThrowsAsync<WriterException>(
49-
() => writer.WriteAsync("hello world"))).Status.StatusCode);
48+
Assert.Equal("StatusCode.SchemeError", (await Assert.ThrowsAsync<WriterException>(
49+
() => writer.WriteAsync("hello world"))).Message);
5050
}
5151

5252
[Fact]

0 commit comments

Comments
 (0)