Skip to content

Commit 374608f

Browse files
author
Eduardo Poleo
committed
ISSUE-525/764: Add multi subscription round robin assignment strategy.
The existing assignment strategy assumes identical subscriptions among consumers within the same consumer group. We need a more general implementation that accounts for different topic subscriptions to be able to perform correct assignments.The new implementation presented here is heavily inspired by the Kafka java client RoundRobinAssignor https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/RoundRobinAssignor.java
1 parent 65e4af5 commit 374608f

File tree

2 files changed

+254
-15
lines changed

2 files changed

+254
-15
lines changed

lib/kafka/round_robin_assignment_strategy.rb

Lines changed: 28 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
1-
# frozen_string_literal: true
2-
31
module Kafka
42

5-
# A consumer group partition assignment strategy that assigns partitions to
6-
# consumers in a round-robin fashion.
3+
# A round robin assignment strategy inpired on the
4+
# original java client round robin assignor. It's capable
5+
# of handling identical as well as different topic subscriptions
6+
# accross the same consumer group.
77
class RoundRobinAssignmentStrategy
88
def protocol_name
99
"roundrobin"
@@ -19,13 +19,34 @@ def protocol_name
1919
# @return [Hash<String, Array<Kafka::ConsumerGroup::Assignor::Partition>] a hash
2020
# mapping member ids to partitions.
2121
def call(cluster:, members:, partitions:)
22-
member_ids = members.keys
2322
partitions_per_member = Hash.new {|h, k| h[k] = [] }
24-
partitions.each_with_index do |partition, index|
25-
partitions_per_member[member_ids[index % member_ids.count]] << partition
23+
relevant_partitions = valid_sorted_partitions(members, partitions)
24+
members_ids = members.keys
25+
iterator = (0...members.size).cycle
26+
idx = iterator.next
27+
28+
relevant_partitions.each do |partition|
29+
topic = partition.topic
30+
31+
while !members[members_ids[idx]].topics.include?(topic)
32+
idx = iterator.next
33+
end
34+
35+
partitions_per_member[members_ids[idx]] << partition
36+
idx = iterator.next
2637
end
2738

2839
partitions_per_member
2940
end
41+
42+
def valid_sorted_partitions(members, partitions)
43+
subscribed_topics = members.map do |id, metadata|
44+
metadata && metadata.topics
45+
end.flatten.compact
46+
47+
partitions
48+
.select { |partition| subscribed_topics.include?(partition.topic) }
49+
.sort_by { |partition| partition.topic }
50+
end
3051
end
3152
end

spec/round_robin_assignment_strategy_spec.rb

Lines changed: 226 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
let(:strategy) { described_class.new }
55

66
it "assigns all partitions" do
7-
members = Hash[(0...10).map {|i| ["member#{i}", nil] }]
7+
members = Hash[(0...10).map {|i| ["member#{i}", double(topics: ['greetings'])] }]
88
partitions = (0...30).map {|i| double(:"partition#{i}", topic: "greetings", partition_id: i) }
99

1010
assignments = strategy.call(cluster: nil, members: members, partitions: partitions)
@@ -21,8 +21,8 @@
2121
end
2222

2323
it "spreads all partitions between members" do
24-
members = Hash[(0...10).map {|i| ["member#{i}", nil] }]
2524
topics = ["topic1", "topic2"]
25+
members = Hash[(0...10).map {|i| ["member#{i}", double(topics: topics)] }]
2626
partitions = topics.product((0...5).to_a).map {|topic, i|
2727
double(:"partition#{i}", topic: topic, partition_id: i)
2828
}
@@ -46,36 +46,50 @@
4646
expect(num_partitions_assigned).to all eq(1)
4747
end
4848

49+
Metadata = Struct.new(:topics)
4950
[
5051
{
5152
name: "uneven topics",
5253
topics: { "topic1" => [0], "topic2" => (0..50).to_a },
53-
members: { "member1" => nil, "member2" => nil },
54+
members: {
55+
"member1" => Metadata.new(["topic1", "topic2"]),
56+
"member2" => Metadata.new(["topic1", "topic2"])
57+
},
5458
},
5559
{
5660
name: "only one partition",
5761
topics: { "topic1" => [0] },
58-
members: { "member1" => nil, "member2" => nil },
62+
members: {
63+
"member1" => Metadata.new(["topic1"]),
64+
"member2" => Metadata.new(["topic1"])
65+
},
5966
},
6067
{
6168
name: "lots of partitions",
6269
topics: { "topic1" => (0..100).to_a },
63-
members: { "member1" => nil },
70+
members: { "member1" => Metadata.new(["topic1"]) },
6471
},
6572
{
6673
name: "lots of members",
6774
topics: { "topic1" => (0..10).to_a, "topic2" => (0..10).to_a },
68-
members: Hash[(0..50).map { |i| ["member#{i}", nil] }]
75+
members: Hash[(0..50).map { |i| ["member#{i}", Metadata.new(["topic1", "topic2"])] }]
6976
},
7077
{
7178
name: "odd number of partitions",
7279
topics: { "topic1" => (0..14).to_a },
73-
members: { "member1" => nil, "member2" => nil },
80+
members: {
81+
"member1" => Metadata.new(["topic1"]),
82+
"member2" => Metadata.new(["topic1"])
83+
},
7484
},
7585
{
7686
name: "five topics, 10 partitions, 3 consumers",
7787
topics: { "topic1" => [0, 1], "topic2" => [0, 1], "topic3" => [0, 1], "topic4" => [0, 1], "topic5" => [0, 1] },
78-
members: { "member1" => nil, "member2" => nil, "member3" => nil },
88+
members: {
89+
"member1" => Metadata.new(["topic1", "topic2", "topic3", "topic4", "topic5"]),
90+
"member2" => Metadata.new(["topic1", "topic2", "topic3", "topic4", "topic5"]),
91+
"member3" => Metadata.new(["topic1", "topic2", "topic3", "topic4", "topic5"])
92+
},
7993
}
8094
].each do |options|
8195
name, topics, members = options[:name], options[:topics], options[:members]
@@ -113,4 +127,208 @@ def expect_even_assignments(topics, assignments)
113127
expect(num_assigned).to be_within(1).of(num_partitions.to_f / assignments.count)
114128
end
115129
end
130+
131+
context 'one consumer no subscriptions or topics / partitions' do
132+
it 'returns empty assignments' do
133+
members = { 'member1' => nil }
134+
partitions = []
135+
136+
assignments = strategy.call(cluster: nil, members: members, partitions: partitions)
137+
138+
expect(assignments).to eq({})
139+
end
140+
end
141+
142+
context 'one consumer with subscription but no matching topic partition' do
143+
it 'returns empty assignments' do
144+
members = { 'member1' => double(topics: ['topic1']) }
145+
partitions = []
146+
147+
assignments = strategy.call(cluster: nil, members: members, partitions: partitions)
148+
149+
expect(assignments).to eq({})
150+
end
151+
end
152+
153+
context 'one consumer subscribed to one topic with one partition' do
154+
it 'assigns the partition to the consumer' do
155+
members = { 'member1' => double(topics: ['topic1']) }
156+
partitions = [
157+
t1p0 = double(:"t1p0", topic: "topic1", partition_id: 0),
158+
]
159+
160+
assignments = strategy.call(cluster: nil, members: members, partitions: partitions)
161+
162+
expect(assignments).to eq({
163+
'member1' => [t1p0]
164+
})
165+
end
166+
end
167+
168+
context 'one consumer subscribed to one topic with multiple partitions' do
169+
it 'assigns all partitions to the consumer' do
170+
members = { 'member1' => double(topics: ['topic1']) }
171+
partitions = [
172+
t1p0 = double(:"t1p0", topic: "topic1", partition_id: 0),
173+
t1p1 = double(:"t1p1", topic: "topic1", partition_id: 1),
174+
]
175+
176+
assignments = strategy.call(cluster: nil, members: members, partitions: partitions)
177+
178+
expect(assignments).to eq({
179+
'member1' => [t1p0, t1p1]
180+
})
181+
end
182+
end
183+
184+
context 'one consumer subscribed to one topic but with multiple different topic partitions' do
185+
it 'only assigns partitions for the subscribed topic' do
186+
members = { 'member1' => double(topics: ['topic1']) }
187+
partitions = [
188+
t1p0 = double(:"t1p0", topic: "topic1", partition_id: 0),
189+
t1p1 = double(:"t1p1", topic: "topic1", partition_id: 1),
190+
t2p0 = double(:"t2p0", topic: "topic2", partition_id: 0),
191+
]
192+
193+
assignments = strategy.call(cluster: nil, members: members, partitions: partitions)
194+
195+
expect(assignments).to eq({
196+
'member1' => [t1p0, t1p1]
197+
})
198+
end
199+
end
200+
201+
context 'one consumer subscribed to multiple topics' do
202+
it 'assigns all the topics partitions to the consumer' do
203+
members = { 'member1' => double(topics: ['topic1', 'topic2']) }
204+
205+
partitions = [
206+
t1p0 = double(:"t1p0", topic: "topic1", partition_id: 0),
207+
t1p1 = double(:"t1p1", topic: "topic1", partition_id: 1),
208+
t2p0 = double(:"t2p0", topic: "topic2", partition_id: 0),
209+
]
210+
211+
assignments = strategy.call(cluster: nil, members: members, partitions: partitions)
212+
213+
expect(assignments).to eq({
214+
'member1' => [t1p0, t1p1, t2p0]
215+
})
216+
end
217+
end
218+
219+
context 'two consumers with one topic and only one partition' do
220+
it 'only assigns the partition to one consumer' do
221+
members = {
222+
'member1' => double(topics: ['topic1']),
223+
'member2' => double(topics: ['topic1'])
224+
}
225+
partitions = [
226+
t1p0 = double(:"t1p0", topic: "topic1", partition_id: 0),
227+
]
228+
229+
assignments = strategy.call(cluster: nil, members: members, partitions: partitions)
230+
231+
expect(assignments).to eq({
232+
'member1' => [t1p0]
233+
})
234+
end
235+
end
236+
237+
context 'two consumers subscribed to one topic with two partitions' do
238+
it 'assigns a partition to each consumer' do
239+
members = {
240+
'member1' => double(topics: ['topic1']),
241+
'member2' => double(topics: ['topic1'])
242+
}
243+
partitions = [
244+
t1p0 = double(:"t1p0", topic: "topic1", partition_id: 0),
245+
t1p1 = double(:"t1p1", topic: "topic1", partition_id: 1),
246+
]
247+
248+
assignments = strategy.call(cluster: nil, members: members, partitions: partitions)
249+
250+
expect(assignments).to eq({
251+
'member1' => [t1p0],
252+
'member2' => [t1p1]
253+
})
254+
end
255+
end
256+
257+
context 'multiple consumers with mixed topics subscriptions' do
258+
it 'creates a balanced assignment' do
259+
members = {
260+
'member1' => double(topics: ['topic1']),
261+
'member2' => double(topics: ['topic1', 'topic2']),
262+
'member3' => double(topics: ['topic1'])
263+
}
264+
partitions = [
265+
t1p0 = double(:"t1p0", topic: "topic1", partition_id: 0),
266+
t1p1 = double(:"t1p1", topic: "topic1", partition_id: 1),
267+
t1p2 = double(:"t1p2", topic: "topic1", partition_id: 2),
268+
t2p0 = double(:"t2p0", topic: "topic2", partition_id: 0),
269+
t2p1 = double(:"t2p1", topic: "topic2", partition_id: 1),
270+
]
271+
272+
assignments = strategy.call(cluster: nil, members: members, partitions: partitions)
273+
274+
expect(assignments).to eq({
275+
'member1' => [t1p0],
276+
'member2' => [t1p1, t2p0, t2p1],
277+
'member3' => [t1p2]
278+
})
279+
end
280+
end
281+
282+
context 'two consumers subscribed to two topics with three partitions each' do
283+
it 'creates a balanced assignment' do
284+
members = {
285+
'member1' => double(topics: ['topic1', 'topic2']),
286+
'member2' => double(topics: ['topic1', 'topic2'])
287+
}
288+
partitions = [
289+
t1p0 = double(:"t1p0", topic: "topic1", partition_id: 0),
290+
t1p1 = double(:"t1p1", topic: "topic1", partition_id: 1),
291+
t1p2 = double(:"t1p2", topic: "topic1", partition_id: 2),
292+
t2p0 = double(:"t2p0", topic: "topic2", partition_id: 0),
293+
t2p1 = double(:"t2p1", topic: "topic2", partition_id: 1),
294+
t2p2 = double(:"t2p2", topic: "topic2", partition_id: 2),
295+
]
296+
297+
assignments = strategy.call(cluster: nil, members: members, partitions: partitions)
298+
299+
expect(assignments).to eq({
300+
'member1' => [t1p0, t1p2, t2p1],
301+
'member2' => [t1p1, t2p0, t2p2]
302+
})
303+
end
304+
end
305+
306+
context 'many consumers subscribed to one topic with partitions given out of order' do
307+
it 'produces balanced assignments' do
308+
members = {
309+
'member1' => double(topics: ['topic1']),
310+
'member2' => double(topics: ['topic1']),
311+
'member3' => double(topics: ['topic2']),
312+
}
313+
314+
partitions = [
315+
t2p0 = double(:"t2p0", topic: "topic2", partition_id: 0),
316+
t1p0 = double(:"t1p0", topic: "topic1", partition_id: 0),
317+
t2p1 = double(:"t2p1", topic: "topic2", partition_id: 1),
318+
t1p1 = double(:"t1p1", topic: "topic1", partition_id: 1),
319+
]
320+
321+
assignments = strategy.call(cluster: nil, members: members, partitions: partitions)
322+
323+
# Without sorting the partitions by topic this input would produce a non balanced assignment:
324+
# member1 => [t1p0, t1p1]
325+
# member2 => []
326+
# member3 => [t2p0, t2p1]
327+
expect(assignments).to eq({
328+
'member1' => [t1p0],
329+
'member2' => [t1p1],
330+
'member3' => [t2p0, t2p1]
331+
})
332+
end
333+
end
116334
end

0 commit comments

Comments
 (0)