Skip to content

Commit d57e23a

Browse files
authored
Use specified partitioner for partition key (#173)
Rdkafka allows us to specify the partition key but always calculates the partition using consistent_random partitioner even if a partitioner is specified in the configuration. This commit makes it use the specified partitioner.
1 parent 77c0134 commit d57e23a

File tree

4 files changed

+21
-6
lines changed

4 files changed

+21
-6
lines changed

lib/rdkafka/bindings.rb

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -246,14 +246,21 @@ class TopicPartitionList < FFI::Struct
246246
attach_function :rd_kafka_conf_set_dr_msg_cb, [:pointer, :delivery_cb], :void
247247

248248
# Partitioner
249-
attach_function :rd_kafka_msg_partitioner_consistent_random, [:pointer, :pointer, :size_t, :int32, :pointer, :pointer], :int32
249+
PARTITIONERS = %w(random consistent consistent_random murmur2 murmur2_random fnv1a fnv1a_random).each_with_object({}) do |name, hsh|
250+
method_name = "rd_kafka_msg_partitioner_#{name}".to_sym
251+
attach_function method_name, [:pointer, :pointer, :size_t, :int32, :pointer, :pointer], :int32
252+
hsh[name] = method_name
253+
end
250254

251-
def self.partitioner(str, partition_count)
255+
def self.partitioner(str, partition_count, partitioner_name = "consistent_random")
252256
# Return RD_KAFKA_PARTITION_UA(unassigned partition) when partition count is nil/zero.
253257
return -1 unless partition_count&.nonzero?
254258

255259
str_ptr = FFI::MemoryPointer.from_string(str)
256-
rd_kafka_msg_partitioner_consistent_random(nil, str_ptr, str.size, partition_count, nil, nil)
260+
method_name = PARTITIONERS.fetch(partitioner_name) do
261+
raise Rdkafka::Config::ConfigError.new("Unknown partitioner: #{partitioner_name}")
262+
end
263+
public_send(method_name, nil, str_ptr, str.size, partition_count, nil, nil)
257264
end
258265

259266
# Create Topics

lib/rdkafka/config.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,7 @@ def producer
179179
# Set callback to receive delivery reports on config
180180
Rdkafka::Bindings.rd_kafka_conf_set_dr_msg_cb(config, Rdkafka::Callbacks::DeliveryCallbackFunction)
181181
# Return producer with Kafka client
182-
Rdkafka::Producer.new(Rdkafka::Producer::Client.new(native_kafka(config, :rd_kafka_producer))).tap do |producer|
182+
Rdkafka::Producer.new(Rdkafka::Producer::Client.new(native_kafka(config, :rd_kafka_producer)), self[:partitioner]).tap do |producer|
183183
opaque.producer = producer
184184
end
185185
end

lib/rdkafka/producer.rb

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,9 @@ class Producer
1010
attr_reader :delivery_callback
1111

1212
# @private
13-
def initialize(client)
13+
def initialize(client, partitioner_name)
1414
@client = client
15+
@partitioner_name = partitioner_name || "consistent_random"
1516

1617
# Makes sure, that the producer gets closed before it gets GCed by Ruby
1718
ObjectSpace.define_finalizer(self, client.finalizer)
@@ -85,7 +86,7 @@ def produce(topic:, payload: nil, key: nil, partition: nil, partition_key: nil,
8586
if partition_key
8687
partition_count = partition_count(topic)
8788
# If the topic is not present, set to -1
88-
partition = Rdkafka::Bindings.partitioner(partition_key, partition_count) if partition_count
89+
partition = Rdkafka::Bindings.partitioner(partition_key, partition_count, @partitioner_name) if partition_count
8990
end
9091

9192
# If partition is nil, use -1 to let librdafka set the partition randomly or

spec/rdkafka/bindings_spec.rb

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,13 @@
7676
result_2 = (Zlib.crc32(partition_key) % partition_count)
7777
expect(result_1).to eq(result_2)
7878
end
79+
80+
it "should return the partition calculated by the specified partitioner" do
81+
result_1 = Rdkafka::Bindings.partitioner(partition_key, partition_count, "murmur2")
82+
ptr = FFI::MemoryPointer.from_string(partition_key)
83+
result_2 = Rdkafka::Bindings.rd_kafka_msg_partitioner_murmur2(nil, ptr, partition_key.size, partition_count, nil, nil)
84+
expect(result_1).to eq(result_2)
85+
end
7986
end
8087

8188
describe "stats callback" do

0 commit comments

Comments
 (0)