Skip to content

Commit ec3a154

Browse files
committed
fix working
1 parent 38dc78a commit ec3a154

File tree

2 files changed

+5
-17
lines changed

2 files changed

+5
-17
lines changed

kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/KafkaStreamsApp.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -104,13 +104,10 @@ protected void doInit() {
104104
streamsConfig = mergeProperties(getBaseStreamsConfig(), getJobStreamsConfig(getAppConfig()));
105105
// build topologies
106106
Map<String, KStream<?, ?>> sourceStreams = new HashMap<>();
107-
StreamsBuilder streamsBuilder = new StreamsBuilder();
108-
109107
// initialize MetricsInterceptorFactory
110108
MetricsInterceptorFactory metricsInterceptorFactory = new MetricsInterceptorFactory();
111-
streamsBuilder =
112-
new StreamsBuilderWithInterceptor(
113-
streamsBuilder, List.of(metricsInterceptorFactory::create));
109+
StreamsBuilder streamsBuilder =
110+
new StreamsBuilderWithInterceptor(List.of(metricsInterceptorFactory::create));
114111

115112
streamsBuilder = buildTopology(streamsConfig, streamsBuilder, sourceStreams);
116113
this.topology = streamsBuilder.build();

kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/interceptors/StreamsBuilderWithInterceptor.java

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -4,39 +4,30 @@
44
import java.util.List;
55
import java.util.regex.Pattern;
66
import org.apache.kafka.streams.StreamsBuilder;
7-
import org.apache.kafka.streams.Topology;
87
import org.apache.kafka.streams.kstream.Consumed;
98
import org.apache.kafka.streams.kstream.KStream;
109
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
1110

1211
public class StreamsBuilderWithInterceptor extends StreamsBuilder {
1312

14-
private final StreamsBuilder delegate;
1513
private final List<ProcessorSupplier> processorSuppliers;
1614

17-
public StreamsBuilderWithInterceptor(
18-
StreamsBuilder delegate, List<ProcessorSupplier> processorSuppliers) {
19-
this.delegate = delegate;
15+
public StreamsBuilderWithInterceptor(List<ProcessorSupplier> processorSuppliers) {
2016
this.processorSuppliers = processorSuppliers;
2117
}
2218

23-
@Override
24-
public synchronized Topology build() {
25-
return delegate.build();
26-
}
27-
2819
@Override
2920
public synchronized <K, V> KStream<K, V> stream(
3021
final Pattern topicPattern, final Consumed<K, V> consumed) {
31-
KStream<K, V> stream = delegate.stream(topicPattern, consumed);
22+
KStream<K, V> stream = super.stream(topicPattern, consumed);
3223
processorSuppliers.forEach(stream::process);
3324
return stream;
3425
}
3526

3627
@Override
3728
public synchronized <K, V> KStream<K, V> stream(
3829
final Collection<String> topics, final Consumed<K, V> consumed) {
39-
KStream<K, V> stream = delegate.stream(topics, consumed);
30+
KStream<K, V> stream = super.stream(topics, consumed);
4031
processorSuppliers.forEach(stream::process);
4132
return stream;
4233
}

0 commit comments

Comments
 (0)