@@ -69,6 +69,11 @@ class KafkaProducer: public KafkaClient
6969     */  
7070    enum  class  SendOption  { NoCopyRecordValue, ToCopyRecordValue };
7171
72+     /* *
73+      * Choose the action while the sending buffer is full. 
74+      */  
75+     enum  class  ActionWhileQueueIsFull  { Block, NoBlock };
76+ 
7277    /* *
7378     * Asynchronously send a record to a topic. 
7479     * 
@@ -87,7 +92,10 @@ class KafkaProducer: public KafkaClient
8792     *   Broker errors, 
8893     *     - [Error Codes] (https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ErrorCodes) 
8994     */  
90-     void  send (const  producer::ProducerRecord& record, const  producer::Callback& deliveryCb, SendOption option = SendOption::NoCopyRecordValue);
95+     void  send (const  producer::ProducerRecord& record,
96+               const  producer::Callback&       deliveryCb,
97+               SendOption                      option = SendOption::NoCopyRecordValue,
98+               ActionWhileQueueIsFull          action = ActionWhileQueueIsFull::Block);
9199
92100    /* *
93101     * Asynchronously send a record to a topic. 
@@ -107,9 +115,13 @@ class KafkaProducer: public KafkaClient
107115     *   Broker errors, 
108116     *     - [Error Codes] (https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ErrorCodes) 
109117     */  
110-     void  send (const  producer::ProducerRecord& record, const  producer::Callback& deliveryCb, Error& error, SendOption option = SendOption::NoCopyRecordValue)
118+     void  send (const  producer::ProducerRecord& record,
119+               const  producer::Callback&       deliveryCb,
120+               Error&                          error,
121+               SendOption                      option = SendOption::NoCopyRecordValue,
122+               ActionWhileQueueIsFull          action = ActionWhileQueueIsFull::Block)
111123    {
112-         try  { send (record, deliveryCb, option); } catch  (const  KafkaException& e) { error = e.error (); }
124+         try  { send (record, deliveryCb, option, action ); } catch  (const  KafkaException& e) { error = e.error (); }
113125    }
114126
115127    /* *
@@ -183,8 +195,6 @@ class KafkaProducer: public KafkaClient
183195        const  producer::Callback                     _deliveryCb;
184196    };
185197
186-     enum  class  ActionWhileQueueIsFull  { Block, NoBlock };
187- 
188198    //  Validate properties (and fix it if necesary)
189199    static  Properties validateAndReformProperties (const  Properties& properties);
190200
@@ -320,10 +330,11 @@ KafkaProducer::deliveryCallback(rd_kafka_t* rk, const rd_kafka_message_t* rkmsg,
320330inline  void 
321331KafkaProducer::send (const  producer::ProducerRecord& record,
322332                    const  producer::Callback&       deliveryCb,
323-                     SendOption                      option)
333+                     SendOption                      option,
334+                     ActionWhileQueueIsFull          action)
324335{
325336    auto  deliveryCbOpaque = std::make_unique<DeliveryCbOpaque>(record.id (), deliveryCb);
326-     auto  queueFullAction  = (isWithAutoEventsPolling () ? ActionWhileQueueIsFull::Block  : ActionWhileQueueIsFull::NoBlock);
337+     auto  queueFullAction  = (isWithAutoEventsPolling () ? action  : ActionWhileQueueIsFull::NoBlock);
327338
328339    const  auto * topic     = record.topic ().c_str ();
329340    const  auto   partition = record.partition ();
0 commit comments