Skip to content

Commit 9405bf9

Browse files
authored
Merge pull request #466 from raytung/fix/out_kafka2_shared_producer_send_size
fix(out_rdkafka2): fixed a bug where it is not respecting the Buffer sections' `chunk_limit_records` and `chunk_limit_size` options
2 parents eb74021 + 74c70a4 commit 9405bf9

File tree

2 files changed

+15
-13
lines changed

2 files changed

+15
-13
lines changed

lib/fluent/plugin/kafka_producer_ext.rb

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -38,9 +38,15 @@ def produce_for_buffered(value, key: nil, topic:, partition: nil, partition_key:
3838
end
3939

4040
# for out_kafka2
41+
# Majority (if not all) of this code is lifted from https://github.com/zendesk/ruby-kafka/blob/master/lib/kafka/producer.rb
42+
# with the main difference where we have removed any checks regarding max_buffer_bytesize and max_buffer_size
43+
# The reason for doing this is to provide a better UX for our users where they only need to set those bounds in
44+
# the Buffer section using `chunk_limit_size` and `chunk_limit_records`.
45+
#
46+
# We should reconsider this in the future in case the `ruby-kafka` library drastically changes its internal.
4147
module Kafka
4248
class Client
43-
def topic_producer(topic, compression_codec: nil, compression_threshold: 1, ack_timeout: 5, required_acks: :all, max_retries: 2, retry_backoff: 1, max_buffer_size: 1000, max_buffer_bytesize: 10_000_000, idempotent: false, transactional: false, transactional_id: nil, transactional_timeout: 60)
49+
def custom_producer(compression_codec: nil, compression_threshold: 1, ack_timeout: 5, required_acks: :all, max_retries: 2, retry_backoff: 1, max_buffer_size: 1000, max_buffer_bytesize: 10_000_000, idempotent: false, transactional: false, transactional_id: nil, transactional_timeout: 60)
4450
cluster = initialize_cluster
4551
compressor = Compressor.new(
4652
codec_name: compression_codec,
@@ -57,8 +63,7 @@ def topic_producer(topic, compression_codec: nil, compression_threshold: 1, ack_
5763
transactional_timeout: transactional_timeout,
5864
)
5965

60-
TopicProducer.new(topic,
61-
cluster: cluster,
66+
CustomProducer.new(cluster: cluster,
6267
transaction_manager: transaction_manager,
6368
logger: @logger,
6469
instrumenter: @instrumenter,
@@ -74,8 +79,8 @@ def topic_producer(topic, compression_codec: nil, compression_threshold: 1, ack_
7479
end
7580
end
7681

77-
class TopicProducer
78-
def initialize(topic, cluster:, transaction_manager:, logger:, instrumenter:, compressor:, ack_timeout:, required_acks:, max_retries:, retry_backoff:, max_buffer_size:, max_buffer_bytesize:, partitioner:)
82+
class CustomProducer
83+
def initialize(cluster:, transaction_manager:, logger:, instrumenter:, compressor:, ack_timeout:, required_acks:, max_retries:, retry_backoff:, max_buffer_size:, max_buffer_bytesize:, partitioner:)
7984
@cluster = cluster
8085
@transaction_manager = transaction_manager
8186
@logger = logger
@@ -88,23 +93,19 @@ def initialize(topic, cluster:, transaction_manager:, logger:, instrumenter:, co
8893
@max_buffer_bytesize = max_buffer_bytesize
8994
@compressor = compressor
9095
@partitioner = partitioner
91-
92-
@topic = topic
93-
@cluster.add_target_topics(Set.new([topic]))
94-
9596
# A buffer organized by topic/partition.
9697
@buffer = MessageBuffer.new
9798

9899
# Messages added by `#produce` but not yet assigned a partition.
99100
@pending_message_queue = PendingMessageQueue.new
100101
end
101102

102-
def produce(value, key: nil, partition: nil, partition_key: nil, headers: EMPTY_HEADER, create_time: Time.now)
103+
def produce(value, key: nil, partition: nil, partition_key: nil, headers: EMPTY_HEADER, create_time: Time.now, topic: nil)
103104
message = PendingMessage.new(
104105
value: value,
105106
key: key,
106107
headers: headers,
107-
topic: @topic,
108+
topic: topic,
108109
partition: partition,
109110
partition_key: partition_key,
110111
create_time: create_time
@@ -245,12 +246,13 @@ def deliver_messages_with_retries
245246

246247
def assign_partitions!
247248
failed_messages = []
248-
partition_count = @cluster.partitions_for(@topic).count
249249

250250
@pending_message_queue.each do |message|
251251
partition = message.partition
252252

253253
begin
254+
partition_count = @cluster.partitions_for(message.topic).count
255+
254256
if partition.nil?
255257
partition = @partitioner.call(partition_count, message)
256258
end

lib/fluent/plugin/out_kafka2.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -207,7 +207,7 @@ def multi_workers_ready?
207207
end
208208

209209
def create_producer
210-
@kafka.producer(**@producer_opts)
210+
@kafka.custom_producer(**@producer_opts)
211211
end
212212

213213
def start

0 commit comments

Comments
 (0)