Skip to content

Commit 0884864

Browse files
authored
improve object allocations (#456)
1 parent 9660001 commit 0884864

File tree

6 files changed

+16
-21
lines changed

6 files changed

+16
-21
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22

33
## 0.16.0 (Unreleased)
44
- **[Breaking]** Retire support for Ruby 2.7.
5+
- **[Breaking]** Messages without headers returned by `#poll` contain frozen empty hash.
6+
- **[Breaking]** `HashWithSymbolKeysTreatedLikeStrings` has been removed so headers are regular hashes with string keys.
57
- **[Feature]** Support incremental config describe + alter API.
68
- **[Feature]** Oauthbearer token refresh callback (bruce-szalwinski-he)
79
- **[Feature]** Provide ability to use topic config on a producer for custom behaviors per dispatch.
@@ -10,6 +12,7 @@
1012
- [Enhancement] Replace time poll based wait engine with an event based to improve response times on blocking operations and wait (nijikon + mensfeld)
1113
- [Enhancement] Allow for usage of the second regex engine of librdkafka by setting `RDKAFKA_DISABLE_REGEX_EXT` during build (mensfeld)
1214
- [Enhancement] name polling Thread as `rdkafka.native_kafka#<name>` (nijikon)
15+
- [Enhancement] Save two objects on message produced and lower CPU usage on message produced with small improvements.
1316
- [Change] Allow for native kafka thread operations deferring and manual start for consumer, producer and admin.
1417
- [Change] The `wait_timeout` argument in `AbstractHandle.wait` method is deprecated and will be removed in future versions without replacement. We don't rely on it's value anymore (nijikon)
1518
- [Fix] Background logger stops working after forking causing memory leaks (mensfeld)

lib/rdkafka/consumer/headers.rb

Lines changed: 3 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -4,17 +4,7 @@ module Rdkafka
44
class Consumer
55
# Interface to return headers for a consumer message
66
module Headers
7-
class HashWithSymbolKeysTreatedLikeStrings < Hash
8-
def [](key)
9-
if key.is_a?(Symbol)
10-
Kernel.warn("rdkafka deprecation warning: header access with Symbol key #{key.inspect} treated as a String. " \
11-
"Please change your code to use String keys to avoid this warning. Symbol keys will break in version 1.")
12-
super(key.to_s)
13-
else
14-
super
15-
end
16-
end
17-
end
7+
EMPTY_HEADERS = {}.freeze
188

199
# Reads a librdkafka native message's headers and returns them as a Ruby Hash
2010
#
@@ -28,7 +18,7 @@ def self.from_native(native_message)
2818
err = Rdkafka::Bindings.rd_kafka_message_headers(native_message, headers_ptrptr)
2919

3020
if err == Rdkafka::Bindings::RD_KAFKA_RESP_ERR__NOENT
31-
return {}
21+
return EMPTY_HEADERS
3222
elsif err != Rdkafka::Bindings::RD_KAFKA_RESP_ERR_NO_ERROR
3323
raise Rdkafka::RdkafkaError.new(err, "Error reading message headers")
3424
end
@@ -39,7 +29,7 @@ def self.from_native(native_message)
3929
value_ptrptr = FFI::MemoryPointer.new(:pointer)
4030
size_ptr = Rdkafka::Bindings::SizePtr.new
4131

42-
headers = HashWithSymbolKeysTreatedLikeStrings.new
32+
headers = {}
4333

4434
idx = 0
4535
loop do

lib/rdkafka/producer.rb

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -319,6 +319,7 @@ def produce(
319319

320320
delivery_handle = DeliveryHandle.new
321321
delivery_handle.label = label
322+
delivery_handle.topic = topic
322323
delivery_handle[:pending] = true
323324
delivery_handle[:response] = -1
324325
delivery_handle[:partition] = -1
@@ -342,7 +343,7 @@ def produce(
342343
args << :int << Rdkafka::Bindings::RD_KAFKA_VTYPE_HEADER
343344
args << :string << key
344345
args << :pointer << value
345-
args << :size_t << value.bytes.size
346+
args << :size_t << value.bytesize
346347
end
347348
end
348349

lib/rdkafka/producer/delivery_handle.rb

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,10 @@ class DeliveryHandle < Rdkafka::AbstractHandle
1414
# @return [Object, nil] label set during message production or nil by default
1515
attr_accessor :label
1616

17+
# @return [String] topic where we are trying to send the message
18+
# We use this instead of reading from `topic_name` pointer to save on memory allocations
19+
attr_accessor :topic
20+
1721
# @return [String] the name of the operation (e.g. "delivery")
1822
def operation_name
1923
"delivery"
@@ -26,7 +30,7 @@ def create_result
2630
self[:offset],
2731
# For part of errors, we will not get a topic name reference and in cases like this
2832
# we should not return it
29-
self[:topic_name].null? ? nil : self[:topic_name].read_string,
33+
topic,
3034
self[:response] != 0 ? RdkafkaError.new(self[:response]) : nil,
3135
label
3236
)

spec/rdkafka/consumer/headers_spec.rb

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -50,11 +50,8 @@
5050
expect(subject['version']).to eq("2.1.3")
5151
end
5252

53-
it 'allows Symbol key, but warns' do
54-
expect(Kernel).to \
55-
receive(:warn).with("rdkafka deprecation warning: header access with Symbol key :version treated as a String. " \
56-
"Please change your code to use String keys to avoid this warning. Symbol keys will break in version 1.")
57-
expect(subject[:version]).to eq("2.1.3")
53+
it 'does not support symbols mappings' do
54+
expect(subject.key?(:version)).to eq(false)
5855
end
5956
end
6057
end

spec/rdkafka/producer/delivery_handle_spec.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
handle[:response] = response
1010
handle[:partition] = 2
1111
handle[:offset] = 100
12-
handle[:topic_name] = FFI::MemoryPointer.from_string("produce_test_topic")
12+
handle.topic = "produce_test_topic"
1313
end
1414
end
1515

0 commit comments

Comments
 (0)