Skip to content

Commit d95857a

Browse files
committed
KAFKA-19504: Remove unused metrics reporter initialization in KafkaAdminClient (#20166)
The `AdminClient` adds a telemetry reporter to the metrics reporters list in the constructor. The problem is that the reporter was already added in the `createInternal` method. In the `createInternal` method call, the `clientTelemetryReporter` is added to a `List<MetricReporters>` which is passed to the `Metrics` object, will get closed when `Metrics.close()` is called. But adding a reporter to the reporters list in the constructor is not used by the `Metrics` object and hence doesn't get closed, causing a memory leak. All related tests pass after this change. Reviewers: Apoorv Mittal <[email protected]>, Matthias J. Sax <[email protected]>, Chia-Ping Tsai <[email protected]>, Jhen-Yung Hsu <[email protected]>
1 parent 80b9abe commit d95857a

File tree

1 file changed

+3
-3
lines changed

1 file changed

+3
-3
lines changed

clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -577,10 +577,12 @@ static KafkaAdminClient createInternal(AdminClientConfig config,
577577
Time time) {
578578
Metrics metrics = null;
579579
String clientId = generateClientId(config);
580+
List<MetricsReporter> reporters = CommonClientConfigs.metricsReporters(clientId, config);
580581
Optional<ClientTelemetryReporter> clientTelemetryReporter = CommonClientConfigs.telemetryReporter(clientId, config);
582+
clientTelemetryReporter.ifPresent(reporters::add);
581583

582584
try {
583-
metrics = new Metrics(new MetricConfig(), new LinkedList<>(), time);
585+
metrics = new Metrics(new MetricConfig(), reporters, time);
584586
LogContext logContext = createLogContext(clientId);
585587
return new KafkaAdminClient(config, clientId, time, metadataManager, metrics,
586588
client, null, logContext, clientTelemetryReporter);
@@ -625,9 +627,7 @@ private KafkaAdminClient(AdminClientConfig config,
625627
CommonClientConfigs.RETRY_BACKOFF_EXP_BASE,
626628
retryBackoffMaxMs,
627629
CommonClientConfigs.RETRY_BACKOFF_JITTER);
628-
List<MetricsReporter> reporters = CommonClientConfigs.metricsReporters(this.clientId, config);
629630
this.clientTelemetryReporter = clientTelemetryReporter;
630-
this.clientTelemetryReporter.ifPresent(reporters::add);
631631
this.metadataRecoveryStrategy = MetadataRecoveryStrategy.forName(config.getString(AdminClientConfig.METADATA_RECOVERY_STRATEGY_CONFIG));
632632
this.partitionLeaderCache = new HashMap<>();
633633
this.adminFetchMetricsManager = new AdminFetchMetricsManager(metrics);

0 commit comments

Comments
 (0)