Skip to content

Commit 637c57c

Browse files
tchronoedsiper
authored andcommitted
doc: add in/out kafka example with lua filter
Signed-off-by: Thiago Padilha <[email protected]>
1 parent c748103 commit 637c57c

File tree

9 files changed

+123
-0
lines changed

9 files changed

+123
-0
lines changed

examples/kafka_filter/.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
build/
2+
kafka/

examples/kafka_filter/Makefile

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
CPU_COUNT := $(shell grep -c ^processor /proc/cpuinfo)
2+
3+
KAFKA_TGT = kafka/done.stamp
4+
KAFKA_DIR = $(shell dirname $(KAFKA_TGT))
5+
CMAKE_TGT = build/done.stamp
6+
FLB_TGT = build/bin/fluent-bit
7+
8+
run: $(KAFKA_TGT) $(FLB_TGT)
9+
./kafka-start.sh $(KAFKA_DIR) && tmux a
10+
11+
$(FLB_TGT): $(CMAKE_TGT)
12+
make -C build -j$(CPU_COUNT)
13+
14+
$(CMAKE_TGT):
15+
rm -rf build
16+
mkdir -pv build
17+
cd build && cmake -DFLB_DEV=On -DFLB_IN_KAFKA=On -DFLB_OUT_KAFKA=On ../../..
18+
touch $@
19+
20+
$(KAFKA_TGT):
21+
rm -rf $(KAFKA_DIR)
22+
./kafka-setup.sh $(KAFKA_DIR)
23+
touch $@
24+
25+
.PHONY: run

examples/kafka_filter/README.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
## Instructions
2+
3+
$ cd examples/kafka_filter
4+
$ make

examples/kafka_filter/common.sh

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
set -euo pipefail
2+
3+
die() {
4+
echo $* >&2
5+
exit 1
6+
}
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
{ "name": "kafka", "type": "message bus" }
2+
{ "name": "fluent-bit", "type": "log processor" }
3+
{ "name": "kubernetes", "type": "container orchestration system" }
4+
{ "name": "docker", "type": "container platform" }
5+
{ "name": "ansible", "type": "software provisioning, configuration management, and application-deployment tool" }
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
#!/bin/bash
2+
3+
. common.sh
4+
5+
url="https://dlcdn.apache.org/kafka/3.0.0/kafka_2.13-3.0.0.tgz"
6+
sha256="a82728166bbccf406009747a25e1fe52dbcb4d575e4a7a8616429b5818cd02d1"
7+
8+
outdir="$1"
9+
10+
[ -z $outdir ] && die "usage: $0 OUTDIR"
11+
[ -d $outdir ] && die "$outdir already exists"
12+
13+
mkdir -p $outdir
14+
cd $outdir
15+
outdir=$(pwd)
16+
cd -
17+
rmdir $outdir
18+
tmpdir=$(mktemp -d)
19+
cd $tmpdir
20+
curl -L $url | tee kafka.tgz | sha256sum -c <(echo "$sha256 -")
21+
tar xf kafka.tgz
22+
mv kafka_* $outdir
23+
rm -rf $tmpdir
24+
find $outdir/config -type f -name '*.properties' | xargs sed -e "s@=/tmp@=$outdir/tmp@" -i
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
#!/bin/bash
2+
3+
. common.sh
4+
5+
java --version &> /dev/null || die "please install java 8+"
6+
7+
if tmux has-session -t kafka; then
8+
exec tmux attach -t kafka
9+
fi
10+
11+
kafka_dir="$1"
12+
[ -z $kafka_dir ] && die "usage: $0 OUTDIR"
13+
[ ! -d $kafka_dir ] && die "$kafka_dir does not exist"
14+
[ ! -e $kafka_dir/bin/zookeeper-server-start.sh ] && die "$kafka_dir doesn't contain kafka"
15+
16+
cd $kafka_dir
17+
tmux new-session -s kafka -n "cli" -d
18+
tmux new-window -n "services" -t kafka
19+
tmux split-window -v -t kafka:0
20+
tmux split-window -v -t kafka:0
21+
tmux split-window -v -t kafka:1
22+
tmux send-keys -t kafka:1.0 './bin/zookeeper-server-start.sh config/zookeeper.properties' C-m
23+
tmux send-keys -t kafka:1.1 'sleep 1 && ./bin/kafka-server-start.sh config/server.properties' C-m
24+
tmux send-keys -t kafka:0.0 'sleep 3 && ./bin/kafka-topics.sh --create --partitions 1 --replication-factor 1 --topic fb-source --bootstrap-server localhost:9092' C-m
25+
tmux send-keys -t kafka:0.1 'sleep 3 && ./bin/kafka-topics.sh --create --partitions 1 --replication-factor 1 --topic fb-sink --bootstrap-server localhost:9092' C-m
26+
tmux send-keys -t kafka:0.0 './bin/kafka-console-consumer.sh --topic fb-sink --from-beginning --bootstrap-server localhost:9092' C-m
27+
tmux send-keys -t kafka:0.1 'sleep 5' C-m
28+
tmux send-keys -t kafka:0.1 'cat ../example-data.json | ./bin/kafka-console-producer.sh --topic fb-source --bootstrap-server localhost:9092' C-m
29+
tmux send-keys -t kafka:0.2 'sleep 4 && ../build/bin/fluent-bit -c ../kafka.conf' C-m
30+
tmux select-window -t kafka:0.1

examples/kafka_filter/kafka.conf

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
[SERVICE]
2+
Flush 0.1
3+
Grace 2
4+
Log_Level info
5+
6+
[INPUT]
7+
Name kafka
8+
brokers localhost:9092
9+
topics fb-source
10+
11+
[FILTER]
12+
Name lua
13+
Match *
14+
script kafka.lua
15+
call modify_kafka_message
16+
17+
[OUTPUT]
18+
Name kafka
19+
brokers localhost:9092
20+
topics fb-sink

examples/kafka_filter/kafka.lua

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
local count = 0
2+
function modify_kafka_message(tag, timestamp, record)
3+
count = count + 1
4+
local payload = record.payload
5+
payload.status = 'processed '..payload.name..', total records: '..tostring(count)
6+
return 1, timestamp, payload
7+
end

0 commit comments

Comments
 (0)