@@ -1669,7 +1669,7 @@ private ConsumerRecords<K, V> doPoll() {
16691669 }
16701670 TopicPartition next = this .batchIterator .next ();
16711671 List <ConsumerRecord <K , V >> subBatch = Objects .requireNonNull (this .lastBatch ).records (next );
1672- records = new ConsumerRecords <>(Collections .singletonMap (next , subBatch ));
1672+ records = new ConsumerRecords <>(Collections .singletonMap (next , subBatch ), Map . of () );
16731673 if (!this .batchIterator .hasNext ()) {
16741674 this .batchIterator = null ;
16751675 }
@@ -1911,7 +1911,7 @@ private void checkIdle() {
19111911 long idleEventInterval2 = idleEventInterval ;
19121912 long now = System .currentTimeMillis ();
19131913 if (!this .receivedSome ) {
1914- idleEventInterval2 *= this .containerProperties .getIdleBeforeDataMultiplier ();
1914+ idleEventInterval2 = ( long ) ( idleEventInterval2 * this .containerProperties .getIdleBeforeDataMultiplier () );
19151915 }
19161916 if (now > this .lastReceive + idleEventInterval2
19171917 && now > this .lastAlertAt + idleEventInterval2 ) {
@@ -2649,7 +2649,7 @@ private boolean checkImmediatePause(Iterator<ConsumerRecord<K, V>> iterator) {
26492649 tp -> new ArrayList <ConsumerRecord <K , V >>()).add (next );
26502650 }
26512651 if (!remaining .isEmpty ()) {
2652- this .remainingRecords = new ConsumerRecords <>(remaining );
2652+ this .remainingRecords = new ConsumerRecords <>(remaining , Map . of () );
26532653 return true ;
26542654 }
26552655 }
@@ -2950,7 +2950,7 @@ private void invokeErrorHandlerBySingleRecord(FailedRecordTuple<K, V> failedReco
29502950 tp -> new ArrayList <>()).add (cRecord );
29512951 }
29522952 if (!records .isEmpty ()) {
2953- this .remainingRecords = new ConsumerRecords <>(records );
2953+ this .remainingRecords = new ConsumerRecords <>(records , Map . of () );
29542954 this .pauseForPending = true ;
29552955 }
29562956 }
@@ -2996,7 +2996,7 @@ private void invokeErrorHandler(final ConsumerRecord<K, V> cRecord,
29962996 }
29972997 }
29982998 if (!records .isEmpty ()) {
2999- this .remainingRecords = new ConsumerRecords <>(records );
2999+ this .remainingRecords = new ConsumerRecords <>(records , Map . of () );
30003000 this .pauseForPending = true ;
30013001 }
30023002 }
@@ -3614,7 +3614,7 @@ public void acknowledge(int index) {
36143614 tp -> new ArrayList <>()).add (record );
36153615 }
36163616 if (!offsetsToCommit .isEmpty ()) {
3617- processAcks (new ConsumerRecords <>(offsetsToCommit ));
3617+ processAcks (new ConsumerRecords <>(offsetsToCommit , Map . of () ));
36183618 }
36193619 this .partial = index ;
36203620 }
@@ -3644,7 +3644,7 @@ public void nack(int index, Duration sleep) {
36443644 newRecords .computeIfAbsent (new TopicPartition (cRecord .topic (), cRecord .partition ()),
36453645 tp -> new LinkedList <>()).add (cRecord );
36463646 }
3647- processAcks (new ConsumerRecords <K , V >(newRecords ));
3647+ processAcks (new ConsumerRecords <K , V >(newRecords , Map . of () ));
36483648 }
36493649
36503650 @ Override
@@ -3727,7 +3727,7 @@ private void removeRevocationsFromPending(Collection<TopicPartition> partitions)
37273727 if (!remainingParts .isEmpty ()) {
37283728 Map <TopicPartition , List <ConsumerRecord <K , V >>> trimmed = new LinkedHashMap <>();
37293729 remainingParts .forEach (part -> trimmed .computeIfAbsent (part , tp -> remaining .records (tp )));
3730- ListenerConsumer .this .remainingRecords = new ConsumerRecords <>(trimmed );
3730+ ListenerConsumer .this .remainingRecords = new ConsumerRecords <>(trimmed , Map . of () );
37313731 }
37323732 else {
37333733 ListenerConsumer .this .remainingRecords = null ;
0 commit comments