Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions src/main/java/com/inyo/ducklake/connect/DucklakeMetrics.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,18 @@
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.KafkaMetricsContext;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.CumulativeSum;
import org.apache.kafka.common.utils.Time;

/**
* Manages metrics for the Ducklake connector using Kafka's metrics system. Metrics are
Expand All @@ -38,7 +43,21 @@ public final class DucklakeMetrics implements DucklakeMetricsInterface {

private static final org.slf4j.Logger LOG =
org.slf4j.LoggerFactory.getLogger(DucklakeMetrics.class);
static final String JMX_DOMAIN = "kafka.connect";
private static final String METRIC_GROUP = "ducklake-sink-task-metrics";

/**
* Creates a Metrics registry that registers MBeans under the kafka.connect JMX domain, so they
* are picked up by the standard JMX Exporter configuration on Kafka Connect workers.
*/
public static Metrics createMetricsRegistry() {
return new Metrics(
new MetricConfig(),
List.of(new JmxReporter()),
Time.SYSTEM,
new KafkaMetricsContext(JMX_DOMAIN));
}

private static final String PREFIX = "ducklake-";

private final Metrics metrics;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ public void start(Map<String, String> map) {
this.config = new DucklakeSinkConfig(DucklakeSinkConfig.CONFIG_DEF, map);
String connectorName = map.getOrDefault("name", "ducklake-sink");
String taskId = map.getOrDefault("task.id", "0");
this.metricsRegistry = new Metrics();
this.metricsRegistry = DucklakeMetrics.createMetricsRegistry();
this.ducklakeMetrics = new DucklakeMetrics(metricsRegistry, connectorName, taskId);
this.connectionFactory = new DucklakeConnectionFactory(config);
this.writers = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ void setUp() throws Exception {
connection = TestHelper.setupDucklakeConnection();

allocator = new RootAllocator(Long.MAX_VALUE);
metricsRegistry = new Metrics();
metricsRegistry = DucklakeMetrics.createMetricsRegistry();
ducklakeMetrics = new DucklakeMetrics(metricsRegistry, "test-connector", "0");
}

Expand Down
21 changes: 20 additions & 1 deletion src/test/java/com/inyo/ducklake/connect/DucklakeMetricsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,13 @@
package com.inyo.ducklake.connect;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.lang.management.ManagementFactory;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.Metrics;
import org.junit.jupiter.api.AfterEach;
Expand All @@ -32,7 +36,7 @@ class DucklakeMetricsTest {

@BeforeEach
void setUp() {
metricsRegistry = new Metrics();
metricsRegistry = DucklakeMetrics.createMetricsRegistry();
ducklakeMetrics = new DucklakeMetrics(metricsRegistry, "test-connector", "0");
}

Expand Down Expand Up @@ -164,6 +168,21 @@ void testMetricTags() {
}
}

@Test
void testMetricsRegisteredUnderKafkaConnectJmxDomain() throws Exception {
MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer();
var mbeans = mBeanServer.queryNames(new ObjectName(DucklakeMetrics.JMX_DOMAIN + ":*"), null);
assertFalse(
mbeans.isEmpty(),
"Metrics should be registered under " + DucklakeMetrics.JMX_DOMAIN + " JMX domain");

boolean foundDucklakeMetric =
mbeans.stream().anyMatch(name -> name.toString().contains("ducklake-sink-task-metrics"));
assertTrue(
foundDucklakeMetric,
"Should find ducklake-sink-task-metrics MBeans under " + DucklakeMetrics.JMX_DOMAIN);
}

@Test
void testMultipleOperations() {
ducklakeMetrics.recordJdbcQuery(1_000_000L);
Expand Down
Loading