Skip to content

Commit fcb58a0

Browse files
joeyjacksonvjkoskela
authored andcommitted
Kafka source (#154)
* kafka source init * kafka consumer wrapper and listener * unit test and checkstyle * exception handling * unit tests for kafka source * interfaces and wrappers for kafka source * removed kafka dep * integration tests for kafka source * config deserialization * Fixed deserializer * Fixed deserializer * styling * kafka docker * pipeline config example * style * error checking * error checking * integration test kafka source from config * style * added parser to kafka source * example pipeline * Fail integration test on send fail to kafka server * requested changes * requested changes * configurable backoff time for kafka source * fixed conf deserializer
1 parent 0bc9de1 commit fcb58a0

File tree

20 files changed

+1309
-122
lines changed

20 files changed

+1309
-122
lines changed

config/pipelines/pipeline.conf

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,29 @@ sources=[
123123
"source": {
124124
type="com.arpnetworking.metrics.common.sources.CollectdHttpSourceV1"
125125
name="collectd_http_source"
126+
},
127+
{
128+
type="com.arpnetworking.metrics.common.sources.KafkaSource"
129+
name="kafka_source"
130+
consumer={
131+
type="org.apache.kafka.clients.consumer.Consumer"
132+
topics=["topic"]
133+
configs={
134+
# Set any properties defined at: https://kafka.apache.org/documentation/#consumerconfigs
135+
bootstrap.servers="localhost:9092"
136+
group.id="group0"
137+
client.id="consumer0"
138+
key.deserializer="org.apache.kafka.common.serialization.ByteArrayDeserializer"
139+
value.deserializer="org.apache.kafka.common.serialization.ByteArrayDeserializer"
140+
auto.offset.reset="earliest"
141+
}
142+
}
143+
parser={
144+
type="com.arpnetworking.metrics.mad.parsers.ProtobufV2bytesToRecordParser"
145+
}
146+
pollTime="PT1S"
147+
shutdownAwaitTime="PT10S"
148+
backoffTime="PT1S"
126149
}
127150
}
128151
]

pom.xml

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -360,11 +360,34 @@
360360
<goals>
361361
<goal>push</goal>
362362
</goals>
363+
<configuration>
364+
<filter>arpnetworking/mad:${project.version}</filter>
365+
</configuration>
363366
</execution>
364367
</executions>
365368
<configuration>
366369
<showLogs>true</showLogs>
367370
<images>
371+
<image>
372+
<name>arpnetworking/mad/kafka:${project.version}</name>
373+
<build>
374+
<dockerFile>${project.basedir}/src/main/docker/kafka/Dockerfile</dockerFile>
375+
<tags>
376+
<tag>${buildNumber}</tag>
377+
</tags>
378+
</build>
379+
<run>
380+
<env>
381+
<AUTO_CREATE_TOPICS>true</AUTO_CREATE_TOPICS>
382+
<ADVERTISED_HOST>localhost</ADVERTISED_HOST>
383+
<ADVERTISED_PORT>9092</ADVERTISED_PORT>
384+
</env>
385+
<ports>
386+
<port>2181:2181</port>
387+
<port>9092:9092</port>
388+
</ports>
389+
</run>
390+
</image>
368391
<image>
369392
<name>arpnetworking/mad:${project.version}</name>
370393
<build>

src/main/assembly/docker.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
</formats>
2525
<fileSets>
2626
<fileSet>
27-
<directory>${project.basedir}/src/main/docker</directory>
27+
<directory>${project.basedir}/src/main/docker/mad</directory>
2828
<outputDirectory/>
2929
<filtered>false</filtered>
3030
<fileMode>0644</fileMode>

src/main/docker/kafka/Dockerfile

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
# Copyright 2016 Spotify
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
# Original: https://github.com/spotify/docker-kafka
16+
17+
# Kafka and Zookeeper
18+
FROM java:openjdk-8-jre
19+
20+
ENV DEBIAN_FRONTEND noninteractive
21+
ENV SCALA_VERSION 2.11
22+
ENV KAFKA_VERSION 2.2.0
23+
ENV KAFKA_HOME /opt/kafka_"$SCALA_VERSION"-"$KAFKA_VERSION"
24+
25+
RUN echo "deb [check-valid-until=no] http://archive.debian.org/debian jessie-backports main" > /etc/apt/sources.list.d/jessie-backports.list
26+
RUN sed -i '/deb http:\/\/deb.debian.org\/debian jessie-updates main/d' /etc/apt/sources.list
27+
28+
# Install Kafka, Zookeeper and other needed things
29+
RUN apt-get -o Acquire::Check-Valid-Until=false update && \
30+
apt-get -o Acquire::Check-Valid-Until=false install -y zookeeper wget supervisor dnsutils && \
31+
rm -rf /var/lib/apt/lists/* && \
32+
apt-get clean && \
33+
wget -q http://apache.mirrors.spacedump.net/kafka/"$KAFKA_VERSION"/kafka_"$SCALA_VERSION"-"$KAFKA_VERSION".tgz -O /tmp/kafka_"$SCALA_VERSION"-"$KAFKA_VERSION".tgz && \
34+
tar xfz /tmp/kafka_2.11-"$KAFKA_VERSION".tgz -C /opt && \
35+
rm /tmp/kafka_2.11-"$KAFKA_VERSION".tgz
36+
37+
ADD scripts/start-kafka.sh /usr/bin/start-kafka.sh
38+
39+
# Supervisor config
40+
ADD supervisor/kafka.conf supervisor/zookeeper.conf /etc/supervisor/conf.d/
41+
42+
# 2181 is zookeeper, 9092 is kafka
43+
# EXPOSE 2181 9092
44+
45+
CMD ["supervisord", "-n"]
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
#!/bin/sh
2+
3+
# Copyright 2016 Spotify
4+
#
5+
# Licensed under the Apache License, Version 2.0 (the "License");
6+
# you may not use this file except in compliance with the License.
7+
# You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
17+
# Original: https://github.com/spotify/docker-kafka
18+
19+
20+
# Optional ENV variables:
21+
# * ADVERTISED_HOST: the external ip for the container, e.g. `docker-machine ip \`docker-machine active\``
22+
# * ADVERTISED_PORT: the external port for Kafka, e.g. 9092
23+
# * ZK_CHROOT: the zookeeper chroot that's used by Kafka (without / prefix), e.g. "kafka"
24+
# * LOG_RETENTION_HOURS: the minimum age of a log file in hours to be eligible for deletion (default is 168, for 1 week)
25+
# * LOG_RETENTION_BYTES: configure the size at which segments are pruned from the log, (default is 1073741824, for 1GB)
26+
# * NUM_PARTITIONS: configure the default number of log partitions per topic
27+
# * AUTO_CREATE_TOPICS: whether to autocreate topics
28+
29+
# Configure advertised host/port if we run in helios
30+
if [ ! -z "$HELIOS_PORT_kafka" ]; then
31+
ADVERTISED_HOST=`echo $HELIOS_PORT_kafka | cut -d':' -f 1 | xargs -n 1 dig +short | tail -n 1`
32+
ADVERTISED_PORT=`echo $HELIOS_PORT_kafka | cut -d':' -f 2`
33+
fi
34+
35+
# Set the external host and port
36+
if [ ! -z "$ADVERTISED_HOST" ]; then
37+
echo "advertised host: $ADVERTISED_HOST"
38+
if grep -q "^advertised.host.name" $KAFKA_HOME/config/server.properties; then
39+
sed -r -i "s/#(advertised.host.name)=(.*)/\1=$ADVERTISED_HOST/g" $KAFKA_HOME/config/server.properties
40+
else
41+
echo "\nadvertised.host.name=$ADVERTISED_HOST" >> $KAFKA_HOME/config/server.properties
42+
fi
43+
fi
44+
if [ ! -z "$ADVERTISED_PORT" ]; then
45+
echo "advertised port: $ADVERTISED_PORT"
46+
if grep -q "^advertised.port" $KAFKA_HOME/config/server.properties; then
47+
sed -r -i "s/#(advertised.port)=(.*)/\1=$ADVERTISED_PORT/g" $KAFKA_HOME/config/server.properties
48+
else
49+
echo "\nadvertised.port=$ADVERTISED_PORT" >> $KAFKA_HOME/config/server.properties
50+
fi
51+
fi
52+
53+
# Set the zookeeper chroot
54+
if [ ! -z "$ZK_CHROOT" ]; then
55+
# wait for zookeeper to start up
56+
until /usr/share/zookeeper/bin/zkServer.sh status; do
57+
sleep 0.1
58+
done
59+
60+
# create the chroot node
61+
echo "create /$ZK_CHROOT \"\"" | /usr/share/zookeeper/bin/zkCli.sh || {
62+
echo "can't create chroot in zookeeper, exit"
63+
exit 1
64+
}
65+
66+
# configure kafka
67+
sed -r -i "s/(zookeeper.connect)=(.*)/\1=localhost:2181\/$ZK_CHROOT/g" $KAFKA_HOME/config/server.properties
68+
fi
69+
70+
# Allow specification of log retention policies
71+
if [ ! -z "$LOG_RETENTION_HOURS" ]; then
72+
echo "log retention hours: $LOG_RETENTION_HOURS"
73+
sed -r -i "s/(log.retention.hours)=(.*)/\1=$LOG_RETENTION_HOURS/g" $KAFKA_HOME/config/server.properties
74+
fi
75+
if [ ! -z "$LOG_RETENTION_BYTES" ]; then
76+
echo "log retention bytes: $LOG_RETENTION_BYTES"
77+
sed -r -i "s/#(log.retention.bytes)=(.*)/\1=$LOG_RETENTION_BYTES/g" $KAFKA_HOME/config/server.properties
78+
fi
79+
80+
# Configure the default number of log partitions per topic
81+
if [ ! -z "$NUM_PARTITIONS" ]; then
82+
echo "default number of partition: $NUM_PARTITIONS"
83+
sed -r -i "s/(num.partitions)=(.*)/\1=$NUM_PARTITIONS/g" $KAFKA_HOME/config/server.properties
84+
fi
85+
86+
# Enable/disable auto creation of topics
87+
if [ ! -z "$AUTO_CREATE_TOPICS" ]; then
88+
echo "auto.create.topics.enable: $AUTO_CREATE_TOPICS"
89+
echo "/nauto.create.topics.enable=$AUTO_CREATE_TOPICS" >> $KAFKA_HOME/config/server.properties
90+
fi
91+
92+
# Run Kafka
93+
$KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
# Copyright 2016 Spotify
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
# Original: https://github.com/spotify/docker-kafka
16+
17+
[program:kafka]
18+
command=/usr/bin/start-kafka.sh
19+
autostart=true
20+
autorestart=true
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
# Copyright 2016 Spotify
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
# Original: https://github.com/spotify/docker-kafka
16+
17+
[program:zookeeper]
18+
command=/usr/share/zookeeper/bin/zkServer.sh start-foreground
19+
autostart=true
20+
autorestart=true
File renamed without changes.

src/main/java/com/arpnetworking/metrics/common/kafka/ConsumerDeserializer.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,10 +37,10 @@
3737
import java.util.Map;
3838

3939
/**
40-
* Jackson <code>JsonDeserializer</code> implementation for <code>Consumer</code>.
40+
* Jackson {@code JsonDeserializer} implementation for {@code Consumer}.
4141
*
42-
* @param <K> the type of key field in <code>Consumer</code>
43-
* @param <V> the type of value field in <code>Consumer</code>
42+
* @param <K> the type of key field in {@code Consumer}
43+
* @param <V> the type of value field in {@code Consumer}
4444
*
4545
* @author Joey Jackson (jjackson at dropbox dot com)
4646
*/

src/main/java/com/arpnetworking/metrics/common/kafka/ConsumerListener.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919

2020
/**
2121
* Interface for classes that handle the records polled by a kafka
22-
* <code>Consumer</code>.
22+
* {@code Consumer}.
2323
*
2424
* @param <T> the type of the value in the consumer records being handled
2525
*

0 commit comments

Comments
 (0)