Skip to content

Commit cf013c2

Browse files
authored
Merge pull request #468 from raytung/fix/rdkafka0-12-0
fix(out_rdkafka2): fix compatibility with rdkafka 0.12.0
2 parents 0d239f8 + 95b58eb commit cf013c2

File tree

3 files changed

+35
-2
lines changed

3 files changed

+35
-2
lines changed

.github/workflows/linux.yml

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,10 @@ jobs:
1515
ruby: [ '3.2', '3.1', '3.0', '2.7' ]
1616
os:
1717
- ubuntu-latest
18-
name: Ruby ${{ matrix.ruby }} unit testing on ${{ matrix.os }}
18+
rdkafka_versions:
19+
- { min: '>= 0.6.0', max: '< 0.12.0' }
20+
- { min: '>= 0.12.0', max: '>= 0.12.0' }
21+
name: Ruby ${{ matrix.ruby }} unit testing on ${{ matrix.os }} with rdkafka gem version (min ${{ matrix.rdkafka_versions.min }} max ${{ matrix.rdkafka_versions.max }})
1922
steps:
2023
- uses: actions/checkout@v3
2124
- uses: ruby/setup-ruby@v1
@@ -33,6 +36,8 @@ jobs:
3336
- name: unit testing
3437
env:
3538
CI: true
39+
RDKAFKA_VERSION_MIN_RANGE: ${{ matrix.rdkafka_versions.min }}
40+
RDKAFKA_VERSION_MAX_RANGE: ${{ matrix.rdkafka_versions.max }}
3641
run: |
3742
sudo ./ci/prepare-kafka-server.sh
3843
gem install bundler rake

Gemfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,4 +3,4 @@ source 'https://rubygems.org'
33
# Specify your gem's dependencies in fluent-plugin-kafka.gemspec
44
gemspec
55

6-
gem 'rdkafka', '>= 0.6.0' if ENV["USE_RDKAFKA"]
6+
gem 'rdkafka', ENV['RDKAFKA_VERSION_MIN_RANGE'], ENV['RDKAFKA_VERSION_MAX_RANGE'] if ENV['USE_RDKAFKA']

lib/fluent/plugin/out_rdkafka2.rb

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,37 @@
55

66
require 'rdkafka'
77

8+
# This is required for `rdkafka` version >= 0.12.0
9+
# Overriding the close method in order to provide a time limit for when it should be forcibly closed
10+
class Rdkafka::Producer::Client
11+
# return false if producer is forcefully closed, otherwise return true
12+
def close(timeout=nil)
13+
return unless @native
14+
15+
# Indicate to polling thread that we're closing
16+
@polling_thread[:closing] = true
17+
# Wait for the polling thread to finish up
18+
thread = @polling_thread.join(timeout)
19+
20+
Rdkafka::Bindings.rd_kafka_destroy(@native)
21+
22+
@native = nil
23+
24+
return !thread.nil?
25+
end
26+
end
27+
828
class Rdkafka::Producer
929
# return false if producer is forcefully closed, otherwise return true
1030
def close(timeout = nil)
31+
rdkafka_version = Rdkafka::VERSION || '0.0.0'
32+
# Rdkafka version >= 0.12.0 changed its internals
33+
if Gem::Version::create(rdkafka_version) >= Gem::Version.create('0.12.0')
34+
ObjectSpace.undefine_finalizer(self)
35+
36+
return @client.close(timeout)
37+
end
38+
1139
@closing = true
1240
# Wait for the polling thread to finish up
1341
# If the broker isn't alive, the thread doesn't exit

0 commit comments

Comments
 (0)