Skip to content

Commit 821e9b6

Browse files
first commit
1 parent 8516a99 commit 821e9b6

File tree

2 files changed

+240
-81
lines changed

2 files changed

+240
-81
lines changed

src/Ydb.Sdk/src/Services/Topic/Reader/Reader.cs

Lines changed: 120 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -254,14 +254,14 @@ internal class ReaderSession : TopicSession<MessageFromClient, MessageFromServer
254254
private readonly ChannelWriter<InternalBatchMessage> _channelWriter;
255255
private readonly CancellationTokenSource _lifecycleReaderSessionCts = new();
256256

257-
private readonly Channel<CommitSending> _channelCommitSending = Channel.CreateUnbounded<CommitSending>(
258-
new UnboundedChannelOptions
259-
{
260-
SingleReader = true,
261-
SingleWriter = true,
262-
AllowSynchronousContinuations = false
263-
}
264-
);
257+
private readonly Channel<MessageFromClient> _channelFromClientMessageSending =
258+
Channel.CreateUnbounded<MessageFromClient>(
259+
new UnboundedChannelOptions
260+
{
261+
SingleReader = true,
262+
AllowSynchronousContinuations = false
263+
}
264+
);
265265

266266
private readonly ConcurrentDictionary<long, PartitionSession> _partitionSessions = new();
267267

@@ -292,38 +292,9 @@ public async void RunProcessingTopic()
292292
{
293293
try
294294
{
295-
await foreach (var commitSending in _channelCommitSending.Reader.ReadAllAsync())
295+
await foreach (var fromClientMessage in _channelFromClientMessageSending.Reader.ReadAllAsync())
296296
{
297-
if (_partitionSessions.TryGetValue(commitSending.PartitionSessionId, out var partitionSession))
298-
{
299-
partitionSession.RegisterCommitRequest(commitSending);
300-
}
301-
else
302-
{
303-
Logger.LogWarning(
304-
"Offset range [{OffsetRange}] is requested to be committed, " +
305-
"but PartitionSession[PartitionSessionId={PartitionSessionId}] is already closed",
306-
commitSending.OffsetsRange, commitSending.PartitionSessionId);
307-
308-
commitSending.TcsCommit.TrySetException(new ReaderException("AD"));
309-
310-
continue;
311-
}
312-
313-
await Stream.Write(new MessageFromClient
314-
{
315-
CommitOffsetRequest = new StreamReadMessage.Types.CommitOffsetRequest
316-
{
317-
CommitOffsets =
318-
{
319-
new StreamReadMessage.Types.CommitOffsetRequest.Types.PartitionCommitOffset
320-
{
321-
Offsets = { commitSending.OffsetsRange },
322-
PartitionSessionId = commitSending.PartitionSessionId
323-
}
324-
}
325-
}
326-
});
297+
await Stream.Write(fromClientMessage);
327298
}
328299
}
329300
catch (Driver.TransportException e)
@@ -338,26 +309,28 @@ await Stream.Write(new MessageFromClient
338309

339310
try
340311
{
312+
long freeBytesSize = 0;
313+
341314
while (await Stream.MoveNextAsync())
342315
{
343-
var freeBytesSize = 0;
316+
var fromServerMessage = Stream.Current;
344317

345-
346-
switch (Stream.Current.ServerMessageCase)
318+
switch (fromServerMessage.ServerMessageCase)
347319
{
348320
case ServerMessageOneofCase.ReadResponse:
349-
await HandleReadResponse();
321+
await HandleReadResponse(fromServerMessage.ReadResponse);
350322
break;
351323
case ServerMessageOneofCase.StartPartitionSessionRequest:
352-
await HandleStartPartitionSessionRequest();
324+
await HandleStartPartitionSessionRequest(fromServerMessage.StartPartitionSessionRequest);
353325
break;
354326
case ServerMessageOneofCase.CommitOffsetResponse:
355-
freeBytesSize += HandleCommitOffsetResponse();
327+
freeBytesSize += HandleCommitOffsetResponse(fromServerMessage.CommitOffsetResponse);
356328
break;
357329
case ServerMessageOneofCase.PartitionSessionStatusResponse:
358330
case ServerMessageOneofCase.UpdateTokenResponse:
359331
case ServerMessageOneofCase.StopPartitionSessionRequest:
360-
freeBytesSize += StopPartitionSessionRequest();
332+
freeBytesSize +=
333+
await StopPartitionSessionRequest(fromServerMessage.StopPartitionSessionRequest);
361334
break;
362335
case ServerMessageOneofCase.InitResponse:
363336
case ServerMessageOneofCase.None:
@@ -366,12 +339,16 @@ await Stream.Write(new MessageFromClient
366339
throw new ArgumentOutOfRangeException();
367340
}
368341

369-
if (freeBytesSize >= FreeBufferCoefficient * _readerConfig.MemoryUsageMaxBytes)
342+
if (freeBytesSize < FreeBufferCoefficient * _readerConfig.MemoryUsageMaxBytes)
370343
{
371-
// await Stream.Write();
372-
373-
Interlocked.Add(ref _memoryUsageMaxBytes, freeBytesSize);
344+
continue;
374345
}
346+
347+
await _channelFromClientMessageSending.Writer.WriteAsync(new MessageFromClient
348+
{ ReadRequest = new StreamReadMessage.Types.ReadRequest { BytesSize = freeBytesSize } });
349+
350+
Interlocked.Add(ref _memoryUsageMaxBytes, freeBytesSize);
351+
freeBytesSize = 0;
375352
}
376353
}
377354
catch (Driver.TransportException e)
@@ -387,9 +364,9 @@ await Stream.Write(new MessageFromClient
387364
}
388365
}
389366

390-
private async Task HandleStartPartitionSessionRequest()
367+
private async Task HandleStartPartitionSessionRequest(
368+
StreamReadMessage.Types.StartPartitionSessionRequest startPartitionSessionRequest)
391369
{
392-
var startPartitionSessionRequest = Stream.Current.StartPartitionSessionRequest;
393370
var partitionSession = startPartitionSessionRequest.PartitionSession;
394371
_partitionSessions[partitionSession.PartitionSessionId] = new PartitionSession(
395372
Logger,
@@ -401,11 +378,11 @@ private async Task HandleStartPartitionSessionRequest()
401378

402379
Logger.LogInformation(
403380
"ReaderSession[{SessionId}] started PartitionSession[PartitionSessionId={PartitionSessionId}, " +
404-
"Path={Path}, PartitionId={PartitionId}, CommittedOffset={CommittedOffset}]",
381+
"Path=\"{Path}\", PartitionId={PartitionId}, CommittedOffset={CommittedOffset}]",
405382
SessionId, partitionSession.PartitionSessionId, partitionSession.Path,
406383
partitionSession.PartitionId, startPartitionSessionRequest.CommittedOffset);
407384

408-
await Stream.Write(new MessageFromClient
385+
await _channelFromClientMessageSending.Writer.WriteAsync(new MessageFromClient
409386
{
410387
StartPartitionSessionResponse = new StreamReadMessage.Types.StartPartitionSessionResponse
411388
{
@@ -415,14 +392,16 @@ await Stream.Write(new MessageFromClient
415392
});
416393
}
417394

418-
private int HandleCommitOffsetResponse()
395+
private long HandleCommitOffsetResponse(StreamReadMessage.Types.CommitOffsetResponse commitOffsetResponse)
419396
{
420-
foreach (var partitionsCommittedOffset in Stream.Current.CommitOffsetResponse.PartitionsCommittedOffsets)
397+
long calculateCommitedBytes = 0;
398+
399+
foreach (var partitionsCommittedOffset in commitOffsetResponse.PartitionsCommittedOffsets)
421400
{
422401
if (_partitionSessions.TryGetValue(partitionsCommittedOffset.PartitionSessionId,
423402
out var partitionSession))
424403
{
425-
partitionSession.HandleCommitedOffset(partitionSession.CommitedOffset);
404+
calculateCommitedBytes += partitionSession.HandleCommitedOffset(partitionSession.CommitedOffset);
426405
}
427406
else
428407
{
@@ -433,12 +412,31 @@ private int HandleCommitOffsetResponse()
433412
}
434413
}
435414

436-
throw new NotImplementedException();
415+
return calculateCommitedBytes;
437416
}
438417

439-
private int StopPartitionSessionRequest()
418+
private async Task<long> StopPartitionSessionRequest(
419+
StreamReadMessage.Types.StopPartitionSessionRequest stopPartitionSessionRequest)
440420
{
441-
throw new NotImplementedException();
421+
// stopPartitionSessionRequest.Graceful
422+
if (_partitionSessions.TryRemove(stopPartitionSessionRequest.PartitionSessionId, out var partitionSession))
423+
{
424+
await _channelFromClientMessageSending.Writer.WriteAsync(
425+
new MessageFromClient
426+
{
427+
StopPartitionSessionResponse = new StreamReadMessage.Types.StopPartitionSessionResponse
428+
{
429+
PartitionSessionId = partitionSession.PartitionSessionId
430+
}
431+
});
432+
433+
return partitionSession.Stop();
434+
}
435+
436+
Logger.LogError("Received StopPartitionSessionRequest[PartitionSessionId={}] for unknown PartitionSession",
437+
stopPartitionSessionRequest.PartitionSessionId);
438+
439+
return 0;
442440
}
443441

444442
public async Task CommitOffsetRange(OffsetsRange offsetsRange, long partitionId, long approximatelyBytesSize)
@@ -448,22 +446,44 @@ public async Task CommitOffsetRange(OffsetsRange offsetsRange, long partitionId,
448446
await using var register = _lifecycleReaderSessionCts.Token.Register(() => tcsCommit
449447
.TrySetException(new YdbException($"ReaderSession[{SessionId}] was deactivated")));
450448

451-
await _channelCommitSending.Writer.WriteAsync(
452-
new CommitSending(
453-
offsetsRange,
454-
partitionId,
455-
tcsCommit,
456-
approximatelyBytesSize
457-
)
458-
);
449+
var commitSending = new CommitSending(offsetsRange, partitionId, tcsCommit, approximatelyBytesSize);
450+
451+
if (_partitionSessions.TryGetValue(commitSending.PartitionSessionId, out var partitionSession))
452+
{
453+
partitionSession.RegisterCommitRequest(commitSending);
454+
455+
await _channelFromClientMessageSending.Writer.WriteAsync(new MessageFromClient
456+
{
457+
CommitOffsetRequest = new StreamReadMessage.Types.CommitOffsetRequest
458+
{
459+
CommitOffsets =
460+
{
461+
new StreamReadMessage.Types.CommitOffsetRequest.Types.PartitionCommitOffset
462+
{
463+
Offsets = { commitSending.OffsetsRange },
464+
PartitionSessionId = commitSending.PartitionSessionId
465+
}
466+
}
467+
}
468+
}
469+
);
470+
}
471+
else
472+
{
473+
Logger.LogWarning("Offset range [{OffsetRange}] is requested to be committed, " +
474+
"but PartitionSession[PartitionSessionId={PartitionSessionId}] is already closed",
475+
commitSending.OffsetsRange, commitSending.PartitionSessionId);
476+
477+
commitSending.TcsCommit.TrySetException(new ReaderException("TODO"));
478+
479+
Interlocked.Add(ref _memoryUsageMaxBytes, commitSending.ApproximatelyBytesSize);
480+
}
459481

460482
await tcsCommit.Task;
461483
}
462484

463-
private async Task HandleReadResponse()
485+
private async Task HandleReadResponse(StreamReadMessage.Types.ReadResponse readResponse)
464486
{
465-
var readResponse = Stream.Current.ReadResponse;
466-
467487
Interlocked.Add(ref _memoryUsageMaxBytes, -readResponse.BytesSize);
468488

469489
var bytesSize = readResponse.BytesSize;
@@ -547,13 +567,15 @@ await _channelWriter.WriteAsync(
547567

548568
private static long CalculateApproximatelyBytesSize(long bytesSize, int countParts, int currentIndex)
549569
{
550-
return bytesSize / countParts + currentIndex == countParts - 1 ? bytesSize % countParts : 0;
570+
return bytesSize / countParts + (currentIndex == countParts - 1 ? bytesSize % countParts : 0);
551571
}
552572

553573
private class PartitionSession
554574
{
555575
private readonly ILogger _logger;
556-
private readonly ConcurrentQueue<(long EndOffset, TaskCompletionSource TcsCommit)> _waitCommitMessages = new();
576+
577+
private readonly ConcurrentQueue<(CommitSending СommitSending, TaskCompletionSource TcsCommit)>
578+
_waitCommitMessages = new();
557579

558580
public PartitionSession(
559581
ILogger logger,
@@ -566,8 +588,8 @@ public PartitionSession(
566588
PartitionSessionId = partitionSessionId;
567589
TopicPath = topicPath;
568590
PartitionId = partitionId;
569-
CommitedOffset = commitedOffset;
570591
PrevEndOffsetMessage = commitedOffset;
592+
CommitedOffset = commitedOffset;
571593
}
572594

573595
// Identifier of partition session. Unique inside one RPC call.
@@ -579,11 +601,11 @@ public PartitionSession(
579601
// Partition identifier
580602
internal long PartitionId { get; }
581603

582-
// Each offset up to and including (committed_offset - 1) was fully processed.
583-
internal long CommitedOffset { get; set; }
584-
585604
internal long PrevEndOffsetMessage { get; set; }
586605

606+
// Each offset up to and including (committed_offset - 1) was fully processed.
607+
internal long CommitedOffset { get; private set; }
608+
587609
internal void RegisterCommitRequest(CommitSending commitSending)
588610
{
589611
var endOffset = commitSending.OffsetsRange.End;
@@ -594,11 +616,11 @@ internal void RegisterCommitRequest(CommitSending commitSending)
594616
}
595617
else
596618
{
597-
_waitCommitMessages.Enqueue((endOffset, commitSending.TcsCommit));
619+
_waitCommitMessages.Enqueue((commitSending, commitSending.TcsCommit));
598620
}
599621
}
600622

601-
internal void HandleCommitedOffset(long commitedOffset)
623+
internal long HandleCommitedOffset(long commitedOffset)
602624
{
603625
if (CommitedOffset >= commitedOffset)
604626
{
@@ -609,14 +631,31 @@ internal void HandleCommitedOffset(long commitedOffset)
609631
}
610632

611633
CommitedOffset = commitedOffset;
634+
long releaseCommitedBytes = 0;
612635

613-
while (_waitCommitMessages.TryPeek(out var waitCommitTcs) && waitCommitTcs.EndOffset <= commitedOffset)
636+
while (_waitCommitMessages.TryPeek(out var waitCommitTcs) &&
637+
waitCommitTcs.СommitSending.OffsetsRange.End <= commitedOffset)
614638
{
615639
_waitCommitMessages.TryDequeue(out _);
616640
waitCommitTcs.TcsCommit.SetResult();
641+
642+
releaseCommitedBytes += waitCommitTcs.СommitSending.ApproximatelyBytesSize;
643+
}
644+
645+
return releaseCommitedBytes;
646+
}
647+
648+
internal long Stop()
649+
{
650+
long releaseCommitedBytes = 0;
651+
while (_waitCommitMessages.TryDequeue(out var commitSending))
652+
{
653+
commitSending.TcsCommit.TrySetException(new ReaderException("TODO"));
654+
655+
releaseCommitedBytes += commitSending.СommitSending.ApproximatelyBytesSize;
617656
}
618657

619-
throw new NotImplementedException();
658+
return releaseCommitedBytes;
620659
}
621660
}
622661
}

0 commit comments

Comments
 (0)