@@ -149,4 +149,79 @@ describe('Consumer commit', () => {
149
149
{ topic : topicName , partition : 2 , offset : '10' , metadata, leaderEpoch : expect . any ( Number ) }
150
150
] ) ;
151
151
} ) ;
152
+
153
+ it . each ( [ [ true ] , [ false ] ] ) ( 'should commit only resolved offsets while using eachBatch' , async ( isAutoCommit ) => {
154
+ /* Evenly distribute 3*30 messages across 3 partitions */
155
+ const numMsgs = 30 ;
156
+ let i = 0 ;
157
+ const messages = Array ( 3 * numMsgs )
158
+ . fill ( )
159
+ . map ( ( ) => {
160
+ const value = secureRandom ( )
161
+ return { value : `value-${ value } ` , partition : ( i ++ ) % 3 }
162
+ } )
163
+
164
+ await producer . connect ( ) ;
165
+ await producer . send ( { topic : topicName , messages } )
166
+ await producer . flush ( ) ;
167
+
168
+ consumer = createConsumer ( {
169
+ groupId,
170
+ maxWaitTimeInMs : 100 ,
171
+ fromBeginning : true ,
172
+ autoCommit : isAutoCommit ,
173
+ autoCommitInterval : 500 ,
174
+ } ) ;
175
+
176
+ let msgCount = 0 ;
177
+ await consumer . connect ( ) ;
178
+ await consumer . subscribe ( { topic : topicName } )
179
+ await consumer . run ( {
180
+ eachBatchAutoResolve : false ,
181
+ eachBatch : async ( { batch, resolveOffset, commitOffsetsIfNecessary } ) => {
182
+ for ( const message of batch . messages ) {
183
+ msgCount ++ ;
184
+ if ( ( + message . offset ) < numMsgs / 2 ) {
185
+ resolveOffset ( message . offset ) ;
186
+ }
187
+ }
188
+ if ( ! isAutoCommit )
189
+ await commitOffsetsIfNecessary ( ) ;
190
+ }
191
+ } ) ;
192
+ await waitFor ( ( ) => msgCount >= ( 3 * numMsgs ) , ( ) => null , { delay : 100 } ) ;
193
+
194
+ /* Disconnect should commit any uncommitted offsets */
195
+ await consumer . disconnect ( ) ;
196
+
197
+ consumer = createConsumer ( {
198
+ groupId,
199
+ maxWaitTimeInMs : 100 ,
200
+ fromBeginning : true ,
201
+ } ) ;
202
+
203
+ await consumer . connect ( ) ;
204
+ const toppars = Array ( 3 ) . fill ( ) . map ( ( _ , i ) => ( { topic : topicName , partition : i } ) ) ;
205
+ const committed = await consumer . committed ( toppars ) ;
206
+ const halfOffset = Math . floor ( numMsgs / 2 ) . toString ( ) ;
207
+ expect ( committed ) . toEqual (
208
+ expect . arrayContaining ( [
209
+ expect . objectContaining ( {
210
+ topic : topicName ,
211
+ partition : 0 ,
212
+ offset : halfOffset ,
213
+ } ) ,
214
+ expect . objectContaining ( {
215
+ topic : topicName ,
216
+ partition : 1 ,
217
+ offset : halfOffset ,
218
+ } ) ,
219
+ expect . objectContaining ( {
220
+ topic : topicName ,
221
+ partition : 2 ,
222
+ offset : halfOffset ,
223
+ } )
224
+ ] )
225
+ )
226
+ } ) ;
152
227
} ) ;
0 commit comments