Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 57 additions & 4 deletions pipeline/inputs/kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,38 @@ This plugin uses the official [librdkafka C library](https://github.com/edenhill

## Get started

To subscribe to or collect messages from Apache Kafka, run the plugin from the command line or through the configuration file:
To subscribe to or collect messages from Apache Kafka, run the plugin from the command line or through the configuration file as shown below.

### Command line

The Kafka plugin can read parameters through the `-p` argument (property):

```shell
fluent-bit -i kafka -o stdout -p brokers=192.168.1.3:9092 -p topics=some-topic
$ fluent-bit -i kafka -o stdout -p brokers=192.168.1.3:9092 -p topics=some-topic
```

### Configuration file

In your main configuration file append the following `Input` and `Output` sections:
In your main configuration file append the following:

{% tabs %}
{% tab title="fluent-bit.yaml" %}

```yaml
pipeline:
inputs:
- name: kafka
brokers: 192.168.1.3:9092
topics: some-topic
poll_ms: 100

outputs:
- name: stdout
match: '*'
```

{% endtab %}
{% tab title="fluent-bit.conf" %}

```text
[INPUT]
Expand All @@ -43,12 +62,43 @@ In your main configuration file append the following `Input` and `Output` sectio

[OUTPUT]
Name stdout
Match *
```

{% endtab %}
{% endtabs %}

#### Example of using Kafka input and output plugins

The Fluent Bit source repository contains a full example of using Fluent Bit to process Kafka records:

{% tabs %}
{% tab title="fluent-bit.yaml" %}

```yaml
pipeline:
inputs:
- name: kafka
brokers: kafka-broker:9092
topics: fb-source
poll_ms: 100
format: json

filters:
- name: lua
match: '*'
script: kafka.lua
call: modify_kafka_message

outputs:
- name: kafka
brokers: kafka-broker:9092
topics: fb-sink
```

{% endtab %}
{% tab title="fluent-bit.conf" %}

```text
[INPUT]
Name kafka
Expand All @@ -69,10 +119,13 @@ The Fluent Bit source repository contains a full example of using Fluent Bit to
topics fb-sink
```

{% endtab %}
{% endtabs %}

The previous example will connect to the broker listening on `kafka-broker:9092` and subscribe to the `fb-source` topic, polling for new messages every 100 milliseconds.

Since the payload will be in JSON format, the plugin is configured to parse the payload with `format json`.

Every message received is then processed with `kafka.lua` and sent back to the `fb-sink` topic of the same broker.

The example can be executed locally with `make start` in the `examples/kafka_filter` directory (`docker/compose` is used).
The example can be executed locally with `make start` in the `examples/kafka_filter` directory (`docker/compose` is used).