Skip to content

Commit a95c8d8

Browse files
feat: renaming tcs
1 parent 6507193 commit a95c8d8

File tree

1 file changed

+12
-12
lines changed
  • src/Ydb.Sdk/src/Services/Topic/Writer

1 file changed

+12
-12
lines changed

src/Ydb.Sdk/src/Services/Topic/Writer/Writer.cs

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ internal class Writer<TValue> : IWriter<TValue>
2626
private readonly ConcurrentQueue<MessageSending> _inFlightMessages = new();
2727
private readonly CancellationTokenSource _disposeTokenSource = new();
2828

29-
private volatile TaskCompletionSource _taskWakeUpCompletionSource = new();
29+
private volatile TaskCompletionSource _tcsWakeUp = new();
3030
private volatile IWriteSession _session = new NotStartedWriterSession("Session not started!");
3131

3232
private int _limitBufferMaxSize;
@@ -49,9 +49,9 @@ public Task<WriteResult> WriteAsync(TValue data, CancellationToken cancellationT
4949

5050
public async Task<WriteResult> WriteAsync(Message<TValue> message, CancellationToken cancellationToken)
5151
{
52-
TaskCompletionSource<WriteResult> completeTask = new();
52+
TaskCompletionSource<WriteResult> tcs = new();
5353
cancellationToken.Register(
54-
() => completeTask.TrySetCanceled(cancellationToken),
54+
() => tcs.TrySetCanceled(cancellationToken),
5555
useSynchronizationContext: false
5656
);
5757

@@ -80,7 +80,7 @@ public async Task<WriteResult> WriteAsync(Message<TValue> message, CancellationT
8080
if (Interlocked.CompareExchange(ref _limitBufferMaxSize,
8181
curLimitBufferSize - data.Length, curLimitBufferSize) == curLimitBufferSize)
8282
{
83-
_toSendBuffer.Enqueue(new MessageSending(messageData, completeTask));
83+
_toSendBuffer.Enqueue(new MessageSending(messageData, tcs));
8484
WakeUpWorker();
8585

8686
break;
@@ -99,7 +99,7 @@ public async Task<WriteResult> WriteAsync(Message<TValue> message, CancellationT
9999

100100
try
101101
{
102-
var writeResult = await completeTask.Task;
102+
var writeResult = await tcs.Task;
103103

104104
return writeResult;
105105
}
@@ -115,16 +115,16 @@ private async void StartWriteWorker()
115115

116116
while (!_disposeTokenSource.Token.IsCancellationRequested)
117117
{
118-
await _taskWakeUpCompletionSource.Task;
119-
_taskWakeUpCompletionSource = new TaskCompletionSource();
118+
await _tcsWakeUp.Task;
119+
_tcsWakeUp = new TaskCompletionSource();
120120

121121
await _session.Write(_toSendBuffer);
122122
}
123123
}
124124

125125
private void WakeUpWorker()
126126
{
127-
_taskWakeUpCompletionSource.TrySetResult();
127+
_tcsWakeUp.TrySetResult();
128128
}
129129

130130
private async Task Initialize()
@@ -252,7 +252,7 @@ public void Dispose()
252252
}
253253
}
254254

255-
internal record MessageSending(MessageData MessageData, TaskCompletionSource<WriteResult> TaskCompletionSource);
255+
internal record MessageSending(MessageData MessageData, TaskCompletionSource<WriteResult> Tcs);
256256

257257
internal interface IWriteSession : IDisposable
258258
{
@@ -282,7 +282,7 @@ public Task Write(ConcurrentQueue<MessageSending> toSendBuffer)
282282
{
283283
while (toSendBuffer.TryDequeue(out var messageSending))
284284
{
285-
messageSending.TaskCompletionSource.TrySetException(_reasonException);
285+
messageSending.Tcs.TrySetException(_reasonException);
286286
}
287287

288288
return Task.CompletedTask;
@@ -401,12 +401,12 @@ Completing task on exception...
401401
Client SeqNo: {SeqNo}, WriteAck: {WriteAck}",
402402
messageFromClient.MessageData.SeqNo, ack);
403403

404-
messageFromClient.TaskCompletionSource.TrySetException(new WriterException(
404+
messageFromClient.Tcs.TrySetException(new WriterException(
405405
$"Client SeqNo[{messageFromClient.MessageData.SeqNo}] is less then server's WriteAck[{ack}]"));
406406
}
407407
else
408408
{
409-
messageFromClient.TaskCompletionSource.TrySetResult(new WriteResult(ack));
409+
messageFromClient.Tcs.TrySetResult(new WriteResult(ack));
410410
}
411411

412412
_inFlightMessages.TryDequeue(out _); // Dequeue

0 commit comments

Comments
 (0)