33using System . Threading . Channels ;
44using Google . Protobuf . WellKnownTypes ;
55using Microsoft . Extensions . Logging ;
6+ using Ydb . Sdk . Ado ;
67using Ydb . Topic ;
78using Ydb . Topic . V1 ;
89using static Ydb . Topic . StreamReadMessage . Types . FromServer ;
@@ -249,6 +250,7 @@ public void Dispose()
249250internal class ReaderSession : TopicSession < MessageFromClient , MessageFromServer >
250251{
251252 private readonly ChannelWriter < InternalBatchMessage > _channelWriter ;
253+ private readonly CancellationTokenSource _lifecycleReaderSessionCts = new ( ) ;
252254
253255 private readonly Channel < CommitSending > _channelCommitSending = Channel . CreateUnbounded < CommitSending > (
254256 new UnboundedChannelOptions
@@ -289,6 +291,18 @@ public async void RunProcessingTopic()
289291 {
290292 await foreach ( var commitSending in _channelCommitSending . Reader . ReadAllAsync ( ) )
291293 {
294+ if ( _partitionSessions . TryGetValue ( commitSending . PartitionSessionId , out var partitionSession ) )
295+ {
296+ partitionSession . RegisterCommitRequest ( commitSending ) ;
297+ }
298+ else
299+ {
300+ Logger . LogWarning (
301+ "Offset range [{OffsetRange}] is requested to be committed, " +
302+ "but PartitionSession[PartitionSessionId={PartitionSessionId}] is already closed" ,
303+ commitSending . OffsetsRange , commitSending . PartitionSessionId ) ;
304+ }
305+
292306 await Stream . Write ( new MessageFromClient
293307 {
294308 CommitOffsetRequest = new StreamReadMessage . Types . CommitOffsetRequest
@@ -309,6 +323,8 @@ await Stream.Write(new MessageFromClient
309323 {
310324 Logger . LogError ( e , "ReaderSession[{SessionId}] have transport error on Commit" , SessionId ) ;
311325
326+ _lifecycleReaderSessionCts . Cancel ( ) ;
327+
312328 ReconnectSession ( ) ;
313329 }
314330 } ) ;
@@ -323,25 +339,10 @@ await Stream.Write(new MessageFromClient
323339 await HandleReadResponse ( ) ;
324340 break ;
325341 case ServerMessageOneofCase . StartPartitionSessionRequest :
326- var startPartitionSessionRequest = Stream . Current . StartPartitionSessionRequest ;
327- var partitionSession = startPartitionSessionRequest . PartitionSession ;
328-
329- _partitionSessions [ partitionSession . PartitionSessionId ] = new PartitionSession (
330- partitionSession . PartitionSessionId ,
331- partitionSession . Path ,
332- partitionSession . PartitionId ,
333- startPartitionSessionRequest . CommittedOffset
334- ) ;
335-
336- Logger . LogInformation ( "ReaderSession[{SessionId}] started PartitionSession[]" , SessionId ) ;
342+ await HandleStartPartitionSessionRequest ( ) ;
337343 break ;
338344 case ServerMessageOneofCase . CommitOffsetResponse :
339- // foreach (var offset in Stream.Current.CommitOffsetResponse.PartitionsCommittedOffsets)
340- // {
341- // offset.CommittedOffset;
342- // offset.PartitionSessionId;
343- // }
344-
345+ HandleCommitOffsetResponse ( ) ;
345346 break ;
346347 case ServerMessageOneofCase . PartitionSessionStatusResponse :
347348 case ServerMessageOneofCase . UpdateTokenResponse :
@@ -361,91 +362,179 @@ await Stream.Write(new MessageFromClient
361362 }
362363 finally
363364 {
365+ _lifecycleReaderSessionCts . Cancel ( ) ;
366+
364367 ReconnectSession ( ) ;
365368 }
366369 }
367370
368- public async Task < TopicPartitionOffset > CommitOffsetRange ( OffsetsRange offsetsRange , long partitionId )
371+ private async Task HandleStartPartitionSessionRequest ( )
372+ {
373+ var startPartitionSessionRequest = Stream . Current . StartPartitionSessionRequest ;
374+ var partitionSession = startPartitionSessionRequest . PartitionSession ;
375+ _partitionSessions [ partitionSession . PartitionSessionId ] = new PartitionSession (
376+ Logger ,
377+ partitionSession . PartitionSessionId ,
378+ partitionSession . Path ,
379+ partitionSession . PartitionId ,
380+ startPartitionSessionRequest . CommittedOffset
381+ ) ;
382+
383+ Logger . LogInformation (
384+ "ReaderSession[{SessionId}] started PartitionSession[PartitionSessionId={PartitionSessionId}, " +
385+ "Path={Path}, PartitionId={PartitionId}, CommittedOffset={CommittedOffset}]" ,
386+ SessionId , partitionSession . PartitionSessionId , partitionSession . Path ,
387+ partitionSession . PartitionId , startPartitionSessionRequest . CommittedOffset ) ;
388+
389+ await Stream . Write ( new MessageFromClient
390+ {
391+ StartPartitionSessionResponse = new StreamReadMessage . Types . StartPartitionSessionResponse
392+ {
393+ PartitionSessionId = partitionSession . PartitionSessionId
394+ /* Simple client doesn't have read_offset or commit_offset settings */
395+ }
396+ } ) ;
397+ }
398+
399+ private void HandleCommitOffsetResponse ( )
400+ {
401+ foreach ( var partitionsCommittedOffset in Stream . Current . CommitOffsetResponse . PartitionsCommittedOffsets )
402+ {
403+ if ( _partitionSessions . TryGetValue ( partitionsCommittedOffset . PartitionSessionId ,
404+ out var partitionSession ) )
405+ {
406+ partitionSession . HandleCommitedOffset ( partitionSession . CommitedOffset ) ;
407+ }
408+ else
409+ {
410+ Logger . LogError (
411+ "Received CommitOffsetResponse[CommittedOffset={CommittedOffset}] " +
412+ "for unknown PartitionSession[PartitionSessionId={PartitionSessionId}]" ,
413+ partitionsCommittedOffset . CommittedOffset , partitionsCommittedOffset . PartitionSessionId ) ;
414+ }
415+ }
416+ }
417+
418+ public async Task CommitOffsetRange ( OffsetsRange offsetsRange , long partitionId )
369419 {
370- var tcsCommit = new TaskCompletionSource < TopicPartitionOffset > ( ) ;
420+ var tcsCommit = new TaskCompletionSource ( ) ;
421+
422+ await using var register = _lifecycleReaderSessionCts . Token . Register ( ( ) => tcsCommit
423+ . TrySetException ( new YdbException ( $ "ReaderSession[{ SessionId } ] was deactivated") ) ) ;
371424
372425 await _channelCommitSending . Writer . WriteAsync ( new CommitSending ( offsetsRange , partitionId , tcsCommit ) ) ;
373426
374- return await tcsCommit . Task ;
427+ await tcsCommit . Task ;
375428 }
376429
377430 private async Task HandleReadResponse ( )
378431 {
379432 var readResponse = Stream . Current . ReadResponse ;
380433
381434 Interlocked . Add ( ref _memoryUsageMaxBytes , - readResponse . BytesSize ) ;
382- var readResponsesInBatch = 0 ;
383435
384- foreach ( var partition in readResponse . PartitionData )
436+ var bytesSize = readResponse . BytesSize ;
437+ var partitionCount = readResponse . PartitionData . Count ;
438+
439+ for ( var partitionIndex = 0 ; partitionIndex < partitionCount ; partitionIndex ++ )
385440 {
441+ var partition = readResponse . PartitionData [ partitionIndex ] ;
386442 var partitionSessionId = partition . PartitionSessionId ;
443+ var approximatelyPartitionBytesSize = CalculateApproximatelyBytesSize (
444+ bytesSize : bytesSize ,
445+ countParts : partitionCount ,
446+ currentIndex : partitionIndex
447+ ) ;
387448
388449 if ( _partitionSessions . TryGetValue ( partitionSessionId , out var partitionSession ) )
389450 {
390451 var startOffsetBatch = partitionSession . CommitedOffset ;
391452 var endOffsetBatch = partitionSession . CommitedOffset ;
392453
454+ var batchCount = partition . Batches . Count ;
393455 var batch = partition . Batches ;
394- for ( var i = 0 ; i < partition . Batches . Count ; i ++ )
456+
457+ for ( var batchIndex = 0 ; batchIndex < batchCount ; batchIndex ++ )
395458 {
459+ var approximatelyBatchBytesSize = CalculateApproximatelyBytesSize (
460+ bytesSize : approximatelyPartitionBytesSize ,
461+ countParts : batchCount ,
462+ currentIndex : batchIndex
463+ ) ;
464+
396465 var internalBatchMessages = new Queue < InternalMessage > ( ) ;
397- var actuallySummaryBatchPayload = 0 ;
466+ var messagesCount = batch [ batchIndex ] . MessageData . Count ;
398467
399- foreach ( var messageData in batch [ i ] . MessageData )
468+ for ( var messageIndex = 0 ; messageIndex < messagesCount ; messageIndex ++ )
400469 {
401- actuallySummaryBatchPayload += messageData . Data . Length ;
402-
403- internalBatchMessages . Enqueue ( new InternalMessage (
404- data : messageData . Data ,
405- topic : partitionSession . TopicPath ,
406- partitionId : partitionSession . PartitionId ,
407- producerId : batch [ i ] . ProducerId ,
408- offsetsRange : new OffsetsRange
409- { Start = partitionSession . CommitedOffset , End = messageData . Offset } ,
410- createdAt : messageData . CreatedAt ,
411- metadataItems : messageData . MetadataItems ,
412- 0
413- ) ) ;
414-
415- partitionSession . CommitedOffset = endOffsetBatch = messageData . Offset + 1 ;
416- }
470+ var messageData = batch [ batchIndex ] . MessageData [ messageIndex ] ;
471+
472+ internalBatchMessages . Enqueue (
473+ new InternalMessage (
474+ data : messageData . Data ,
475+ topic : partitionSession . TopicPath ,
476+ partitionId : partitionSession . PartitionId ,
477+ producerId : batch [ batchIndex ] . ProducerId ,
478+ offsetsRange : new OffsetsRange
479+ { Start = partitionSession . PrevEndOffsetMessage , End = messageData . Offset } ,
480+ createdAt : messageData . CreatedAt ,
481+ metadataItems : messageData . MetadataItems ,
482+ CalculateApproximatelyBytesSize (
483+ bytesSize : approximatelyBatchBytesSize ,
484+ countParts : messagesCount ,
485+ currentIndex : messageIndex
486+ )
487+ )
488+ ) ;
417489
418- readResponsesInBatch -= actuallySummaryBatchPayload ;
490+ partitionSession . PrevEndOffsetMessage = endOffsetBatch = messageData . Offset + 1 ;
491+ }
419492
420- await _channelWriter . WriteAsync ( new InternalBatchMessage (
421- new OffsetsRange { Start = startOffsetBatch , End = endOffsetBatch } ,
422- internalBatchMessages ,
423- this )
493+ await _channelWriter . WriteAsync (
494+ new InternalBatchMessage (
495+ new OffsetsRange { Start = startOffsetBatch , End = endOffsetBatch } ,
496+ internalBatchMessages ,
497+ this ,
498+ approximatelyBatchBytesSize
499+ )
424500 ) ;
425501 }
426502 }
427503 else
428504 {
429- Logger . LogCritical (
505+ Logger . LogError (
430506 "ReaderSession[{SessionId}]: received PartitionData for unknown(closed?) " +
431507 "PartitionSession[{PartitionSessionId}], all messages were skipped!" ,
432508 SessionId , partitionSessionId ) ;
509+
510+ Interlocked . Add ( ref _memoryUsageMaxBytes , approximatelyPartitionBytesSize ) ;
433511 }
434512 }
435513 }
436514
515+ private static long CalculateApproximatelyBytesSize ( long bytesSize , int countParts , int currentIndex )
516+ {
517+ return bytesSize / countParts + currentIndex == countParts - 1 ? bytesSize % countParts : 0 ;
518+ }
519+
437520 private class PartitionSession
438521 {
522+ private readonly ILogger _logger ;
523+ private readonly ConcurrentQueue < ( long EndOffset , TaskCompletionSource TcsCommit ) > _waitCommitMessages = new ( ) ;
524+
439525 public PartitionSession (
526+ ILogger logger ,
440527 long partitionSessionId ,
441528 string topicPath ,
442529 long partitionId ,
443530 long commitedOffset )
444531 {
532+ _logger = logger ;
445533 PartitionSessionId = partitionSessionId ;
446534 TopicPath = topicPath ;
447535 PartitionId = partitionId ;
448536 CommitedOffset = commitedOffset ;
537+ PrevEndOffsetMessage = commitedOffset ;
449538 }
450539
451540 // Identifier of partition session. Unique inside one RPC call.
@@ -459,5 +548,40 @@ public PartitionSession(
459548
460549 // Each offset up to and including (committed_offset - 1) was fully processed.
461550 internal long CommitedOffset { get ; set ; }
551+
552+ internal long PrevEndOffsetMessage { get ; set ; }
553+
554+ internal void RegisterCommitRequest ( CommitSending commitSending )
555+ {
556+ var endOffset = commitSending . OffsetsRange . End ;
557+
558+ if ( endOffset <= CommitedOffset )
559+ {
560+ commitSending . TcsCommit . SetResult ( ) ;
561+ }
562+ else
563+ {
564+ _waitCommitMessages . Enqueue ( ( endOffset , commitSending . TcsCommit ) ) ;
565+ }
566+ }
567+
568+ internal void HandleCommitedOffset ( long commitedOffset )
569+ {
570+ if ( CommitedOffset >= commitedOffset )
571+ {
572+ _logger . LogError (
573+ "Received CommitOffsetResponse[CommitedOffset={CommitedOffset}] " +
574+ "which is not greater than previous committed offset: {PrevCommitedOffset}" ,
575+ commitedOffset , CommitedOffset ) ;
576+ }
577+
578+ CommitedOffset = commitedOffset ;
579+
580+ while ( _waitCommitMessages . TryPeek ( out var waitCommitTcs ) && waitCommitTcs . EndOffset <= commitedOffset )
581+ {
582+ _waitCommitMessages . TryDequeue ( out _ ) ;
583+ waitCommitTcs . TcsCommit . SetResult ( ) ;
584+ }
585+ }
462586 }
463587}
0 commit comments