Skip to content

Commit c43be58

Browse files
committed
add new interceptor framework
1 parent 0aa8c44 commit c43be58

File tree

3 files changed

+61
-0
lines changed

3 files changed

+61
-0
lines changed

kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/KafkaStreamsApp.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,8 @@ protected void doInit() {
103103
// build topologies
104104
Map<String, KStream<?, ?>> sourceStreams = new HashMap<>();
105105
StreamsBuilder streamsBuilder = new StreamsBuilder();
106+
streamsBuilder =
107+
new StreamsBuilderWithInterceptor(streamsBuilder, List.of(MetricsProcessor::new));
106108
streamsBuilder = buildTopology(streamsConfig, streamsBuilder, sourceStreams);
107109
this.topology = streamsBuilder.build();
108110

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package org.hypertrace.core.kafkastreams.framework;
2+
3+
import java.time.Instant;
4+
import org.apache.kafka.streams.processor.api.Processor;
5+
import org.apache.kafka.streams.processor.api.Record;
6+
7+
public class MetricsProcessor implements Processor {
8+
9+
@Override
10+
public void process(Record record) {
11+
System.out.println("Hello world");
12+
System.out.println(record.timestamp());
13+
Instant timestamp = Instant.ofEpochMilli(record.timestamp());
14+
System.out.println(timestamp); // e.g., 2026-03-10T12:17:00Z
15+
}
16+
}
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
package org.hypertrace.core.kafkastreams.framework;
2+
3+
import java.util.Collection;
4+
import java.util.List;
5+
import java.util.regex.Pattern;
6+
import org.apache.kafka.streams.StreamsBuilder;
7+
import org.apache.kafka.streams.Topology;
8+
import org.apache.kafka.streams.kstream.Consumed;
9+
import org.apache.kafka.streams.kstream.KStream;
10+
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
11+
12+
public class StreamsBuilderWithInterceptor extends StreamsBuilder {
13+
14+
private final StreamsBuilder delegate;
15+
private final List<ProcessorSupplier> processorSuppliers;
16+
17+
public StreamsBuilderWithInterceptor(
18+
StreamsBuilder delegate, List<ProcessorSupplier> processorSuppliers) {
19+
this.delegate = delegate;
20+
this.processorSuppliers = processorSuppliers;
21+
}
22+
23+
@Override
24+
public synchronized Topology build() {
25+
return delegate.build();
26+
}
27+
28+
@Override
29+
public synchronized <K, V> KStream<K, V> stream(
30+
final Pattern topicPattern, final Consumed<K, V> consumed) {
31+
KStream<K, V> stream = delegate.stream(topicPattern, consumed);
32+
processorSuppliers.forEach(stream::process);
33+
return stream;
34+
}
35+
36+
@Override
37+
public synchronized <K, V> KStream<K, V> stream(
38+
final Collection<String> topics, final Consumed<K, V> consumed) {
39+
KStream<K, V> stream = delegate.stream(topics, consumed);
40+
processorSuppliers.forEach(stream::process);
41+
return stream;
42+
}
43+
}

0 commit comments

Comments
 (0)