Skip to content

Commit d8e801b

Browse files
joeyjacksonspeezepearson
authored andcommitted
interfaces and wrappers for kafka source (#153)
* interfaces and wrappers for kafka source * removed kafka dep * config deserialization * Fixed deserializer * error checking * error checking
1 parent c51e27d commit d8e801b

File tree

5 files changed

+368
-0
lines changed

5 files changed

+368
-0
lines changed

pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -813,6 +813,11 @@
813813
<version>${statsd.client.timgroup}</version>
814814
<scope>test</scope>
815815
</dependency>
816+
<dependency>
817+
<groupId>org.apache.kafka</groupId>
818+
<artifactId>kafka-clients</artifactId>
819+
<version>2.2.1</version>
820+
</dependency>
816821
</dependencies>
817822

818823
<profiles>
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
/*
2+
* Copyright 2019 Dropbox.com
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.arpnetworking.metrics.common.kafka;
17+
18+
import com.arpnetworking.commons.jackson.databind.ObjectMapperFactory;
19+
import com.fasterxml.jackson.core.JsonParser;
20+
import com.fasterxml.jackson.core.JsonProcessingException;
21+
import com.fasterxml.jackson.core.type.TypeReference;
22+
import com.fasterxml.jackson.databind.DeserializationContext;
23+
import com.fasterxml.jackson.databind.JsonDeserializer;
24+
import com.fasterxml.jackson.databind.JsonMappingException;
25+
import com.fasterxml.jackson.databind.JsonNode;
26+
import com.fasterxml.jackson.databind.ObjectMapper;
27+
import com.fasterxml.jackson.databind.deser.std.JsonNodeDeserializer;
28+
import com.fasterxml.jackson.databind.exc.MismatchedInputException;
29+
import com.fasterxml.jackson.databind.node.ObjectNode;
30+
import org.apache.kafka.clients.consumer.Consumer;
31+
import org.apache.kafka.clients.consumer.KafkaConsumer;
32+
import org.apache.kafka.common.KafkaException;
33+
import org.apache.kafka.common.config.ConfigException;
34+
35+
import java.io.IOException;
36+
import java.util.List;
37+
import java.util.Map;
38+
39+
/**
40+
* Jackson <code>JsonDeserializer</code> implementation for <code>Consumer</code>.
41+
*
42+
* @param <K> the type of key field in <code>Consumer</code>
43+
* @param <V> the type of value field in <code>Consumer</code>
44+
*
45+
* @author Joey Jackson (jjackson at dropbox dot com)
46+
*/
47+
public class ConsumerDeserializer<K, V> extends JsonDeserializer<Consumer<K, V>> {
48+
49+
@Override
50+
public Consumer<K, V> deserialize(final JsonParser parser, final DeserializationContext context)
51+
throws IOException, JsonProcessingException {
52+
// Parse input json into JsonNode Tree
53+
final JsonDeserializer<? extends JsonNode> deserializer = JsonNodeDeserializer.getDeserializer(ObjectNode.class);
54+
final JsonNode node = deserializer.deserialize(parser, context);
55+
56+
// Pull out configs and topics fields and convert deserialize with standard mapper
57+
final JsonNode configNode = node.get("configs");
58+
final JsonNode topicsNode = node.get("topics");
59+
if (configNode == null) {
60+
throw MismatchedInputException.from(parser, Consumer.class,
61+
"Consumer object missing \"configs\" field");
62+
}
63+
if (topicsNode == null) {
64+
throw MismatchedInputException.from(parser, Consumer.class,
65+
"Consumer object missing \"topics\" field");
66+
}
67+
68+
final ObjectMapper mapper = ObjectMapperFactory.getInstance();
69+
final TypeReference<Map<String, String>> configsType = new TypeReference<Map<String, String>>() {};
70+
final TypeReference<List<String>> topicsType = new TypeReference<List<String>>() {};
71+
final Map<String, Object> configs;
72+
final List<String> topics;
73+
74+
try {
75+
configs = mapper.convertValue(configNode, configsType);
76+
} catch (final IllegalArgumentException e) {
77+
throw new JsonMappingException(parser, "\"configs\" field must be an object", e);
78+
}
79+
80+
try {
81+
topics = mapper.convertValue(topicsNode, topicsType);
82+
} catch (final IllegalArgumentException e) {
83+
throw new JsonMappingException(parser, "\"topics\" field must be a list", e);
84+
}
85+
86+
if (configs == null) {
87+
throw MismatchedInputException.from(parser, Consumer.class,
88+
"\"configs\" field cannot have null value");
89+
}
90+
if (topics == null) {
91+
throw MismatchedInputException.from(parser, Consumer.class,
92+
"\"topics\" field cannot have null value");
93+
}
94+
95+
// Create consumer
96+
try {
97+
final KafkaConsumer<K, V> consumer = new KafkaConsumer<>(configs);
98+
consumer.subscribe(topics);
99+
return consumer;
100+
} catch (final ConfigException e) {
101+
throw new JsonMappingException(parser, "Error in Kafka Consumer configuration", e);
102+
} catch (final KafkaException e) {
103+
throw new JsonMappingException(parser, "Error creating Kafka Consumer", e);
104+
}
105+
106+
}
107+
}
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* Copyright 2019 Dropbox.com
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.arpnetworking.metrics.common.kafka;
17+
18+
import org.apache.kafka.clients.consumer.ConsumerRecord;
19+
20+
/**
21+
* Interface for classes that handle the records polled by a kafka
22+
* <code>Consumer</code>.
23+
*
24+
* @param <T> the type of the value in the consumer records being handled
25+
*
26+
* @author Joey Jackson (jjackson at dropbox dot com)
27+
*/
28+
public interface ConsumerListener<T> {
29+
30+
/**
31+
* Handles a consumer record from the consumer.
32+
*
33+
* @param record the consumer record to be handled
34+
*/
35+
void handle(ConsumerRecord<?, T> record);
36+
37+
/**
38+
* Handles a throwable from the consumer.
39+
*
40+
* @param throwable the throwable to be handled
41+
*/
42+
void handle(Throwable throwable);
43+
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/*
2+
* Copyright 2019 Dropbox.com
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.arpnetworking.metrics.common.kafka;
17+
18+
/**
19+
* Interface for classes which use a kafka <code>Consumer</code> to
20+
* continually poll a kafka topic.
21+
*
22+
* @author Joey Jackson (jjackson at dropbox dot com)
23+
*/
24+
public interface RunnableConsumer extends Runnable {
25+
26+
/**
27+
* Stop the <code>RunnableConsumer</code> instance.
28+
*/
29+
void stop();
30+
}
Lines changed: 183 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,183 @@
1+
/*
2+
* Copyright 2019 Dropbox.com
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.arpnetworking.metrics.common.kafka;
17+
18+
import com.arpnetworking.commons.jackson.databind.ObjectMapperFactory;
19+
import com.fasterxml.jackson.core.JsonParseException;
20+
import com.fasterxml.jackson.core.type.TypeReference;
21+
import com.fasterxml.jackson.databind.JsonMappingException;
22+
import com.fasterxml.jackson.databind.ObjectMapper;
23+
import com.fasterxml.jackson.databind.exc.MismatchedInputException;
24+
import com.fasterxml.jackson.databind.module.SimpleModule;
25+
import com.google.common.collect.Sets;
26+
import org.apache.kafka.clients.consumer.Consumer;
27+
import org.junit.Assert;
28+
import org.junit.BeforeClass;
29+
import org.junit.Test;
30+
31+
import java.io.File;
32+
import java.io.IOException;
33+
import java.nio.file.Files;
34+
35+
/**
36+
* Unit Tests for <code>ConsumerDeserializer</code>.
37+
*
38+
* @author Joey Jackson (jjackson at dropbox dot com)
39+
*/
40+
public class ConsumerDeserializerTest {
41+
42+
private static final String VALID_CONFIGS = "\"configs\": {"
43+
+ "\"group.id\":\"group0\","
44+
+ "\"bootstrap.servers\":\"localhost:9092\","
45+
+ "\"key.deserializer\":\"org.apache.kafka.common.serialization.StringDeserializer\","
46+
+ "\"value.deserializer\":\"org.apache.kafka.common.serialization.StringDeserializer\""
47+
+ "}";
48+
private static final String VALID_TOPICS = "\"topics\":[\"test\"]";
49+
50+
/*
51+
* {
52+
* "configs": {
53+
* "group.id":"group0",
54+
* "bootstrap.servers":"localhost:9092",
55+
* "key.deserializer":"org.apache.kafka.common.serialization.StringDeserializer",
56+
* "value.deserializer":"org.apache.kafka.common.serialization.StringDeserializer"
57+
* },
58+
* "topics":["test"]
59+
* }
60+
*/
61+
private static final String CONSUMER_JSON_VALID = makeJsonObject(VALID_CONFIGS, VALID_TOPICS);
62+
private static final String CONSUMER_JSON_MALFORMED = "{" + VALID_CONFIGS + VALID_TOPICS + "}";
63+
private static final String CONSUMER_JSON_NO_KEY_DESERIALIZER = makeJsonObject(
64+
"\"configs\": {\"group.id\":\"group0\",\"bootstrap.servers\":\"localhost:9092\","
65+
+ "\"value.deserializer\":\"org.apache.kafka.common.serialization.StringDeserializer\"}", VALID_TOPICS);
66+
private static final String CONSUMER_JSON_NO_VALUE_DESERIALIZER = makeJsonObject(
67+
"\"configs\": {\"group.id\":\"group0\",\"bootstrap.servers\":\"localhost:9092\","
68+
+ "\"key.deserializer\":\"org.apache.kafka.common.serialization.StringDeserializer\"}", VALID_TOPICS);
69+
private static final String CONSUMER_JSON_NO_BOOTSTRAP_SERVER = makeJsonObject(
70+
"\"configs\": {\"group.id\":\"group0\","
71+
+ "\"key.deserializer\":\"org.apache.kafka.common.serialization.StringDeserializer\","
72+
+ "\"value.deserializer\":\"org.apache.kafka.common.serialization.StringDeserializer\"}", VALID_TOPICS);
73+
private static final String CONSUMER_JSON_NO_CONFIG = makeJsonObject(VALID_TOPICS);
74+
private static final String CONSUMER_JSON_NO_TOPICS = makeJsonObject(VALID_CONFIGS);
75+
private static final String CONSUMER_JSON_CONFIGS_NOT_OBJECT = makeJsonObject("\"configs\": true", VALID_TOPICS);
76+
private static final String CONSUMER_JSON_CONFIGS_NULL = makeJsonObject("\"configs\": null", VALID_TOPICS);
77+
private static final String CONSUMER_JSON_TOPICS_NOT_LIST = makeJsonObject(VALID_CONFIGS, "\"topics\":\"test\"");
78+
private static final String CONSUMER_JSON_TOPICS_NULL = makeJsonObject(VALID_CONFIGS, "\"topics\":null");
79+
80+
private static final ObjectMapper OBJECT_MAPPER = ObjectMapperFactory.createInstance();
81+
82+
@BeforeClass
83+
public static void setUpClass() throws IOException {
84+
Files.createDirectories(new File("./target/tmp/filter/ConsumerDeserializer").toPath());
85+
final SimpleModule module = new SimpleModule("KafkaConsumer");
86+
module.addDeserializer(Consumer.class, new ConsumerDeserializer<>());
87+
OBJECT_MAPPER.registerModule(module);
88+
}
89+
90+
/* --- Success --- */
91+
@Test
92+
public void testDeserializeConsumerSuccess() throws IOException {
93+
final Consumer<String, String> consumer = createConsumerFromJSON(CONSUMER_JSON_VALID);
94+
Assert.assertEquals(Sets.newHashSet("test"), consumer.subscription());
95+
}
96+
97+
/* --- Invalid JSON --- */
98+
@Test(expected = JsonParseException.class)
99+
public void testDeserializeConsumerMalformedJson() throws IOException {
100+
createConsumerFromJSON(CONSUMER_JSON_MALFORMED);
101+
}
102+
103+
/* --- Not JSON Object --- */
104+
@Test(expected = MismatchedInputException.class)
105+
public void testDeserializeConsumerNotJsonObjectNumber() throws IOException {
106+
createConsumerFromJSON("1000");
107+
}
108+
109+
@Test(expected = MismatchedInputException.class)
110+
public void testDeserializeConsumerNotJsonObjectList() throws IOException {
111+
createConsumerFromJSON("[10, 10]");
112+
}
113+
114+
@Test(expected = MismatchedInputException.class)
115+
public void testDeserializeConsumerNotJsonObjectBoolean() throws IOException {
116+
createConsumerFromJSON("true");
117+
}
118+
119+
/* --- Valid JSON Object but Wrong Field Types--- */
120+
@Test(expected = JsonMappingException.class)
121+
public void testDeserializeConsumerTopicsNotList() throws IOException {
122+
createConsumerFromJSON(CONSUMER_JSON_TOPICS_NOT_LIST);
123+
}
124+
125+
@Test(expected = JsonMappingException.class)
126+
public void testDeserializeConsumerConfigsNotObject() throws IOException {
127+
createConsumerFromJSON(CONSUMER_JSON_CONFIGS_NOT_OBJECT);
128+
}
129+
130+
@Test(expected = MismatchedInputException.class)
131+
public void testDeserializeConsumerTopicsNotListNull() throws IOException {
132+
createConsumerFromJSON(CONSUMER_JSON_TOPICS_NULL);
133+
}
134+
135+
@Test(expected = MismatchedInputException.class)
136+
public void testDeserializeConsumerConfigsNotObjectNull() throws IOException {
137+
createConsumerFromJSON(CONSUMER_JSON_CONFIGS_NULL);
138+
}
139+
140+
/* --- Valid JSON Object but Missing Fields --- */
141+
@Test(expected = MismatchedInputException.class)
142+
public void testDeserializeConsumerMissingConfigsField() throws IOException {
143+
createConsumerFromJSON(CONSUMER_JSON_NO_CONFIG);
144+
}
145+
146+
@Test(expected = MismatchedInputException.class)
147+
public void testDeserializeConsumerMissingTopicsField() throws IOException {
148+
createConsumerFromJSON(CONSUMER_JSON_NO_TOPICS);
149+
}
150+
151+
/* --- Valid JSON but Configs Missing Needed Fields --- */
152+
@Test(expected = JsonMappingException.class)
153+
public void testDeserializeConsumerMissingKeyDeserializer() throws IOException {
154+
createConsumerFromJSON(CONSUMER_JSON_NO_KEY_DESERIALIZER);
155+
}
156+
157+
@Test(expected = JsonMappingException.class)
158+
public void testDeserializeConsumerMissingValueDeserializer() throws IOException {
159+
createConsumerFromJSON(CONSUMER_JSON_NO_VALUE_DESERIALIZER);
160+
}
161+
162+
@Test(expected = JsonMappingException.class)
163+
public void testDeserializeConsumerMissingBootstrapServer() throws IOException {
164+
createConsumerFromJSON(CONSUMER_JSON_NO_BOOTSTRAP_SERVER);
165+
}
166+
167+
private static Consumer<String, String> createConsumerFromJSON(final String jsonString)
168+
throws IOException {
169+
return OBJECT_MAPPER.readValue(jsonString, new TypeReference<Consumer<String, String>>() {});
170+
}
171+
172+
private static String makeJsonObject(final String... fields) {
173+
final StringBuilder jsonObject = new StringBuilder("{");
174+
for (int i = 0; i < fields.length; i++) {
175+
if (i != 0) {
176+
jsonObject.append(",");
177+
}
178+
jsonObject.append(fields[i]);
179+
}
180+
jsonObject.append("}");
181+
return jsonObject.toString();
182+
}
183+
}

0 commit comments

Comments
 (0)