Skip to content

Commit 06dcf2c

Browse files
authored
fix null pointer reference segfault (#636)
1 parent 6ef7b31 commit 06dcf2c

File tree

5 files changed

+312
-39
lines changed

5 files changed

+312
-39
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
- **[Feature]** Add precompiled `x86_64-linux-musl` setup.
66
- **[Feature]** Add precompiled `macos_arm64` setup.
77
- [Fix] Fix a case where using empty key on the `musl` architecture would cause a segfault.
8+
- [Fix] Fix for null pointer reference bypass on empty string being too wide causing segfault.
89
- [Enhancement] Allow for producing to non-existing topics with `key` and `partition_key` present.
910
- [Enhancement] Replace TTL-based partition count cache with a global cache that reuses `librdkafka` statistics data when possible.
1011
- [Enhancement] Support producing and consuming of headers with mulitple values (KIP-82).

lib/rdkafka/bindings.rb

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -384,18 +384,16 @@ class NativeErrorDesc < FFI::Struct
384384
hsh[name] = method_name
385385
end
386386

387-
def self.partitioner(str, partition_count, partitioner_name = "consistent_random")
387+
def self.partitioner(topic_ptr, str, partition_count, partitioner = "consistent_random")
388388
# Return RD_KAFKA_PARTITION_UA(unassigned partition) when partition count is nil/zero.
389389
return -1 unless partition_count&.nonzero?
390-
# musl architecture crashes with empty string
391-
return 0 if str.empty?
392390

393-
str_ptr = FFI::MemoryPointer.from_string(str)
394-
method_name = PARTITIONERS.fetch(partitioner_name) do
395-
raise Rdkafka::Config::ConfigError.new("Unknown partitioner: #{partitioner_name}")
391+
str_ptr = str.empty? ? FFI::MemoryPointer::NULL : FFI::MemoryPointer.from_string(str)
392+
method_name = PARTITIONERS.fetch(partitioner) do
393+
raise Rdkafka::Config::ConfigError.new("Unknown partitioner: #{partitioner}")
396394
end
397395

398-
public_send(method_name, nil, str_ptr, str.size, partition_count, nil, nil)
396+
public_send(method_name, topic_ptr, str_ptr, str.size, partition_count, nil, nil)
399397
end
400398

401399
# Create Topics

lib/rdkafka/producer.rb

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -51,13 +51,13 @@ class TopicHandleCreationError < RuntimeError; end
5151

5252
# @private
5353
# @param native_kafka [NativeKafka]
54-
# @param partitioner_name [String, nil] name of the partitioner we want to use or nil to use
54+
# @param partitioner [String, nil] name of the partitioner we want to use or nil to use
5555
# the "consistent_random" default
56-
def initialize(native_kafka, partitioner_name)
56+
def initialize(native_kafka, partitioner)
5757
@topics_refs_map = {}
5858
@topics_configs = {}
5959
@native_kafka = native_kafka
60-
@partitioner_name = partitioner_name || "consistent_random"
60+
@partitioner = partitioner || "consistent_random"
6161

6262
# Makes sure, that native kafka gets closed before it gets GCed by Ruby
6363
ObjectSpace.define_finalizer(self, native_kafka.finalizer)
@@ -275,7 +275,8 @@ def produce(
275275
timestamp: nil,
276276
headers: nil,
277277
label: nil,
278-
topic_config: EMPTY_HASH
278+
topic_config: EMPTY_HASH,
279+
partitioner: @partitioner
279280
)
280281
closed_producer_check(__method__)
281282

@@ -307,10 +308,14 @@ def produce(
307308

308309
# Check if there are no overrides for the partitioner and use the default one only when
309310
# no per-topic is present.
310-
partitioner_name = @topics_configs.dig(topic, topic_config_hash, :partitioner) || @partitioner_name
311+
selected_partitioner = @topics_configs.dig(topic, topic_config_hash, :partitioner) || partitioner
311312

312313
# If the topic is not present, set to -1
313-
partition = Rdkafka::Bindings.partitioner(partition_key, partition_count, partitioner_name) if partition_count.positive?
314+
partition = Rdkafka::Bindings.partitioner(
315+
topic_ref,
316+
partition_key,
317+
partition_count,
318+
selected_partitioner) if partition_count.positive?
314319
end
315320

316321
# If partition is nil, use -1 to let librdafka set the partition randomly or

spec/rdkafka/bindings_spec.rb

Lines changed: 0 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -77,30 +77,6 @@
7777
end
7878
end
7979

80-
describe "partitioner" do
81-
let(:partition_key) { ('a'..'z').to_a.shuffle.take(15).join('') }
82-
let(:partition_count) { rand(50) + 1 }
83-
84-
it "should return the same partition for a similar string and the same partition count" do
85-
result_1 = Rdkafka::Bindings.partitioner(partition_key, partition_count)
86-
result_2 = Rdkafka::Bindings.partitioner(partition_key, partition_count)
87-
expect(result_1).to eq(result_2)
88-
end
89-
90-
it "should match the old partitioner" do
91-
result_1 = Rdkafka::Bindings.partitioner(partition_key, partition_count)
92-
result_2 = (Zlib.crc32(partition_key) % partition_count)
93-
expect(result_1).to eq(result_2)
94-
end
95-
96-
it "should return the partition calculated by the specified partitioner" do
97-
result_1 = Rdkafka::Bindings.partitioner(partition_key, partition_count, "murmur2")
98-
ptr = FFI::MemoryPointer.from_string(partition_key)
99-
result_2 = Rdkafka::Bindings.rd_kafka_msg_partitioner_murmur2(nil, ptr, partition_key.size, partition_count, nil, nil)
100-
expect(result_1).to eq(result_2)
101-
end
102-
end
103-
10480
describe "stats callback" do
10581
context "without a stats callback" do
10682
it "should do nothing" do

0 commit comments

Comments
 (0)