@@ -9,7 +9,15 @@ class Producer
99    # Cache partitions count for 30 seconds 
1010    PARTITIONS_COUNT_TTL  =  30 
1111
12-     private_constant  :PARTITIONS_COUNT_TTL 
12+     # Empty hash used as a default 
13+     EMPTY_HASH  =  { } . freeze 
14+ 
15+     private_constant  :PARTITIONS_COUNT_TTL ,  :EMPTY_HASH 
16+ 
17+     # Raised when there was a critical issue when invoking rd_kafka_topic_new 
18+     # This is a temporary solution until https://github.com/karafka/rdkafka-ruby/issues/451 is 
19+     # resolved and this is normalized in all the places 
20+     class  TopicHandleCreationError  < RuntimeError ;  end 
1321
1422    # @private 
1523    # Returns the current delivery callback, by default this is nil. 
@@ -28,6 +36,8 @@ class Producer
2836    # @param partitioner_name [String, nil] name of the partitioner we want to use or nil to use 
2937    #   the "consistent_random" default 
3038    def  initialize ( native_kafka ,  partitioner_name ) 
39+       @topics_refs_map  =  { } 
40+       @topics_configs  =  { } 
3141      @native_kafka  =  native_kafka 
3242      @partitioner_name  =  partitioner_name  || "consistent_random" 
3343
@@ -54,6 +64,52 @@ def initialize(native_kafka, partitioner_name)
5464      end 
5565    end 
5666
67+     # Sets alternative set of configuration details that can be set per topic 
68+     # @note It is not allowed to re-set the same topic config twice because of the underlying 
69+     #   librdkafka caching 
70+     # @param topic [String] The topic name 
71+     # @param config [Hash] config we want to use per topic basis 
72+     # @param config_hash [Integer] hash of the config. We expect it here instead of computing it, 
73+     #   because it is already computed during the retrieval attempt in the `#produce` flow. 
74+     def  set_topic_config ( topic ,  config ,  config_hash ) 
75+       # Ensure lock on topic reference just in case 
76+       @native_kafka . with_inner  do  |inner |
77+         @topics_refs_map [ topic ]  ||= { } 
78+         @topics_configs [ topic ]  ||= { } 
79+ 
80+         return  if  @topics_configs [ topic ] . key? ( config_hash ) 
81+ 
82+         # If config is empty, we create an empty reference that will be used with defaults 
83+         rd_topic_config  =  if  config . empty? 
84+                             nil 
85+                           else 
86+                             Rdkafka ::Bindings . rd_kafka_topic_conf_new . tap  do  |topic_config |
87+                               config . each  do  |key ,  value |
88+                                 error_buffer  =  FFI ::MemoryPointer . new ( :char ,  256 ) 
89+                                 result  =  Rdkafka ::Bindings . rd_kafka_topic_conf_set ( 
90+                                   topic_config , 
91+                                   key . to_s , 
92+                                   value . to_s , 
93+                                   error_buffer , 
94+                                   256 
95+                                 ) 
96+ 
97+                                 unless  result  == :config_ok 
98+                                   raise  Config ::ConfigError . new ( error_buffer . read_string ) 
99+                                 end 
100+                               end 
101+                             end 
102+                           end 
103+ 
104+         topic_handle  =  Bindings . rd_kafka_topic_new ( inner ,  topic ,  rd_topic_config ) 
105+ 
106+         raise  TopicHandleCreationError . new ( "Error creating topic handle for topic #{ topic }  " )  if  topic_handle . null? 
107+ 
108+         @topics_configs [ topic ] [ config_hash ]  =  config 
109+         @topics_refs_map [ topic ] [ config_hash ]  =  topic_handle 
110+       end 
111+     end 
112+ 
57113    # Starts the native Kafka polling thread and kicks off the init polling 
58114    # @note Not needed to run unless explicit start was disabled 
59115    def  start 
@@ -83,7 +139,18 @@ def delivery_callback=(callback)
83139    def  close 
84140      return  if  closed? 
85141      ObjectSpace . undefine_finalizer ( self ) 
86-       @native_kafka . close 
142+ 
143+       @native_kafka . close  do 
144+         # We need to remove the topics references objects before we destroy the producer, 
145+         # otherwise they would leak out 
146+         @topics_refs_map . each_value  do  |refs |
147+           refs . each_value  do  |ref |
148+             Rdkafka ::Bindings . rd_kafka_topic_destroy ( ref ) 
149+           end 
150+         end 
151+       end 
152+ 
153+       @topics_refs_map . clear 
87154    end 
88155
89156    # Whether this producer has closed 
@@ -182,11 +249,22 @@ def partition_count(topic)
182249    # @param timestamp [Time,Integer,nil] Optional timestamp of this message. Integer timestamp is in milliseconds since Jan 1 1970. 
183250    # @param headers [Hash<String,String>] Optional message headers 
184251    # @param label [Object, nil] a label that can be assigned when producing a message that will be part of the delivery handle and the delivery report 
252+     # @param topic_config [Hash] topic config for given message dispatch. Allows to send messages to topics with different configuration 
185253    # 
186254    # @return [DeliveryHandle] Delivery handle that can be used to wait for the result of producing this message 
187255    # 
188256    # @raise [RdkafkaError] When adding the message to rdkafka's queue failed 
189-     def  produce ( topic :,  payload : nil ,  key : nil ,  partition : nil ,  partition_key : nil ,  timestamp : nil ,  headers : nil ,  label : nil ) 
257+     def  produce ( 
258+       topic :, 
259+       payload : nil , 
260+       key : nil , 
261+       partition : nil , 
262+       partition_key : nil , 
263+       timestamp : nil , 
264+       headers : nil , 
265+       label : nil , 
266+       topic_config : EMPTY_HASH 
267+     ) 
190268      closed_producer_check ( __method__ ) 
191269
192270      # Start by checking and converting the input 
@@ -205,8 +283,20 @@ def produce(topic:, payload: nil, key: nil, partition: nil, partition_key: nil,
205283                   key . bytesize 
206284                 end 
207285
286+       topic_config_hash  =  topic_config . hash 
287+ 
288+       # Checks if we have the rdkafka topic reference object ready. It saves us on object 
289+       # allocation and allows to use custom config on demand. 
290+       set_topic_config ( topic ,  topic_config ,  topic_config_hash )  unless  @topics_refs_map . dig ( topic ,  topic_config_hash ) 
291+       topic_ref  =  @topics_refs_map . dig ( topic ,  topic_config_hash ) 
292+ 
208293      if  partition_key 
209294        partition_count  =  partition_count ( topic ) 
295+ 
296+         # Check if there are no overrides for the partitioner and use the default one only when 
297+         # no per-topic is present. 
298+         partitioner_name  =  @topics_configs . dig ( topic ,  topic_config_hash ,  :partitioner )  || @partitioner_name 
299+ 
210300        # If the topic is not present, set to -1 
211301        partition  =  Rdkafka ::Bindings . partitioner ( partition_key ,  partition_count ,  @partitioner_name )  if  partition_count . positive? 
212302      end 
@@ -236,7 +326,7 @@ def produce(topic:, payload: nil, key: nil, partition: nil, partition_key: nil,
236326      DeliveryHandle . register ( delivery_handle ) 
237327
238328      args  =  [ 
239-         :int ,  Rdkafka ::Bindings ::RD_KAFKA_VTYPE_TOPIC ,  :string  ,   topic , 
329+         :int ,  Rdkafka ::Bindings ::RD_KAFKA_VTYPE_RKT ,  :pointer  ,   topic_ref , 
240330        :int ,  Rdkafka ::Bindings ::RD_KAFKA_VTYPE_MSGFLAGS ,  :int ,  Rdkafka ::Bindings ::RD_KAFKA_MSG_F_COPY , 
241331        :int ,  Rdkafka ::Bindings ::RD_KAFKA_VTYPE_VALUE ,  :buffer_in ,  payload ,  :size_t ,  payload_size , 
242332        :int ,  Rdkafka ::Bindings ::RD_KAFKA_VTYPE_KEY ,  :buffer_in ,  key ,  :size_t ,  key_size , 
0 commit comments