Skip to content

Commit 83fd74a

Browse files
#210 clone codec once per input thread since it is not threadsafe
Fixes #211
1 parent e0b3b2d commit 83fd74a

File tree

3 files changed

+7
-3
lines changed

3 files changed

+7
-3
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
## 6.3.4
2+
- Fix an issue that led to random failures in decoding messages when using more than one input thread
3+
14
## 6.3.3
25
- Upgrade Kafka client to version 0.11.0.0
36

lib/logstash/inputs/kafka.rb

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -247,10 +247,11 @@ def thread_runner(logstash_queue, consumer)
247247
else
248248
consumer.subscribe(topics);
249249
end
250+
codec_instance = @codec.clone
250251
while !stop?
251-
records = consumer.poll(poll_timeout_ms);
252+
records = consumer.poll(poll_timeout_ms)
252253
for record in records do
253-
@codec.decode(record.value.to_s) do |event|
254+
codec_instance.decode(record.value.to_s) do |event|
254255
decorate(event)
255256
if @decorate_events
256257
event.set("[kafka][topic]", record.topic)

logstash-input-kafka.gemspec

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
Gem::Specification.new do |s|
22
s.name = 'logstash-input-kafka'
3-
s.version = '6.3.3'
3+
s.version = '6.3.4'
44
s.licenses = ['Apache License (2.0)']
55
s.summary = 'This input will read events from a Kafka topic. It uses the high level consumer API provided by Kafka to read messages from the broker'
66
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"

0 commit comments

Comments
 (0)