Skip to content

Commit be54ab6

Browse files
authored
Rework metrics: Prometheus-friendly counters, flush duration, diagnostics (#48)
* Translate METRICS.md to English, retain Portuguese as METRICS-pt.md * Wire up unrecorded metrics: schema ops, batch processing, record counts Sensors for createTable, evolveSchema, schema-operation-*, batch-size-*, and records-processed-* were registered but never recorded to in production code. This connects them: - DucklakeTableManager: instrument createTable/evolveTableSchema with schema operation timers - DucklakeSinkTask: create DucklakeMetrics in start(), record batch/record metrics in flushBatches(), close in stop() - DucklakeWriter: pass metrics through to DucklakeTableManager - MetricTimer.close(): narrow to no checked exception * Remove Portuguese METRICS-pt.md and translate remaining PT comment * Reformat METRICS.md to use tables instead of bulleted lists * Add per-partition flush duration metric using cumulative counters Tracks flush write time (excluding consolidation) per partition via flush-duration-total-ms and flush-count cumulative sums. These are Prometheus-friendly — use rate() to derive averages, and they aggregate correctly across partitions/tasks unlike windowed Avg/Max stats. * Convert all metrics from windowed Avg/Max/Rate to CumulativeSum counters Windowed stats (Avg, Max, Rate) don't aggregate correctly through the JMX Exporter → Prometheus pipeline. Replace them with cumulative counters that work with Prometheus rate() for correct cross-partition and cross-task aggregation. Renamed metrics: - jdbc-query-time-avg/max/rate → jdbc-query-duration-total-ms - schema-operation-time-avg/max → schema-operation-duration-total-ms - operation-time-avg/max, operation-rate → operation-duration-total-ms - records-processed-rate → removed (use rate(records-processed-total)) - batch-size-avg/max → batch-records-total + batch-count * Prefix all metric names with ducklake- to avoid collisions Kafka Connect shares a single Metrics registry across all connectors in a worker. Generic names like jdbc-query-count could collide with other connectors. Prefixing with ducklake- makes them unambiguous regardless of tag filtering. * Add diagnostic metrics: schema mismatch, flush skips, DLQ, spill, consolidation New metrics for diagnosing perf and correctness issues: - schema-mismatch-count: detects the many-small-files problem - flush-skip-count: lock contention precursor to rebalance - errant-record-count: records sent to DLQ (silent data loss signal) - spill-batch-count/spill-bytes-total: disk spill volume - consolidation-duration-total-ms/count: batch consolidation overhead * Fix thread-safety: use ConcurrentHashMap for per-partition flush sensors Flush sensors are lazily created via computeIfAbsent and accessed from the put thread, scheduled flush thread, and partition executor threads. HashMap.computeIfAbsent is not thread-safe and can corrupt internal state under concurrent access. * Fix thread leak: close Metrics registry in DucklakeMetrics.close() new Metrics() starts an internal MetricsReporter thread and JMX registrations. Without closing it, each task rebalance leaks a thread and stale JMX beans, leading to OOM over hours in busy clusters. * Only record evolveSchema metric when actual DDL executes Previously the timer wrapped the entire method including the PRAGMA metadata check that runs on every batch. Now it only fires when columns are actually added, making the metric meaningful. * Standardize all counter metric names with -total suffix OpenMetrics/Prometheus convention: counters end with _total. Without it, metrics like ducklake_batch_count look like gauges after JMX Exporter converts hyphens to underscores. * Use DucklakeMetricsInterface instead of concrete class in Writer/Factory Allows injecting mock or no-op metrics implementations for testing. * Add NoopDucklakeMetrics to eliminate null-check duplication Null object pattern replaces all if-metrics-not-null branches with unconditional calls. Reduces code duplication at every instrumentation site and prevents forgetting a null check when adding new metrics. * Change recordBatchProcessed to accept long instead of int Avoids silent overflow on the long-to-int cast, even though current defaults make it unlikely to hit. * Remove reference to non-existent Grafana dashboard JSON * Switch DucklakeMetrics from System.Logger to SLF4J System.Logger uses JDK Platform Logging which may not bridge to the same backend as SLF4J in Kafka Connect. Log output from DucklakeMetrics (including unknown operation type warnings) could be lost or go to a different handler. * Add tests for new metrics: consolidation, spill, mismatch, DLQ, flush skip Covers all metrics added in this branch plus concurrent flush sensor creation and cleanup after close. * Fix Metrics registry ownership: close at call site, not in DucklakeMetrics DucklakeMetrics should not close a registry it doesn't own — if a shared registry were ever passed in, close() would kill it for all consumers. Move registry lifecycle to DucklakeSinkTask.stop() where it was created. Also stores the registry reference so it's closed even if start() fails partway through. * Remove duplicate batch-records-total metric records-processed-total and batch-records-total recorded identical values. Keep records-processed-total + batch-count-total; derive average batch size via rate(records) / rate(batch_count). * Fix schema mismatch metric: record 1 per event, not output batch count The metric now counts flushes that had schema mismatches, rather than recording the number of output batches. Simpler, matches the docs, and works correctly as a counter with rate(). * Clean up nits: dead null checks, unmodifiable maps, sensor naming, comment - Remove dead null checks on final fields in close() - Make operationTimingSensors/operationCountSensors unmodifiable after construction to prevent accidental mutation from other threads - Normalize internal sensor names (remove inconsistent -sensor suffix) - Add comment explaining load-bearing NoopDucklakeMetrics initializer * Include type upgrades in evolveSchema metric timing performTypeUpgrade() executes ALTER COLUMN DDL but was called inside the field loop before the timer started. If only type upgrades occurred (no new columns, _inserted_at exists), the method returned early without recording the schema operation. Defer upgrades to execute inside the timed block alongside column additions. * Drop unused partition parameter from recordFlushSkip The partition key was computed at every call site but ignored by the implementation. Overall skip rate is sufficient for alerting; correlate with per-partition flush duration to identify the hot partition. * Move metrics close to finally block in stop() to prevent leak on error connectionFactory.close() can throw, which would skip metrics cleanup in the same try block. Moving to finally ensures the Metrics registry thread and JMX registrations are always cleaned up.
1 parent e61e884 commit be54ab6

File tree

11 files changed

+828
-620
lines changed

11 files changed

+828
-620
lines changed

METRICS.md

Lines changed: 146 additions & 237 deletions
Large diffs are not rendered by default.

src/main/java/com/inyo/ducklake/connect/DucklakeMetrics.java

Lines changed: 232 additions & 251 deletions
Large diffs are not rendered by default.

src/main/java/com/inyo/ducklake/connect/DucklakeMetricsInterface.java

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ public interface DucklakeMetricsInterface extends AutoCloseable {
2626

2727
void recordSchemaOperation(long durationNanos, String operationType);
2828

29-
void recordBatchProcessed(int recordCount);
29+
void recordBatchProcessed(long recordCount);
3030

3131
MetricTimer startJdbcQueryTimer();
3232

@@ -36,6 +36,25 @@ public interface DucklakeMetricsInterface extends AutoCloseable {
3636

3737
MetricTimer startSchemaOperationTimer(String operationType);
3838

39+
void recordFlushDuration(long durationNanos, String partition);
40+
41+
MetricTimer startFlushTimer(String partition);
42+
43+
void recordSchemaMismatch();
44+
45+
void recordFlushSkip();
46+
47+
void recordErrantRecord();
48+
49+
void recordSpill(long bytes);
50+
51+
void recordConsolidationDuration(long durationNanos);
52+
53+
MetricTimer startConsolidationTimer();
54+
3955
/** Simple marker interface for timers returned by the metrics implementation. */
40-
interface MetricTimer extends AutoCloseable {}
56+
interface MetricTimer extends AutoCloseable {
57+
@Override
58+
void close();
59+
}
4160
}

src/main/java/com/inyo/ducklake/connect/DucklakeSinkTask.java

Lines changed: 44 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import org.apache.arrow.memory.RootAllocator;
4242
import org.apache.arrow.vector.VectorSchemaRoot;
4343
import org.apache.kafka.common.TopicPartition;
44+
import org.apache.kafka.common.metrics.Metrics;
4445
import org.apache.kafka.connect.errors.ConnectException;
4546
import org.apache.kafka.connect.sink.ErrantRecordReporter;
4647
import org.apache.kafka.connect.sink.SinkRecord;
@@ -88,6 +89,9 @@ public class DucklakeSinkTask extends SinkTask {
8889

8990
// Errant record reporter for sending bad records to DLQ
9091
private ErrantRecordReporter errantRecordReporter;
92+
private Metrics metricsRegistry;
93+
// Initialized to no-op so stop() can call close() unconditionally even if start() fails
94+
private DucklakeMetricsInterface ducklakeMetrics = NoopDucklakeMetrics.INSTANCE;
9195

9296
// Jitter configuration to stagger flushes and avoid PG row-level contention
9397
// Each partition gets a random jitter offset (0 to maxJitterMs) applied to its flush timing
@@ -150,6 +154,10 @@ public String version() {
150154
@Override
151155
public void start(Map<String, String> map) {
152156
this.config = new DucklakeSinkConfig(DucklakeSinkConfig.CONFIG_DEF, map);
157+
String connectorName = map.getOrDefault("name", "ducklake-sink");
158+
String taskId = map.getOrDefault("task.id", "0");
159+
this.metricsRegistry = new Metrics();
160+
this.ducklakeMetrics = new DucklakeMetrics(metricsRegistry, connectorName, taskId);
153161
this.connectionFactory = new DucklakeConnectionFactory(config);
154162
this.writers = new HashMap<>();
155163
this.buffers = new HashMap<>();
@@ -299,6 +307,7 @@ private void checkTimeBasedFlushSpillable(long now) {
299307
AtomicInteger skips =
300308
consecutiveFlushSkips.computeIfAbsent(partition, k -> new AtomicInteger(0));
301309
int skipCount = skips.incrementAndGet();
310+
ducklakeMetrics.recordFlushSkip();
302311
if (skipCount >= MAX_CONSECUTIVE_SKIPS_BEFORE_WARNING) {
303312
LOG.warn(
304313
"Flush check for partition {} skipped {} times - possible lock contention",
@@ -376,6 +385,7 @@ private void checkTimeBasedFlushInMemory(long now) {
376385
AtomicInteger skips =
377386
consecutiveFlushSkips.computeIfAbsent(partition, k -> new AtomicInteger(0));
378387
int skipCount = skips.incrementAndGet();
388+
ducklakeMetrics.recordFlushSkip();
379389
if (skipCount >= MAX_CONSECUTIVE_SKIPS_BEFORE_WARNING) {
380390
LOG.warn(
381391
"Flush check for partition {} skipped {} times - possible lock contention",
@@ -427,7 +437,8 @@ public void open(Collection<TopicPartition> partitions) {
427437
super.open(partitions);
428438
try {
429439
this.connectionFactory.create();
430-
this.writerFactory = new DucklakeWriterFactory(config, connectionFactory.getConnection());
440+
this.writerFactory =
441+
new DucklakeWriterFactory(config, connectionFactory.getConnection(), ducklakeMetrics);
431442

432443
// Create one writer and buffer for each partition
433444
for (TopicPartition partition : partitions) {
@@ -453,7 +464,8 @@ public void open(Collection<TopicPartition> partitions) {
453464
}
454465
}
455466

456-
spillableBuffers.put(partition, new SpillablePartitionBuffer(partitionSpillDir));
467+
spillableBuffers.put(
468+
partition, new SpillablePartitionBuffer(partitionSpillDir, ducklakeMetrics));
457469
LOG.info("Created writer and spillable buffer for partition: {}", partition);
458470
} else {
459471
buffers.put(partition, new PartitionBuffer());
@@ -782,19 +794,30 @@ private void flushBatches(TopicPartition partition, FlushData flushData) {
782794

783795
List<VectorSchemaRoot> consolidated;
784796
try {
785-
consolidated = BatchConsolidator.consolidate(flushData.batches);
797+
try (var timer = ducklakeMetrics.startConsolidationTimer()) {
798+
consolidated = BatchConsolidator.consolidate(flushData.batches);
799+
}
800+
if (consolidated.size() > 1) {
801+
ducklakeMetrics.recordSchemaMismatch();
802+
}
786803
} catch (RuntimeException e) {
787804
// Consolidation failed mid-append — source batches may be partially consumed.
788805
// Close all original batches to avoid leaking Arrow memory.
789806
closeBatches(flushData.batches);
790807
throw e;
791808
}
792809
try {
793-
for (VectorSchemaRoot root : consolidated) {
794-
if (root.getRowCount() > 0) {
795-
writer.write(root);
810+
String partitionKey = partition.topic() + "-" + partition.partition();
811+
try (var timer = ducklakeMetrics.startFlushTimer(partitionKey)) {
812+
for (VectorSchemaRoot root : consolidated) {
813+
if (root.getRowCount() > 0) {
814+
writer.write(root);
815+
}
796816
}
797817
}
818+
if (flushData.recordCount > 0) {
819+
ducklakeMetrics.recordBatchProcessed(flushData.recordCount);
820+
}
798821
} catch (RuntimeException e) {
799822
LOG.error("Failed to write buffered data for partition: {}", partition, e);
800823
throw e;
@@ -1058,6 +1081,7 @@ private void handleSchemaConflictWithDLQ(
10581081

10591082
try {
10601083
errantRecordReporter.report(record, recordError);
1084+
ducklakeMetrics.recordErrantRecord();
10611085
} catch (Exception dlqError) {
10621086
LOG.error(
10631087
"Failed to report record to DLQ: topic={}, partition={}, offset={}",
@@ -1208,6 +1232,20 @@ public void stop() {
12081232
}
12091233
} catch (Exception e) {
12101234
throw new RuntimeException("Failed to stop DucklakeSinkTask", e);
1235+
} finally {
1236+
try {
1237+
ducklakeMetrics.close();
1238+
} catch (Exception e) {
1239+
LOG.warn("Failed closing metrics: {}", e.getMessage());
1240+
}
1241+
if (metricsRegistry != null) {
1242+
try {
1243+
metricsRegistry.close();
1244+
} catch (Exception e) {
1245+
LOG.warn("Failed closing metrics registry: {}", e.getMessage());
1246+
}
1247+
metricsRegistry = null;
1248+
}
12111249
}
12121250
}
12131251
}

src/main/java/com/inyo/ducklake/connect/DucklakeWriterFactory.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,14 +24,14 @@ public final class DucklakeWriterFactory {
2424

2525
private final DucklakeSinkConfig config;
2626
private final DuckDBConnection conn;
27-
private final DucklakeMetrics metrics;
27+
private final DucklakeMetricsInterface metrics;
2828

2929
public DucklakeWriterFactory(DucklakeSinkConfig config, DuckDBConnection conn) {
30-
this(config, conn, null);
30+
this(config, conn, NoopDucklakeMetrics.INSTANCE);
3131
}
3232

3333
public DucklakeWriterFactory(
34-
DucklakeSinkConfig config, DuckDBConnection conn, DucklakeMetrics metrics) {
34+
DucklakeSinkConfig config, DuckDBConnection conn, DucklakeMetricsInterface metrics) {
3535
this.config = config;
3636
this.metrics = metrics;
3737
try {
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
/*
2+
* Copyright 2025 Inyo Contributors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.inyo.ducklake.connect;
17+
18+
/** No-op metrics implementation that discards all recordings. */
19+
public final class NoopDucklakeMetrics implements DucklakeMetricsInterface {
20+
21+
public static final NoopDucklakeMetrics INSTANCE = new NoopDucklakeMetrics();
22+
23+
private static final MetricTimer NOOP_TIMER = () -> {};
24+
25+
private NoopDucklakeMetrics() {}
26+
27+
@Override
28+
public void recordJdbcQuery(long durationNanos) {}
29+
30+
@Override
31+
public void recordJdbcQuery(long durationNanos, String operationType) {}
32+
33+
@Override
34+
public void recordSchemaOperation(long durationNanos) {}
35+
36+
@Override
37+
public void recordSchemaOperation(long durationNanos, String operationType) {}
38+
39+
@Override
40+
public void recordBatchProcessed(long recordCount) {}
41+
42+
@Override
43+
public MetricTimer startJdbcQueryTimer() {
44+
return NOOP_TIMER;
45+
}
46+
47+
@Override
48+
public MetricTimer startJdbcQueryTimer(String operationType) {
49+
return NOOP_TIMER;
50+
}
51+
52+
@Override
53+
public MetricTimer startSchemaOperationTimer() {
54+
return NOOP_TIMER;
55+
}
56+
57+
@Override
58+
public MetricTimer startSchemaOperationTimer(String operationType) {
59+
return NOOP_TIMER;
60+
}
61+
62+
@Override
63+
public void recordFlushDuration(long durationNanos, String partition) {}
64+
65+
@Override
66+
public MetricTimer startFlushTimer(String partition) {
67+
return NOOP_TIMER;
68+
}
69+
70+
@Override
71+
public void recordSchemaMismatch() {}
72+
73+
@Override
74+
public void recordFlushSkip() {}
75+
76+
@Override
77+
public void recordErrantRecord() {}
78+
79+
@Override
80+
public void recordSpill(long bytes) {}
81+
82+
@Override
83+
public void recordConsolidationDuration(long durationNanos) {}
84+
85+
@Override
86+
public MetricTimer startConsolidationTimer() {
87+
return NOOP_TIMER;
88+
}
89+
90+
@Override
91+
public void close() {}
92+
}

src/main/java/com/inyo/ducklake/connect/SpillablePartitionBuffer.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ public class SpillablePartitionBuffer {
4444
private static final Logger LOG = LoggerFactory.getLogger(SpillablePartitionBuffer.class);
4545

4646
private final Path spillDirectory;
47+
private final DucklakeMetricsInterface metrics;
4748
private final List<SpilledBatch> spilledBatches = new ArrayList<>();
4849
private long recordCount = 0;
4950
private long estimatedBytes = 0;
@@ -73,7 +74,15 @@ private static class SpilledBatch {
7374
value = "CT_CONSTRUCTOR_THROW",
7475
justification = "Failing fast on invalid spill directory is intentional")
7576
public SpillablePartitionBuffer(Path spillDirectory) {
77+
this(spillDirectory, NoopDucklakeMetrics.INSTANCE);
78+
}
79+
80+
@SuppressFBWarnings(
81+
value = "CT_CONSTRUCTOR_THROW",
82+
justification = "Failing fast on invalid spill directory is intentional")
83+
public SpillablePartitionBuffer(Path spillDirectory, DucklakeMetricsInterface metrics) {
7684
this.spillDirectory = spillDirectory;
85+
this.metrics = metrics;
7786
try {
7887
Files.createDirectories(spillDirectory);
7988
} catch (IOException e) {
@@ -106,6 +115,7 @@ public void add(VectorSchemaRoot root) {
106115
recordCount += root.getRowCount();
107116
estimatedBytes += byteSize;
108117

118+
metrics.recordSpill(byteSize);
109119
LOG.debug(
110120
"Spilled batch to {}: {} rows, {} bytes",
111121
spillFile.getFileName(),

0 commit comments

Comments
 (0)