Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
139 changes: 69 additions & 70 deletions pipeline/outputs/kafka.md
Original file line number Diff line number Diff line change
@@ -1,42 +1,44 @@
# Kafka Producer

Kafka output plugin, producer, allows to ingest your records into an [Apache Kafka](https://kafka.apache.org/) service. This plugin use the official [librdkafka C library](https://github.com/edenhill/librdkafka) \(built-in dependency\)
The _Kafka Producer_ output plugin lets you ingest your records into an [Apache Kafka](https://kafka.apache.org/) service. This plugin uses the official [librdkafka C library](https://github.com/edenhill/librdkafka).

Starting with version 4.0.4, the Kafka input plugin supports authentication with AWS MSK IAM, enabling integration with Amazon MSK (Managed Streaming for Apache Kafka) clusters that require IAM-based access.
In Fluent Bit 4.0.4 and later, the Kafka input plugin supports authentication with AWS MSK IAM, enabling integration with Amazon MSK (Managed Streaming for Apache Kafka) clusters that require IAM-based access.

## Configuration Parameters
## Configuration parameters

| Key | Description | default |
This plugin supports the following parameters:

| Key | Description | Default |
| :--- | :--- | :--- |
| 'format' | Specify data format, options available: json, msgpack, raw. | json |
| 'message_key' | Optional key to store the message | |
| 'message_key_field' | If set, the value of Message_Key_Field in the record will indicate the message key. If not set nor found in the record, Message_Key will be used (if set). | |
| 'timestamp_key' | Set the key to store the record timestamp | @timestamp |
| 'timestamp_format' | Specify timestamp format, should be 'double', '[iso8601](https://en.wikipedia.org/wiki/ISO_8601)' (seconds precision) or 'iso8601_ns' (fractional seconds precision) | double |
| 'brokers' | Single or multiple list of Kafka Brokers, e.g: 192.168.1.3:9092, 192.168.1.4:9092. | |
| 'topics' | Single entry or list of topics separated by comma (,) that Fluent Bit will use to send messages to Kafka. If only one topic is set, that one will be used for all records. Instead if multiple topics exists, the one set in the record by Topic_Key will be used. | fluent-bit |
| 'topic_key' | If multiple Topics exists, the value of Topic_Key in the record will indicate the topic to use. E.g: if Topic_Key is _router_ and the record is {"key1": 123, "router": "route_2"}, Fluent Bit will use topic _route_2_. Note that if the value of Topic_Key is not present in Topics, then by default the first topic in the Topics list will indicate the topic to be used. | |
| 'dynamic_topic' | adds unknown topics (found in Topic_Key) to Topics. So in Topics only a default topic needs to be configured | Off |
| 'queue_full_retries' | Fluent Bit queues data into rdkafka library, if for some reason the underlying library cannot flush the records the queue might fills up blocking new addition of records. The `queue_full_retries` option set the number of local retries to enqueue the data. The default value is 10 times, the interval between each retry is 1 second. Setting the `queue_full_retries` value to `0` set's an unlimited number of retries. | 10 |
| 'rdkafka.{property}' | '{property}' can be any [librdkafka properties](https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md) | |
| 'raw_log_key' | When using the raw format and set, the value of raw_log_key in the record will be send to kafka as the payload. | |
| 'workers' | The number of [workers](../../administration/multithreading.md#outputs) to perform flush operations for this output. | `0` |
| `format` | Specify data format. Available formats: `json`, `msgpack`, `raw`. | `json` |
| `message_key` | Optional key to store the message | _none_ |
| `message_key_field` | If set, the value of `message_key_field` in the record will indicate the message key. If not set nor found in the record, `message_key` will be used if set. | _none_ |
| `timestamp_key` | Set the key to store the record timestamp | `@timestamp` |
| `timestamp_format` | Specify timestamp format. Allowed values:`double`, `[iso8601](https://en.wikipedia.org/wiki/ISO_8601)` (seconds precision) or `iso8601_ns` (fractional seconds precision). | `double` |
| `brokers` | Single or multiple list of Kafka Brokers. For example, `192.168.1.3:9092`, `192.168.1.4:9092`. | _none_ |
| `topics` | Single entry or list of topics separated by comma (,) that Fluent Bit will use to send messages to Kafka. If only one topic is set, that one will be used for all records. Instead if multiple topics exists, the one set in the record by `Topic_Key` will be used. | fluent-bit |
| `topic_key` | If multiple `topics` exist, the value of `Topic_Key` in the record will indicate the topic to use. For example, if `Topic_Key` is `router` and the record is `{"key1": 123, "router": "route_2"}`, Fluent Bit will use `topic _route_2_`. If the value of `Topic_Key` isn't present in `topics`, then the first topic in the `topics` list will indicate the topic to be used. | _none_ |
| `dynamic_topic` | Adds unknown topics (found in `Topic_Key`) to `topics`. In `topics`, only a default topic needs to be configured. | `Off` |
| `queue_full_retries` | Fluent Bit queues data into `rdkafka` library. If the underlying library can't flush the records the queue might fill up, blocking new addition of records. `queue_full_retries` sets the number of local retries to enqueue the data. The interval between retries is 1 second. Setting the `queue_full_retries` value to `0` sets an unlimited number of retries. | `10` |
| `rdkafka.{property}` | `{property}` can be any [librdkafka properties](https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md) | |
| `raw_log_key` | When using the raw format and set, the value of `raw_log_key` in the record will be send to Kafka as the payload. | _none_ |
| `workers` | The number of [workers](../../administration/multithreading.md#outputs) to perform flush operations for this output. | `0` |

> Setting `rdkafka.log.connection.close` to `false` and `rdkafka.request.required.acks` to 1 are examples of recommended settings of librdfkafka properties.
Setting `rdkafka.log.connection.close` to `false` and `rdkafka.request.required.acks` to `1` are examples of recommended settings of `librdfkafka` properties.

## Getting Started
## Get started

In order to insert records into Apache Kafka, you can run the plugin from the command line or through the configuration file:
To insert records into Apache Kafka, you can run the plugin from the command line or through the configuration file.

### Command Line
### Command line

The **kafka** plugin can read parameters through the **-p** argument \(property\), e.g:
The Kafka plugin can read parameters through the `-p` argument (property):

```shell
fluent-bit -i cpu -o kafka -p brokers=192.168.1.3:9092 -p topics=test
```

### Configuration File
### Configuration file

In your main configuration file append the following:

Expand All @@ -47,7 +49,7 @@ In your main configuration file append the following:
pipeline:
inputs:
- name: cpu

outputs:
- name: kafka
match: '*'
Expand All @@ -72,29 +74,26 @@ pipeline:
{% endtab %}
{% endtabs %}

### Avro Support
### Avro support

Fluent-bit comes with support for avro encoding for the out_kafka plugin.
Avro support is optional and must be activated at build-time by using a
build def with cmake: `-DFLB_AVRO_ENCODER=On` such as in the following
Fluent Bit comes with support for Avro encoding for the `out_kafka` plugin.
Avro support is optional and must be activated at build time by using a
build def with `cmake`: `-DFLB_AVRO_ENCODER=On` such as in the following
example which activates:

* out_kafka with avro encoding
* fluent-bit's prometheus
* metrics via an embedded http endpoint
* debugging support
* builds the test suites
- `out_kafka` with Avro encoding
- Fluent Bit Prometheus
- Metrics using an embedded HTTP endpoint
- Debugging support
- Builds the test suites

```shell
cmake -DFLB_DEV=On -DFLB_OUT_KAFKA=On -DFLB_TLS=On -DFLB_TESTS_RUNTIME=On -DFLB_TESTS_INTERNAL=On -DCMAKE_BUILD_TYPE=Debug -DFLB_HTTP_SERVER=true -DFLB_AVRO_ENCODER=On ../
```

#### Kafka Configuration File with Avro Encoding
#### Kafka configuration file with Avro encoding

This is example fluent-bit config tails kubernetes logs, decorates the
log lines with kubernetes metadata via the kubernetes filter, and then
sends the fully decorated log lines to a kafka broker encoded with a
specific avro schema.
In this example, the Fluent Bit configuration tails Kubernetes logs, updates the log lines with Kubernetes metadata using the Kubernetes filter. It then sends the updated log lines to a Kafka broker encoded with a specific Avro schema.

{% tabs %}
{% tab title="fluent-bit.yaml" %}
Expand All @@ -110,7 +109,7 @@ pipeline:
skip_long_lines: on
refresh_interval: 10
parser: some-parser

filters:
- name: kubernetes
match: 'kube.*'
Expand All @@ -120,7 +119,7 @@ pipeline:
kube_tag_prefix: kube.var.log.containers.
merge_log: on
merge_log_key: log_processed

outputs:
- name: kafka
match: '*'
Expand All @@ -130,7 +129,7 @@ pipeline:
schema_id: some_schema_id
rdkafka.client.id: some_client_id
rdkafka.debug: all
rdkafka.enable.ssl.certificate.verification: true
rdkafka.enable.ssl.certificate.verification: true
rdkafka.ssl.certificate.location: /certs/some.cert
rdkafka.ssl.key.location: /certs/some.key
rdkafka.ssl.ca.location: /certs/some-bundle.crt
Expand Down Expand Up @@ -192,11 +191,9 @@ pipeline:
{% endtab %}
{% endtabs %}

#### Kafka Configuration File with Raw format
#### Kafka configuration file with `raw`format

This example Fluent Bit configuration file creates example records with the
_payloadkey_ and _msgkey_ keys. The _msgkey_ value is used as the Kafka message
key, and the _payloadkey_ value as the payload.
This example Fluent Bit configuration file creates example records with the `payloadkey` and `msgkey` keys. The `msgkey` value is used as the Kafka message key, and the `payloadkey` value as the payload.

{% tabs %}
{% tab title="fluent-bit.yaml" %}
Expand All @@ -207,7 +204,7 @@ pipeline:
- name: dummy
tag: example.data
dummy: '{"payloadkey":"Data to send to kafka", "msgkey": "Key to use in the message"}'

outputs:
- name: kafka
match: '*'
Expand Down Expand Up @@ -241,39 +238,41 @@ pipeline:
{% endtab %}
{% endtabs %}

## AWS MSK IAM Authentication
## AWS MSK IAM authentication

*Available since Fluent Bit v4.0.4*

Fluent Bit supports authentication to Amazon MSK (Managed Streaming for Apache Kafka) clusters using AWS IAM for the Kafka output plugin. This allows you to securely send data to MSK brokers with AWS credentials, leveraging IAM roles and policies for access control.
Fluent Bit 4.0.4 and later supports authentication to Amazon MSK (Managed Streaming for Apache Kafka) clusters using AWS IAM for the Kafka output plugin. This lets you securely send data to MSK brokers with AWS credentials, leveraging IAM roles and policies for access control.

### Prerequisites

**Build Requirements**
If you are compiling Fluent Bit from source, ensure the following requirements are met to enable AWS MSK IAM support:
If you are compiling Fluent Bit from source, ensure the following requirements are met to enable AWS MSK IAM support:

- Build Requirements

The packages `libsasl2` and `libsasl2-dev` must be installed on your build environment.

- The packages `libsasl2` and `libsasl2-dev` must be installed on your build environment.
- Runtime Requirements:

**Runtime Requirements**
- **Network Access:** Fluent Bit must be able to reach your MSK broker endpoints (AWS VPC setup).
- **AWS Credentials:** Provide credentials using any supported AWS method:
- IAM roles (recommended for EC2, ECS, or EKS)
- Environment variables (`AWS_ACCESS_KEY_ID`, `AWS_SECRET_ACCESS_KEY`)
- AWS credentials file (`~/.aws/credentials`)
- Instance metadata service (IMDS)
- Network Access: Fluent Bit must be able to reach your MSK broker endpoints (AWS VPC setup).
- AWS Credentials: Provide credentials using any supported AWS method:
- IAM roles (recommended for EC2, ECS, or EKS)
- Environment variables (`AWS_ACCESS_KEY_ID`, `AWS_SECRET_ACCESS_KEY`)
- AWS credentials file (`~/.aws/credentials`)
- Instance metadata service (IMDS)

Note these credentials are discovery by default when `aws_msk_iam` flag is enabled.
These credentials are discovered by default when `aws_msk_iam` flag is enabled.

- **IAM Permissions:** The credentials must allow access to the target MSK cluster (see example policy below).
- IAM Permissions: The credentials must allow access to the target MSK cluster.

### Configuration Parameters
### AWS MSK IAM configuration parameters

| Property | Description | Type | Required |
This plugin supports the following parameters:

| Property | Description | Type | Default |
|---------------------------|-----------------------------------------------------|---------|-------------------------------|
| `aws_msk_iam` | Enable AWS MSK IAM authentication | Boolean | No (default: false) |
| `aws_msk_iam_cluster_arn` | Full ARN of the MSK cluster for region extraction | String | Yes (if `aws_msk_iam` is true)|
| `aws_msk_iam` | Optional. Enable AWS MSK IAM authentication. | Boolean | `false` |
| `aws_msk_iam_cluster_arn` | Full ARN of the MSK cluster for region extraction. Required if `aws_msk_iam` is set. | String | _none_ |

### Configuration Example
### Configuration example


{% tabs %}
Expand All @@ -296,9 +295,9 @@ pipeline:
{% endtab %}
{% endtabs %}

### Example AWS IAM Policy
### AWS IAM policy

> **Note:** IAM policies and permissions can be complex and may vary depending on your organization's security requirements. If you are unsure about the correct permissions or best practices, please consult with your AWS administrator or an AWS expert who is familiar with MSK and IAM security.
IAM policies and permissions can be complex and can vary depending on your organization's security requirements. If you are unsure about the correct permissions or best practices, consult with your AWS administrator or an AWS expert who is familiar with MSK and IAM security.

The AWS credentials used by Fluent Bit must have permission to connect to your MSK cluster. Here is a minimal example policy:

Expand All @@ -320,4 +319,4 @@ The AWS credentials used by Fluent Bit must have permission to connect to your M
}
]
}
```
```
4 changes: 4 additions & 0 deletions vale-styles/FluentBit/Acronyms.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ exceptions:
- DPPS
- ECR
- ECS
- EKS
- FAQ
- FIPS
- GCC
Expand All @@ -50,6 +51,7 @@ exceptions:
- HTTPS
- IAM
- IDE
- IMDS
- JAR
- JSON
- JSX
Expand All @@ -60,6 +62,7 @@ exceptions:
- LTS
- LTSV
- MQTT
- MSK
- NET
- NGINX
- NIC
Expand Down Expand Up @@ -104,6 +107,7 @@ exceptions:
- UTC
- UTF
- UUID
- VPC
- WASI
- WMI
- XML
Expand Down
6 changes: 5 additions & 1 deletion vale-styles/FluentBit/Headings.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ exceptions:
- Amazon OpenSearch Service
- API
- APIs
- AWS MSK IAM
- AWS IAM
- Azure
- Azure Logs Ingestion API
- Azure Log Analytics
Expand Down Expand Up @@ -51,6 +53,9 @@ exceptions:
- gRPC
- I
- InfluxDB
- Kafka
- Kafka Producer
- Kafka REST
- Kinesis
- Kubernetes
- LaunchDarkly
Expand All @@ -72,7 +77,6 @@ exceptions:
- ServiceMonitor
- SignalFx
- Slack
- SQL
- SSL
- StatsD
- Studio
Expand Down
1 change: 1 addition & 0 deletions vale-styles/FluentBit/Spelling-exceptions.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ Appname
autoscale
autoscaler
autoscaling
Avro
backoff
backpressure
backtrace
Expand Down