11using System . Collections . Concurrent ;
2+ using System . Collections . Immutable ;
23using System . Net . Sockets ;
34using System . Threading . Channels ;
45using Google . Protobuf ;
@@ -42,14 +43,54 @@ internal Reader(IDriver driver, ReaderConfig config, IDeserializer<TValue> deser
4243 _ = Initialize ( ) ;
4344 }
4445
45- public ValueTask < Message < TValue > > ReadAsync ( CancellationToken cancellationToken = default )
46+ public async ValueTask < Message < TValue > > ReadAsync ( CancellationToken cancellationToken = default )
4647 {
47- throw new NotImplementedException ( ) ;
48+ while ( await _receivedMessagesChannel . Reader . WaitToReadAsync ( cancellationToken ) )
49+ {
50+ if ( _receivedMessagesChannel . Reader . TryPeek ( out var batchInternalMessage ) )
51+ {
52+ if ( batchInternalMessage . InternalMessages . TryDequeue ( out var message ) )
53+ {
54+ return message . ToPublicMessage ( _deserializer , batchInternalMessage . ReaderSession ) ;
55+ }
56+
57+ if ( ! _receivedMessagesChannel . Reader . TryRead ( out _ ) )
58+ {
59+ throw new ReaderException ( "Detect race condition on ReadAsync operation" ) ;
60+ }
61+ }
62+ else
63+ {
64+ throw new ReaderException ( "Detect race condition on ReadAsync operation" ) ;
65+ }
66+ }
67+
68+ throw new ReaderException ( "Reader is disposed" ) ;
4869 }
4970
50- public ValueTask < IReadOnlyList < Message < TValue > > > ReadBatchAsync ( CancellationToken cancellationToken = default )
71+ public async ValueTask < BatchMessage < TValue > > ReadBatchAsync ( CancellationToken cancellationToken = default )
5172 {
52- throw new NotImplementedException ( ) ;
73+ while ( await _receivedMessagesChannel . Reader . WaitToReadAsync ( cancellationToken ) )
74+ {
75+ if ( ! _receivedMessagesChannel . Reader . TryRead ( out var batchInternalMessage ) )
76+ {
77+ throw new ReaderException ( "Detect race condition on ReadBatchAsync operation" ) ;
78+ }
79+
80+ if ( batchInternalMessage . InternalMessages . Count == 0 )
81+ {
82+ continue ;
83+ }
84+
85+ return new BatchMessage < TValue > (
86+ batchInternalMessage . InternalMessages
87+ . Select ( message => message . ToPublicMessage ( _deserializer , batchInternalMessage . ReaderSession ) )
88+ . ToImmutableArray ( ) ,
89+ batchInternalMessage . ReaderSession
90+ ) ;
91+ }
92+
93+ throw new ReaderException ( "Reader is disposed" ) ;
5394 }
5495
5596 private async Task Initialize ( )
@@ -177,7 +218,7 @@ public void Dispose()
177218 try
178219 {
179220 _disposeCts . Cancel ( ) ;
180-
221+
181222 _readerSession ? . Dispose ( ) ;
182223 }
183224 finally
@@ -212,6 +253,7 @@ internal class ReaderSession : TopicSession<MessageFromClient, MessageFromServer
212253{
213254 private readonly ChannelWriter < InternalBatchMessage > _channelWriter ;
214255 private readonly ConcurrentDictionary < long , PartitionSession > _partitionSessions = new ( ) ;
256+ private readonly ConcurrentQueue < TaskCompletionSource > _tcsOnCommitedMessages = new ( ) ;
215257
216258 private long _memoryUsageMaxBytes ;
217259
@@ -258,6 +300,11 @@ public async void RunProcessingTopic()
258300 }
259301 }
260302
303+ public async Task CommitOffsetRange ( OffsetsRange offsetsRange )
304+ {
305+ throw new NotImplementedException ( ) ;
306+ }
307+
261308 private async Task HandleReadResponse ( )
262309 {
263310 var readResponse = Stream . Current . ReadResponse ;
@@ -277,20 +324,25 @@ private async Task HandleReadResponse()
277324 {
278325 foreach ( var messageData in batch . MessageData )
279326 {
280- internalBatchMessages . Enqueue (
281- new InternalMessage (
282- messageData . Data ,
283- new OffsetsRange { Start = partitionSession . CommitedOffset , End = messageData . Offset } ,
284- messageData . CreatedAt ,
285- messageData . MetadataItems
286- ) ) ;
327+ internalBatchMessages . Enqueue ( new InternalMessage (
328+ data : messageData . Data ,
329+ topic : partitionSession . TopicPath ,
330+ partitionId : partitionSession . PartitionId ,
331+ producerId : batch . ProducerId ,
332+ offsetsRange : new OffsetsRange
333+ { Start = partitionSession . CommitedOffset , End = messageData . Offset } ,
334+ createdAt : messageData . CreatedAt ,
335+ metadataItems : messageData . MetadataItems
336+ ) ) ;
287337
288338 partitionSession . CommitedOffset = endOffsetBatch = messageData . Offset + 1 ;
289339 }
290340 }
291341
292342 await _channelWriter . WriteAsync ( new InternalBatchMessage (
293- new OffsetsRange { Start = startOffsetBatch , End = endOffsetBatch } , internalBatchMessages )
343+ new OffsetsRange { Start = startOffsetBatch , End = endOffsetBatch } ,
344+ internalBatchMessages ,
345+ this )
294346 ) ;
295347 }
296348 else
@@ -348,34 +400,66 @@ internal class InternalMessage
348400{
349401 public InternalMessage (
350402 ByteString data ,
403+ string topic ,
404+ long partitionId ,
405+ string producerId ,
351406 OffsetsRange offsetsRange ,
352- Timestamp createAt ,
407+ Timestamp createdAt ,
353408 RepeatedField < MetadataItem > metadataItems )
354409 {
355410 Data = data ;
411+ Topic = topic ;
412+ PartitionId = partitionId ;
413+ ProducerId = producerId ;
356414 OffsetsRange = offsetsRange ;
357- CreateAt = createAt ;
415+ CreatedAt = createdAt ;
358416 MetadataItems = metadataItems ;
359417 }
360418
361- public ByteString Data { get ; }
419+ private ByteString Data { get ; }
420+
421+ private string Topic { get ; }
422+
423+ private long PartitionId { get ; }
424+
425+ private string ProducerId { get ; }
426+
427+ private OffsetsRange OffsetsRange { get ; }
362428
363- public OffsetsRange OffsetsRange { get ; }
429+ private Timestamp CreatedAt { get ; }
364430
365- public Timestamp CreateAt { get ; }
431+ private RepeatedField < MetadataItem > MetadataItems { get ; }
366432
367- public RepeatedField < MetadataItem > MetadataItems { get ; }
433+ internal Message < TValue > ToPublicMessage < TValue > ( IDeserializer < TValue > deserializer , ReaderSession readerSession )
434+ {
435+ return new Message < TValue > (
436+ data : deserializer . Deserialize ( Data . ToByteArray ( ) ) ,
437+ topic : Topic ,
438+ partitionId : PartitionId ,
439+ producerId : ProducerId ,
440+ createdAt : CreatedAt . ToDateTime ( ) ,
441+ metadata : MetadataItems . Select ( item => new Metadata ( item . Key , item . Value . ToByteArray ( ) ) ) . ToImmutableArray ( ) ,
442+ offsetsRange : OffsetsRange ,
443+ readerSession : readerSession
444+ ) ;
445+ }
368446}
369447
370448internal class InternalBatchMessage
371449{
372- public InternalBatchMessage ( OffsetsRange batchOffsetsRange , Queue < InternalMessage > internalMessages )
450+ public InternalBatchMessage (
451+ OffsetsRange batchOffsetsRange ,
452+ Queue < InternalMessage > internalMessages ,
453+ ReaderSession readerSession )
373454 {
374455 BatchOffsetsRange = batchOffsetsRange ;
375456 InternalMessages = internalMessages ;
457+ ReaderSession = readerSession ;
376458 }
377459
378- public OffsetsRange BatchOffsetsRange { get ; }
460+ internal OffsetsRange BatchOffsetsRange { get ; }
461+
462+ internal Queue < InternalMessage > InternalMessages { get ; }
379463
380- public Queue < InternalMessage > InternalMessages { get ; }
464+ internal ReaderSession ReaderSession { get ; }
381465}
0 commit comments