Skip to content

Commit cc6bec0

Browse files
committed
add unit tests
1 parent ec3a154 commit cc6bec0

File tree

6 files changed

+166
-3
lines changed

6 files changed

+166
-3
lines changed

kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/interceptors/metrics/MetricsInterceptor.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ public class MetricsInterceptor implements Processor {
1616

1717
@Override
1818
public void process(Record record) {
19-
System.out.println("Inside custom processor");
2019
timeLagCounter.increment(System.currentTimeMillis() - record.timestamp());
2120
numRecordsCounter.increment();
2221
}

kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/interceptors/metrics/MetricsInterceptorFactory.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,8 @@
55
import org.hypertrace.core.serviceframework.metrics.PlatformMetricsRegistry;
66

77
public class MetricsInterceptorFactory {
8-
private static final String TIME_LAG_COUNTER_NAME = "time_lag";
9-
private static final String NUM_RECORDS_COUNTER_NAME = "num_records";
8+
private static final String TIME_LAG_COUNTER_NAME = "kafka_records_time_lag";
9+
private static final String NUM_RECORDS_COUNTER_NAME = "kafka_records_count";
1010

1111
private final Counter timeLagCounter;
1212
private final Counter numRecordsCounter;
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
package org.hypertrace.core.kafkastreams.framework.interceptors;
2+
3+
import java.util.List;
4+
import org.apache.kafka.streams.KeyValue;
5+
import org.apache.kafka.streams.processor.api.Processor;
6+
import org.apache.kafka.streams.processor.api.Record;
7+
8+
public class CachingInterceptor implements Processor<String, String, Void, Void> {
9+
10+
private List<KeyValue<String, String>> pairs;
11+
12+
public CachingInterceptor(List<KeyValue<String, String>> pairs) {
13+
this.pairs = pairs;
14+
}
15+
16+
@Override
17+
public void process(Record<String, String> record) {
18+
pairs.add(new KeyValue<>(record.key(), record.value()));
19+
}
20+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package org.hypertrace.core.kafkastreams.framework.interceptors;
2+
3+
import java.util.ArrayList;
4+
import java.util.List;
5+
import org.apache.kafka.streams.KeyValue;
6+
7+
public class CachingInterceptorFactory {
8+
private List<KeyValue<String, String>> pairs;
9+
10+
CachingInterceptorFactory() {
11+
this.pairs = new ArrayList<>();
12+
}
13+
14+
CachingInterceptor create() {
15+
return new CachingInterceptor(pairs);
16+
}
17+
18+
List<KeyValue<String, String>> getCache() {
19+
return pairs;
20+
}
21+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
package org.hypertrace.core.kafkastreams.framework.interceptors;
2+
3+
import static org.hamcrest.CoreMatchers.equalTo;
4+
import static org.hamcrest.CoreMatchers.is;
5+
import static org.hamcrest.MatcherAssert.assertThat;
6+
import static org.junit.jupiter.api.Assertions.assertEquals;
7+
8+
import java.util.List;
9+
import java.util.Properties;
10+
import org.apache.kafka.common.serialization.Serdes;
11+
import org.apache.kafka.streams.KeyValue;
12+
import org.apache.kafka.streams.StreamsBuilder;
13+
import org.apache.kafka.streams.TestInputTopic;
14+
import org.apache.kafka.streams.TestOutputTopic;
15+
import org.apache.kafka.streams.Topology;
16+
import org.apache.kafka.streams.TopologyTestDriver;
17+
import org.apache.kafka.streams.kstream.KStream;
18+
import org.junit.jupiter.api.AfterEach;
19+
import org.junit.jupiter.api.BeforeEach;
20+
import org.junit.jupiter.api.Test;
21+
22+
public class StreamsBuilderWithInterceptorTest {
23+
24+
private static final String INPUT_TOPIC = "input";
25+
private static final String OUTPUT_TOPIC = "output";
26+
27+
private StreamsBuilder streamsBuilder;
28+
private Topology topology;
29+
private TopologyTestDriver td;
30+
private TestInputTopic<String, String> inputTopic;
31+
private TestOutputTopic<String, String> outputTopic;
32+
33+
private Properties streamsConfig;
34+
35+
CachingInterceptorFactory factory;
36+
37+
@BeforeEach
38+
public void setup() {
39+
40+
factory = new CachingInterceptorFactory();
41+
streamsBuilder = new StreamsBuilderWithInterceptor(List.of(factory::create));
42+
KStream<String, String> stream = streamsBuilder.stream(INPUT_TOPIC);
43+
stream.to(OUTPUT_TOPIC);
44+
topology = streamsBuilder.build();
45+
46+
streamsConfig = new Properties();
47+
streamsConfig.put("application.id", "test-interceptor");
48+
streamsConfig.put("bootstrap.servers", "dummy:1234");
49+
streamsConfig.put("default.key.serde", Serdes.StringSerde.class.getName());
50+
streamsConfig.put("default.value.serde", Serdes.StringSerde.class.getName());
51+
td = new TopologyTestDriver(topology, streamsConfig);
52+
}
53+
54+
@AfterEach
55+
public void tearDown() {
56+
td.close();
57+
}
58+
59+
@Test
60+
public void interceptorTest() {
61+
inputTopic =
62+
td.createInputTopic(
63+
INPUT_TOPIC, Serdes.String().serializer(), Serdes.String().serializer());
64+
outputTopic =
65+
td.createOutputTopic(
66+
OUTPUT_TOPIC, Serdes.String().deserializer(), Serdes.String().deserializer());
67+
68+
assertThat(outputTopic.isEmpty(), is(true));
69+
70+
inputTopic.pipeInput("fooooo", "barrrrr");
71+
assertThat(outputTopic.readValue(), equalTo("barrrrr"));
72+
assertThat(outputTopic.isEmpty(), is(true));
73+
74+
inputTopic.pipeInput("foo", "bar");
75+
assertThat(outputTopic.readValue(), equalTo("bar"));
76+
assertThat(outputTopic.isEmpty(), is(true));
77+
78+
List<KeyValue<String, String>> cache = factory.getCache();
79+
assertThat(cache.size(), is(2));
80+
KeyValue<String, String> pair = cache.get(0);
81+
assertEquals("fooooo", pair.key);
82+
assertEquals("barrrrr", pair.value);
83+
84+
pair = cache.get(1);
85+
assertEquals("foo", pair.key);
86+
assertEquals("bar", pair.value);
87+
}
88+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
package org.hypertrace.core.kafkastreams.framework.interceptors.metrics;
2+
3+
import static org.mockito.ArgumentMatchers.anyDouble;
4+
import static org.mockito.Mockito.mock;
5+
import static org.mockito.Mockito.times;
6+
import static org.mockito.Mockito.verify;
7+
8+
import io.micrometer.core.instrument.Counter;
9+
import org.apache.kafka.streams.processor.api.Record;
10+
import org.junit.jupiter.api.BeforeEach;
11+
import org.junit.jupiter.api.Test;
12+
13+
public class MetricsInterceptorTest {
14+
15+
private Counter timeLagCounter;
16+
private Counter numRecordsCounter;
17+
private MetricsInterceptor interceptor;
18+
19+
@BeforeEach
20+
void setup() {
21+
timeLagCounter = mock(Counter.class);
22+
numRecordsCounter = mock(Counter.class);
23+
interceptor = new MetricsInterceptor(numRecordsCounter, timeLagCounter);
24+
}
25+
26+
@Test
27+
void shouldIncrementCounters() {
28+
Record<Object, Object> record =
29+
new Record<>("key", "value", System.currentTimeMillis() - 50000);
30+
interceptor.process(record);
31+
32+
verify(numRecordsCounter, times(1)).increment();
33+
verify(timeLagCounter, times(1)).increment(anyDouble());
34+
}
35+
}

0 commit comments

Comments
 (0)