Skip to content

Commit d9108fd

Browse files
author
Suyog Rao
authored
breaking: Nest decorated fields under @metadata (#213)
* breaking: Nest decorated fields under `@metadata` Fixes #198 * add docs about the metdata information
1 parent 83fd74a commit d9108fd

File tree

5 files changed

+27
-10
lines changed

5 files changed

+27
-10
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1+
## 7.0.0
2+
- Breaking: Nest the decorated fields under `@metadata` field to avoid mapping conflicts with beats.
3+
Fixes #198, #180
4+
15
## 6.3.4
26
- Fix an issue that led to random failures in decoding messages when using more than one input thread
37

docs/index.asciidoc

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,19 @@ For more information see http://kafka.apache.org/documentation.html#theconsumer
6464

6565
Kafka consumer configuration: http://kafka.apache.org/documentation.html#consumerconfigs
6666

67+
==== Metadata fields
68+
69+
The following metadata from Kafka broker are added under the `[@metadata]` field:
70+
71+
* `[@metadata][kafka][topic]`: Original Kafka topic from where the message was consumed.
72+
* `[@metadata][kafka][consumer_group]`: Consumer group
73+
* `[@metadata][kafka][partition]`: Partition info for this message.
74+
* `[@metadata][kafka][offset]`: Original record offset for this message.
75+
* `[@metadata][kafka][key]`: Record key, if any.
76+
* `[@metadata][kafka][timestamp]`: Timestamp when this message was received by the Kafka broker.
77+
78+
Please note that `@metadata` fields are not part of any of your events at output time. If you need these information to be
79+
inserted into your original event, you'll have to use the `mutate` filter to manually copy the required fields into your `event`.
6780

6881
[id="plugins-{type}s-{plugin}-options"]
6982
==== Kafka Input Configuration Options

lib/logstash/inputs/kafka.rb

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -254,12 +254,12 @@ def thread_runner(logstash_queue, consumer)
254254
codec_instance.decode(record.value.to_s) do |event|
255255
decorate(event)
256256
if @decorate_events
257-
event.set("[kafka][topic]", record.topic)
258-
event.set("[kafka][consumer_group]", @group_id)
259-
event.set("[kafka][partition]", record.partition)
260-
event.set("[kafka][offset]", record.offset)
261-
event.set("[kafka][key]", record.key)
262-
event.set("[kafka][timestamp]", record.timestamp)
257+
event.set("[@metadata][kafka][topic]", record.topic)
258+
event.set("[@metadata][kafka][consumer_group]", @group_id)
259+
event.set("[@metadata][kafka][partition]", record.partition)
260+
event.set("[@metadata][kafka][offset]", record.offset)
261+
event.set("[@metadata][kafka][key]", record.key)
262+
event.set("[@metadata][kafka][timestamp]", record.timestamp)
263263
end
264264
logstash_queue << event
265265
end

logstash-input-kafka.gemspec

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
Gem::Specification.new do |s|
22
s.name = 'logstash-input-kafka'
3-
s.version = '6.3.4'
3+
s.version = '7.0.0'
44
s.licenses = ['Apache License (2.0)']
55
s.summary = 'This input will read events from a Kafka topic. It uses the high level consumer API provided by Kafka to read messages from the broker'
66
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"

spec/integration/inputs/kafka_spec.rb

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -134,9 +134,9 @@ def thread_it(kafka_input, queue)
134134
wait(timeout_seconds).for {queue.length}.to eq(num_events)
135135
expect(queue.length).to eq(num_events)
136136
event = queue.shift
137-
expect(event.get("kafka")["topic"]).to eq("logstash_topic_plain")
138-
expect(event.get("kafka")["consumer_group"]).to eq(group_id_3)
139-
expect(event.get("kafka")["timestamp"]).to be >= start
137+
expect(event.get("[@metadata][kafka][topic]")).to eq("logstash_topic_plain")
138+
expect(event.get("[@metadata][kafka][consumer_group]")).to eq(group_id_3)
139+
expect(event.get("[@metadata][kafka][timestamp]")).to be >= start
140140
ensure
141141
t.kill
142142
t.join(30_000)

0 commit comments

Comments
 (0)