Skip to content

Commit 5d7dbb8

Browse files
committed
in_rdkafka_group: support regexp pattern in topics
Signed-off-by: Shizuo Fujita <[email protected]>
1 parent 50e3d17 commit 5d7dbb8

File tree

3 files changed

+139
-3
lines changed

3 files changed

+139
-3
lines changed

README.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,9 @@ With the introduction of the rdkafka-ruby based input plugin we hope to support
177177

178178
See also [rdkafka-ruby](https://github.com/appsignal/rdkafka-ruby) and [librdkafka](https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md) for more detailed documentation about Kafka consumer options.
179179

180+
`topics` supports POSIX Extended Regular Expression pattern (not Ruby regex syntax) since v0.19.6. If you want to use regex pattern, use `/pattern/` like `/foo.*/`. <br>
181+
**Note**: A caret (`^`) is automatically added to the beginning of the pattern.
182+
180183
Consuming topic name is used for event tag. So when the target topic name is `app_event`, the tag is `app_event`. If you want to modify tag, use `add_prefix` or `add_suffix` parameter. With `add_prefix kafka`, the tag is `kafka.app_event`.
181184

182185
### Output plugin

lib/fluent/plugin/in_rdkafka_group.rb

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -70,19 +70,29 @@ def initialize
7070
end
7171

7272
def _config_to_array(config)
73-
config_array = config.split(',').map {|k| k.strip }
73+
config_array = config.split(',').map {|k| _config_regex_pattern(k.strip) }
7474
if config_array.empty?
7575
raise Fluent::ConfigError, "kafka_group: '#{config}' is a required parameter"
7676
end
7777
config_array
7878
end
79+
private :_config_to_array
80+
81+
def _config_regex_pattern(topic)
82+
if (m = /^\/(.+)\/$/.match(topic))
83+
# librdkafka recognizes string as regex pattern if the topic name starts with '^'.
84+
# https://github.com/confluentinc/librdkafka/blob/570c785e9e35812db8f50254bd2f7e0cf47def39/src/rdkafka.h#L4148
85+
# https://github.com/confluentinc/librdkafka/blob/e1db7eaa517f0a6438bc846a9c49ede73b9ea211/src/rdkafka_topic.c#L2064
86+
return "^#{m[1]}"
87+
end
88+
topic
89+
end
90+
private :_config_regex_pattern
7991

8092
def multi_workers_ready?
8193
true
8294
end
8395

84-
private :_config_to_array
85-
8696
def configure(conf)
8797
compat_parameters_convert(conf, :parser)
8898

Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
require 'helper'
2+
require 'fluent/test/driver/input'
3+
require 'securerandom'
4+
5+
class RdkafkaGroupInputTest < Test::Unit::TestCase
6+
7+
def have_rdkafka
8+
begin
9+
require 'fluent/plugin/in_rdkafka_group'
10+
true
11+
rescue LoadError
12+
false
13+
end
14+
end
15+
16+
def setup
17+
omit_unless(have_rdkafka, "rdkafka isn't installed")
18+
Fluent::Test.setup
19+
end
20+
21+
TOPIC_NAME = "kafka-input-#{SecureRandom.uuid}"
22+
23+
CONFIG = %[
24+
topics #{TOPIC_NAME}
25+
kafka_configs {"bootstrap.servers": "localhost:9092", "group.id": "test_group"}
26+
<parse>
27+
@type none
28+
</parse>
29+
]
30+
31+
def create_driver(conf = CONFIG)
32+
Fluent::Test::Driver::Input.new(Fluent::Plugin::RdKafkaGroupInput).configure(conf)
33+
end
34+
35+
36+
def test_configure
37+
d = create_driver
38+
assert_equal [TOPIC_NAME], d.instance.topics
39+
assert_equal 'localhost:9092', d.instance.kafka_configs['bootstrap.servers']
40+
end
41+
42+
def test_multi_worker_support
43+
d = create_driver
44+
assert_true d.instance.multi_workers_ready?
45+
end
46+
47+
class ConsumeTest < self
48+
TOPIC_NAME = "kafka-input-#{SecureRandom.uuid}"
49+
50+
def setup
51+
@kafka = Kafka.new(["localhost:9092"], client_id: 'kafka')
52+
@producer = @kafka.producer
53+
@kafka.create_topic(TOPIC_NAME)
54+
end
55+
56+
def teardown
57+
@kafka.delete_topic(TOPIC_NAME)
58+
@kafka.close
59+
end
60+
61+
def test_consume
62+
conf = %[
63+
topics #{TOPIC_NAME}
64+
kafka_configs {"bootstrap.servers": "localhost:9092", "group.id": "test_group"}
65+
<parse>
66+
@type none
67+
</parse>
68+
]
69+
70+
d = create_driver(conf)
71+
72+
d.run(expect_records: 1, timeout: 10) do
73+
sleep 0.1
74+
@producer.produce("Hello, fluent-plugin-kafka!", topic: TOPIC_NAME)
75+
@producer.deliver_messages
76+
end
77+
78+
expected = {'message' => 'Hello, fluent-plugin-kafka!'}
79+
assert_equal expected, d.events[0][2]
80+
end
81+
end
82+
83+
class ConsumeTopicWithRegexpTest < self
84+
TOPIC_NAME1 = "kafka-input-1-#{SecureRandom.uuid}"
85+
TOPIC_NAME2 = "kafka-input-2-#{SecureRandom.uuid}"
86+
TOPIC_NAME_REGEXP = "/kafka-input-(1|2)-.*/"
87+
88+
def setup
89+
@kafka = Kafka.new(["localhost:9092"], client_id: 'kafka')
90+
@producer = @kafka.producer
91+
@kafka.create_topic(TOPIC_NAME1)
92+
@kafka.create_topic(TOPIC_NAME2)
93+
end
94+
95+
def teardown
96+
@kafka.delete_topic(TOPIC_NAME1)
97+
@kafka.delete_topic(TOPIC_NAME2)
98+
@kafka.close
99+
end
100+
101+
def test_consume_with_regexp
102+
conf = %[
103+
topics #{TOPIC_NAME_REGEXP}
104+
kafka_configs {"bootstrap.servers": "localhost:9092", "group.id": "test_group"}
105+
<parse>
106+
@type none
107+
</parse>
108+
]
109+
d = create_driver(conf)
110+
111+
d.run(expect_records: 2, timeout: 10) do
112+
sleep 0.1
113+
@producer.produce("Hello, fluent-plugin-kafka! in topic 1", topic: TOPIC_NAME1)
114+
@producer.produce("Hello, fluent-plugin-kafka! in topic 2", topic: TOPIC_NAME2)
115+
@producer.deliver_messages
116+
end
117+
expected_message_pattern = /Hello, fluent-plugin-kafka! in topic [12]/
118+
assert_equal 2, d.events.size
119+
assert_match(expected_message_pattern, d.events[0][2]['message'])
120+
assert_match(expected_message_pattern, d.events[1][2]['message'])
121+
end
122+
end
123+
end

0 commit comments

Comments
 (0)