diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index 44b28a1f07e4a..4c729f596063b 100755 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -20,7 +20,6 @@ import java.lang.{Long => JLong} import java.util.concurrent.locks.ReentrantReadWriteLock import java.util.Optional import java.util.concurrent.{CompletableFuture, ConcurrentHashMap, CopyOnWriteArrayList} -import kafka.controller.StateChangeLogger import kafka.log._ import kafka.server._ import kafka.server.share.DelayedShareFetch @@ -37,6 +36,7 @@ import org.apache.kafka.common.record.{FileRecords, MemoryRecords, RecordBatch} import org.apache.kafka.common.requests._ import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.{UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET} import org.apache.kafka.common.utils.Time +import org.apache.kafka.logger.StateChangeLogger import org.apache.kafka.metadata.{LeaderAndIsr, LeaderRecoveryState, MetadataCache, PartitionRegistration} import org.apache.kafka.server.common.RequestLocal import org.apache.kafka.server.log.remote.TopicPartitionLog @@ -322,7 +322,7 @@ class Partition(val topicPartition: TopicPartition, def topic: String = topicPartition.topic def partitionId: Int = topicPartition.partition - private val stateChangeLogger = new StateChangeLogger(localBrokerId, inControllerContext = false, None) + private val stateChangeLogger = new StateChangeLogger(localBrokerId) private val remoteReplicasMap = new ConcurrentHashMap[Int, Replica] // The read lock is only required when multiple reads are executed and needs to be in a consistent manner private val leaderIsrUpdateLock = new ReentrantReadWriteLock diff --git a/core/src/main/scala/kafka/controller/StateChangeLogger.scala b/core/src/main/scala/kafka/controller/StateChangeLogger.scala deleted file mode 100644 index 9f188fe33b74a..0000000000000 --- a/core/src/main/scala/kafka/controller/StateChangeLogger.scala +++ /dev/null @@ -1,45 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.controller - -import com.typesafe.scalalogging.Logger -import kafka.utils.Logging - -object StateChangeLogger { - private val logger = Logger("state.change.logger") -} - -/** - * Simple class that sets `logIdent` appropriately depending on whether the state change logger is being used in the - * context of the KafkaController or not (e.g. ReplicaManager and MetadataCache log to the state change logger - * irrespective of whether the broker is the Controller). - */ -class StateChangeLogger(brokerId: Int, inControllerContext: Boolean, controllerEpoch: Option[Int]) extends Logging { - - if (controllerEpoch.isDefined && !inControllerContext) - throw new IllegalArgumentException("Controller epoch should only be defined if inControllerContext is true") - - override lazy val logger: Logger = StateChangeLogger.logger - - locally { - val prefix = if (inControllerContext) "Controller" else "Broker" - val epochEntry = controllerEpoch.fold("")(epoch => s" epoch=$epoch") - logIdent = s"[$prefix id=$brokerId$epochEntry] " - } - -} diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 10b41b88bd092..ff70d7ae34aec 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -18,7 +18,6 @@ package kafka.server import com.yammer.metrics.core.Meter import kafka.cluster.{Partition, PartitionListener} -import kafka.controller.StateChangeLogger import kafka.log.LogManager import kafka.server.HostedPartition.Online import kafka.server.QuotaFactory.QuotaManagers @@ -48,6 +47,7 @@ import org.apache.kafka.common.requests._ import org.apache.kafka.common.utils.{Exit, Time, Utils} import org.apache.kafka.coordinator.transaction.{AddPartitionsToTxnConfig, TransactionLogConfig} import org.apache.kafka.image.{LocalReplicaChanges, MetadataImage, TopicsDelta} +import org.apache.kafka.logger.StateChangeLogger import org.apache.kafka.metadata.LeaderConstants.NO_LEADER import org.apache.kafka.metadata.MetadataCache import org.apache.kafka.server.common.{DirectoryEventHandler, RequestLocal, StopPartition} @@ -272,7 +272,7 @@ class ReplicaManager(val config: KafkaConfig, @volatile private var isInControlledShutdown = false this.logIdent = s"[ReplicaManager broker=$localBrokerId] " - protected val stateChangeLogger = new StateChangeLogger(localBrokerId, inControllerContext = false, None) + protected val stateChangeLogger = new StateChangeLogger(localBrokerId) private var logDirFailureHandler: LogDirFailureHandler = _ @@ -789,9 +789,9 @@ class ReplicaManager(val config: KafkaConfig, hasCustomErrorMessage = customException.isDefined ) } - // In non-transaction paths, errorResults is typically empty, so we can + // In non-transaction paths, errorResults is typically empty, so we can // directly use entriesPerPartition instead of creating a new filtered collection - val entriesWithoutErrorsPerPartition = + val entriesWithoutErrorsPerPartition = if (errorResults.nonEmpty) entriesPerPartition.filter { case (key, _) => !errorResults.contains(key) } else entriesPerPartition @@ -1637,13 +1637,13 @@ class ReplicaManager(val config: KafkaConfig, remoteFetchPartitionStatus: Seq[(TopicIdPartition, FetchPartitionStatus)]): Unit = { val remoteFetchTasks = new util.HashMap[TopicIdPartition, Future[Void]] val remoteFetchResults = new util.HashMap[TopicIdPartition, CompletableFuture[RemoteLogReadResult]] - + remoteFetchInfos.forEach { (topicIdPartition, remoteFetchInfo) => val (task, result) = processRemoteFetch(remoteFetchInfo) remoteFetchTasks.put(topicIdPartition, task) remoteFetchResults.put(topicIdPartition, result) } - + val remoteFetchMaxWaitMs = config.remoteLogManagerConfig.remoteFetchMaxWaitMs().toLong val remoteFetch = new DelayedRemoteFetch(remoteFetchTasks, remoteFetchResults, remoteFetchInfos, remoteFetchMaxWaitMs, remoteFetchPartitionStatus, params, logReadResults, this, responseCallback) diff --git a/server-common/src/main/java/org/apache/kafka/logger/StateChangeLogger.java b/server-common/src/main/java/org/apache/kafka/logger/StateChangeLogger.java new file mode 100644 index 0000000000000..a8f7ed9cc9d38 --- /dev/null +++ b/server-common/src/main/java/org/apache/kafka/logger/StateChangeLogger.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.logger; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Simple class that sets logIdent appropriately depending on whether the state change logger is being used in the + * context of the broker (e.g. ReplicaManager and Partition). + */ +public class StateChangeLogger { + private static final Logger LOGGER = LoggerFactory.getLogger("state.change.logger"); + + private final String logIdent; + + public StateChangeLogger(int brokerId) { + this.logIdent = String.format("[Broker id=%d] ", brokerId); + } + + public void trace(String message) { + LOGGER.info("{}{}", logIdent, message); + } + + public void info(String message) { + LOGGER.info("{}{}", logIdent, message); + } + + public void warn(String message) { + LOGGER.warn("{}{}", logIdent, message); + } + + public void error(String message) { + LOGGER.error("{}{}", logIdent, message); + } + + public void error(String message, Throwable e) { + LOGGER.error("{}{}", logIdent, message, e); + } +}