Skip to content

Commit 3ab99a9

Browse files
update writer
1 parent 391f86d commit 3ab99a9

File tree

4 files changed

+60
-81
lines changed

4 files changed

+60
-81
lines changed

src/Ydb.Sdk/src/Services/Topic/Writer/Writer.cs

Lines changed: 38 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,9 @@ internal class Writer<TValue> : IWriter<TValue>
2626
private readonly CancellationTokenSource _disposeCts = new();
2727

2828
private volatile TaskCompletionSource _tcsWakeUp = new();
29+
private volatile TaskCompletionSource _tcsBufferAvailableEvent = new();
2930
private volatile IWriteSession _session = null!;
30-
31-
private int _limitBufferMaxSize;
31+
private volatile int _limitBufferMaxSize;
3232

3333
internal Writer(IDriver driver, WriterConfig config, ISerializer<TValue> serializer)
3434
{
@@ -79,7 +79,7 @@ public async Task<WriteResult> WriteAsync(Message<TValue> message, CancellationT
7979

8080
while (true)
8181
{
82-
var curLimitBufferSize = Volatile.Read(ref _limitBufferMaxSize);
82+
var curLimitBufferSize = _limitBufferMaxSize;
8383

8484
if ( // sending one biggest message anyway
8585
(curLimitBufferSize == _config.BufferMaxSize && data.Length > curLimitBufferSize)
@@ -98,15 +98,15 @@ public async Task<WriteResult> WriteAsync(Message<TValue> message, CancellationT
9898
continue;
9999
}
100100

101-
_logger.LogWarning(
102-
"Buffer overflow: the data size [{DataLength}] exceeds the current buffer limit ({CurLimitBufferSize}) [BufferMaxSize = {BufferMaxSize}]",
103-
data.Length, curLimitBufferSize, _config.BufferMaxSize);
101+
// _logger.LogWarning(
102+
// "Buffer overflow: the data size [{DataLength}] exceeds the current buffer limit ({CurLimitBufferSize}) [BufferMaxSize = {BufferMaxSize}]",
103+
// data.Length, curLimitBufferSize, _config.BufferMaxSize);
104104

105105
try
106106
{
107-
await Task.Delay(_config.BufferOverflowRetryTimeoutMs, cancellationToken);
107+
await WaitBufferAvailable(cancellationToken);
108108
}
109-
catch (TaskCanceledException)
109+
catch (OperationCanceledException)
110110
{
111111
throw new WriterException("Buffer overflow");
112112
}
@@ -121,24 +121,46 @@ public async Task<WriteResult> WriteAsync(Message<TValue> message, CancellationT
121121
finally
122122
{
123123
Interlocked.Add(ref _limitBufferMaxSize, data.Length);
124+
125+
_tcsBufferAvailableEvent.TrySetResult();
124126
}
125127
}
126128

129+
private async Task WaitBufferAvailable(CancellationToken cancellationToken)
130+
{
131+
var tcsBufferAvailableEvent = _tcsBufferAvailableEvent;
132+
133+
await tcsBufferAvailableEvent.Task.WaitAsync(cancellationToken);
134+
135+
Interlocked.CompareExchange(
136+
ref _tcsBufferAvailableEvent,
137+
new TaskCompletionSource(),
138+
tcsBufferAvailableEvent
139+
);
140+
}
141+
127142
private async void StartWriteWorker()
128143
{
129144
await Initialize();
130145

131-
while (!_disposeCts.Token.IsCancellationRequested)
146+
try
132147
{
133-
await _tcsWakeUp.Task;
134-
_tcsWakeUp = new TaskCompletionSource();
135-
136-
if (_toSendBuffer.IsEmpty)
148+
while (!_disposeCts.Token.IsCancellationRequested)
137149
{
138-
continue;
139-
}
150+
await _tcsWakeUp.Task.WaitAsync(_disposeCts.Token);
151+
_tcsWakeUp = new TaskCompletionSource();
140152

141-
await _session.Write(_toSendBuffer);
153+
if (_toSendBuffer.IsEmpty)
154+
{
155+
continue;
156+
}
157+
158+
await _session.Write(_toSendBuffer);
159+
}
160+
}
161+
catch (OperationCanceledException)
162+
{
163+
_logger.LogInformation("WriteWorker[{WriterConfig}] is disposed", _config);
142164
}
143165
}
144166

src/Ydb.Sdk/src/Services/Topic/Writer/WriterBuilder.cs

Lines changed: 1 addition & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -48,27 +48,14 @@ public WriterBuilder(IDriver driver, string topicPath)
4848
/// </remarks>
4949
public ISerializer<TValue>? Serializer { get; set; }
5050

51-
52-
/// <summary>
53-
/// Represents the timeout duration, in milliseconds, used when a buffer overflow is detected.
54-
/// This timeout specifies how long the system should wait before attempting to retry the operation.
55-
/// </summary>
56-
/// <remarks>
57-
/// This timeout is important for managing system performance and stability.
58-
/// Too short a timeout could lead to rapid retry attempts, potentially causing further resource contention
59-
/// and degrading system performance. Conversely, too long a timeout might delay processing significantly.
60-
/// </remarks>
61-
public int BufferOverflowRetryTimeoutMs { get; set; } = 10;
62-
6351
public IWriter<TValue> Build()
6452
{
6553
var config = new WriterConfig(
6654
topicPath: TopicPath,
6755
producerId: ProducerId,
6856
codec: Codec,
6957
bufferMaxSize: BufferMaxSize,
70-
partitionId: PartitionId,
71-
bufferOverflowRetryTimeoutMs: BufferOverflowRetryTimeoutMs
58+
partitionId: PartitionId
7259
);
7360

7461
return new Writer<TValue>(

src/Ydb.Sdk/src/Services/Topic/Writer/WriterConfig.cs

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,15 +9,13 @@ internal WriterConfig(
99
string? producerId,
1010
Codec codec,
1111
int bufferMaxSize,
12-
long? partitionId,
13-
int bufferOverflowRetryTimeoutMs)
12+
long? partitionId)
1413
{
1514
TopicPath = topicPath;
1615
ProducerId = producerId;
1716
Codec = codec;
1817
BufferMaxSize = bufferMaxSize;
1918
PartitionId = partitionId;
20-
BufferOverflowRetryTimeoutMs = bufferOverflowRetryTimeoutMs;
2119
}
2220

2321
public string TopicPath { get; }
@@ -30,17 +28,15 @@ internal WriterConfig(
3028

3129
public long? PartitionId { get; }
3230

33-
public int BufferOverflowRetryTimeoutMs { get; }
34-
3531
public override string ToString()
3632
{
37-
var toString = new StringBuilder().Append("[TopicPath: ").Append(TopicPath);
33+
var toString = new StringBuilder().Append("TopicPath: ").Append(TopicPath);
3834

3935
if (ProducerId != null)
4036
{
4137
toString.Append(", ProducerId: ").Append(ProducerId);
4238
}
4339

44-
return toString.Append(", Codec: ").Append(Codec).Append(']').ToString();
40+
return toString.Append(", Codec: ").Append(Codec).ToString();
4541
}
4642
}

src/Ydb.Sdk/tests/Topic/WriterUnitTests.cs

Lines changed: 18 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -318,7 +318,7 @@ public async Task Initialize_WhenNotSupportedCodec_ThrowWriterExceptionOnWriteAs
318318
}
319319

320320
[Fact]
321-
public async Task WriteAsyncStress_WhenBufferIsOverflow_ThrowWriterExceptionOnBufferOverflow()
321+
public async Task WriteAsyncStress_WhenBufferIsOverflow_ThrowException()
322322
{
323323
const int countBatchSendingSize = 1000;
324324
const int batchTasksSize = 100;
@@ -331,77 +331,51 @@ public async Task WriteAsyncStress_WhenBufferIsOverflow_ThrowWriterExceptionOnBu
331331
var taskSource = new TaskCompletionSource<bool>();
332332
_mockStream.Setup(stream => stream.Write(It.IsAny<FromClient>()))
333333
.Returns(Task.CompletedTask);
334-
var mockNextAsync = _mockStream.SetupSequence(stream => stream.MoveNextAsync())
334+
_mockStream.SetupSequence(stream => stream.MoveNextAsync())
335335
.Returns(new ValueTask<bool>(true))
336336
.Returns(new ValueTask<bool>(taskSource.Task));
337-
var sequentialResult = _mockStream.SetupSequence(stream => stream.Current)
338-
.Returns(new StreamWriteMessage.Types.FromServer
339-
{
340-
InitResponse = new StreamWriteMessage.Types.InitResponse
341-
{ LastSeqNo = 0, PartitionId = 1, SessionId = "SessionId" },
342-
Status = StatusIds.Types.StatusCode.Success
343-
});
344337
using var writer = new WriterBuilder<int>(_mockIDriver.Object, "/topic")
345338
{
346339
ProducerId = "producerId",
347-
BufferMaxSize = bufferSize /* bytes */,
348-
BufferOverflowRetryTimeoutMs = 1_000
340+
BufferMaxSize = bufferSize /* bytes */
349341
}.Build();
350342

351343
for (var attempt = 0; attempt < countBatchSendingSize; attempt++)
352344
{
353345
_testOutputHelper.WriteLine($"Processing attempt {attempt}");
354346
var cts = new CancellationTokenSource();
347+
cts.CancelAfter(10);
355348

356349
var tasks = new List<Task<WriteResult>>();
357-
var serverAck = new StreamWriteMessage.Types.FromServer
358-
{
359-
WriteResponse = new StreamWriteMessage.Types.WriteResponse { PartitionId = 1 },
360-
Status = StatusIds.Types.StatusCode.Success
361-
};
350+
362351
for (var i = 0; i < batchTasksSize; i++)
363352
{
364353
tasks.Add(writer.WriteAsync(100, cts.Token));
365-
serverAck.WriteResponse.Acks.Add(new StreamWriteMessage.Types.WriteResponse.Types.WriteAck
366-
{
367-
SeqNo = bufferSize / messageSize * attempt + i + 1,
368-
Written = new StreamWriteMessage.Types.WriteResponse.Types.WriteAck.Types.Written
369-
{ Offset = i * messageSize + bufferSize * attempt }
370-
});
371354
}
372355

373-
sequentialResult.Returns(() =>
374-
{
375-
// ReSharper disable once AccessToModifiedClosure
376-
Volatile.Write(ref taskSource, new TaskCompletionSource<bool>());
377-
mockNextAsync.Returns(() =>
378-
{
379-
cts.Cancel();
380-
return new ValueTask<bool>(Volatile.Read(ref taskSource).Task);
381-
});
382-
return serverAck;
383-
});
384-
taskSource.SetResult(true);
385-
386-
var countSuccess = 0;
387-
var countErrors = 0;
356+
var countErrorCancel = 0;
357+
var countErrorBuffer = 0;
388358
foreach (var task in tasks)
389359
{
390360
try
391361
{
392-
var res = await task;
393-
countSuccess++;
394-
Assert.Equal(PersistenceStatus.Written, res.Status);
362+
await task;
395363
}
396364
catch (WriterException e)
397365
{
398-
countErrors++;
399-
Assert.Equal("Buffer overflow", e.Message);
366+
if ("Buffer overflow" == e.Message)
367+
{
368+
countErrorBuffer++;
369+
}
370+
else
371+
{
372+
countErrorCancel++;
373+
}
400374
}
401375
}
402376

403-
Assert.Equal(bufferSize / messageSize, countSuccess);
404-
Assert.Equal(batchTasksSize - bufferSize / messageSize, countErrors);
377+
Assert.Equal(bufferSize / messageSize, countErrorCancel);
378+
Assert.Equal(batchTasksSize - bufferSize / messageSize, countErrorBuffer);
405379
}
406380
}
407381

0 commit comments

Comments
 (0)