Skip to content

Commit 6f29133

Browse files
Added intercepted channel registry in kafka streams app for its implementers (#55)
1 parent 81a6473 commit 6f29133

File tree

2 files changed

+21
-0
lines changed

2 files changed

+21
-0
lines changed

kafka-streams-framework/build.gradle.kts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ dependencies {
1414
api(project(":kafka-streams-serdes"))
1515
api("org.apache.kafka:kafka-streams:7.2.1-ccs")
1616
api("io.confluent:kafka-streams-avro-serde:7.2.1")
17+
api("org.hypertrace.core.grpcutils:grpc-client-utils:0.11.2")
1718

1819
implementation("org.apache.avro:avro:1.11.1")
1920
implementation("org.apache.kafka:kafka-clients:7.2.1-ccs")

kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/KafkaStreamsApp.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package org.hypertrace.core.kafkastreams.framework;
22

3+
import static io.grpc.Deadline.after;
4+
import static java.util.concurrent.TimeUnit.SECONDS;
35
import static org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG;
46
import static org.apache.kafka.clients.producer.ProducerConfig.ACKS_CONFIG;
57
import static org.apache.kafka.clients.producer.ProducerConfig.BATCH_SIZE_CONFIG;
@@ -23,6 +25,7 @@
2325
import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde;
2426
import io.micrometer.core.instrument.Tag;
2527
import io.micrometer.core.instrument.Tags;
28+
import io.micrometer.core.instrument.binder.grpc.MetricCollectingClientInterceptor;
2629
import io.micrometer.core.instrument.binder.kafka.KafkaStreamsMetrics;
2730
import java.time.Duration;
2831
import java.util.ArrayList;
@@ -39,6 +42,8 @@
3942
import org.apache.kafka.streams.Topology;
4043
import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler;
4144
import org.apache.kafka.streams.kstream.KStream;
45+
import org.hypertrace.core.grpcutils.client.GrpcChannelRegistry;
46+
import org.hypertrace.core.grpcutils.client.GrpcRegistryConfig;
4247
import org.hypertrace.core.kafkastreams.framework.listeners.LoggingStateListener;
4348
import org.hypertrace.core.kafkastreams.framework.listeners.LoggingStateRestoreListener;
4449
import org.hypertrace.core.kafkastreams.framework.rocksdb.BoundedMemoryConfigSetter;
@@ -58,6 +63,9 @@ public abstract class KafkaStreamsApp extends PlatformService {
5863
public static final String PRE_CREATE_TOPICS = "precreate.topics";
5964
public static final String KAFKA_STREAMS_CONFIG_KEY = "kafka.streams.config";
6065
private static final Logger logger = LoggerFactory.getLogger(KafkaStreamsApp.class);
66+
67+
private final GrpcChannelRegistry grpcChannelRegistry;
68+
6169
protected KafkaStreams app;
6270
private KafkaStreamsMetrics metrics;
6371

@@ -67,6 +75,17 @@ public abstract class KafkaStreamsApp extends PlatformService {
6775

6876
public KafkaStreamsApp(ConfigClient configClient) {
6977
super(configClient);
78+
this.grpcChannelRegistry =
79+
new GrpcChannelRegistry(
80+
GrpcRegistryConfig.builder()
81+
.defaultInterceptor(
82+
new MetricCollectingClientInterceptor(
83+
PlatformMetricsRegistry.getMeterRegistry()))
84+
.build());
85+
}
86+
87+
protected GrpcChannelRegistry getGrpcChannelRegistry() {
88+
return grpcChannelRegistry;
7089
}
7190

7291
@Override
@@ -147,6 +166,7 @@ protected void doStop() {
147166
metrics.close();
148167
}
149168
app.close(Duration.ofSeconds(30));
169+
grpcChannelRegistry.shutdown(after(10, SECONDS));
150170
}
151171

152172
/**

0 commit comments

Comments
 (0)