File tree Expand file tree Collapse file tree 1 file changed +3
-5
lines changed
RabbitMQ.Stream.Client/Reliable Expand file tree Collapse file tree 1 file changed +3
-5
lines changed Original file line number Diff line number Diff line change 33// Copyright (c) 2007-2023 VMware, Inc.
44
55using System . Collections . Concurrent ;
6- using System . Linq ;
76using System . Threading ;
87using System . Threading . Tasks ;
98
@@ -80,13 +79,12 @@ private async Task<IConsumer> SuperConsumer(bool boot)
8079 {
8180 ConcurrentDictionary < string , IOffsetType > offsetSpecs = new ( ) ;
8281 // if is not the boot time and at least one message was consumed
83- // it can restart consuming from the last consumer offset + 1 (+1 since we need to consume fro the next)
82+ // it can restart consuming from the last consumer offset + 1 (+1 since we need to consume from the next)
8483 if ( ! boot && _consumedFirstTime )
8584 {
86- for ( var i = 0 ; i < _lastOffsetConsumed . Count ; i ++ )
85+ foreach ( var ( stream , offset ) in _lastOffsetConsumed )
8786 {
88- offsetSpecs [ _lastOffsetConsumed . Keys . ElementAt ( i ) ] =
89- new OffsetTypeOffset ( _lastOffsetConsumed . Values . ElementAt ( i ) + 1 ) ;
87+ offsetSpecs [ stream ] = new OffsetTypeOffset ( offset + 1 ) ;
9088 }
9189 }
9290 else
You can’t perform that action at this time.
0 commit comments