Skip to content

Commit da78aa6

Browse files
authored
Wighted group partitioner (#51)
1 parent e670394 commit da78aa6

File tree

21 files changed

+524
-7
lines changed

21 files changed

+524
-7
lines changed

kafka-streams-framework/build.gradle.kts

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,18 +11,14 @@ tasks.test {
1111
}
1212

1313
dependencies {
14-
annotationProcessor("org.projectlombok:lombok:1.18.24")
15-
compileOnly("org.projectlombok:lombok:1.18.24")
16-
1714
api(project(":kafka-streams-serdes"))
1815
api("org.apache.kafka:kafka-streams:7.2.1-ccs")
1916
api("io.confluent:kafka-streams-avro-serde:7.2.1")
2017

21-
implementation("com.google.guava:guava:31.1-jre")
2218
implementation("org.apache.avro:avro:1.11.1")
2319
implementation("org.apache.kafka:kafka-clients:7.2.1-ccs")
24-
implementation("org.hypertrace.core.serviceframework:platform-metrics:0.1.39")
25-
implementation("org.hypertrace.core.serviceframework:platform-service-framework:0.1.39")
20+
implementation("org.hypertrace.core.serviceframework:platform-metrics:0.1.47")
21+
implementation("org.hypertrace.core.serviceframework:platform-service-framework:0.1.47")
2622
implementation("org.apache.commons:commons-lang3:3.12.0")
2723

2824
testImplementation("org.apache.kafka:kafka-streams-test-utils:7.2.1-ccs")

kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/KafkaStreamsApp.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import static org.apache.kafka.clients.producer.ProducerConfig.COMPRESSION_TYPE_CONFIG;
77
import static org.apache.kafka.clients.producer.ProducerConfig.LINGER_MS_CONFIG;
88
import static org.apache.kafka.common.config.TopicConfig.RETENTION_MS_CONFIG;
9+
import static org.apache.kafka.streams.StreamsConfig.APPLICATION_ID_CONFIG;
910
import static org.apache.kafka.streams.StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG;
1011
import static org.apache.kafka.streams.StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG;
1112
import static org.apache.kafka.streams.StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG;
@@ -20,6 +21,8 @@
2021
import com.google.common.collect.Streams;
2122
import com.typesafe.config.Config;
2223
import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde;
24+
import io.micrometer.core.instrument.Tag;
25+
import io.micrometer.core.instrument.Tags;
2326
import io.micrometer.core.instrument.binder.kafka.KafkaStreamsMetrics;
2427
import java.time.Duration;
2528
import java.util.ArrayList;
@@ -90,7 +93,11 @@ protected void doInit() {
9093
app = new KafkaStreams(topology, streamsConfigProps);
9194

9295
// export kafka streams metrics
93-
metrics = new KafkaStreamsMetrics(app);
96+
metrics =
97+
new KafkaStreamsMetrics(
98+
app,
99+
Tags.of(
100+
Tag.of("kstreams.app", streamsConfigProps.getProperty(APPLICATION_ID_CONFIG))));
94101
metrics.bindTo(PlatformMetricsRegistry.getMeterRegistry());
95102

96103
// useful for resetting local state - during testing or any other scenarios where
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
plugins {
2+
`java-library`
3+
jacoco
4+
id("org.hypertrace.avro-plugin")
5+
id("org.hypertrace.publish-plugin")
6+
id("org.hypertrace.jacoco-report-plugin")
7+
}
8+
9+
tasks.test {
10+
useJUnitPlatform()
11+
}
12+
13+
dependencies {
14+
annotationProcessor("org.projectlombok:lombok:1.18.24")
15+
compileOnly("org.projectlombok:lombok:1.18.24")
16+
17+
implementation("com.google.guava:guava:31.1-jre")
18+
implementation("org.apache.avro:avro:1.11.1")
19+
implementation("com.typesafe:config:1.4.2")
20+
implementation("org.apache.kafka:kafka-clients:7.2.1-ccs")
21+
implementation("org.apache.kafka:kafka-streams:7.2.1-ccs")
22+
implementation("org.slf4j:slf4j-api:1.7.36")
23+
24+
testImplementation("org.junit.jupiter:junit-jupiter:5.8.2")
25+
testImplementation("org.junit-pioneer:junit-pioneer:1.7.1")
26+
testImplementation("org.mockito:mockito-core:4.5.1")
27+
testRuntimeOnly("org.apache.logging.log4j:log4j-slf4j-impl:2.17.2")
28+
}
29+
30+
// Disabling compatibility check for the test avro definitions.
31+
tasks.named<org.hypertrace.gradle.avro.CheckAvroCompatibility>("avroCompatibilityCheck") {
32+
enabled = false
33+
setAgainstFiles(null)
34+
}

kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/partitioner/AvroFieldValuePartitioner.java renamed to kafka-streams-partitioners/avro-partitioners/src/main/java/org/hypertrace/core/kafkastreams/framework/partitioner/AvroFieldValuePartitioner.java

File renamed without changes.

kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/partitioner/AvroFieldValuePartitionerConfig.java renamed to kafka-streams-partitioners/avro-partitioners/src/main/java/org/hypertrace/core/kafkastreams/framework/partitioner/AvroFieldValuePartitionerConfig.java

File renamed without changes.

kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/partitioner/FilteredUniformStickyPartitionCache.java renamed to kafka-streams-partitioners/avro-partitioners/src/main/java/org/hypertrace/core/kafkastreams/framework/partitioner/FilteredUniformStickyPartitionCache.java

File renamed without changes.

kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/partitioner/FilteredUniformStickyPartitioner.java renamed to kafka-streams-partitioners/avro-partitioners/src/main/java/org/hypertrace/core/kafkastreams/framework/partitioner/FilteredUniformStickyPartitioner.java

File renamed without changes.
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
@namespace("org.hypertrace.core.kafkastreams.framework.partitioner")
2+
protocol TestRecordProtocol {
3+
record TestCustomerRecord {
4+
string customer_id;
5+
string span_id;
6+
string trace_id;
7+
}
8+
9+
record TestTenantRecord {
10+
string tenant_id;
11+
string span_id;
12+
string trace_id;
13+
}
14+
}

kafka-streams-framework/src/test/java/org/hypertrace/core/kafkastreams/framework/partitioner/AvroFieldValuePartitionerTest.java renamed to kafka-streams-partitioners/avro-partitioners/src/test/java/org/hypertrace/core/kafkastreams/framework/partitioner/AvroFieldValuePartitionerTest.java

File renamed without changes.

kafka-streams-framework/src/test/java/org/hypertrace/core/kafkastreams/framework/partitioner/FilteredUniformStickyPartitionerTest.java renamed to kafka-streams-partitioners/avro-partitioners/src/test/java/org/hypertrace/core/kafkastreams/framework/partitioner/FilteredUniformStickyPartitionerTest.java

File renamed without changes.

0 commit comments

Comments
 (0)