@@ -53,13 +53,13 @@ import org.apache.kafka.metadata.MetadataCache
5353import org .apache .kafka .server .common .{DirectoryEventHandler , RequestLocal , StopPartition , TopicOptionalIdPartition }
5454import org .apache .kafka .server .metrics .KafkaMetricsGroup
5555import org .apache .kafka .server .network .BrokerEndPoint
56- import org .apache .kafka .server .purgatory .{DelayedDeleteRecords , DelayedOperationPurgatory , DeleteRecordsPartitionStatus , TopicPartitionOperationKey }
56+ import org .apache .kafka .server .purgatory .{DelayedDeleteRecords , DelayedOperationPurgatory , DelayedRemoteListOffsets , DeleteRecordsPartitionStatus , ListOffsetsPartitionStatus , TopicPartitionOperationKey }
5757import org .apache .kafka .server .share .fetch .{DelayedShareFetchKey , DelayedShareFetchPartitionKey }
5858import org .apache .kafka .server .storage .log .{FetchParams , FetchPartitionData }
5959import org .apache .kafka .server .util .{Scheduler , ShutdownableThread }
60- import org .apache .kafka .server .{ActionQueue , DelayedActionQueue , ListOffsetsPartitionStatus , common }
60+ import org .apache .kafka .server .{ActionQueue , DelayedActionQueue , common }
6161import org .apache .kafka .storage .internals .checkpoint .{LazyOffsetCheckpoints , OffsetCheckpointFile , OffsetCheckpoints }
62- import org .apache .kafka .storage .internals .log ._
62+ import org .apache .kafka .storage .internals .log .{ AppendOrigin , FetchDataInfo , LeaderHwChange , LogAppendInfo , LogConfig , LogDirFailureChannel , LogOffsetMetadata , LogReadInfo , OffsetResultHolder , RecordValidationException , RemoteLogReadResult , RemoteStorageFetchInfo , UnifiedLog , VerificationGuard }
6363import org .apache .kafka .storage .log .metrics .BrokerTopicStats
6464
6565import java .io .File
@@ -70,6 +70,7 @@ import java.util.concurrent.atomic.AtomicBoolean
7070import java .util .concurrent .locks .Lock
7171import java .util .concurrent .{CompletableFuture , Future , RejectedExecutionException , TimeUnit }
7272import java .util .{Collections , Optional , OptionalInt , OptionalLong }
73+ import java .util .function .Consumer
7374import scala .collection .{Map , Seq , Set , immutable , mutable }
7475import scala .jdk .CollectionConverters ._
7576import scala .jdk .OptionConverters .{RichOption , RichOptional }
@@ -841,7 +842,7 @@ class ReplicaManager(val config: KafkaConfig,
841842 )
842843
843844 val retryTimeoutMs = Math .min(config.addPartitionsToTxnConfig.addPartitionsToTxnRetryBackoffMaxMs(), config.requestTimeoutMs)
844- val addPartitionsRetryBackoffMs = config.addPartitionsToTxnConfig.addPartitionsToTxnRetryBackoffMs
845+ val addPartitionsRetryBackoffMs = config.addPartitionsToTxnConfig.addPartitionsToTxnRetryBackoffMs()
845846 val startVerificationTimeMs = time.milliseconds
846847 def maybeRetryOnConcurrentTransactions (results : (Map [TopicPartition , Errors ], Map [TopicPartition , VerificationGuard ])): Unit = {
847848 if (time.milliseconds() - startVerificationTimeMs >= retryTimeoutMs) {
@@ -1470,7 +1471,7 @@ class ReplicaManager(val config: KafkaConfig,
14701471 correlationId : Int ,
14711472 version : Short ,
14721473 buildErrorResponse : (Errors , ListOffsetsPartition ) => ListOffsetsPartitionResponse ,
1473- responseCallback : List [ ListOffsetsTopicResponse ] => Unit ,
1474+ responseCallback : Consumer [util. Collection [ ListOffsetsTopicResponse ]] ,
14741475 timeoutMs : Int = 0 ): Unit = {
14751476 val statusByPartition = mutable.Map [TopicPartition , ListOffsetsPartitionStatus ]()
14761477 topics.foreach { topic =>
@@ -1569,7 +1570,7 @@ class ReplicaManager(val config: KafkaConfig,
15691570 if (delayedRemoteListOffsetsRequired(statusByPartition)) {
15701571 val delayMs : Long = if (timeoutMs > 0 ) timeoutMs else config.remoteLogManagerConfig.remoteListOffsetsRequestTimeoutMs()
15711572 // create delayed remote list offsets operation
1572- val delayedRemoteListOffsets = new DelayedRemoteListOffsets (delayMs, version, statusByPartition, this , responseCallback)
1573+ val delayedRemoteListOffsets = new DelayedRemoteListOffsets (delayMs, version, statusByPartition.asJava, tp => getPartitionOrException(tp) , responseCallback)
15731574 // create a list of (topic, partition) pairs to use as keys for this delayed remote list offsets operation
15741575 val listOffsetsRequestKeys = statusByPartition.keys.map(new TopicPartitionOperationKey (_)).toList
15751576 // try to complete the request immediately, otherwise put it into the purgatory
@@ -1580,7 +1581,7 @@ class ReplicaManager(val config: KafkaConfig,
15801581 case (topic, status) =>
15811582 new ListOffsetsTopicResponse ().setName(topic).setPartitions(status.values.flatMap(s => Some (s.responseOpt.get())).toList.asJava)
15821583 }.toList
1583- responseCallback(responseTopics)
1584+ responseCallback.accept (responseTopics.asJava )
15841585 }
15851586 }
15861587
@@ -1899,7 +1900,7 @@ class ReplicaManager(val config: KafkaConfig,
18991900 createLogReadResult(highWatermark, leaderLogStartOffset, leaderLogEndOffset,
19001901 new OffsetMovedToTieredStorageException (" Given offset" + offset + " is moved to tiered storage" ))
19011902 } else {
1902- val throttleTimeMs = remoteLogManager.get.getFetchThrottleTimeMs()
1903+ val throttleTimeMs = remoteLogManager.get.getFetchThrottleTimeMs
19031904 val fetchDataInfo = if (throttleTimeMs > 0 ) {
19041905 // Record the throttle time for the remote log fetches
19051906 remoteLogManager.get.fetchThrottleTimeSensor().record(throttleTimeMs, time.milliseconds())
0 commit comments