@@ -32,6 +32,7 @@ TEST(KafkaSyncProducer, RecordTimestamp)
3232
3333 // Prepare a producer
3434 KafkaSyncProducer producer (KafkaTestUtility::GetKafkaClientCommonConfig ());
35+ producer.setErrorCallback (KafkaTestUtility::DumpError);
3536
3637 constexpr int TIME_LAPSE_THRESHOLD_MS = 1000 ;
3738 using namespace std ::chrono;
@@ -124,6 +125,7 @@ TEST(KafkaAsyncProducer, NoMissedDeliveryCallback)
124125
125126 {
126127 KafkaAsyncProducer producer (KafkaTestUtility::GetKafkaClientCommonConfig ().put (ProducerConfig::MESSAGE_TIMEOUT_MS, " 5000" ));
128+ producer.setErrorCallback (KafkaTestUtility::DumpError);
127129
128130 // Pause the brokers for a while
129131 auto asyncTask = KafkaTestUtility::PauseBrokersForAWhile (std::chrono::seconds (5 ));
@@ -157,6 +159,7 @@ TEST(KafkaAsyncProducer, MightMissDeliveryCallbackIfCloseWithLimitedTimeout)
157159 std::size_t deliveryCount = 0 ;
158160 {
159161 KafkaAsyncProducer producer (KafkaTestUtility::GetKafkaClientCommonConfig ());
162+ producer.setErrorCallback (KafkaTestUtility::DumpError);
160163
161164 KafkaTestUtility::PauseBrokers ();
162165
@@ -202,6 +205,7 @@ TEST(KafkaAsyncProducer, BrokerStopWhileSendingMessages)
202205 std::size_t deliveryCount = 0 ;
203206 {
204207 KafkaAsyncProducer producer (KafkaTestUtility::GetKafkaClientCommonConfig ());
208+ producer.setErrorCallback (KafkaTestUtility::DumpError);
205209
206210 // Pause the brokers for a while (shorter then the default "MESSAGE_TIMEOUT_MS" for producer, which is 10 seconds)
207211 auto asyncTask = KafkaTestUtility::PauseBrokersForAWhile (std::chrono::seconds (5 ));
@@ -249,6 +253,7 @@ TEST(KafkaAsyncProducer, Send_AckTimeout)
249253 {
250254 KafkaAsyncProducer producer (KafkaTestUtility::GetKafkaClientCommonConfig ()
251255 .put (ProducerConfig::MESSAGE_TIMEOUT_MS, " 3000" )); // If with no response, the delivery would fail in a short time
256+ producer.setErrorCallback (KafkaTestUtility::DumpError);
252257
253258 // Pause the brokers for a while
254259 auto asyncTask = KafkaTestUtility::PauseBrokersForAWhile (std::chrono::seconds (5 ));
@@ -287,6 +292,7 @@ TEST(KafkaAsyncProducer, ManuallyPollEvents_AckTimeout)
287292 KafkaAsyncProducer producer (KafkaTestUtility::GetKafkaClientCommonConfig ()
288293 .put (ProducerConfig::MESSAGE_TIMEOUT_MS, " 3000" ), // If with no response, the delivery would fail in a short time
289294 KafkaClient::EventsPollingOption::Manual); // Manually call `pollEvents()`
295+ producer.setErrorCallback (KafkaTestUtility::DumpError);
290296
291297 // Pause the brokers for a while
292298 auto asyncTask = KafkaTestUtility::PauseBrokersForAWhile (std::chrono::seconds (5 ));
@@ -333,6 +339,7 @@ TEST(KafkaAsyncProducer, ManuallyPollEvents_AlwaysFinishClosing)
333339 KafkaAsyncProducer producer (KafkaTestUtility::GetKafkaClientCommonConfig ()
334340 .put (ProducerConfig::MESSAGE_TIMEOUT_MS, " 3000" ), // If with no response, the delivery would fail in a short time
335341 KafkaClient::EventsPollingOption::Manual); // Manually call `pollEvents()`
342+ producer.setErrorCallback (KafkaTestUtility::DumpError);
336343
337344 // Pause the brokers for a while
338345 auto asyncTask = KafkaTestUtility::PauseBrokersForAWhile (std::chrono::seconds (5 ));
@@ -362,6 +369,7 @@ TEST(KafkaSyncProducer, Send_AckTimeout)
362369 KafkaTestUtility::CreateKafkaTopic (topic, 5 , 3 );
363370
364371 KafkaSyncProducer producer (KafkaTestUtility::GetKafkaClientCommonConfig ().put (ProducerConfig::MESSAGE_TIMEOUT_MS, " 3000" ));
372+ producer.setErrorCallback (KafkaTestUtility::DumpError);
365373
366374 // Pause the brokers for a while
367375 auto asyncTask = KafkaTestUtility::PauseBrokersForAWhile (std::chrono::seconds (5 ));
0 commit comments