Skip to content

Commit f68a149

Browse files
authored
KAFKA-18509 Move StateChangeLogger to server-common module (#20637)
We can rewrite this class from scala to java and move to server-common module. To maintain backward compatibility, we should keep the logger name `state.change.logger`. Reviewers: Chia-Ping Tsai <[email protected]>
1 parent 24cad50 commit f68a149

File tree

4 files changed

+62
-53
lines changed

4 files changed

+62
-53
lines changed

core/src/main/scala/kafka/cluster/Partition.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ import java.lang.{Long => JLong}
2020
import java.util.concurrent.locks.ReentrantReadWriteLock
2121
import java.util.Optional
2222
import java.util.concurrent.{CompletableFuture, ConcurrentHashMap, CopyOnWriteArrayList}
23-
import kafka.controller.StateChangeLogger
2423
import kafka.log._
2524
import kafka.server._
2625
import kafka.server.share.DelayedShareFetch
@@ -37,6 +36,7 @@ import org.apache.kafka.common.record.{FileRecords, MemoryRecords, RecordBatch}
3736
import org.apache.kafka.common.requests._
3837
import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.{UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET}
3938
import org.apache.kafka.common.utils.Time
39+
import org.apache.kafka.logger.StateChangeLogger
4040
import org.apache.kafka.metadata.{LeaderAndIsr, LeaderRecoveryState, MetadataCache, PartitionRegistration}
4141
import org.apache.kafka.server.common.RequestLocal
4242
import org.apache.kafka.server.log.remote.TopicPartitionLog
@@ -322,7 +322,7 @@ class Partition(val topicPartition: TopicPartition,
322322
def topic: String = topicPartition.topic
323323
def partitionId: Int = topicPartition.partition
324324

325-
private val stateChangeLogger = new StateChangeLogger(localBrokerId, inControllerContext = false, None)
325+
private val stateChangeLogger = new StateChangeLogger(localBrokerId)
326326
private val remoteReplicasMap = new ConcurrentHashMap[Int, Replica]
327327
// The read lock is only required when multiple reads are executed and needs to be in a consistent manner
328328
private val leaderIsrUpdateLock = new ReentrantReadWriteLock

core/src/main/scala/kafka/controller/StateChangeLogger.scala

Lines changed: 0 additions & 45 deletions
This file was deleted.

core/src/main/scala/kafka/server/ReplicaManager.scala

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ package kafka.server
1818

1919
import com.yammer.metrics.core.Meter
2020
import kafka.cluster.{Partition, PartitionListener}
21-
import kafka.controller.StateChangeLogger
2221
import kafka.log.LogManager
2322
import kafka.server.HostedPartition.Online
2423
import kafka.server.QuotaFactory.QuotaManagers
@@ -48,6 +47,7 @@ import org.apache.kafka.common.requests._
4847
import org.apache.kafka.common.utils.{Exit, Time, Utils}
4948
import org.apache.kafka.coordinator.transaction.{AddPartitionsToTxnConfig, TransactionLogConfig}
5049
import org.apache.kafka.image.{LocalReplicaChanges, MetadataImage, TopicsDelta}
50+
import org.apache.kafka.logger.StateChangeLogger
5151
import org.apache.kafka.metadata.LeaderConstants.NO_LEADER
5252
import org.apache.kafka.metadata.MetadataCache
5353
import org.apache.kafka.server.common.{DirectoryEventHandler, RequestLocal, StopPartition}
@@ -272,7 +272,7 @@ class ReplicaManager(val config: KafkaConfig,
272272
@volatile private var isInControlledShutdown = false
273273

274274
this.logIdent = s"[ReplicaManager broker=$localBrokerId] "
275-
protected val stateChangeLogger = new StateChangeLogger(localBrokerId, inControllerContext = false, None)
275+
protected val stateChangeLogger = new StateChangeLogger(localBrokerId)
276276

277277
private var logDirFailureHandler: LogDirFailureHandler = _
278278

@@ -789,9 +789,9 @@ class ReplicaManager(val config: KafkaConfig,
789789
hasCustomErrorMessage = customException.isDefined
790790
)
791791
}
792-
// In non-transaction paths, errorResults is typically empty, so we can
792+
// In non-transaction paths, errorResults is typically empty, so we can
793793
// directly use entriesPerPartition instead of creating a new filtered collection
794-
val entriesWithoutErrorsPerPartition =
794+
val entriesWithoutErrorsPerPartition =
795795
if (errorResults.nonEmpty) entriesPerPartition.filter { case (key, _) => !errorResults.contains(key) }
796796
else entriesPerPartition
797797

@@ -1637,13 +1637,13 @@ class ReplicaManager(val config: KafkaConfig,
16371637
remoteFetchPartitionStatus: Seq[(TopicIdPartition, FetchPartitionStatus)]): Unit = {
16381638
val remoteFetchTasks = new util.HashMap[TopicIdPartition, Future[Void]]
16391639
val remoteFetchResults = new util.HashMap[TopicIdPartition, CompletableFuture[RemoteLogReadResult]]
1640-
1640+
16411641
remoteFetchInfos.forEach { (topicIdPartition, remoteFetchInfo) =>
16421642
val (task, result) = processRemoteFetch(remoteFetchInfo)
16431643
remoteFetchTasks.put(topicIdPartition, task)
16441644
remoteFetchResults.put(topicIdPartition, result)
16451645
}
1646-
1646+
16471647
val remoteFetchMaxWaitMs = config.remoteLogManagerConfig.remoteFetchMaxWaitMs().toLong
16481648
val remoteFetch = new DelayedRemoteFetch(remoteFetchTasks, remoteFetchResults, remoteFetchInfos, remoteFetchMaxWaitMs,
16491649
remoteFetchPartitionStatus, params, logReadResults, this, responseCallback)
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.kafka.logger;
18+
19+
import org.slf4j.Logger;
20+
import org.slf4j.LoggerFactory;
21+
22+
/**
23+
* Simple class that sets logIdent appropriately depending on whether the state change logger is being used in the
24+
* context of the broker (e.g. ReplicaManager and Partition).
25+
*/
26+
public class StateChangeLogger {
27+
private static final Logger LOGGER = LoggerFactory.getLogger("state.change.logger");
28+
29+
private final String logIdent;
30+
31+
public StateChangeLogger(int brokerId) {
32+
this.logIdent = String.format("[Broker id=%d] ", brokerId);
33+
}
34+
35+
public void trace(String message) {
36+
LOGGER.info("{}{}", logIdent, message);
37+
}
38+
39+
public void info(String message) {
40+
LOGGER.info("{}{}", logIdent, message);
41+
}
42+
43+
public void warn(String message) {
44+
LOGGER.warn("{}{}", logIdent, message);
45+
}
46+
47+
public void error(String message) {
48+
LOGGER.error("{}{}", logIdent, message);
49+
}
50+
51+
public void error(String message, Throwable e) {
52+
LOGGER.error("{}{}", logIdent, message, e);
53+
}
54+
}

0 commit comments

Comments
 (0)