Skip to content

Commit b4f0adf

Browse files
authored
feat(metrics): add metrics on the log append permits (#2081)
Signed-off-by: Ning Yu <[email protected]>
1 parent 42d1272 commit b4f0adf

File tree

3 files changed

+21
-0
lines changed

3 files changed

+21
-0
lines changed

core/src/main/scala/kafka/log/streamaspect/ElasticLog.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import org.apache.kafka.common.utils.{ThreadUtils, Time}
2727
import org.apache.kafka.common.{KafkaException, TopicPartition, Uuid}
2828
import org.apache.kafka.metadata.stream.StreamTags
2929
import org.apache.kafka.server.metrics.KafkaMetricsGroup
30+
import org.apache.kafka.server.metrics.s3stream.S3StreamKafkaMetricsManager
3031
import org.apache.kafka.server.util.Scheduler
3132
import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile
3233
import org.apache.kafka.storage.internals.log._
@@ -583,6 +584,7 @@ class ElasticLog(val metaStream: MetaStream,
583584
object ElasticLog extends Logging {
584585
private val APPEND_PERMIT = 100 * 1024 * 1024
585586
private val APPEND_PERMIT_SEMAPHORE = new Semaphore(APPEND_PERMIT)
587+
S3StreamKafkaMetricsManager.setLogAppendPermitNumSupplier(() => APPEND_PERMIT_SEMAPHORE.availablePermits())
586588

587589
private val LAST_RECORD_TIMESTAMP = new AtomicLong()
588590
private val KafkaMetricsGroup = new KafkaMetricsGroup(ElasticLog.getClass)

server-common/src/main/java/org/apache/kafka/server/metrics/s3stream/S3StreamKafkaMetricsConstants.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ public class S3StreamKafkaMetricsConstants {
2121
public static final String STREAM_OBJECT_NUM = "stream_object_num";
2222
public static final String FETCH_LIMITER_PERMIT_NUM = "fetch_limiter_permit_num";
2323
public static final String FETCH_PENDING_TASK_NUM = "fetch_pending_task_num";
24+
public static final String LOG_APPEND_PERMIT_NUM = "log_append_permit_num";
2425
public static final String SLOW_BROKER_METRIC_NAME = "slow_broker_count";
2526
public static final String TOPIC_PARTITION_COUNT_METRIC_NAME = "topic_partition_count";
2627

server-common/src/main/java/org/apache/kafka/server/metrics/s3stream/S3StreamKafkaMetricsManager.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,8 @@ public class S3StreamKafkaMetricsManager {
6363
private static Supplier<Map<String, Integer>> fetchLimiterPermitNumSupplier = Collections::emptyMap;
6464
private static ObservableLongGauge fetchPendingTaskNumMetrics = new NoopObservableLongGauge();
6565
private static Supplier<Map<String, Integer>> fetchPendingTaskNumSupplier = Collections::emptyMap;
66+
private static ObservableLongGauge logAppendPermitNumMetrics = new NoopObservableLongGauge();
67+
private static Supplier<Integer> logAppendPermitNumSupplier = () -> 0;
6668
private static MetricsConfig metricsConfig = new MetricsConfig(MetricsLevel.INFO, Attributes.empty());
6769
private static ObservableLongGauge slowBrokerMetrics = new NoopObservableLongGauge();
6870
private static Supplier<Map<Integer, Boolean>> slowBrokerSupplier = Collections::emptyMap;
@@ -86,6 +88,7 @@ public static void initMetrics(Meter meter, String prefix) {
8688
initAutoBalancerMetrics(meter, prefix);
8789
initObjectMetrics(meter, prefix);
8890
initFetchMetrics(meter, prefix);
91+
initLogAppendMetrics(meter, prefix);
8992
initPartitionStatusStatisticsMetrics(meter, prefix);
9093
}
9194

@@ -203,6 +206,17 @@ private static void initFetchMetrics(Meter meter, String prefix) {
203206
});
204207
}
205208

209+
private static void initLogAppendMetrics(Meter meter, String prefix) {
210+
logAppendPermitNumMetrics = meter.gaugeBuilder(prefix + S3StreamKafkaMetricsConstants.LOG_APPEND_PERMIT_NUM)
211+
.setDescription("The number of permits in elastic log append limiter")
212+
.ofLongs()
213+
.buildWithCallback(result -> {
214+
if (MetricsLevel.INFO.isWithin(metricsConfig.getMetricsLevel())) {
215+
result.record(logAppendPermitNumSupplier.get(), metricsConfig.getBaseAttributes());
216+
}
217+
});
218+
}
219+
206220
private static void initPartitionStatusStatisticsMetrics(Meter meter, String prefix) {
207221
partitionStatusStatisticsMetrics = meter.gaugeBuilder(prefix + S3StreamKafkaMetricsConstants.PARTITION_STATUS_STATISTICS_METRIC_NAME)
208222
.setDescription("The statistics of partition status")
@@ -248,6 +262,10 @@ public static void setFetchPendingTaskNumSupplier(Supplier<Map<String, Integer>>
248262
S3StreamKafkaMetricsManager.fetchPendingTaskNumSupplier = fetchPendingTaskNumSupplier;
249263
}
250264

265+
public static void setLogAppendPermitNumSupplier(Supplier<Integer> logAppendPermitNumSupplier) {
266+
S3StreamKafkaMetricsManager.logAppendPermitNumSupplier = logAppendPermitNumSupplier;
267+
}
268+
251269
public static void setSlowBrokerSupplier(Supplier<Map<Integer, Boolean>> slowBrokerSupplier) {
252270
S3StreamKafkaMetricsManager.slowBrokerSupplier = slowBrokerSupplier;
253271
}

0 commit comments

Comments
 (0)