Skip to content

Commit 73d6622

Browse files
committed
Pipeline: output: kafka producer: style
Signed-off-by: Lynette Miles <[email protected]>
1 parent 6eee081 commit 73d6622

File tree

4 files changed

+79
-71
lines changed

4 files changed

+79
-71
lines changed

pipeline/outputs/kafka.md

Lines changed: 69 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -1,42 +1,44 @@
11
# Kafka Producer
22

3-
Kafka output plugin, producer, allows to ingest your records into an [Apache Kafka](https://kafka.apache.org/) service. This plugin use the official [librdkafka C library](https://github.com/edenhill/librdkafka) \(built-in dependency\)
3+
The _Kafka Producer_ output plugin lets you ingest your records into an [Apache Kafka](https://kafka.apache.org/) service. This plugin uses the official [librdkafka C library](https://github.com/edenhill/librdkafka).
44

5-
Starting with version 4.0.4, the Kafka input plugin supports authentication with AWS MSK IAM, enabling integration with Amazon MSK (Managed Streaming for Apache Kafka) clusters that require IAM-based access.
5+
In Fluent Bit 4.0.4 and later, the Kafka input plugin supports authentication with AWS MSK IAM, enabling integration with Amazon MSK (Managed Streaming for Apache Kafka) clusters that require IAM-based access.
66

7-
## Configuration Parameters
7+
## Configuration parameters
88

9-
| Key | Description | default |
9+
This plugin supports the following parameters:
10+
11+
| Key | Description | Default |
1012
| :--- | :--- | :--- |
11-
| 'format' | Specify data format, options available: json, msgpack, raw. | json |
12-
| 'message_key' | Optional key to store the message | |
13-
| 'message_key_field' | If set, the value of Message_Key_Field in the record will indicate the message key. If not set nor found in the record, Message_Key will be used (if set). | |
14-
| 'timestamp_key' | Set the key to store the record timestamp | @timestamp |
15-
| 'timestamp_format' | Specify timestamp format, should be 'double', '[iso8601](https://en.wikipedia.org/wiki/ISO_8601)' (seconds precision) or 'iso8601_ns' (fractional seconds precision) | double |
16-
| 'brokers' | Single or multiple list of Kafka Brokers, e.g: 192.168.1.3:9092, 192.168.1.4:9092. | |
17-
| 'topics' | Single entry or list of topics separated by comma (,) that Fluent Bit will use to send messages to Kafka. If only one topic is set, that one will be used for all records. Instead if multiple topics exists, the one set in the record by Topic_Key will be used. | fluent-bit |
18-
| 'topic_key' | If multiple Topics exists, the value of Topic_Key in the record will indicate the topic to use. E.g: if Topic_Key is _router_ and the record is {"key1": 123, "router": "route_2"}, Fluent Bit will use topic _route_2_. Note that if the value of Topic_Key is not present in Topics, then by default the first topic in the Topics list will indicate the topic to be used. | |
19-
| 'dynamic_topic' | adds unknown topics (found in Topic_Key) to Topics. So in Topics only a default topic needs to be configured | Off |
20-
| 'queue_full_retries' | Fluent Bit queues data into rdkafka library, if for some reason the underlying library cannot flush the records the queue might fills up blocking new addition of records. The `queue_full_retries` option set the number of local retries to enqueue the data. The default value is 10 times, the interval between each retry is 1 second. Setting the `queue_full_retries` value to `0` set's an unlimited number of retries. | 10 |
21-
| 'rdkafka.{property}' | '{property}' can be any [librdkafka properties](https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md) | |
22-
| 'raw_log_key' | When using the raw format and set, the value of raw_log_key in the record will be send to kafka as the payload. | |
23-
| 'workers' | The number of [workers](../../administration/multithreading.md#outputs) to perform flush operations for this output. | `0` |
13+
| `format` | Specify data format. Available formats: `json`, `msgpack`, `raw`. | `json` |
14+
| `message_key` | Optional key to store the message | _none_ |
15+
| `message_key_field` | If set, the value of `message_key_field` in the record will indicate the message key. If not set nor found in the record, `message_key` will be used if set. | _none_ |
16+
| `timestamp_key` | Set the key to store the record timestamp | `@timestamp` |
17+
| `timestamp_format` | Specify timestamp format. Allowed values:`double`, `[iso8601](https://en.wikipedia.org/wiki/ISO_8601)` (seconds precision) or `iso8601_ns` (fractional seconds precision). | `double` |
18+
| `brokers` | Single or multiple list of Kafka Brokers. For example, `192.168.1.3:9092`, `192.168.1.4:9092`. | _none_ |
19+
| `topics` | Single entry or list of topics separated by comma (,) that Fluent Bit will use to send messages to Kafka. If only one topic is set, that one will be used for all records. Instead if multiple topics exists, the one set in the record by `Topic_Key` will be used. | fluent-bit |
20+
| `topic_key` | If multiple `topics` exist, the value of `Topic_Key` in the record will indicate the topic to use. For example, if `Topic_Key` is `router` and the record is `{"key1": 123, "router": "route_2"}`, Fluent Bit will use `topic _route_2_`. If the value of `Topic_Key` isn't present in `topics`, then the first topic in the `topics` list will indicate the topic to be used. | _none_ |
21+
| `dynamic_topic` | Adds unknown topics (found in `Topic_Key`) to `topics`. In `topics`, only a default topic needs to be configured. | `Off` |
22+
| `queue_full_retries` | Fluent Bit queues data into `rdkafka` library. If the underlying library can't flush the records the queue might fill up, blocking new addition of records. `queue_full_retries` sets the number of local retries to enqueue the data. The interval between retries is 1 second. Setting the `queue_full_retries` value to `0` sets an unlimited number of retries. | `10` |
23+
| `rdkafka.{property}` | `{property}` can be any [librdkafka properties](https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md) | |
24+
| `raw_log_key` | When using the raw format and set, the value of `raw_log_key` in the record will be send to Kafka as the payload. | _none_ |
25+
| `workers` | The number of [workers](../../administration/multithreading.md#outputs) to perform flush operations for this output. | `0` |
2426

25-
> Setting `rdkafka.log.connection.close` to `false` and `rdkafka.request.required.acks` to 1 are examples of recommended settings of librdfkafka properties.
27+
Setting `rdkafka.log.connection.close` to `false` and `rdkafka.request.required.acks` to `1` are examples of recommended settings of `librdfkafka` properties.
2628

27-
## Getting Started
29+
## Get started
2830

29-
In order to insert records into Apache Kafka, you can run the plugin from the command line or through the configuration file:
31+
To insert records into Apache Kafka, you can run the plugin from the command line or through the configuration file.
3032

31-
### Command Line
33+
### Command line
3234

33-
The **kafka** plugin can read parameters through the **-p** argument \(property\), e.g:
35+
The Kafka plugin can read parameters through the `-p` argument (property):
3436

3537
```shell
3638
fluent-bit -i cpu -o kafka -p brokers=192.168.1.3:9092 -p topics=test
3739
```
3840

39-
### Configuration File
41+
### Configuration file
4042

4143
In your main configuration file append the following:
4244

@@ -47,7 +49,7 @@ In your main configuration file append the following:
4749
pipeline:
4850
inputs:
4951
- name: cpu
50-
52+
5153
outputs:
5254
- name: kafka
5355
match: '*'
@@ -72,29 +74,26 @@ pipeline:
7274
{% endtab %}
7375
{% endtabs %}
7476

75-
### Avro Support
77+
### Avro support
7678

77-
Fluent-bit comes with support for avro encoding for the out_kafka plugin.
78-
Avro support is optional and must be activated at build-time by using a
79-
build def with cmake: `-DFLB_AVRO_ENCODER=On` such as in the following
79+
Fluent Bit comes with support for Avro encoding for the `out_kafka` plugin.
80+
Avro support is optional and must be activated at build time by using a
81+
build def with `cmake`: `-DFLB_AVRO_ENCODER=On` such as in the following
8082
example which activates:
8183

82-
* out_kafka with avro encoding
83-
* fluent-bit's prometheus
84-
* metrics via an embedded http endpoint
85-
* debugging support
86-
* builds the test suites
84+
- `out_kafka` with Avro encoding
85+
- Fluent Bit Prometheus
86+
- Metrics using an embedded HTTP endpoint
87+
- Debugging support
88+
- Builds the test suites
8789

8890
```shell
8991
cmake -DFLB_DEV=On -DFLB_OUT_KAFKA=On -DFLB_TLS=On -DFLB_TESTS_RUNTIME=On -DFLB_TESTS_INTERNAL=On -DCMAKE_BUILD_TYPE=Debug -DFLB_HTTP_SERVER=true -DFLB_AVRO_ENCODER=On ../
9092
```
9193

92-
#### Kafka Configuration File with Avro Encoding
94+
#### Kafka configuration file with Avro encoding
9395

94-
This is example fluent-bit config tails kubernetes logs, decorates the
95-
log lines with kubernetes metadata via the kubernetes filter, and then
96-
sends the fully decorated log lines to a kafka broker encoded with a
97-
specific avro schema.
96+
In this example, the Fluent Bit configuration tails Kubernetes logs, updates the log lines with Kubernetes metadata using the Kubernetes filter. It then sends the updated log lines to a Kafka broker encoded with a specific Avro schema.
9897

9998
{% tabs %}
10099
{% tab title="fluent-bit.yaml" %}
@@ -110,7 +109,7 @@ pipeline:
110109
skip_long_lines: on
111110
refresh_interval: 10
112111
parser: some-parser
113-
112+
114113
filters:
115114
- name: kubernetes
116115
match: 'kube.*'
@@ -120,7 +119,7 @@ pipeline:
120119
kube_tag_prefix: kube.var.log.containers.
121120
merge_log: on
122121
merge_log_key: log_processed
123-
122+
124123
outputs:
125124
- name: kafka
126125
match: '*'
@@ -130,7 +129,7 @@ pipeline:
130129
schema_id: some_schema_id
131130
rdkafka.client.id: some_client_id
132131
rdkafka.debug: all
133-
rdkafka.enable.ssl.certificate.verification: true
132+
rdkafka.enable.ssl.certificate.verification: true
134133
rdkafka.ssl.certificate.location: /certs/some.cert
135134
rdkafka.ssl.key.location: /certs/some.key
136135
rdkafka.ssl.ca.location: /certs/some-bundle.crt
@@ -192,11 +191,9 @@ pipeline:
192191
{% endtab %}
193192
{% endtabs %}
194193

195-
#### Kafka Configuration File with Raw format
194+
#### Kafka configuration file with `raw`format
196195

197-
This example Fluent Bit configuration file creates example records with the
198-
_payloadkey_ and _msgkey_ keys. The _msgkey_ value is used as the Kafka message
199-
key, and the _payloadkey_ value as the payload.
196+
This example Fluent Bit configuration file creates example records with the `payloadkey` and `msgkey` keys. The `msgkey` value is used as the Kafka message key, and the `payloadkey` value as the payload.
200197

201198
{% tabs %}
202199
{% tab title="fluent-bit.yaml" %}
@@ -207,7 +204,7 @@ pipeline:
207204
- name: dummy
208205
tag: example.data
209206
dummy: '{"payloadkey":"Data to send to kafka", "msgkey": "Key to use in the message"}'
210-
207+
211208
outputs:
212209
- name: kafka
213210
match: '*'
@@ -241,39 +238,41 @@ pipeline:
241238
{% endtab %}
242239
{% endtabs %}
243240

244-
## AWS MSK IAM Authentication
241+
## AWS MSK IAM authentication
245242

246-
*Available since Fluent Bit v4.0.4*
247-
248-
Fluent Bit supports authentication to Amazon MSK (Managed Streaming for Apache Kafka) clusters using AWS IAM for the Kafka output plugin. This allows you to securely send data to MSK brokers with AWS credentials, leveraging IAM roles and policies for access control.
243+
Fluent Bit 4.0.4 and later supports authentication to Amazon MSK (Managed Streaming for Apache Kafka) clusters using AWS IAM for the Kafka output plugin. This lets you securely send data to MSK brokers with AWS credentials, leveraging IAM roles and policies for access control.
249244

250245
### Prerequisites
251246

252-
**Build Requirements**
253-
If you are compiling Fluent Bit from source, ensure the following requirements are met to enable AWS MSK IAM support:
247+
If you are compiling Fluent Bit from source, ensure the following requirements are met to enable AWS MSK IAM support:
248+
249+
- Build Requirements
250+
251+
The packages `libsasl2` and `libsasl2-dev` must be installed on your build environment.
254252

255-
- The packages `libsasl2` and `libsasl2-dev` must be installed on your build environment.
253+
- Runtime Requirements:
256254

257-
**Runtime Requirements**
258-
- **Network Access:** Fluent Bit must be able to reach your MSK broker endpoints (AWS VPC setup).
259-
- **AWS Credentials:** Provide credentials using any supported AWS method:
260-
- IAM roles (recommended for EC2, ECS, or EKS)
261-
- Environment variables (`AWS_ACCESS_KEY_ID`, `AWS_SECRET_ACCESS_KEY`)
262-
- AWS credentials file (`~/.aws/credentials`)
263-
- Instance metadata service (IMDS)
255+
- Network Access: Fluent Bit must be able to reach your MSK broker endpoints (AWS VPC setup).
256+
- AWS Credentials: Provide credentials using any supported AWS method:
257+
- IAM roles (recommended for EC2, ECS, or EKS)
258+
- Environment variables (`AWS_ACCESS_KEY_ID`, `AWS_SECRET_ACCESS_KEY`)
259+
- AWS credentials file (`~/.aws/credentials`)
260+
- Instance metadata service (IMDS)
264261

265-
Note these credentials are discovery by default when `aws_msk_iam` flag is enabled.
262+
These credentials are discovered by default when `aws_msk_iam` flag is enabled.
266263

267-
- **IAM Permissions:** The credentials must allow access to the target MSK cluster (see example policy below).
264+
- IAM Permissions: The credentials must allow access to the target MSK cluster.
268265

269-
### Configuration Parameters
266+
### AWS MSK IAM configuration parameters
270267

271-
| Property | Description | Type | Required |
268+
This plugin supports the following parameters:
269+
270+
| Property | Description | Type | Default |
272271
|---------------------------|-----------------------------------------------------|---------|-------------------------------|
273-
| `aws_msk_iam` | Enable AWS MSK IAM authentication | Boolean | No (default: false) |
274-
| `aws_msk_iam_cluster_arn` | Full ARN of the MSK cluster for region extraction | String | Yes (if `aws_msk_iam` is true)|
272+
| `aws_msk_iam` | Optional. Enable AWS MSK IAM authentication. | Boolean | `false` |
273+
| `aws_msk_iam_cluster_arn` | Full ARN of the MSK cluster for region extraction. Required if `aws_msk_iam` is set. | String | _none_ |
275274

276-
### Configuration Example
275+
### Configuration example
277276

278277

279278
{% tabs %}
@@ -296,9 +295,9 @@ pipeline:
296295
{% endtab %}
297296
{% endtabs %}
298297
299-
### Example AWS IAM Policy
298+
### AWS IAM policy
300299
301-
> **Note:** IAM policies and permissions can be complex and may vary depending on your organization's security requirements. If you are unsure about the correct permissions or best practices, please consult with your AWS administrator or an AWS expert who is familiar with MSK and IAM security.
300+
IAM policies and permissions can be complex and can vary depending on your organization's security requirements. If you are unsure about the correct permissions or best practices, consult with your AWS administrator or an AWS expert who is familiar with MSK and IAM security.
302301
303302
The AWS credentials used by Fluent Bit must have permission to connect to your MSK cluster. Here is a minimal example policy:
304303
@@ -320,4 +319,4 @@ The AWS credentials used by Fluent Bit must have permission to connect to your M
320319
}
321320
]
322321
}
323-
```
322+
```

vale-styles/FluentBit/Acronyms.yml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ exceptions:
3131
- DPPS
3232
- ECR
3333
- ECS
34+
- EKS
3435
- FAQ
3536
- FIPS
3637
- GCC
@@ -50,6 +51,7 @@ exceptions:
5051
- HTTPS
5152
- IAM
5253
- IDE
54+
- IMDS
5355
- JAR
5456
- JSON
5557
- JSX
@@ -60,6 +62,7 @@ exceptions:
6062
- LTS
6163
- LTSV
6264
- MQTT
65+
- MSK
6366
- NET
6467
- NGINX
6568
- NIC
@@ -104,6 +107,7 @@ exceptions:
104107
- UTC
105108
- UTF
106109
- UUID
110+
- VPC
107111
- WASI
108112
- WMI
109113
- XML

vale-styles/FluentBit/Headings.yml

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ exceptions:
1616
- Amazon OpenSearch Service
1717
- API
1818
- APIs
19+
- AWS MSK IAM
20+
- AWS IAM
1921
- Azure
2022
- Azure Logs Ingestion API
2123
- Azure Log Analytics
@@ -51,6 +53,9 @@ exceptions:
5153
- gRPC
5254
- I
5355
- InfluxDB
56+
- Kafka
57+
- Kafka Producer
58+
- Kafka REST
5459
- Kinesis
5560
- Kubernetes
5661
- LaunchDarkly
@@ -72,7 +77,6 @@ exceptions:
7277
- ServiceMonitor
7378
- SignalFx
7479
- Slack
75-
- SQL
7680
- SSL
7781
- StatsD
7882
- Studio

vale-styles/FluentBit/Spelling-exceptions.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ Appname
88
autoscale
99
autoscaler
1010
autoscaling
11+
Avro
1112
backoff
1213
backpressure
1314
backtrace

0 commit comments

Comments
 (0)