Skip to content

Commit f2639de

Browse files
committed
add real counters
1 parent c43be58 commit f2639de

File tree

5 files changed

+55
-18
lines changed

5 files changed

+55
-18
lines changed

kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/KafkaStreamsApp.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,8 @@
4646
import org.apache.kafka.streams.kstream.KStream;
4747
import org.hypertrace.core.grpcutils.client.GrpcChannelRegistry;
4848
import org.hypertrace.core.grpcutils.client.GrpcRegistryConfig;
49+
import org.hypertrace.core.kafkastreams.framework.interceptors.StreamsBuilderWithInterceptor;
50+
import org.hypertrace.core.kafkastreams.framework.interceptors.metrics.MetricsInterceptorFactory;
4951
import org.hypertrace.core.kafkastreams.framework.listeners.LoggingStateListener;
5052
import org.hypertrace.core.kafkastreams.framework.listeners.LoggingStateRestoreListener;
5153
import org.hypertrace.core.kafkastreams.framework.rocksdb.BoundedMemoryConfigSetter;
@@ -103,8 +105,13 @@ protected void doInit() {
103105
// build topologies
104106
Map<String, KStream<?, ?>> sourceStreams = new HashMap<>();
105107
StreamsBuilder streamsBuilder = new StreamsBuilder();
108+
109+
// initialize MetricsInterceptorFactory
110+
MetricsInterceptorFactory metricsInterceptorFactory = new MetricsInterceptorFactory();
106111
streamsBuilder =
107-
new StreamsBuilderWithInterceptor(streamsBuilder, List.of(MetricsProcessor::new));
112+
new StreamsBuilderWithInterceptor(
113+
streamsBuilder, List.of(metricsInterceptorFactory::create));
114+
108115
streamsBuilder = buildTopology(streamsConfig, streamsBuilder, sourceStreams);
109116
this.topology = streamsBuilder.build();
110117

kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/MetricsProcessor.java

Lines changed: 0 additions & 16 deletions
This file was deleted.

kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/StreamsBuilderWithInterceptor.java renamed to kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/interceptors/StreamsBuilderWithInterceptor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package org.hypertrace.core.kafkastreams.framework;
1+
package org.hypertrace.core.kafkastreams.framework.interceptors;
22

33
import java.util.Collection;
44
import java.util.List;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
package org.hypertrace.core.kafkastreams.framework.interceptors.metrics;
2+
3+
import io.micrometer.core.instrument.Counter;
4+
import org.apache.kafka.streams.processor.api.Processor;
5+
import org.apache.kafka.streams.processor.api.Record;
6+
7+
public class MetricsInterceptor implements Processor {
8+
9+
private final Counter timeLagCounter;
10+
private final Counter numRecordsCounter;
11+
12+
MetricsInterceptor(Counter numRecordsCounter, Counter timeLagCounter) {
13+
this.numRecordsCounter = numRecordsCounter;
14+
this.timeLagCounter = timeLagCounter;
15+
}
16+
17+
@Override
18+
public void process(Record record) {
19+
timeLagCounter.increment(System.currentTimeMillis() - record.timestamp());
20+
numRecordsCounter.increment();
21+
}
22+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
package org.hypertrace.core.kafkastreams.framework.interceptors.metrics;
2+
3+
import io.micrometer.core.instrument.Counter;
4+
import java.util.Collections;
5+
import org.hypertrace.core.serviceframework.metrics.PlatformMetricsRegistry;
6+
7+
public class MetricsInterceptorFactory {
8+
private static final String TIME_LAG_COUNTER_NAME = "time_lag";
9+
private static final String NUM_RECORDS_COUNTER_NAME = "num_records";
10+
11+
private final Counter timeLagCounter;
12+
private final Counter numRecordsCounter;
13+
14+
public MetricsInterceptorFactory() {
15+
this.numRecordsCounter =
16+
PlatformMetricsRegistry.registerCounter(NUM_RECORDS_COUNTER_NAME, Collections.emptyMap());
17+
this.timeLagCounter =
18+
PlatformMetricsRegistry.registerCounter(TIME_LAG_COUNTER_NAME, Collections.emptyMap());
19+
}
20+
21+
public MetricsInterceptor create() {
22+
return new MetricsInterceptor(numRecordsCounter, timeLagCounter);
23+
}
24+
}

0 commit comments

Comments
 (0)