24
24
import com .arpnetworking .test .CollectorPeriodicMetrics ;
25
25
import com .arpnetworking .test .StringToRecordParser ;
26
26
import com .google .common .collect .ImmutableList ;
27
+ import com .google .common .collect .Maps ;
27
28
import org .apache .kafka .clients .consumer .Consumer ;
28
29
import org .apache .kafka .clients .consumer .ConsumerRecord ;
29
30
import org .apache .kafka .clients .consumer .ConsumerRecords ;
30
31
import org .apache .kafka .clients .consumer .MockConsumer ;
31
- import org .apache .kafka .clients .consumer .OffsetResetStrategy ;
32
+ import org .apache .kafka .clients .consumer .internals . AutoOffsetResetStrategy ;
32
33
import org .apache .kafka .common .KafkaException ;
33
34
import org .apache .kafka .common .TopicPartition ;
34
35
import org .junit .After ;
@@ -327,7 +328,7 @@ private static List<String> createValues(final String prefix, final int num) {
327
328
328
329
private static MockConsumer <String , String > createMockConsumer (
329
330
final ConsumerRecords <String , String > consumerRecords ) {
330
- final MockConsumer <String , String > consumer = new MockConsumer <>(OffsetResetStrategy . EARLIEST );
331
+ final MockConsumer <String , String > consumer = new MockConsumer <>(AutoOffsetResetStrategy . StrategyType . EARLIEST . toString () );
331
332
consumer .assign (Collections .singletonList (new TopicPartition (TOPIC , PARTITION )));
332
333
consumer .updateBeginningOffsets (Collections .singletonMap (new TopicPartition (TOPIC , PARTITION ), 0L ));
333
334
for (final ConsumerRecord <String , String > record : consumerRecords ) {
@@ -342,7 +343,9 @@ private static ConsumerRecords<String, String> expectedConsumerRecords() {
342
343
for (final String value : EXPECTED ) {
343
344
records .add (new ConsumerRecord <>(TOPIC , PARTITION , offset ++, "" + offset , value ));
344
345
}
345
- return new ConsumerRecords <>(Collections .singletonMap (new TopicPartition (TOPIC , PARTITION ), records ));
346
+ return new ConsumerRecords <>(
347
+ Collections .singletonMap (new TopicPartition (TOPIC , PARTITION ), records ),
348
+ Maps .newHashMap ());
346
349
}
347
350
348
351
private static class FillingBlockingQueue extends ArrayBlockingQueue <String > {
0 commit comments