Skip to content

Commit df0639e

Browse files
feat: implement consistent retry logic for topic writer (#223)
* Do not send messages that have a timeout by cancelToken. * If your value serializer throws an exception, this will be wrapped in a WriterException with unspecified status. * Added BufferOverflowRetryTimeoutMs to the next try write. * Rename _disposeTokenSource -> _disposeCts. * Optimize write worker: if (_toSendBuffer.IsEmpty) continue. * On RPC errors create DummyWriterSession. * Message has been skipped because its sequence number is less than or equal to the last processed server's SeqNo. * Calculate the next sequence number from the calculated previous messages.
1 parent 8749518 commit df0639e

File tree

9 files changed

+515
-284
lines changed

9 files changed

+515
-284
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
- Topic Writer updated release candidate
2+
13
## v0.9.0-rc0
24
- Topic Writer release candidate
35
- Fixed: grpc requests go via proxy on Grpc.NET.Client >= 2.44

src/Ydb.Sdk/src/Services/Topic/Exceptions.cs

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,20 +4,15 @@ public class WriterException : Exception
44
{
55
public WriterException(string message) : base(message)
66
{
7-
Status = new Status(StatusCode.Unspecified);
87
}
98

109
public WriterException(string message, Status status) : base(message + ": " + status)
1110
{
12-
Status = status;
1311
}
1412

15-
public WriterException(string message, Driver.TransportException e) : base(message, e)
13+
public WriterException(string message, Exception inner) : base(message, inner)
1614
{
17-
Status = e.Status;
1815
}
19-
20-
public Status Status { get; }
2116
}
2217

2318
public class ReaderException : Exception

src/Ydb.Sdk/src/Services/Topic/TopicSession.cs

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ namespace Ydb.Sdk.Services.Topic;
55
internal abstract class TopicSession<TFromClient, TFromServer> : IDisposable
66
{
77
private readonly Func<Task> _initialize;
8-
private readonly Action<WriterException> _resetSessionOnTransportError;
98

109
protected readonly IBidirectionalStream<TFromClient, TFromServer> Stream;
1110
protected readonly ILogger Logger;
@@ -17,17 +16,15 @@ protected TopicSession(
1716
IBidirectionalStream<TFromClient, TFromServer> stream,
1817
ILogger logger,
1918
string sessionId,
20-
Func<Task> initialize,
21-
Action<WriterException> resetSessionOnTransportError)
19+
Func<Task> initialize)
2220
{
2321
Stream = stream;
2422
Logger = logger;
2523
SessionId = sessionId;
2624
_initialize = initialize;
27-
_resetSessionOnTransportError = resetSessionOnTransportError;
2825
}
2926

30-
protected async void ReconnectSession(WriterException exception)
27+
protected async void ReconnectSession()
3128
{
3229
if (Interlocked.CompareExchange(ref _isActive, 0, 1) == 0)
3330
{
@@ -36,9 +33,7 @@ protected async void ReconnectSession(WriterException exception)
3633
return;
3734
}
3835

39-
_resetSessionOnTransportError(exception);
40-
41-
Logger.LogInformation("WriterSession[{SessionId}] has been deactivated, starting to reconnect", SessionId);
36+
Logger.LogInformation("TopicSession[{SessionId}] has been deactivated, starting to reconnect", SessionId);
4237

4338
await _initialize();
4439
}

src/Ydb.Sdk/src/Services/Topic/Writer/WriteResult.cs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ namespace Ydb.Sdk.Services.Topic.Writer;
44

55
public class WriteResult
66
{
7+
internal static readonly WriteResult Skipped = new();
8+
79
private readonly long _offset;
810

911
internal WriteResult(StreamWriteMessage.Types.WriteResponse.Types.WriteAck ack)
@@ -23,6 +25,11 @@ internal WriteResult(StreamWriteMessage.Types.WriteResponse.Types.WriteAck ack)
2325
}
2426
}
2527

28+
private WriteResult()
29+
{
30+
Status = PersistenceStatus.AlreadyWritten;
31+
}
32+
2633
public PersistenceStatus Status { get; }
2734

2835
public bool TryGetOffset(out long offset)

0 commit comments

Comments
 (0)