Skip to content

Commit 02229c7

Browse files
committed
Adding YAML examples and layout fixes for Kafka output plugin. Part of issue #1916.
Signed-off-by: Eric D. Schabell <[email protected]>
1 parent 5e3cd44 commit 02229c7

File tree

1 file changed

+160
-58
lines changed

1 file changed

+160
-58
lines changed

pipeline/outputs/kafka.md

Lines changed: 160 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -32,26 +32,45 @@ In order to insert records into Apache Kafka, you can run the plugin from the co
3232

3333
The **kafka** plugin can read parameters through the **-p** argument \(property\), e.g:
3434

35-
```text
35+
```shell
3636
fluent-bit -i cpu -o kafka -p brokers=192.168.1.3:9092 -p topics=test
3737
```
3838

3939
### Configuration File
4040

41-
In your main configuration file append the following _Input_ & _Output_ sections:
41+
In your main configuration file append the following:
42+
43+
{% tabs %}
44+
{% tab title="fluent-bit.yaml" %}
45+
46+
```yaml
47+
pipeline:
48+
inputs:
49+
- name: cpu
50+
51+
outputs:
52+
- name: kafka
53+
match: '*'
54+
host: 192.1681.3:9092
55+
topics: test
56+
```
57+
58+
{% endtab %}
59+
{% tab title="fluent-bit.conf" %}
4260
4361
```text
4462
[INPUT]
45-
Name cpu
63+
Name cpu
4664

4765
[OUTPUT]
48-
Name kafka
49-
Match *
50-
Brokers 192.168.1.3:9092
51-
Topics test
66+
Name kafka
67+
Match *
68+
Brokers 192.168.1.3:9092
69+
Topics test
5270
```
5371

54-
72+
{% endtab %}
73+
{% endtabs %}
5574

5675
### Avro Support
5776

@@ -66,7 +85,7 @@ example which activates:
6685
* debugging support
6786
* builds the test suites
6887

69-
```
88+
```shell
7089
cmake -DFLB_DEV=On -DFLB_OUT_KAFKA=On -DFLB_TLS=On -DFLB_TESTS_RUNTIME=On -DFLB_TESTS_INTERNAL=On -DCMAKE_BUILD_TYPE=Debug -DFLB_HTTP_SERVER=true -DFLB_AVRO_ENCODER=On ../
7190
```
7291

@@ -77,75 +96,151 @@ log lines with kubernetes metadata via the kubernetes filter, and then
7796
sends the fully decorated log lines to a kafka broker encoded with a
7897
specific avro schema.
7998

99+
{% tabs %}
100+
{% tab title="fluent-bit.yaml" %}
101+
102+
```yaml
103+
pipeline:
104+
inputs:
105+
- name: tail
106+
tag: kube.*
107+
alias: some-alias
108+
path: /logdir/*.log
109+
db: /dbdir/some.db
110+
skip_long_lines: on
111+
refresh_interval: 10
112+
parser: some-parser
113+
114+
filters:
115+
- name: kubernetes
116+
match: 'kube.*'
117+
kube_url: https://some_kube_api:443
118+
kube_ca_file: /certs/ca.crt
119+
kube_token_file: /tokens/token
120+
kube_tag_prefix: kube.var.log.containers.
121+
merge_log: on
122+
merge_log_key: log_processed
123+
124+
outputs:
125+
- name: kafka
126+
match: '*'
127+
brokers: 192.168.1.3:9092
128+
topics: test
129+
schema_str: '{"name":"avro_logging","type":"record","fields":[{"name":"timestamp","type":"string"},{"name":"stream","type":"string"},{"name":"log","type":"string"},{"name":"kubernetes","type":{"name":"krec","type":"record","fields":[{"name":"pod_name","type":"string"},{"name":"namespace_name","type":"string"},{"name":"pod_id","type":"string"},{"name":"labels","type":{"type":"map","values":"string"}},{"name":"annotations","type":{"type":"map","values":"string"}},{"name":"host","type":"string"},{"name":"container_name","type":"string"},{"name":"docker_id","type":"string"},{"name":"container_hash","type":"string"},{"name":"container_image","type":"string"}]}},{"name":"cluster_name","type":"string"},{"name":"fabric","type":"string"}]}'
130+
schema_id: some_schema_id
131+
rdkafka.client.id: some_client_id
132+
rdkafka.debug: all
133+
rdkafka.enable.ssl.certificate.verification: true
134+
rdkafka.ssl.certificate.location: /certs/some.cert
135+
rdkafka.ssl.key.location: /certs/some.key
136+
rdkafka.ssl.ca.location: /certs/some-bundle.crt
137+
rdkafka.security.protocol: ssl
138+
rdkafka.request.required.acks: 1
139+
rdkafka.log.connection.close: false
140+
format: avro
141+
rdkafka.log_level: 7
142+
rdkafka.metadata.broker.list: 192.168.1.3:9092
143+
```
144+
145+
{% endtab %}
146+
{% tab title="fluent-bit.conf" %}
147+
80148
```text
81149
[INPUT]
82-
Name tail
83-
Tag kube.*
84-
Alias some-alias
85-
Path /logdir/*.log
86-
DB /dbdir/some.db
87-
Skip_Long_Lines On
88-
Refresh_Interval 10
89-
Parser some-parser
150+
Name tail
151+
Tag kube.*
152+
Alias some-alias
153+
Path /logdir/*.log
154+
DB /dbdir/some.db
155+
Skip_Long_Lines On
156+
Refresh_Interval 10
157+
Parser some-parser
90158

91159
[FILTER]
92-
Name kubernetes
93-
Match kube.*
94-
Kube_URL https://some_kube_api:443
95-
Kube_CA_File /certs/ca.crt
96-
Kube_Token_File /tokens/token
97-
Kube_Tag_Prefix kube.var.log.containers.
98-
Merge_Log On
99-
Merge_Log_Key log_processed
160+
Name kubernetes
161+
Match kube.*
162+
Kube_URL https://some_kube_api:443
163+
Kube_CA_File /certs/ca.crt
164+
Kube_Token_File /tokens/token
165+
Kube_Tag_Prefix kube.var.log.containers.
166+
Merge_Log On
167+
Merge_Log_Key log_processed
100168

101169
[OUTPUT]
102-
Name kafka
103-
Match *
104-
Brokers 192.168.1.3:9092
105-
Topics test
106-
Schema_str {"name":"avro_logging","type":"record","fields":[{"name":"timestamp","type":"string"},{"name":"stream","type":"string"},{"name":"log","type":"string"},{"name":"kubernetes","type":{"name":"krec","type":"record","fields":[{"name":"pod_name","type":"string"},{"name":"namespace_name","type":"string"},{"name":"pod_id","type":"string"},{"name":"labels","type":{"type":"map","values":"string"}},{"name":"annotations","type":{"type":"map","values":"string"}},{"name":"host","type":"string"},{"name":"container_name","type":"string"},{"name":"docker_id","type":"string"},{"name":"container_hash","type":"string"},{"name":"container_image","type":"string"}]}},{"name":"cluster_name","type":"string"},{"name":"fabric","type":"string"}]}
107-
Schema_id some_schema_id
108-
rdkafka.client.id some_client_id
109-
rdkafka.debug All
110-
rdkafka.enable.ssl.certificate.verification true
111-
112-
rdkafka.ssl.certificate.location /certs/some.cert
113-
rdkafka.ssl.key.location /certs/some.key
114-
rdkafka.ssl.ca.location /certs/some-bundle.crt
115-
rdkafka.security.protocol ssl
116-
rdkafka.request.required.acks 1
117-
rdkafka.log.connection.close false
118-
119-
Format avro
120-
rdkafka.log_level 7
121-
rdkafka.metadata.broker.list 192.168.1.3:9092
170+
Name kafka
171+
Match *
172+
Brokers 192.168.1.3:9092
173+
Topics test
174+
Schema_str {"name":"avro_logging","type":"record","fields":[{"name":"timestamp","type":"string"},{"name":"stream","type":"string"},{"name":"log","type":"string"},{"name":"kubernetes","type":{"name":"krec","type":"record","fields":[{"name":"pod_name","type":"string"},{"name":"namespace_name","type":"string"},{"name":"pod_id","type":"string"},{"name":"labels","type":{"type":"map","values":"string"}},{"name":"annotations","type":{"type":"map","values":"string"}},{"name":"host","type":"string"},{"name":"container_name","type":"string"},{"name":"docker_id","type":"string"},{"name":"container_hash","type":"string"},{"name":"container_image","type":"string"}]}},{"name":"cluster_name","type":"string"},{"name":"fabric","type":"string"}]}
175+
Schema_id some_schema_id
176+
rdkafka.client.id some_client_id
177+
rdkafka.debug All
178+
rdkafka.enable.ssl.certificate.verification true
179+
180+
rdkafka.ssl.certificate.location /certs/some.cert
181+
rdkafka.ssl.key.location /certs/some.key
182+
rdkafka.ssl.ca.location /certs/some-bundle.crt
183+
rdkafka.security.protocol ssl
184+
rdkafka.request.required.acks 1
185+
rdkafka.log.connection.close false
186+
187+
Format avro
188+
rdkafka.log_level 7
189+
rdkafka.metadata.broker.list 192.168.1.3:9092
122190
```
123191

192+
{% endtab %}
193+
{% endtabs %}
194+
124195
#### Kafka Configuration File with Raw format
125196

126197
This example Fluent Bit configuration file creates example records with the
127198
_payloadkey_ and _msgkey_ keys. The _msgkey_ value is used as the Kafka message
128199
key, and the _payloadkey_ value as the payload.
129200

201+
% tabs %}
202+
{% tab title="fluent-bit.yaml" %}
203+
204+
```yaml
205+
pipeline:
206+
inputs:
207+
- name: dummy
208+
tag: example.data
209+
dummy: '{"payloadkey":"Data to send to kafka", "msgkey": "Key to use in the message"}'
210+
211+
outputs:
212+
- name: kafka
213+
match: '*'
214+
host: 192.1681.3:9092
215+
topics: test
216+
format: raw
217+
raw_log_key: payloadkey
218+
message_key_field: msgkey
219+
```
220+
221+
{% endtab %}
222+
{% tab title="fluent-bit.conf" %}
130223
131224
```text
132225
[INPUT]
133-
Name example
134-
Tag example.data
135-
Dummy {"payloadkey":"Data to send to kafka", "msgkey": "Key to use in the message"}
226+
Name dummy
227+
Tag example.data
228+
Dummy {"payloadkey":"Data to send to kafka", "msgkey": "Key to use in the message"}
136229

137230

138231
[OUTPUT]
139-
Name kafka
140-
Match *
141-
Brokers 192.168.1.3:9092
142-
Topics test
143-
Format raw
144-
145-
Raw_Log_Key payloadkey
146-
Message_Key_Field msgkey
232+
Name kafka
233+
Match *
234+
Brokers 192.168.1.3:9092
235+
Topics test
236+
Format raw
237+
Raw_Log_Key payloadkey
238+
Message_Key_Field msgkey
147239
```
148240

241+
{% endtab %}
242+
{% endtabs %}
243+
149244
## AWS MSK IAM Authentication
150245

151246
*Available since Fluent Bit v4.0.4*
@@ -180,20 +275,27 @@ If you are compiling Fluent Bit from source, ensure the following requirements a
180275

181276
### Configuration Example
182277

278+
279+
{% tabs %}
280+
{% tab title="fluent-bit.yaml" %}
281+
183282
```yaml
184283
pipeline:
185284
inputs:
186285
- name: random
187286

188287
outputs:
189288
- name: kafka
190-
match: "*"
289+
match: '*'
191290
brokers: my-cluster.abcdef.c1.kafka.us-east-1.amazonaws.com:9098
192291
topics: my-topic
193292
aws_msk_iam: true
194293
aws_msk_iam_cluster_arn: arn:aws:kafka:us-east-1:123456789012:cluster/my-cluster/abcdef-1234-5678-9012-abcdefghijkl-s3
195294
```
196295
296+
{% endtab %}
297+
{% endtabs %}
298+
197299
### Example AWS IAM Policy
198300
199301
> **Note:** IAM policies and permissions can be complex and may vary depending on your organization's security requirements. If you are unsure about the correct permissions or best practices, please consult with your AWS administrator or an AWS expert who is familiar with MSK and IAM security.
@@ -218,4 +320,4 @@ The AWS credentials used by Fluent Bit must have permission to connect to your M
218320
}
219321
]
220322
}
221-
```
323+
```

0 commit comments

Comments
 (0)