Skip to content

Commit 1a37b23

Browse files
authored
skip intermediate callback array (#388)
1 parent 26dba3c commit 1a37b23

File tree

2 files changed

+24
-2
lines changed

2 files changed

+24
-2
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
- [Enhancement] Alias `topic_name` as `topic` in the delivery report (mensfeld)
66
- [Enhancement] Provide `label` producer handler and report reference for improved traceability (mensfeld)
77
- [Enhancement] Include the error when invoking `create_result` on producer handle (mensfeld)
8+
- [Enhancement] Skip intermediate array creation on delivery report callback execution (one per message).
89
- [Fix] Fix return type on `#rd_kafka_poll` (mensfeld)
910
- [Fix] `uint8_t` does not exist on Apple Silicon (mensfeld)
1011
- [Fix] Missing ACL `RD_KAFKA_RESOURCE_BROKER` constant reference (mensfeld)

lib/rdkafka/producer.rb

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,9 @@ class Producer
2323
attr_reader :delivery_callback_arity
2424

2525
# @private
26+
# @param native_kafka [NativeKafka]
27+
# @param partitioner_name [String, nil] name of the partitioner we want to use or nil to use
28+
# the "consistent_random" default
2629
def initialize(native_kafka, partitioner_name)
2730
@native_kafka = native_kafka
2831
@partitioner_name = partitioner_name || "consistent_random"
@@ -258,13 +261,27 @@ def produce(topic:, payload: nil, key: nil, partition: nil, partition_key: nil,
258261
delivery_handle
259262
end
260263

264+
# Calls (if registered) the delivery callback
265+
#
266+
# @param delivery_report [Producer::DeliveryReport]
267+
# @param delivery_handle [Producer::DeliveryHandle]
261268
def call_delivery_callback(delivery_report, delivery_handle)
262269
return unless @delivery_callback
263270

264-
args = [delivery_report, delivery_handle].take(@delivery_callback_arity)
265-
@delivery_callback.call(*args)
271+
case @delivery_callback_arity
272+
when 0
273+
@delivery_callback.call
274+
when 1
275+
@delivery_callback.call(delivery_report)
276+
else
277+
@delivery_callback.call(delivery_report, delivery_handle)
278+
end
266279
end
267280

281+
# Figures out the arity of a given block/method
282+
#
283+
# @param callback [#call, Proc]
284+
# @return [Integer] arity of the provided block/method
268285
def arity(callback)
269286
return callback.arity if callback.respond_to?(:arity)
270287

@@ -273,6 +290,10 @@ def arity(callback)
273290

274291
private
275292

293+
# Ensures, no operations can happen on a closed producer
294+
#
295+
# @param method [Symbol] name of the method that invoked producer
296+
# @raise [Rdkafka::ClosedProducerError]
276297
def closed_producer_check(method)
277298
raise Rdkafka::ClosedProducerError.new(method) if closed?
278299
end

0 commit comments

Comments
 (0)